如何等待所有线程完成,使用ExecutorService?

我需要一次执行一些任务4,如下所示:

ExecutorService taskExecutor = Executors.newFixedThreadPool(4); while(...) { taskExecutor.execute(new MyTask()); } //...wait for completion somehow 

一旦所有这些都完成,我怎么能得到通知? 现在我想不出比设置一些全局任务计数器好多less,并在每个任务结束时减less它,然后在无限循环监视这个计数器变成0; 或者得到一份期货清单,并在无限循环监测isDone所有这些。 什么是不涉及无限循环的更好的解决scheme?

谢谢。

基本上在一个ExecutorService你调用shutdown() ,然后awaitTermination()

 ExecutorService taskExecutor = Executors.newFixedThreadPool(4); while(...) { taskExecutor.execute(new MyTask()); } taskExecutor.shutdown(); try { taskExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); } catch (InterruptedException e) { ... } 

使用CountDownLatch :

 CountDownLatch latch = new CountDownLatch(totalNumberOfTasks); ExecutorService taskExecutor = Executors.newFixedThreadPool(4); while(...) { taskExecutor.execute(new MyTask()); } try { latch.await(); } catch (InterruptedException E) { // handle } 

并在你的任务(附上试试/最后)

 latch.countDown(); 

ExecutorService.invokeAll()为你做。

 ExecutorService taskExecutor = Executors.newFixedThreadPool(4); List<Callable<?>> tasks; // your tasks // invokeAll() returns when all tasks are complete List<Future<?>> futures = taskExecutor.invokeAll(tasks); 

您也可以使用期货列表:

 List<Future> futures = new ArrayList<Future>(); // now add to it: futures.add(executorInstance.submit(new Callable<Void>() { public Void call() throws IOException { // do something return null; } })); 

那么当你想join所有这些,它基本上相当于join每一个,(带来额外的好处,它重新引发从子线程的exception到主):

 for(Future f: this.futures) { f.get(); } 

基本上诀窍是在每个未来的一个调用.get(),而不是无限循环调用(所有或每个)isDone()。 所以,只要最后一个线程结束,你就可以保证“继续前进”。 需要注意的是,由于.get()调用会重新引发exception,如果其中一个线程死了,您可能会在其他线程完成完成之前从此引发exception[为了避免这种情况,您可以在打电话]。 另一个需要注意的是它保留对所有线程的引用,所以如果他们有线程局部variables,它们将不会被收集,直到你通过这个块之后(尽pipe你可能能够解决这个问题,如果它成为一个问题,未来的ArrayList)。 如果你想知道哪个未来“先完成”,你可以使用一些像https://stackoverflow.com/a/31885029/32453

只是我的两分钱。 为了克服CountDownLatch事先知道任务数量的需求,可以用一个简单的Semaphore来做旧式的方法。

 ExecutorService taskExecutor = Executors.newFixedThreadPool(4); int numberOfTasks=0; Semaphore s=new Semaphore(0); while(...) { taskExecutor.execute(new MyTask()); numberOfTasks++; } try { s.aquire(numberOfTasks); ... 

在你的任务中,像调用s.release()一样调用latch.countDown();

在Java8中,你可以使用CompletableFuture来完成它:

 ExecutorService es = Executors.newFixedThreadPool(4); List<Runnable> tasks = getTasks(); CompletableFuture<?>[] futures = tasks.stream() .map(task -> CompletableFuture.runAsync(task, es)) .toArray(CompletableFuture[]::new); CompletableFuture.allOf(futures).join(); es.shutdown(); 

Java 5及更高版本中的CyclicBarrier类是为这类事情devise的。

有点迟到的游戏,但为了完成…

完成所有任务,而不是“等待”所有任务完成,你可以根据好莱坞原则思考“不要打电话给我,我会打电话给你”。 我认为由此产生的代码更优雅…

番石榴提供了一些有趣的工具来完成这一点。

一个例子 ::

将ExecutorService包装到ListeningExecutorService ::

 ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10)); 

提交可执行文件的集合::

 for (Callable<Integer> callable : callables) { ListenableFuture<Integer> lf = service.submit(callable); // listenableFutures is a collection listenableFutures.add(lf) }); 

现在的关键部分:

 ListenableFuture<List<Integer>> lf = Futures.successfulAsList(listenableFutures); 

附加一个callbackListenableFuture,你可以使用通知所有期货完成::

  Futures.addCallback(lf, new FutureCallback<List<Integer>>() { @Override public void onSuccess(List<Integer> result) { log.info("@@ finished processing {} elements", Iterables.size(result)); // do something with all the results } @Override public void onFailure(Throwable t) { log.info("@@ failed because of :: {}", t); } }); 

这也提供了一个好处,你可以收集所有的结果在一个地方,一旦处理完成…

更多信息在这里

你可以将你的任务包装在另一个可运行的,这将发送通知:

 taskExecutor.execute(new Runnable() { public void run() { taskStartedNotification(); new MyTask().run(); taskFinishedNotification(); } }); 

我刚写了一个解决你的问题的示例程序。 没有给出简洁的实现,所以我会添加一个。 虽然你可以使用executor.shutdown()executor.awaitTermination() ,但不是最好的做法,因为不同线程所花费的时间是不可预知的。

 ExecutorService es = Executors.newCachedThreadPool(); List<Callable<Integer>> tasks = new ArrayList<>(); for (int j = 1; j <= 10; j++) { tasks.add(new Callable<Integer>() { @Override public Integer call() throws Exception { int sum = 0; System.out.println("Starting Thread " + Thread.currentThread().getId()); for (int i = 0; i < 1000000; i++) { sum += i; } System.out.println("Stopping Thread " + Thread.currentThread().getId()); return sum; } }); } try { List<Future<Integer>> futures = es.invokeAll(tasks); int flag = 0; for (Future<Integer> f : futures) { Integer res = f.get(); System.out.println("Sum: " + res); if (!f.isDone()) flag = 1; } if (flag == 0) System.out.println("SUCCESS"); else System.out.println("FAILED"); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } 

只是为了提供更多的select这里不同的使用闩锁/障碍。 您也可以获得部分结果,直到所有人都完成使用CompletionService 。

从实践中的Java Concurrency开始:“如果您有一批计算提交给Executor,并且您希望在可用时检索它们的结果,则可以保留与每个任务关联的Future,并通过调用get超时时间为零,这是可能的,但是很乏味 ,幸运的是有一个更好的方法 :完成服务。

这里的实现

 public class TaskSubmiter { private final ExecutorService executor; TaskSubmiter(ExecutorService executor) { this.executor = executor; } void doSomethingLarge(AnySourceClass source) { final List<InterestedResult> info = doPartialAsyncProcess(source); CompletionService<PartialResult> completionService = new ExecutorCompletionService<PartialResult>(executor); for (final InterestedResult interestedResultItem : info) completionService.submit(new Callable<PartialResult>() { public PartialResult call() { return InterestedResult.doAnOperationToGetPartialResult(); } }); try { for (int t = 0, n = info.size(); t < n; t++) { Future<PartialResult> f = completionService.take(); PartialResult PartialResult = f.get(); processThisSegment(PartialResult); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (ExecutionException e) { throw somethinghrowable(e.getCause()); } } } 

遵循以下方法之一。

  1. 遍历所有将来的任务,从ExecutorService submit返回,并按照Kiranbuild议,通过在Future对象上阻塞调用get()来检查状态
  2. 在ExecutorService上使用invokeAll()
  3. CountDownLatch
  4. ForkJoinPool或者Executors.html#newWorkStealingPool
  5. 使用shutdown, awaitTermination, shutdownNow按正确的顺序shutdown, awaitTermination, shutdownNow ThreadPoolExecutor的API

相关的SE问题:

Javamultithreading中如何使用CountDownLatch?

如何正确closuresjava的ExecutorService

您可以使用您自己的ExecutorCompletionService子类来包装taskExecutor和您自己的BlockingQueue实现,以便在每个任务完成时获得通知,并在完成的任务数达到您的期望目标时执行任何您希望的callback或其他操作。

你应该使用executorService.shutdown()executorService.awaitTermination方法。

举例如下:

 public class ScheduledThreadPoolExample { public static void main(String[] args) throws InterruptedException { ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5); executorService.scheduleAtFixedRate(() -> System.out.println("process task."), 0, 1, TimeUnit.SECONDS); TimeUnit.SECONDS.sleep(10); executorService.shutdown(); executorService.awaitTermination(1, TimeUnit.DAYS); } } 

Java 8 – 我们可以使用streamAPI来处理stream。 请参阅下面的代码片段

 final List<Runnable> tasks = ...; //or any other functional interface tasks.stream().parallel().forEach(Runnable::run) // Uses default pool //alternatively to specify parallelism new ForkJoinPool(15).submit( () -> tasks.stream().parallel().forEach(Runnable::run) ).get(); 

你可以使用这个代码:

 public class MyTask implements Runnable { private CountDownLatch countDownLatch; public MyTask(CountDownLatch countDownLatch { this.countDownLatch = countDownLatch; } @Override public void run() { try { //Do somethings // this.countDownLatch.countDown();//important } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } } } CountDownLatch countDownLatch = new CountDownLatch(NUMBER_OF_TASKS); ExecutorService taskExecutor = Executors.newFixedThreadPool(4); for (int i = 0; i < NUMBER_OF_TASKS; i++){ taskExecutor.execute(new MyTask(countDownLatch)); } countDownLatch.await(); System.out.println("Finish tasks"); 

这可能有帮助

 Log.i(LOG_TAG, "shutting down executor..."); executor.shutdown(); while (true) { try { Log.i(LOG_TAG, "Waiting for executor to terminate..."); if (executor.isTerminated()) break; if (executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { break; } } catch (InterruptedException ignored) {} } 

你可以在这个Runner类上调用waitTillDone()

 Runner runner = Runner.runner(4); // create pool with 4 threads in thread pool while(...) { runner.run(new MyTask()); // here you submit your task } runner.waitTillDone(); // and this blocks until all tasks are finished (or failed) runner.shutdown(); // once you done you can shutdown the runner 

在调用shutdown()之前,可以重复使用这个类并调用waitTillDone()多次,再加上代码非常简单 。 你也不需要预先知道 任务数量

要使用它只需添加这个gradle / maven compile 'com.github.matejtymes:javafixes:1.1.1'依赖项到您的项目。

更多细节可以在这里find:

https://github.com/MatejTymes/JavaFixes

http://matejtymes.blogspot.com/2016/04/executor-that-notifies-you-when-task.html

执行器getActiveCount()有一个方法 – 给出活动线程的计数。

跨越线程之后,我们可以检查activeCount()值是否为0 。 一旦该值为零,意味着没有活动线程正在运行,这意味着任务完成:

 while (true) { if (executor.getActiveCount() == 0) { //ur own piece of code break; } }