使用队列的生产者/消费者线程

我想创build一些Producer/Consumer线程应用程序。 但是我不确定在两者之间实现一个队列的最佳方式。

所以我有两个想法(这两个可能是完全错误的)。 我想知道哪个更好,如果他们都吸了,那么执行队列的最好方法是什么。 主要是我在这些例子中执行的队列,我很关心。 我正在扩展一个Queue类,它是一个内部类,并且是线程安全的。 以下是每个4个类的两个示例。

主要类 –

 public class SomeApp { private Consumer consumer; private Producer producer; public static void main (String args[]) { consumer = new Consumer(); producer = new Producer(); } } 

消费者阶层 –

 public class Consumer implements Runnable { public Consumer() { Thread consumer = new Thread(this); consumer.start(); } public void run() { while(true) { //get an object off the queue Object object = QueueHandler.dequeue(); //do some stuff with the object } } } 

生产者类 –

 public class Producer implements Runnable { public Producer() { Thread producer = new Thread(this); producer.start(); } public void run() { while(true) { //add to the queue some sort of unique object QueueHandler.enqueue(new Object()); } } } 

队列类 –

 public class QueueHandler { //This Queue class is a thread safe (written in house) class public static Queue<Object> readQ = new Queue<Object>(100); public static void enqueue(Object object) { //do some stuff readQ.add(object); } public static Object dequeue() { //do some stuff return readQ.get(); } } 

要么

主要类 –

 public class SomeApp { Queue<Object> readQ; private Consumer consumer; private Producer producer; public static void main (String args[]) { readQ = new Queue<Object>(100); consumer = new Consumer(readQ); producer = new Producer(readQ); } } 

消费者阶层 –

 public class Consumer implements Runnable { Queue<Object> queue; public Consumer(Queue<Object> readQ) { queue = readQ; Thread consumer = new Thread(this); consumer.start(); } public void run() { while(true) { //get an object off the queue Object object = queue.dequeue(); //do some stuff with the object } } } 

生产者类 –

 public class Producer implements Runnable { Queue<Object> queue; public Producer(Queue<Object> readQ) { queue = readQ; Thread producer = new Thread(this); producer.start(); } public void run() { while(true) { //add to the queue some sort of unique object queue.enqueue(new Object()); } } } 

队列类 –

 //the extended Queue class is a thread safe (written in house) class public class QueueHandler extends Queue<Object> { public QueueHandler(int size) { super(size); //All I'm thinking about now is McDonalds. } public void enqueue(Object object) { //do some stuff readQ.add(); } public Object dequeue() { //do some stuff return readQ.get(); } } 

去!

Java 5+拥有您需要的所有工具。 你会想:

  1. 把你所有的生产者放在一个ExecutorService ;
  2. 把你所有的消费者放在另一个ExecutorService ;
  3. 如有必要,使用BlockingQueue在两者之间进行通信。

(3)我说“如果有必要”,因为根据我的经验,这是一个不必要的步骤。 您所做的只是向消费者执行者服务提交新任务。 所以:

 final ExecutorService producers = Executors.newFixedThreadPool(100); final ExecutorService consumers = Executors.newFixedThreadPool(100); while (/* has more work */) { producers.submit(...); } producers.shutdown(); producers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); consumers.shutdown(); consumers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); 

所以producers直接提交给consumers

好的,正如其他人指出的,最好的办法是使用java.util.concurrent包。 我强烈推荐“实践中的Java并发”。 这是一本很好的书,涵盖了你需要知道的几乎所有的东西。

至于你的具体实现,正如我在评论中指出的,不要从构造函数启动线程 – 它可能是不安全的。

除此之外,第二个实施看起来更好。 你不想把队列放在静态字段中。 你可能只是失去了灵活性罢了。

如果你想继续自己的实现(为了学习的目的,我猜?),至less提供一个start()方法。 你应该构造对象(你可以实例化Thread对象),然后调用start()来启动线程。

编辑: ExecutorService有他们自己的队列,所以这可能是混淆..这里有一些让你开始。

 public class Main { public static void main(String[] args) { //The numbers are just silly tune parameters. Refer to the API. //The important thing is, we are passing a bounded queue. ExecutorService consumer = new ThreadPoolExecutor(1,4,30,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(100)); //No need to bound the queue for this executor. //Use utility method instead of the complicated Constructor. ExecutorService producer = Executors.newSingleThreadExecutor(); Runnable produce = new Produce(consumer); producer.submit(produce); } } class Produce implements Runnable { private final ExecutorService consumer; public Produce(ExecutorService consumer) { this.consumer = consumer; } @Override public void run() { Pancake cake = Pan.cook(); Runnable consume = new Consume(cake); consumer.submit(consume); } } class Consume implements Runnable { private final Pancake cake; public Consume(Pancake cake){ this.cake = cake; } @Override public void run() { cake.eat(); } } 

进一步编辑:对于生产者,而不是while(true) ,你可以做这样的事情:

 @Override public void run(){ while(!Thread.currentThread().isInterrupted()){ //do stuff } } 

这样你可以通过调用.shutdownNow()来closures执行程序。 如果你使用while(true) ,它将不会closures。

另请注意, Producer仍然容易受到RuntimeExceptions (即一个RuntimeException将会暂停处理)

你正在重新发明轮子。

如果您需要持久性和其他企业function使用JMS (我build议ActiveMq )。

如果您需要快速的内存队列,请使用java队列的一个阻塞。

如果您需要支持Java 1.4或更早版本,请使用Doug Lea出色的并发包。

我已经扩展cletusbuild议的答案工作代码示例。

  1. 一个ExecutorService (pes)接受Producer任务。
  2. 一个ExecutorService (ces)接受Consumer任务。
  3. ProducerConsumer股份BlockingQueue
  4. 多个Producer任务生成不同的数字。
  5. 任何Consumer任务都可以消耗由Producer生成的数字

码:

 import java.util.concurrent.*; public class ProducerConsumerWithES { public static void main(String args[]){ BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<Integer>(); ExecutorService pes = Executors.newFixedThreadPool(2); ExecutorService ces = Executors.newFixedThreadPool(2); pes.submit(new Producer(sharedQueue,1)); pes.submit(new Producer(sharedQueue,2)); ces.submit(new Consumer(sharedQueue,1)); ces.submit(new Consumer(sharedQueue,2)); // shutdown should happen somewhere along with awaitTermination / * https://stackoverflow.com/questions/36644043/how-to-properly-shutdown-java-executorservice/36644320#36644320 */ pes.shutdown(); ces.shutdown(); } } class Producer implements Runnable { private final BlockingQueue<Integer> sharedQueue; private int threadNo; public Producer(BlockingQueue<Integer> sharedQueue,int threadNo) { this.threadNo = threadNo; this.sharedQueue = sharedQueue; } @Override public void run() { for(int i=1; i<= 5; i++){ try { int number = i+(10*threadNo); System.out.println("Produced:" + number + ":by thread:"+ threadNo); sharedQueue.put(number); } catch (Exception err) { err.printStackTrace(); } } } } class Consumer implements Runnable{ private final BlockingQueue<Integer> sharedQueue; private int threadNo; public Consumer (BlockingQueue<Integer> sharedQueue,int threadNo) { this.sharedQueue = sharedQueue; this.threadNo = threadNo; } @Override public void run() { while(true){ try { int num = sharedQueue.take(); System.out.println("Consumed: "+ num + ":by thread:"+threadNo); } catch (Exception err) { err.printStackTrace(); } } } } 

输出:

 Produced:11:by thread:1 Produced:21:by thread:2 Produced:22:by thread:2 Consumed: 11:by thread:1 Produced:12:by thread:1 Consumed: 22:by thread:1 Consumed: 21:by thread:2 Produced:23:by thread:2 Consumed: 12:by thread:1 Produced:13:by thread:1 Consumed: 23:by thread:2 Produced:24:by thread:2 Consumed: 13:by thread:1 Produced:14:by thread:1 Consumed: 24:by thread:2 Produced:25:by thread:2 Consumed: 14:by thread:1 Produced:15:by thread:1 Consumed: 25:by thread:2 Consumed: 15:by thread:1 

注意。 如果你不需要多个生产者和消费者,保持单一的生产者和消费者。 我已经添加了多个生产者和消费者,以在多个生产者和消费者中展示BlockingQueue的function。

  1. Java代码“BlockingQueue”已经同步了put和get方法。
  2. Java代码“Producer”,生成器线程来生成数据。
  3. Java代码“消费者”,消费者线程消费所产生的数据。
  4. Java代码“ProducerConsumer_Main”,主要function是启动生产者和消费者线程。

BlockingQueue.java

 public class BlockingQueue { int item; boolean available = false; public synchronized void put(int value) { while (available == true) { try { wait(); } catch (InterruptedException e) { } } item = value; available = true; notifyAll(); } public synchronized int get() { while(available == false) { try { wait(); } catch(InterruptedException e){ } } available = false; notifyAll(); return item; } } 

Consumer.java

 package com.sukanya.producer_Consumer; public class Consumer extends Thread { blockingQueue queue; private int number; Consumer(BlockingQueue queue,int number) { this.queue = queue; this.number = number; } public void run() { int value = 0; for (int i = 0; i < 10; i++) { value = queue.get(); System.out.println("Consumer #" + this.number+ " got: " + value); } } } 

ProducerConsumer_Main.java

 package com.sukanya.producer_Consumer; public class ProducerConsumer_Main { public static void main(String args[]) { BlockingQueue queue = new BlockingQueue(); Producer producer1 = new Producer(queue,1); Consumer consumer1 = new Consumer(queue,1); producer1.start(); consumer1.start(); } } 

这是一个非常简单的代码。

 import java.util.*; // @author : rootTraveller, June 2017 class ProducerConsumer { public static void main(String[] args) throws Exception { Queue<Integer> queue = new LinkedList<>(); Integer buffer = new Integer(10); //Important buffer or queue size, change as per need. Producer producerThread = new Producer(queue, buffer, "PRODUCER"); Consumer consumerThread = new Consumer(queue, buffer, "CONSUMER"); producerThread.start(); consumerThread.start(); } } class Producer extends Thread { private Queue<Integer> queue; private int queueSize ; public Producer (Queue<Integer> queueIn, int queueSizeIn, String ThreadName){ super(ThreadName); this.queue = queueIn; this.queueSize = queueSizeIn; } public void run() { while(true){ synchronized (queue) { while(queue.size() == queueSize){ System.out.println(Thread.currentThread().getName() + " FULL : waiting...\n"); try{ queue.wait(); //Important } catch (Exception ex) { ex.printStackTrace(); } } //queue empty then produce one, add and notify int randomInt = new Random().nextInt(); System.out.println(Thread.currentThread().getName() + " producing... : " + randomInt); queue.add(randomInt); queue.notifyAll(); //Important } //synchronized ends here : NOTE } } } class Consumer extends Thread { private Queue<Integer> queue; private int queueSize; public Consumer(Queue<Integer> queueIn, int queueSizeIn, String ThreadName){ super (ThreadName); this.queue = queueIn; this.queueSize = queueSizeIn; } public void run() { while(true){ synchronized (queue) { while(queue.isEmpty()){ System.out.println(Thread.currentThread().getName() + " Empty : waiting...\n"); try { queue.wait(); //Important } catch (Exception ex) { ex.printStackTrace(); } } //queue empty then consume one and notify System.out.println(Thread.currentThread().getName() + " consuming... : " + queue.remove()); queue.notifyAll(); } //synchronized ends here : NOTE } } } 
Interesting Posts