如何等待ThreadPoolExecutor中的所有任务完成而不closuresExecutor?

我不能使用shutdown()awaitTermination()因为在等待的时候可能会有新的任务被添加到ThreadPoolExecutor中。

所以我正在寻找一种方法来等待,直到ThreadPoolExecutor已经清空它的队列,并完成了它的所有任务,而不停止在这个点之前添加新的任务。

如果它有什么区别,这是Android的。

谢谢

更新 :几个星期后,重新审视这个,我发现一个修改后的CountDownLatch在这种情况下对我更好。 我会保留答案,因为它更多地适用于我所问的问题。

如果您有兴趣了解特定任务何时完成或某批任务,则可以使用ExecutorService.submit(Runnable) 。 调用这个方法返回一个Future对象,它可以被放置到一个Collection ,然后你的主线程将遍历每个对象的Future.get() 。 这将导致您的主线程停止执行,直到ExecutorService处理完所有Runnable任务。

 Collection<Future<?>> futures = new LinkedList<Future<?>>(); futures.add(executorService.submit(myRunnable)); for (Future<?> future:futures) { future.get(); } 

我的情景是一个networking抓取工具,从网站获取一些信息,然后处理它们。 一个ThreadPoolExecutor用来加速进程,因为可以在这个时间加载很多页面。 因此,在现有任务中将创build新的任务,因为爬虫将在每个页面中遵循超链接。 问题是一样的:主线程不知道什么时候所有的任务都完成了,并且可以开始处理结果。 我用一个简单的方法来确定这一点。 这不是很优雅,但在我的情况下工作:

 while (executor.getTaskCount()!=executor.getCompletedTaskCount()){ System.err.println("count="+executor.getTaskCount()+","+executor.getCompletedTaskCount()); Thread.sleep(5000); } executor.shutdown(); executor.awaitTermination(60, TimeUnit.SECONDS); 

也许你正在寻找一个CompletionService来pipe理任务的批处理,另请参阅这个答案 。

(这是一个尝试重现Thilo早先删除的答案,我自己的调整。)

我认为你可能需要澄清你的问题,因为有一个隐含的无限的条件…在某些时候,你必须决定closures你的执行者,在这一点上,它不会接受任何更多的任务。 你的问题似乎意味着你要等到你知道没有进一步的任务将被提交,你只能在自己的应用程序代码中知道。

下面的答案可以让你顺利地转换到一个新的TPE(无论什么原因),完成所有当前提交的任务,而不是拒绝新的TPE的任务。 它可能会回答你的问题。 @ Thilo也可能。

假设你在某个地方定义了一个可用的TPE,

 AtomicReference<ThreadPoolExecutor> publiclyAvailableTPE = ...; 

然后您可以编写TPE交换例程。 它也可以使用同步方法编写,但我认为这更简单:

 void rotateTPE() { ThreadPoolExecutor newTPE = createNewTPE(); // atomic swap with publicly-visible TPE ThreadPoolExecutor oldTPE = publiclyAvailableTPE.getAndSet(newTPE); oldTPE.shutdown(); // and if you want this method to block awaiting completion of old tasks in // the previously visible TPE oldTPE.awaitTermination(); } 

另外,如果你真的没有开玩笑想要杀死线程池,那么你的提交者端将需要应付被拒绝的任务,你可以使用null作为新的TPE:

 void killTPE() { ThreadPoolExecutor oldTPE = publiclyAvailableTPE.getAndSet(null); oldTPE.shutdown(); // and if you want this method to block awaiting completion of old tasks in // the previously visible TPE oldTPE.awaitTermination(); } 

这可能会导致上游问题,调用者需要知道如何处理null

你也可以用一个虚拟的TPE换掉,这个TPE只是拒绝每一个新的执行,但是这相当于在TPE上调用shutdown()会发生什么。

如果你不想使用shutdown ,请按照下面的方法:

  1. 遍历所有Future任务从ExecutorService提交,并检查状态阻塞调用Future对象的get()Tim Benderbuild议

  2. 使用其中之一

    1. ExecutorService上使用invokeAll
    2. 使用CountDownLatch
    3. 使用ForkJoinPool或newWorkStealingPool的Executors (自java 8以来)

executor服务上的invokeAll()也达到了CountDownLatch的相同目的

相关的SE问题:

如何等待多个线程来完成?

你可以在Runner类中调用waitTillDone()

 Runner runner = Runner.runner(10); runner.runIn(2, SECONDS, runnable); runner.run(runnable); // each of this runnables could submit more tasks runner.waitTillDone(); // blocks until all tasks are finished (or failed) // and now reuse it runner.runIn(500, MILLISECONDS, callable); runner.waitTillDone(); runner.shutdown(); 

要使用它,将这个gradle / maven依赖项添加到您的项目中: 'com.github.matejtymes:javafixes:1.0'

欲了解更多详情,请看这里: https : //github.com/MatejTymes/JavaFixes或在这里: http : //matejtymes.blogspot.com/2016/04/executor-that-notifies-you-when-task.html