等待未来的名单

我有一个方法返回一个期货List

 List<Future<O>> futures = getFutures(); 

现在我要等到所有的期货都成功完成处理,或者任何由未来返回输出的任务都抛出exception。 即使有一个任务抛出exception,也没有必要等待另一个未来。

简单的方法将是

 wait() { For(Future f : futures) { try { f.get(); } catch(Exception e) { //TODO catch specific exception // this future threw exception , means somone could not do its task return; } } } 

但是这里的问题是,如果第四个未来抛出exception,那么我将不必要地等待前三个期货可用。

如何解决这个问题? 将以任何方式倒计时闩锁帮助? 我无法使用Future isDone因为java文档说

 boolean isDone() Returns true if this task completed. Completion may be due to normal termination, an exception, or cancellation -- in all of these cases, this method will return true. 

您可以使用CompletionService在准备就绪后立即收到期货,如果其中一个抛出exception,则取消处理。 像这样的东西:

 Executor executor = Executors.newFixedThreadPool(4); CompletionService<SomeResult> completionService = new ExecutorCompletionService<SomeResult>(executor); //4 tasks for(int i = 0; i < 4; i++) { completionService.submit(new Callable<SomeResult>() { public SomeResult call() { ... return result; } }); } int received = 0; boolean erros = false; while(received < 4 && !errors) { Future<SomeResult> resultFuture = completionService.take(); //blocks if none available try { SomeResult result = resultFuture.get(); received ++; ... // do something with the result } catch(Exception e) { //log errors = true; } } 

我认为你可以进一步改进,以取消任何仍在执行的任务,如果其中一个引发错误。

编辑:我在这里find了一个更全面的例子: http : //blog.teamlazerbeez.com/2009/04/29/java-completionservice/

如果您正在使用Java 8,那么您可以使用CompletableFuture和CompletableFuture.allOf来更轻松地完成这个任务,CompletableFuture和CompletableFuture.allOf仅在所有提供的CompletableFutures完成后才应用callback。

 // Waits for all futures to complete and returns a list of results. // If a future completes exceptionally then the resulting future will too. public static <T> CompletableFuture<List<T>> all(List<CompletableFuture<T>> futures) { CompletableFuture[] cfs = futures.toArray(new CompletableFuture[futures.size()]); return CompletableFuture.allOf(cfs) .thenApply(() -> futures.stream() .map(CompletableFuture::join) .collect(Collectors.toList()) ); } 

你可以使用一个ExecutorCompletionService 。 这个文档甚至为你的确切用例提供了一个例子:

相反,假设你想使用任务集合中的第一个非空结果,忽略遇到exception的任何情况,并在第一个任务准备好时取消所有其他任务:

 void solve(Executor e, Collection<Callable<Result>> solvers) throws InterruptedException { CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e); int n = solvers.size(); List<Future<Result>> futures = new ArrayList<Future<Result>>(n); Result result = null; try { for (Callable<Result> s : solvers) futures.add(ecs.submit(s)); for (int i = 0; i < n; ++i) { try { Result r = ecs.take().get(); if (r != null) { result = r; break; } } catch (ExecutionException ignore) { } } } finally { for (Future<Result> f : futures) f.cancel(true); } if (result != null) use(result); } 

这里要注意的重要的事情是ecs.take()将获得第一个完成的任务,而不仅仅是第一个提交的任务。 因此,你应该按照完成执行的顺序(或抛出exception)来获取它们。

也许这会有所帮助(没有什么会replace原始线程,是的!)我build议运行每个Future人与分离的线程(他们并行),然后当有一个错误,它只是信号的经理( Handler类)。

 class Handler{ //... private Thread thisThread; private boolean failed=false; private Thread[] trds; public void waitFor(){ thisThread=Thread.currentThread(); List<Future<Object>> futures = getFutures(); trds=new Thread[futures.size()]; for (int i = 0; i < trds.length; i++) { RunTask rt=new RunTask(futures.get(i), this); trds[i]=new Thread(rt); } synchronized (this) { for(Thread tx:trds){ tx.start(); } } for(Thread tx:trds){ try {tx.join(); } catch (InterruptedException e) { System.out.println("Job failed!");break; } }if(!failed){System.out.println("Job Done");} } private List<Future<Object>> getFutures() { return null; } public synchronized void cancelOther(){if(failed){return;} failed=true; for(Thread tx:trds){ tx.stop();//Deprecated but works here like a boss }thisThread.interrupt(); } //... } class RunTask implements Runnable{ private Future f;private Handler h; public RunTask(Future f,Handler h){this.f=f;this.h=h;} public void run(){ try{ f.get();//beware about state of working, the stop() method throws ThreadDeath Error at any thread state (unless it blocked by some operation) }catch(Exception e){System.out.println("Error, stopping other guys...");h.cancelOther();} catch(Throwable t){System.out.println("Oops, some other guy has stopped working...");} } } 

我不得不说上面的代码会出错(没有检查),但我希望我能解释一下这个解决scheme。 请尝试一下。

CompletionService将使用.submit()方法将您的Callables带入,您可以使用.take()方法检索计算的期货。

有一点你不能忘记的是通过调用.shutdown()方法来终止ExecutorService。 另外,只有在保存对执行程序服务的引用时才能调用此方法,因此请确保保留一个。

示例代码:

 ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); CompletionService<YourCallableImplementor> completionService = new ExecutorCompletionService<YourCallableImplementor>(service); ArrayList<Future<YourCallableImplementor>> futures = new ArrayList<Future<YourCallableImplementor>>(); for (String computeMe : elementsToCompute) { futures.add(completionService.submit(new YourCallableImplementor(computeMe))); } //now retrieve the futures after computation (auto wait for it) int received = 0; while(received < elementsToCompute.size()) { Future<YourCallableImplementor> resultFuture = completionService.take(); YourCallableImplementor result = resultFuture.get(); received ++; } //important: shutdown your ExecutorService service.shutdown(); 

如果您正在使用Java 8并且不想操作CompletableFuture ,我已经编写了一个工具来使用stream检索List<Future<T>>结果。 关键在于你被禁止map(Future::get)

 public final class Futures { private Futures() {} public static <E> Collector<Future<E>, Collection<E>, List<E>> present() { return new FutureCollector<>(); } private static class FutureCollector<T> implements Collector<Future<T>, Collection<T>, List<T>> { private final List<Throwable> exceptions = new LinkedList<>(); @Override public Supplier<Collection<T>> supplier() { return LinkedList::new; } @Override public BiConsumer<Collection<T>, Future<T>> accumulator() { return (r, f) -> { try { r.add(f.get()); } catch (InterruptedException e) {} catch (ExecutionException e) { exceptions.add(e.getCause()); } }; } @Override public BinaryOperator<Collection<T>> combiner() { return (l1, l2) -> { l1.addAll(l2); return l1; }; } @Override public Function<Collection<T>, List<T>> finisher() { return l -> { List<T> ret = new ArrayList<>(l); if (!exceptions.isEmpty()) throw new AggregateException(exceptions, ret); return ret; }; } @Override public Set<java.util.stream.Collector.Characteristics> characteristics() { return java.util.Collections.emptySet(); } } 

这需要一个像C#一样工作的AggregateException

 public class AggregateException extends RuntimeException { /** * */ private static final long serialVersionUID = -4477649337710077094L; private final List<Throwable> causes; private List<?> successfulElements; public AggregateException(List<Throwable> causes, List<?> l) { this.causes = causes; successfulElements = l; } public AggregateException(List<Throwable> causes) { this.causes = causes; } @Override public synchronized Throwable getCause() { return this; } public List<Throwable> getCauses() { return causes; } public List<?> getSuccessfulElements() { return successfulElements; } public void setSuccessfulElements(List<?> successfulElements) { this.successfulElements = successfulElements; } } 

这个组件完全像C#的Task.WaitAll一样 。 我正在处理与CompletableFuture.allOf相同的变体(等同于Task.WhenAll

我这样做的原因是我使用Spring的ListenableFuture ,不想移植到CompletableFuture尽pipe它是一个更标准的方法