如何等待所有线程完成,使用ExecutorService?
我需要一次执行一些任务4,如下所示:
ExecutorService taskExecutor = Executors.newFixedThreadPool(4); while(...) { taskExecutor.execute(new MyTask()); } //...wait for completion somehow
一旦所有这些都完成,我怎么能得到通知? 现在我想不出比设置一些全局任务计数器好多less,并在每个任务结束时减less它,然后在无限循环监视这个计数器变成0; 或者得到一份期货清单,并在无限循环监测isDone所有这些。 什么是不涉及无限循环的更好的解决scheme?
谢谢。
基本上在一个ExecutorService
你调用shutdown()
,然后awaitTermination()
:
ExecutorService taskExecutor = Executors.newFixedThreadPool(4); while(...) { taskExecutor.execute(new MyTask()); } taskExecutor.shutdown(); try { taskExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); } catch (InterruptedException e) { ... }
使用CountDownLatch :
CountDownLatch latch = new CountDownLatch(totalNumberOfTasks); ExecutorService taskExecutor = Executors.newFixedThreadPool(4); while(...) { taskExecutor.execute(new MyTask()); } try { latch.await(); } catch (InterruptedException E) { // handle }
并在你的任务(附上试试/最后)
latch.countDown();
ExecutorService.invokeAll()
为你做。
ExecutorService taskExecutor = Executors.newFixedThreadPool(4); List<Callable<?>> tasks; // your tasks // invokeAll() returns when all tasks are complete List<Future<?>> futures = taskExecutor.invokeAll(tasks);
您也可以使用期货列表:
List<Future> futures = new ArrayList<Future>(); // now add to it: futures.add(executorInstance.submit(new Callable<Void>() { public Void call() throws IOException { // do something return null; } }));
那么当你想join所有这些,它基本上相当于join每一个,(带来额外的好处,它重新引发从子线程的exception到主):
for(Future f: this.futures) { f.get(); }
基本上诀窍是在每个未来的一个调用.get(),而不是无限循环调用(所有或每个)isDone()。 所以,只要最后一个线程结束,你就可以保证“继续前进”。 需要注意的是,由于.get()调用会重新引发exception,如果其中一个线程死了,您可能会在其他线程完成完成之前从此引发exception[为了避免这种情况,您可以在打电话]。 另一个需要注意的是它保留对所有线程的引用,所以如果他们有线程局部variables,它们将不会被收集,直到你通过这个块之后(尽pipe你可能能够解决这个问题,如果它成为一个问题,未来的ArrayList)。 如果你想知道哪个未来“先完成”,你可以使用一些像https://stackoverflow.com/a/31885029/32453
只是我的两分钱。 为了克服CountDownLatch
事先知道任务数量的需求,可以用一个简单的Semaphore
来做旧式的方法。
ExecutorService taskExecutor = Executors.newFixedThreadPool(4); int numberOfTasks=0; Semaphore s=new Semaphore(0); while(...) { taskExecutor.execute(new MyTask()); numberOfTasks++; } try { s.aquire(numberOfTasks); ...
在你的任务中,像调用s.release()
一样调用latch.countDown();
在Java8中,你可以使用CompletableFuture来完成它:
ExecutorService es = Executors.newFixedThreadPool(4); List<Runnable> tasks = getTasks(); CompletableFuture<?>[] futures = tasks.stream() .map(task -> CompletableFuture.runAsync(task, es)) .toArray(CompletableFuture[]::new); CompletableFuture.allOf(futures).join(); es.shutdown();
Java 5及更高版本中的CyclicBarrier类是为这类事情devise的。
有点迟到的游戏,但为了完成…
完成所有任务,而不是“等待”所有任务完成,你可以根据好莱坞原则思考“不要打电话给我,我会打电话给你”。 我认为由此产生的代码更优雅…
番石榴提供了一些有趣的工具来完成这一点。
一个例子 ::
将ExecutorService包装到ListeningExecutorService ::
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
提交可执行文件的集合::
for (Callable<Integer> callable : callables) { ListenableFuture<Integer> lf = service.submit(callable); // listenableFutures is a collection listenableFutures.add(lf) });
现在的关键部分:
ListenableFuture<List<Integer>> lf = Futures.successfulAsList(listenableFutures);
附加一个callbackListenableFuture,你可以使用通知所有期货完成::
Futures.addCallback(lf, new FutureCallback<List<Integer>>() { @Override public void onSuccess(List<Integer> result) { log.info("@@ finished processing {} elements", Iterables.size(result)); // do something with all the results } @Override public void onFailure(Throwable t) { log.info("@@ failed because of :: {}", t); } });
这也提供了一个好处,你可以收集所有的结果在一个地方,一旦处理完成…
更多信息在这里
你可以将你的任务包装在另一个可运行的,这将发送通知:
taskExecutor.execute(new Runnable() { public void run() { taskStartedNotification(); new MyTask().run(); taskFinishedNotification(); } });
我刚写了一个解决你的问题的示例程序。 没有给出简洁的实现,所以我会添加一个。 虽然你可以使用executor.shutdown()
和executor.awaitTermination()
,但不是最好的做法,因为不同线程所花费的时间是不可预知的。
ExecutorService es = Executors.newCachedThreadPool(); List<Callable<Integer>> tasks = new ArrayList<>(); for (int j = 1; j <= 10; j++) { tasks.add(new Callable<Integer>() { @Override public Integer call() throws Exception { int sum = 0; System.out.println("Starting Thread " + Thread.currentThread().getId()); for (int i = 0; i < 1000000; i++) { sum += i; } System.out.println("Stopping Thread " + Thread.currentThread().getId()); return sum; } }); } try { List<Future<Integer>> futures = es.invokeAll(tasks); int flag = 0; for (Future<Integer> f : futures) { Integer res = f.get(); System.out.println("Sum: " + res); if (!f.isDone()) flag = 1; } if (flag == 0) System.out.println("SUCCESS"); else System.out.println("FAILED"); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
只是为了提供更多的select这里不同的使用闩锁/障碍。 您也可以获得部分结果,直到所有人都完成使用CompletionService 。
从实践中的Java Concurrency开始:“如果您有一批计算提交给Executor,并且您希望在可用时检索它们的结果,则可以保留与每个任务关联的Future,并通过调用get超时时间为零,这是可能的,但是很乏味 ,幸运的是有一个更好的方法 :完成服务。
这里的实现
public class TaskSubmiter { private final ExecutorService executor; TaskSubmiter(ExecutorService executor) { this.executor = executor; } void doSomethingLarge(AnySourceClass source) { final List<InterestedResult> info = doPartialAsyncProcess(source); CompletionService<PartialResult> completionService = new ExecutorCompletionService<PartialResult>(executor); for (final InterestedResult interestedResultItem : info) completionService.submit(new Callable<PartialResult>() { public PartialResult call() { return InterestedResult.doAnOperationToGetPartialResult(); } }); try { for (int t = 0, n = info.size(); t < n; t++) { Future<PartialResult> f = completionService.take(); PartialResult PartialResult = f.get(); processThisSegment(PartialResult); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (ExecutionException e) { throw somethinghrowable(e.getCause()); } } }
遵循以下方法之一。
- 遍历所有将来的任务,从
ExecutorService
submit
返回,并按照Kiran
build议,通过在Future
对象上阻塞调用get()
来检查状态 - 在ExecutorService上使用
invokeAll()
- CountDownLatch
- ForkJoinPool或者Executors.html#newWorkStealingPool
- 使用
shutdown, awaitTermination, shutdownNow
按正确的顺序shutdown, awaitTermination, shutdownNow
ThreadPoolExecutor的API
相关的SE问题:
Javamultithreading中如何使用CountDownLatch?
如何正确closuresjava的ExecutorService
您可以使用您自己的ExecutorCompletionService子类来包装taskExecutor
和您自己的BlockingQueue实现,以便在每个任务完成时获得通知,并在完成的任务数达到您的期望目标时执行任何您希望的callback或其他操作。
你应该使用executorService.shutdown()
和executorService.awaitTermination
方法。
举例如下:
public class ScheduledThreadPoolExample { public static void main(String[] args) throws InterruptedException { ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5); executorService.scheduleAtFixedRate(() -> System.out.println("process task."), 0, 1, TimeUnit.SECONDS); TimeUnit.SECONDS.sleep(10); executorService.shutdown(); executorService.awaitTermination(1, TimeUnit.DAYS); } }
Java 8 – 我们可以使用streamAPI来处理stream。 请参阅下面的代码片段
final List<Runnable> tasks = ...; //or any other functional interface tasks.stream().parallel().forEach(Runnable::run) // Uses default pool //alternatively to specify parallelism new ForkJoinPool(15).submit( () -> tasks.stream().parallel().forEach(Runnable::run) ).get();
你可以使用这个代码:
public class MyTask implements Runnable { private CountDownLatch countDownLatch; public MyTask(CountDownLatch countDownLatch { this.countDownLatch = countDownLatch; } @Override public void run() { try { //Do somethings // this.countDownLatch.countDown();//important } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } } } CountDownLatch countDownLatch = new CountDownLatch(NUMBER_OF_TASKS); ExecutorService taskExecutor = Executors.newFixedThreadPool(4); for (int i = 0; i < NUMBER_OF_TASKS; i++){ taskExecutor.execute(new MyTask(countDownLatch)); } countDownLatch.await(); System.out.println("Finish tasks");
这可能有帮助
Log.i(LOG_TAG, "shutting down executor..."); executor.shutdown(); while (true) { try { Log.i(LOG_TAG, "Waiting for executor to terminate..."); if (executor.isTerminated()) break; if (executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { break; } } catch (InterruptedException ignored) {} }
你可以在这个Runner类上调用waitTillDone() :
Runner runner = Runner.runner(4); // create pool with 4 threads in thread pool while(...) { runner.run(new MyTask()); // here you submit your task } runner.waitTillDone(); // and this blocks until all tasks are finished (or failed) runner.shutdown(); // once you done you can shutdown the runner
在调用shutdown()之前,可以重复使用这个类并调用waitTillDone()多次,再加上代码非常简单 。 你也不需要预先知道 任务的数量 。
要使用它只需添加这个gradle / maven compile 'com.github.matejtymes:javafixes:1.1.1'
依赖项到您的项目。
更多细节可以在这里find:
https://github.com/MatejTymes/JavaFixes
http://matejtymes.blogspot.com/2016/04/executor-that-notifies-you-when-task.html
执行器getActiveCount()
有一个方法 – 给出活动线程的计数。
跨越线程之后,我们可以检查activeCount()
值是否为0
。 一旦该值为零,意味着没有活动线程正在运行,这意味着任务完成:
while (true) { if (executor.getActiveCount() == 0) { //ur own piece of code break; } }