ExecutorService,如何等待所有任务完成
等待ExecutorService
所有任务完成的最简单方法是什么? 我的任务主要是计算,所以我只想运行大量的工作 – 每个核心都有一个。 现在我的设置看起来像这样:
ExecutorService es = Executors.newFixedThreadPool(2); for (DataTable singleTable : uniquePhrases) { es.execute(new ComputeDTask(singleTable)); } try{ es.wait(); } catch (InterruptedException e){ e.printStackTrace(); }
ComputeDTask
实现可运行。 这似乎正确地执行任务,但代码崩溃在wait()
IllegalMonitorStateException
。 这很奇怪,因为我玩了一些玩具的例子,似乎工作。
uniquePhrases
包含数以万计的元素。 我应该使用另一种方法吗? 我正在寻找尽可能简单的事情
最简单的方法是使用ExecutorService.invokeAll()
,它可以在一行内完成所需的工作。 按照你的说法,你需要修改或包装ComputeDTask
来实现Callable<>
,这可以给你更多的灵活性。 可能在你的应用程序中有一个Callable.call()
的有意义的实现,但是如果不使用Executors.callable()
,这里有一个方法来包装它。
ExecutorService es = Executors.newFixedThreadPool(2); List<Callable<Object>> todo = new ArrayList<Callable<Object>>(singleTable.size()); for (DataTable singleTable: uniquePhrases) { todo.add(Executors.callable(new ComputeDTask(singleTable))); } List<Future<Object>> answers = es.invokeAll(todo);
正如其他人所指出的那样,如果合适,您可以使用invokeAll()
的超时版本。 在这个例子中, answers
将包含一堆将返回空值的Future
(参见Executors.callable()
定义Executors.callable()
。可能你想要做的只是轻微的重构,所以你可以得到一个有用的回答,或者一个引用到底层的ComputeDTask
,但是我不能从你的例子中知道。
如果不清楚,请注意,在所有任务完成之前, invokeAll()
将不会返回。 (也就是说,你的answers
集合中的所有Future
都会报告.isDone()
如果问的话)。这样可以避免所有的手动closures,awaitTermination等等,并且允许你重复使用这个ExecutorService
多个周期。
SO有几个相关的问题:
-
如何等待所有线程完成
-
从java线程返回值
-
invokeAll()不愿意接受一个Collection <Callable <t >>
-
我需要同步吗?
这些都不是严格意义上的问题,但它们确实提供了一些有关人们如何认为Executor
/ ExecutorService
应该被使用的颜色。
如果要等待所有任务完成,请使用shutdown
方法而不是wait
。 然后按照awaitTermination
。
此外,您可以使用Runtime.availableProcessors
获取硬件线程数,以便您可以正确初始化线程池。
如果等待ExecutorService
所有任务完成并不完全是您的目标,而是等到特定的一批任务完成后,您可以使用CompletionService
– 具体来说,就是一个ExecutorCompletionService
。
这个想法是创build一个包装Executor
,通过CompletionService
提交一些已知数量的任务,然后使用take()
(which blocks)或poll()
(不这样做)从完成队列中take()
相同数量的结果。 。 一旦你得出了你所提交任务的所有预期结果,你就知道他们都完成了。
让我再说一次,因为从界面上不是很明显:你必须知道你在CompletionService
中放入了多less东西,以便知道有多less东西要绘制出来。 这尤其与take()
方法有关:调用它太多了,它会阻塞你的调用线程,直到其他线程提交另一个工作到同一个CompletionService
。
有一些例子展示了如何在Java实践中 使用CompletionService
。
如果要等待执行程序服务完成执行,请调用shutdown()
,然后调用awaitTermination(units,unitType) ,例如awaitTermination(1, MINUTE)
。 ExecutorService不会阻止它自己的显示器,所以你不能使用wait
等。
你可以等待一段时间完成工作:
int maxSecondsPerComputeDTask = 20; try { while (!es.awaitTermination(uniquePhrases.size() * maxSecondsPerComputeDTask, TimeUnit.SECONDS)) { // consider giving up with a 'break' statement under certain conditions } } catch (InterruptedException e) { throw new RuntimeException(e); }
或者你可以使用ExecutorService 。 提交 ( Runnable )并收集它返回的Future对象,并依次调用get()来等待它们完成。
ExecutorService es = Executors.newFixedThreadPool(2); Collection<Future<?>> futures = new LinkedList<<Future<?>>(); for (DataTable singleTable : uniquePhrases) { futures.add(es.submit(new ComputeDTask(singleTable))); } for (Future<?> future : futures) { try { future.get(); } catch (InterruptedException e) { throw new RuntimeException(e); } catch (ExecutionException e) { throw new RuntimeException(e); } }
InterruptedException对于正确处理非常重要。 这是什么让你或你的图书馆的用户安全终止一个漫长的过程。
我也有这样的情况,我有一套文件要抓取。 我从一个应该被处理的初始的“种子”文件开始,该文件包含应该被处理的其他文件的链接,等等。
在我的主程序中,我只想写下如下的代码, Crawler
控制了一堆线程。
Crawler c = new Crawler(); c.schedule(seedDocument); c.waitUntilCompletion()
如果我想要导航一棵树,也会发生同样的情况。 我会popup根节点,每个节点的处理器会根据需要将子节点添加到队列中,一堆线程将处理树中的所有节点,直到没有更多的节点。
我在JVM中找不到任何我认为有点令人惊讶的东西。 所以我写了一个AutoStopThreadPool
类,它可以直接使用,也可以使用子类来添加适合域的方法,如schedule(Document)
。 希望它有帮助!
AutoStopThreadPool Javadoc | 下载
只是使用
latch = new CountDownLatch(noThreads)
在每个线程中
latch.countDown();
并作为障碍
latch.await();
IllegalMonitorStateException的根本原因:
抛出以指示线程试图等待对象的监视器或通知其他线程等待对象的监视器,而不拥有指定的监视器。
从您的代码中,您只需在ExecutorService上调用wait(),而不必拥有该锁。
下面的代码将修复IllegalMonitorStateException
try { synchronized(es){ es.wait(); // Add some condition before you call wait() } }
按照以下方法之一等待所有已提交给ExecutorService
任务的完成。
-
在
ExecutorService
上迭代所有Future
任务,并通过在Future
对象上阻塞调用get()
来检查状态 -
在
ExecutorService
上使用invokeAll -
使用CountDownLatch
-
使用ForkJoinPool或newWorkStealingPool的
Executors
(自java 8以来) -
按照oracle文档页面中的build议closures池
void shutdownAndAwaitTermination(ExecutorService pool) { pool.shutdown(); // Disable new tasks from being submitted try { // Wait a while for existing tasks to terminate if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { pool.shutdownNow(); // Cancel currently executing tasks // Wait a while for tasks to respond to being cancelled if (!pool.awaitTermination(60, TimeUnit.SECONDS)) System.err.println("Pool did not terminate"); } } catch (InterruptedException ie) { // (Re-)Cancel if current thread also interrupted pool.shutdownNow(); // Preserve interrupt status Thread.currentThread().interrupt(); }
如果要在使用选项5而不是选项1至4时正常等待所有任务完成,请更改
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
至
每隔1分钟检查一次
while(condition)
。
添加集合中的所有线程并使用invokeAll
提交。 如果您可以使用ExecutorService
invokeAll
方法,那么在所有线程完成之前,JVM不会继续下一行。
这里有一个很好的例子: 通过ExecutorService调用全部
一个简单的替代方法是使用线程和连接。 请参阅: 连接线程
你可以使用一个ThreadPoolExecutor ,它的池大小设置为Runtime.getRuntime()。availableProcessors()和.execute()
它是你想要执行的所有任务,然后调用tpe.shutdown()
,然后等待while(!tpe.terminated()) { /* waiting for all tasks to complete */}
所有提交的任务的块。 其中tpe
是对您的ThreadPoolExecutor
实例的引用。
或者,如果更合适的话使用ExecutorCompletionService一个CompletionService ,它使用提供的Executor来执行任务。 这个课程安排提交的任务完成后,放在一个队列可以使用take。 这个类是足够轻量级的,以适合处理任务组时的瞬态使用。
注意:在等待所有作业完成时,此解决scheme不会像ExecutorService.invokeAll()
那样阻止。 所以虽然最简单的也是最有限的,因为它会阻塞应用程序的主线程。 如果您想知道发生了什么事情,或者在这些工作正在运行时做其他事情,比如后处理结果或生成以及增量汇总结果,那就不太好。
将你的任务提交到Runner ,然后等待像这样调用方法waitTillDone() :
Runner runner = Runner.runner(2); for (DataTable singleTable : uniquePhrases) { runner.run(new ComputeDTask(singleTable)); } // blocks until all tasks are finished (or failed) runner.waitTillDone(); runner.shutdown();
要使用它,添加这个gradle / maven依赖: 'com.github.matejtymes:javafixes:1.0'
欲了解更多详情,请看这里: https : //github.com/MatejTymes/JavaFixes或在这里: http : //matejtymes.blogspot.com/2016/04/executor-that-notifies-you-when-task.html
我将等待执行程序以指定的超时结束,并认为它适合于完成任务。
try { //do stuff here exe.execute(thread); } finally { exe.shutdown(); } boolean result = exe.awaitTermination(4, TimeUnit.HOURS); if (!result) { LOGGER.error("It took more than 4 hour for the executor to stop, this shouldn't be the normal behaviour."); }
听起来像你需要ForkJoinPool
并使用全局池来执行任务。
public static void main(String[] args) { // the default `commonPool` should be sufficient for many cases. ForkJoinPool pool = ForkJoinPool.commonPool(); // The root of your task that may spawn other tasks. // Make sure it submits the additional tasks to the same executor that it is in. Runnable rootTask = new YourTask(pool); pool.execute(rootTask); pool.awaitQuiescence(...); // that's it. }
漂亮是在pool.awaitQuiescence
中,方法将阻止使用调用者的线程执行它的任务,然后返回时它真的是空的。
对不起,迟到的答案,但你可以使用以下强制threadPoolTaskExecutor等待,直到所有活动和挂起的任务已经处理。
ThreadPoolTaskExecutor threadPoolTaskExecutor = (ThreadPoolTaskExecutor) AppContext.getBean("threadPoolTaskExecutor");
//代码在这里执行可运行的例子:
while(iterator.hasNext()) { threadPoolTaskExecutor.execute(new Runnable() ... ); } while(threadPoolTaskExecutor.getActiveCount() > 0) { //Hold until finish } threadPoolTaskExecutor.shutdown();
确保ThreadPoolTaskExecutor bean是使用scope =“Prototype”定义的。 重要!