列出<未来>到未来<列表>序列

我正在尝试将List<CompletableFuture<X>>转换为CompletableFuture<List<T>> 。 这是非常有用的,因为当你有很多的asynchronous任务,你需要得到所有的结果。

如果其中任何一个失败,那么最后的未来将失败。 这是我已经实现的:

  public static <T> CompletableFuture<List<T>> sequence2(List<CompletableFuture<T>> com, ExecutorService exec) { if(com.isEmpty()){ throw new IllegalArgumentException(); } Stream<? extends CompletableFuture<T>> stream = com.stream(); CompletableFuture<List<T>> init = CompletableFuture.completedFuture(new ArrayList<T>()); return stream.reduce(init, (ls, fut) -> ls.thenComposeAsync(x -> fut.thenApplyAsync(y -> { x.add(y); return x; },exec),exec), (a, b) -> a.thenCombineAsync(b,(ls1,ls2)-> { ls1.addAll(ls2); return ls1; },exec)); } 

运行它:

 ExecutorService executorService = Executors.newCachedThreadPool(); Stream<CompletableFuture<Integer>> que = IntStream.range(0,100000).boxed().map(x -> CompletableFuture.supplyAsync(() -> { try { Thread.sleep((long) (Math.random() * 10)); } catch (InterruptedException e) { e.printStackTrace(); } return x; }, executorService)); CompletableFuture<List<Integer>> sequence = sequence2(que.collect(Collectors.toList()), executorService); 

如果其中任何一个失败,那么它失败。 即使有一百万期货,它也会如预期产出。 我的问题是:说如果有超过5000期货,如果他们中的任何一个失败,我得到一个StackOverflowError

线程“pool-1-thread-2611”中的exceptionjava.util.concurrent.CompletableFuture.internalComplete上的java.lang.StackOverflowError(CompletableFuture.java:210)at java.util.concurrent.CompletableFuture $ ThenCompose.run(CompletableFuture.java :1487)在java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193)在java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210)在java.util.concurrent.CompletableFuture $ ThenCompose.run( CompletableFuture.java:1487)

我做错了什么?

注意:当未来的任何一个失败时,上述返回的将来会失败。 接受的答案也应该采取这一点。

使用CompletableFuture.allOf(...)

 static<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com) { return CompletableFuture.allOf(com.toArray(new CompletableFuture[com.size()])) .thenApply(v -> com.stream() .map(CompletableFuture::join) .collect(toList()) ); } 

关于您的实施的几点意见:

您使用.thenComposeAsync.thenApplyAsync.thenCombineAsync可能不会达到您的期望。 这些...Async方法运行在一个单独的线程提供给他们的function。 所以,在你的情况下,你正在添加新的项目到列表中运行提供的执行器。 无需将轻量级操作填充到caching线程执行程序中。 没有很好的理由,请不要使用thenXXXXAsync方法。

此外, reduce不应该被用来积累到可变容器中。 即使当stream是连续的,它可能正常工作,但如果stream是并行的,它将会失败。 要执行可变缩减,请改用.collect

如果要在第一次故障后立即完成整个计算,请在sequence方法中执行以下操作:

 CompletableFuture<List<T>> result = CompletableFuture.allOf(com.toArray(new CompletableFuture[com.size()])) .thenApply(v -> com.stream() .map(CompletableFuture::join) .collect(toList()) ); com.forEach(f -> f.whenComplete((t, ex) -> { if (ex != null) { result.completeExceptionally(ex); } })); return result; 

另外,如果您想取消第一次失败时的剩余操作,请添加exec.shutdownNow();result.completeExceptionally(ex);之后 。 当然,这个假定exec只存在于这一个计算中。 如果没有,你将不得不循环并取消每个剩余的Future

正如Misha指出的 ,你正在过度使用…Async操作。 而且,你正在编写一个复杂的操作链,它build模一个不依赖你的程序逻辑的依赖项:

  • 你创build一个工作x取决于你的列表的第一个和第二个工作
  • 你创build一个工作x + 1,这取决于工作x和你的列表的第三个工作
  • 你创build一个工作x + 2,这取决于工作x + 1和你的列表的第四个工作
  • 你创build一个工作x + 5000,这取决于工作x + 4999和你的清单的最后一份工作

然后,取消(显式或由于例外)这个recursion组合的作业可能会recursion执行,并可能失败,并带有StackOverflowError 。 这是依赖于实现的。

正如米沙已经表明的那样 ,有一种方法可以让你模仿你的初衷,定义一份依赖你所有工作的工作。

但值得注意的是,即使这样也没有必要。 由于您使用的是无限的线程池执行程序,因此只需将收集结果的asynchronous作业发布到列表中即可完成。 无论如何,要求完成每个工作的结果都意味着等待完成。

 ExecutorService executorService = Executors.newCachedThreadPool(); List<CompletableFuture<Integer>> que = IntStream.range(0, 100000) .mapToObj(x -> CompletableFuture.supplyAsync(() -> { LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos((long)(Math.random()*10))); return x; }, executorService)).collect(Collectors.toList()); CompletableFuture<List<Integer>> sequence = CompletableFuture.supplyAsync( () -> que.stream().map(CompletableFuture::join).collect(Collectors.toList()), executorService); 

使用合成依赖操作的方法非常重要,当线程数量有限且作业可能会产生额外的asynchronous作业时,以避免等待作业从必须首先完成的作业中窃取线程,但这两种情况都不是这样。

在这个特定的情况下,一个工作只是迭代这个大量的必备工作,并在必要时等待,可能比build模这个大量的依赖关系更有效率,并且每个工作都要通知从属工作完成。

您可以获取Spotify的CompletableFutures库并使用allAsList方法。 我认为这是从番石榴的Futures.allAsList方法启发。

 public static <T> CompletableFuture<List<T>> allAsList( List<? extends CompletionStage<? extends T>> stages) { 

如果你不想使用一个库,这里是一个简单的实现:

 public <T> CompletableFuture<List<T>> allAsList(final List<CompletableFuture<T>> futures) { return CompletableFuture.allOf( futures.toArray(new CompletableFuture[futures.size()]) ).thenApply(ignored -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()) ); } 

除了Spotify Futures图书馆,你可以试试我的代码在这里find: https : //github.com/vsilaev/java-async-await/blob/master/net.tascalate.async.examples/src/main/java/net/ tascalate / concurrent / CompletionStages.java (对同一个包中的其他类有依赖关系)

它实现了一个逻辑来返回“至lessN个M”CompletionStage-s策略允许容忍多less错误。 所有/任何情况下都有方便的方法,加上剩余期货的取消政策,加上代码处理CompletionStage-s(接口)而不是CompletableFuture(具体的类)。

Javaslang拥有非常方便的Future API 。 它也允许从未来的收集中做出收集的未来。

 List<Future<String>> listOfFutures = ... Future<Seq<String>> futureOfList = Future.sequence(listOfFutures); 

请参阅http://static.javadoc.io/io.javaslang/javaslang/2.0.5/javaslang/concurrent/Future.html#sequence-java.lang.Iterable-

为了join@Misha所接受的答案,可以进一步扩展为collections者:

  public static <T> Collector<CompletableFuture<T>, ?, CompletableFuture<List<T>>> sequenceCollector() { return Collectors.collectingAndThen(Collectors.toList(), com -> sequence(com)); } 

现在你可以:

 Stream<CompletableFuture<Integer>> stream = Stream.of( CompletableFuture.completedFuture(1), CompletableFuture.completedFuture(2), CompletableFuture.completedFuture(3) ); CompletableFuture<List<Integer>> ans = stream.collect(sequenceCollector()); 

在CompletableFuture上使用thenCombine的示例序列操作

 public<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com){ CompletableFuture<List<T>> identity = CompletableFuture.completedFuture(new ArrayList<T>()); BiFunction<CompletableFuture<List<T>>,CompletableFuture<T>,CompletableFuture<List<T>>> combineToList = (acc,next) -> acc.thenCombine(next,(a,b) -> { a.add(b); return a;}); BinaryOperator<CompletableFuture<List<T>>> combineLists = (a,b)-> a.thenCombine(b,(l1,l2)-> { l1.addAll(l2); return l1;}) ; return com.stream() .reduce(identity, combineToList, combineLists); } } 

如果你不介意使用第三方库, cyclops-react (我是作者)有一套CompletableFutures(和Optionals,Streams等)的工具方法,

  List<CompletableFuture<String>> listOfFutures; CompletableFuture<ListX<String>> sequence =CompletableFutures.sequence(listOfFutures);