如何等待ThreadPoolExecutor中的所有任务完成而不closuresExecutor?
我不能使用shutdown()
和awaitTermination()
因为在等待的时候可能会有新的任务被添加到ThreadPoolExecutor中。
所以我正在寻找一种方法来等待,直到ThreadPoolExecutor已经清空它的队列,并完成了它的所有任务,而不停止在这个点之前添加新的任务。
如果它有什么区别,这是Android的。
谢谢
更新 :几个星期后,重新审视这个,我发现一个修改后的CountDownLatch在这种情况下对我更好。 我会保留答案,因为它更多地适用于我所问的问题。
如果您有兴趣了解特定任务何时完成或某批任务,则可以使用ExecutorService.submit(Runnable)
。 调用这个方法返回一个Future
对象,它可以被放置到一个Collection
,然后你的主线程将遍历每个对象的Future.get()
。 这将导致您的主线程停止执行,直到ExecutorService
处理完所有Runnable
任务。
Collection<Future<?>> futures = new LinkedList<Future<?>>(); futures.add(executorService.submit(myRunnable)); for (Future<?> future:futures) { future.get(); }
我的情景是一个networking抓取工具,从网站获取一些信息,然后处理它们。 一个ThreadPoolExecutor用来加速进程,因为可以在这个时间加载很多页面。 因此,在现有任务中将创build新的任务,因为爬虫将在每个页面中遵循超链接。 问题是一样的:主线程不知道什么时候所有的任务都完成了,并且可以开始处理结果。 我用一个简单的方法来确定这一点。 这不是很优雅,但在我的情况下工作:
while (executor.getTaskCount()!=executor.getCompletedTaskCount()){ System.err.println("count="+executor.getTaskCount()+","+executor.getCompletedTaskCount()); Thread.sleep(5000); } executor.shutdown(); executor.awaitTermination(60, TimeUnit.SECONDS);
也许你正在寻找一个CompletionService来pipe理任务的批处理,另请参阅这个答案 。
(这是一个尝试重现Thilo早先删除的答案,我自己的调整。)
我认为你可能需要澄清你的问题,因为有一个隐含的无限的条件…在某些时候,你必须决定closures你的执行者,在这一点上,它不会接受任何更多的任务。 你的问题似乎意味着你要等到你知道没有进一步的任务将被提交,你只能在自己的应用程序代码中知道。
下面的答案可以让你顺利地转换到一个新的TPE(无论什么原因),完成所有当前提交的任务,而不是拒绝新的TPE的任务。 它可能会回答你的问题。 @ Thilo也可能。
假设你在某个地方定义了一个可用的TPE,
AtomicReference<ThreadPoolExecutor> publiclyAvailableTPE = ...;
然后您可以编写TPE交换例程。 它也可以使用同步方法编写,但我认为这更简单:
void rotateTPE() { ThreadPoolExecutor newTPE = createNewTPE(); // atomic swap with publicly-visible TPE ThreadPoolExecutor oldTPE = publiclyAvailableTPE.getAndSet(newTPE); oldTPE.shutdown(); // and if you want this method to block awaiting completion of old tasks in // the previously visible TPE oldTPE.awaitTermination(); }
另外,如果你真的没有开玩笑想要杀死线程池,那么你的提交者端将需要应付被拒绝的任务,你可以使用null
作为新的TPE:
void killTPE() { ThreadPoolExecutor oldTPE = publiclyAvailableTPE.getAndSet(null); oldTPE.shutdown(); // and if you want this method to block awaiting completion of old tasks in // the previously visible TPE oldTPE.awaitTermination(); }
这可能会导致上游问题,调用者需要知道如何处理null
。
你也可以用一个虚拟的TPE换掉,这个TPE只是拒绝每一个新的执行,但是这相当于在TPE上调用shutdown()
会发生什么。
如果你不想使用shutdown
,请按照下面的方法:
-
遍历所有
Future
任务从ExecutorService
提交,并检查状态阻塞调用Future
对象的get()
由Tim Bender
build议 -
使用其中之一
- 在
ExecutorService
上使用invokeAll - 使用CountDownLatch
- 使用ForkJoinPool或newWorkStealingPool的
Executors
(自java 8以来)
- 在
executor服务上的invokeAll()
也达到了CountDownLatch
的相同目的
相关的SE问题:
如何等待多个线程来完成?
你可以在Runner类中调用waitTillDone() :
Runner runner = Runner.runner(10); runner.runIn(2, SECONDS, runnable); runner.run(runnable); // each of this runnables could submit more tasks runner.waitTillDone(); // blocks until all tasks are finished (or failed) // and now reuse it runner.runIn(500, MILLISECONDS, callable); runner.waitTillDone(); runner.shutdown();
要使用它,将这个gradle / maven依赖项添加到您的项目中: 'com.github.matejtymes:javafixes:1.0'
欲了解更多详情,请看这里: https : //github.com/MatejTymes/JavaFixes或在这里: http : //matejtymes.blogspot.com/2016/04/executor-that-notifies-you-when-task.html