阻塞队列和multithreading的消费者,如何知道何时停止
我有一个单线程生产者创build一些任务对象,然后将其添加到一个ArrayBlockingQueue
(这是固定的大小)。
我也开始一个multithreading的消费者。 这是build立一个固定的线程池( Executors.newFixedThreadPool(threadCount);
)。 然后,我将一些ConsumerWorker实例提交给此ThreadPool,每个ConsumerWorker都具有对上述ArrayBlockingQueue实例的引用。
每个这样的工作人员将在队列中做一个take()
处理任务。
我的问题是,有什么最好的方式让工人知道什么时候不会有任何工作要做。 换句话说,我怎么告诉工人,生产者已经join队列,从这一刻起,每个工人应该停下来,看看队列是空的。
我现在得到的是一个安装程序,我的Producer用一个callback来初始化,当他完成它的工作(添加队列的东西)时触发callback。 我还保留了我创build并提交给ThreadPool的所有ConsumerWorkers的列表。 当生产者callback告诉我生产者完成时,我可以告诉每个工人。 在这一点上,他们应该简单地继续检查队列是否为空,当它变为空时,它们应该停止,从而允许我正常closures下ExecutorService线程池。 这是这样的
public class ConsumerWorker implements Runnable{ private BlockingQueue<Produced> inputQueue; private volatile boolean isRunning = true; public ConsumerWorker(BlockingQueue<Produced> inputQueue) { this.inputQueue = inputQueue; } @Override public void run() { //worker loop keeps taking en element from the queue as long as the producer is still running or as //long as the queue is not empty: while(isRunning || !inputQueue.isEmpty()) { System.out.println("Consumer "+Thread.currentThread().getName()+" START"); try { Object queueElement = inputQueue.take(); //process queueElement } catch (Exception e) { e.printStackTrace(); } } } //this is used to signal from the main thread that he producer has finished adding stuff to the queue public void setRunning(boolean isRunning) { this.isRunning = isRunning; }
}
这里的问题是,我有一个显而易见的竞争条件,有时生产者会完成,发出信号,消费者工作人员将在消耗队列中的所有东西之前停止。
我的问题是什么是同步这个最好的方法,这一切工作正常? 我应该同步整个部分检查生产者是否正在运行,如果队列是空的,还要从队列中取一些东西(在队列对象上)? 我应该只是同步更新ConsumerWorker实例上的isRunning
布尔值? 任何其他build议?
更新,这是我最终使用的工作实现:
public class ConsumerWorker implements Runnable{ private BlockingQueue<Produced> inputQueue; private final static Produced POISON = new Produced(-1); public ConsumerWorker(BlockingQueue<Produced> inputQueue) { this.inputQueue = inputQueue; } @Override public void run() { //worker loop keeps taking en element from the queue as long as the producer is still running or as //long as the queue is not empty: while(true) { System.out.println("Consumer "+Thread.currentThread().getName()+" START"); try { Produced queueElement = inputQueue.take(); Thread.sleep(new Random().nextInt(100)); if(queueElement==POISON) { break; } //process queueElement } catch (Exception e) { e.printStackTrace(); } System.out.println("Consumer "+Thread.currentThread().getName()+" END"); } } //this is used to signal from the main thread that he producer has finished adding stuff to the queue public void stopRunning() { try { inputQueue.put(POISON); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } }
}
JohnVint的回答很大程度上启发了这一点,只做了一些小的修改。
===由于@ vendhan的评论更新。
谢谢你的观察。 你是对的,这个问题中的第一部分代码(其中包括while(isRunning || !inputQueue.isEmpty())
并没有什么意义。
在我的实际最终实施中,我做了一些更接近你的build议,即“||” (或)带有“&&”(和),因为每个工人(消费者)现在只检查他从列表中得到的元素是否是一个毒丸,如果这样就停止了(理论上我们可以说工人要运行,队列不能为空)。
你应该继续从队列中take()
。 你可以使用毒丸告诉工人停止。 例如:
private final Object POISON_PILL = new Object(); @Override public void run() { //worker loop keeps taking en element from the queue as long as the producer is still running or as //long as the queue is not empty: while(isRunning) { System.out.println("Consumer "+Thread.currentThread().getName()+" START"); try { Object queueElement = inputQueue.take(); if(queueElement == POISON_PILL) { inputQueue.add(POISON_PILL);//notify other threads to stop return; } //process queueElement } catch (Exception e) { e.printStackTrace(); } } } //this is used to signal from the main thread that he producer has finished adding stuff to the queue public void finish() { //you can also clear here if you wanted isRunning = false; inputQueue.add(POISON_PILL); }
我会派工人一个特别的工作包,表示他们应该closures:
public class ConsumerWorker implements Runnable{ private static final Produced DONE = new Produced(); private BlockingQueue<Produced> inputQueue; public ConsumerWorker(BlockingQueue<Produced> inputQueue) { this.inputQueue = inputQueue; } @Override public void run() { for (;;) { try { Produced item = inputQueue.take(); if (item == DONE) { inputQueue.add(item); // keep in the queue so all workers stop break; } // process `item` } catch (Exception e) { e.printStackTrace(); } } }
}
要停止工作,只需将ConsumerWorker.DONE
添加到队列即可。
在您尝试从队列中检索元素的代码块中,使用poll(time,unit)
而不是take()
。
try { Object queueElement = inputQueue.poll(timeout,unit); //process queueElement } catch (InterruptedException e) { if(!isRunning && queue.isEmpty()) return ; }
通过指定适当的超时值,确保线程不会阻塞,以防出现不幸的序列
-
isRunning
是真的 - 队列变空,所以线程进入阻塞的等待(如果使用
take()
-
isRunning
设置为false
有许多策略可以使用,但是一个简单的策略是有一个任务的子类来表示工作的结束。 制作者不直接发送这个信号。 相反,它排队这个任务子类的一个实例。 当你的一个消费者拉出这个任务并执行它时,就会导致信号被发送。
我不得不使用一个multithreading的生产者和一个multithreading的消费者。 我结束了一个Scheduler -- N Producers -- M Consumers
计划,每两个通过一个队列(总共两个队列)进行通信。 调度程序用请求产生数据填充第一个队列,然后用N个“毒丸”填充它。 有一个积极生产者的计数器(primefaces诠释),最后一个生产者收到最后一个毒药药丸M消毒丸发送到消费者队列。
我们不能使用CountDownLatch
,其中大小是生产者中的logging数。 并且每个用户都会logging下数据。 当所有任务完成时,它将会使用awaits()
方法。 然后停止所有的消费者。 所有logging都被处理。