什么时候应该使用ExecutorService上的CompletionService?

我在这个博客文章中发现了CompletionService。 但是,这并没有真正展示CompletionService比标准ExecutorService更有优势。 相同的代码也可以写入。 那么,CompletionService何时有用呢?

你可以给一个简短的代码样本,使其透明? 例如,这个代码示例只显示不需要CompletionService(=相当于ExecutorService)

ExecutorService taskExecutor = Executors.newCachedThreadPool(); // CompletionService<Long> taskCompletionService = // new ExecutorCompletionService<Long>(taskExecutor); Callable<Long> callable = new Callable<Long>() { @Override public Long call() throws Exception { return 1L; } }; Future<Long> future = // taskCompletionService.submit(callable); taskExecutor.submit(callable); while (!future.isDone()) { // Do some work... System.out.println("Working on something..."); } try { System.out.println(future.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } 

使用ExecutorService,一旦您提交了要运行的任务,您需要手动编写代码以便高效地获取已完成任务的结果。 使用CompletionService,这是非常自动的。 您提交的代码中的差异不是很明显,因为您只提交一项任务。 不过,想象一下你有一个要提交的任务列表。 在下面的例子中,多个任务被提交给CompletionService。 然后,而不是试图找出哪个任务已经完成(得到结果),它只是要求CompletionService实例返回分支,因为它们变得可用。

 public class CompletionServiceTest { class CalcResult { long result ; CalcResult(long l){ result = l; } } class CallableTask implements Callable<CalcResult> { String taskName ; long input1 ; int input2 ; CallableTask(String name , long v1 , int v2 ){ taskName = name; input1 = v1; input2 = v2 ; } public CalcResult call() throws Exception { System.out.println(" Task " + taskName + " Started -----"); for(int i=0;i<input2 ;i++){ try { Thread.sleep(200); } catch (InterruptedException e) { System.out.println(" Task " + taskName + " Interrupted !! "); e.printStackTrace(); } input1 += i; } System.out.println(" Task " + taskName + " Completed @@@@@@"); return new CalcResult(input1) ; } } public void test(){ ExecutorService taskExecutor = Executors.newFixedThreadPool(3); CompletionService<CalcResult> taskCompletionService = new ExecutorCompletionService<CalcResult>(taskExecutor); int submittedTasks = 5; for(int i=0;i< submittedTasks;i++){ taskCompletionService.submit(new CallableTask( String.valueOf(i), (i * 10), ((i * 10) + 10 ) )); System.out.println("Task " + String.valueOf(i) + "subitted"); } for(int tasksHandled=0;tasksHandled<submittedTasks;tasksHandled++){ try { System.out.println("trying to take from Completion service"); Future<CalcResult> result = taskCompletionService.take(); System.out.println("result for a task availble in queue.Trying to get()" ); // above call blocks till atleast one task is completed and results availble for it // but we dont have to worry which one // process the result here by doing result.get() CalcResult l = result.get(); System.out.println("Task " + String.valueOf(tasksHandled) + "Completed - results obtained : " + String.valueOf(l.result)); } catch (InterruptedException e) { // Something went wrong with a task submitted System.out.println("Error Interrupted exception"); e.printStackTrace(); } catch (ExecutionException e) { // Something went wrong with the result e.printStackTrace(); System.out.println("Error get() threw exception"); } } } } 

省略许多细节:

  • ExecutorService =传入队列+工作线程
  • CompletionService =传入队列+工作线程+输出队列

我认为javadoc最好的答案是什么时候CompletionServiceExecutorService不是有用的。

一种将新的asynchronous任务的生成与已完成任务的结果消耗分离的服务。

基本上,这个接口允许一个程序让生产者创build和提交任务(甚至检查这些提交的结果),而不需要知道任何其他消费者对这些任务的结果。 同时,意识到CompletionService消费者可以在不知道生产者提交任务的情况下进行polltake结果。

为了logging,我可能是错的,因为它是相当晚,但我相当确定在该博客文章中的示例代码导致内存泄漏。 没有一个积极的消费者从ExecutorCompletionService的内部队列中取出结果,我不确定博主是如何期待排队的。

基本上,如果您想要并行执行多个任务,然后使用它们的完成顺序,则使用CompletionService 。 所以,如果我执行5个工作, CompletionService将会给我第一个完成的工作。 只有一个任务的例子除了提交一个Callable的能力之外,没有比Executor更多的价值。

首先,如果我们不想浪费处理器的时间,我们不会使用

 while (!future.isDone()) { // Do some work... } 

我们必须使用

 service.shutdown(); service.awaitTermination(14, TimeUnit.DAYS); 

这个代码的坏处是会closuresExecutorService 。 如果我们想继续使用它(即我们有一些recursion的任务创build),我们有两个select:invokeAll或ExecutorService

invokeAll将等待,直到所有的任务将完成。 ExecutorService赋予我们一个接一个的调查结果的能力。

最后,recursion的例子:

 ExecutorService executorService = Executors.newFixedThreadPool(THREAD_NUMBER); ExecutorCompletionService<String> completionService = new ExecutorCompletionService<String>(executorService); while (Tasks.size() > 0) { for (final Task task : Tasks) { completionService.submit(new Callable<String>() { @Override public String call() throws Exception { return DoTask(task); } }); } try { int taskNum = Tasks.size(); Tasks.clear(); for (int i = 0; i < taskNum; ++i) { Result result = completionService.take().get(); if (result != null) Tasks.add(result.toTask()); } } catch (InterruptedException e) { // error :( } catch (ExecutionException e) { // error :( } } 

在运行时自己看看,试着实现这两个解决scheme(Executorservice和Completionservice),你会看到他们的行为有多不同,它将更清楚什么时候使用一个或另一个。 这里有一个例子,如果你想http://rdafbn.blogspot.co.uk/2013/01/executorservice-vs-completionservice-vs.html

假设您有5个长时间运行的任务(可调用的任务),并且您已经提交了这些任务来执行服务。 现在想象一下,如果任何一个任务完成,你不想等待所有5个任务竞争,而是想对这些任务进行一些处理。 现在可以通过编写未来对象的轮询逻辑来完成这个任务,或者使用这个API。

Interesting Posts