Java 8并行stream中的自定义线程池

是否有可能为Java 8 并行stream指定自定义线程池? 我找不到任何地方。

想象一下,我有一个服务器应用程序,我想使用并行stream。 但是,应用程序是大型的,multithreading的,所以我想划分它。 我不想在另一个模块的应用程序块任务的一个模块中运行缓慢的任务。

如果我不能为不同的模块使用不同的线程池,这意味着我不能在大多数现实世界的情况下安全地使用并行stream。

试试下面的例子。 有一些CPU密集型任务在单独的线程中执行。 任务利用并行stream。 第一个任务是打破的,所以每一步需要1秒钟(由线程睡眠模拟)。 问题是其他线程卡住,等待中断的任务完成。 这是人为的例子,但想象一下,一个servlet应用程序和一个长期运行的任务提交给共享fork连接池的人。

public class ParallelTest { public static void main(String[] args) throws InterruptedException { ExecutorService es = Executors.newCachedThreadPool(); es.execute(() -> runTask(1000)); //incorrect task es.execute(() -> runTask(0)); es.execute(() -> runTask(0)); es.execute(() -> runTask(0)); es.execute(() -> runTask(0)); es.execute(() -> runTask(0)); es.shutdown(); es.awaitTermination(60, TimeUnit.SECONDS); } private static void runTask(int delay) { range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max() .ifPresent(max -> System.out.println(Thread.currentThread() + " " + max)); } public static boolean isPrime(long n) { return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0); } } 

实际上有一个技巧是如何在特定的fork-join池中执行并行操作的。 如果将它作为fork-join池中的任务执行,则它将停留在此处,而不使用常见的任务。

 ForkJoinPool forkJoinPool = new ForkJoinPool(2); forkJoinPool.submit(() -> //parallel task here, for example IntStream.range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList()) ).get(); 

诀窍基于ForkJoinTask.fork ,它指定:“如果适用,安排在池中运行当前任务的asynchronous执行任务;如果不是inForkJoinPool(),则使用ForkJoinPool.commonPool()

并行数据stream使用缺省的ForkJoinPool, 默认情况下,由于你有处理器 ,所以缺省的线程less一个 ,由Runtime.getRuntime().availableProcessors() (这意味着并行stream使用你所有的处理器,因为它们也使用主线程)。

对于需要单独或自定义池的应用程序,可以使用给定的目标并行性级别来构造ForkJoinPool; 默认情况下,等于可用处理器的数量。

要改变并行stream的执行方式,你也可以

  • 提交并行stream执行到你自己的ForkJoinPool: yourFJP.submit(() -> stream.parallel().forEach(soSomething)); 要么
  • 您可以使用系统属性更改公共池的大小: System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20")用于20个线程的目标并行度。

后者在我的机器上有8个处理器的例子。 如果我运行以下程序:

 long start = System.currentTimeMillis(); IntStream s = IntStream.range(0, 20); //System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20"); s.parallel().forEach(i -> { try { Thread.sleep(100); } catch (Exception ignore) {} System.out.print((System.currentTimeMillis() - start) + " "); }); 

输出是:

215 216 216 216 216 216 216 216 315 316 316 316 316 316 316 316 415 416 416 416

所以你可以看到并行stream一次处理8个项目,即它使用8个线程。 但是,如果我取消对注释行的注释,则输出为:

215 215 215 215 215 216 216 216 216 216 216 216 216 216 216 216 216 216 216

这一次,并行stream使用了20个线程,并且stream中的所有20个元素都被同时处理。

除了在您自己的forkJoinPool中触发并行计算的技巧外,您还可以将该池传递给CompletableFuture.supplyAsync方法,如下所示:

 ForkJoinPool forkJoinPool = new ForkJoinPool(2); CompletableFuture<List<Integer>> primes = CompletableFuture.supplyAsync(() -> //parallel task here, for example range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList()), forkJoinPool ); 

使用ForkJoinPool并提交并行stream不可靠地使用所有线程。 如果你看看这个( HashSet中 的并行stream并不是并行 运行的 ),这( 为什么并行stream不使用ForkJoinPool的所有线程? ),你会看到推理。

简短版本:如果ForkJoinPool /提交不适合你,使用

 System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "10"); 

要测量使用的线程的实际数量,可以检查Thread.activeCount()

  Runnable r = () -> IntStream .range(-42, +42) .parallel() .map(i -> Thread.activeCount()) .max() .ifPresent(System.out::println); ForkJoinPool.commonPool().submit(r).join(); new ForkJoinPool(42).submit(r).join(); 

这可以在4核CPU上产生如下输出:

 5 // common pool 23 // custom pool 

没有.parallel()它给:

 3 // common pool 4 // custom pool 

到现在为止,我使用了这个问题的答案中描述的解决scheme。 现在,我想出了一个名为Parallel Stream Support的小库:

 ForkJoinPool pool = new ForkJoinPool(NR_OF_THREADS); ParallelIntStreamSupport.range(1, 1_000_000, pool) .filter(PrimesPrint::isPrime) .collect(toList()) 

但正如@PabloMatiasGomez在评论中指出的那样,并行stream的分裂机制存在一些缺陷,这很大程度上取决于公共池的大小。 请参阅HashSet中的并行stream不会并行运行 。

我正在使用此解决scheme只有不同types的工作有单独的池,但我不能将公用池的大小设置为1,即使我不使用它。

去获得AbacusUtil 。 线程号可以通过为并行stream指定。 这里是示例代码:

 LongStream.range(4, 1_000_000).parallel(threadNum)... 

披露:我是AbacusUtil的开发者。

如果您不介意使用第三方库,使用独眼巨人反应,您可以在同一pipe道中混合使用顺序stream和并行stream,并提供自定义的ForkJoinPools。 例如

  ReactiveSeq.range(1, 1_000_000) .foldParallel(new ForkJoinPool(10), s->s.filter(i->true) .peek(i->System.out.println("Thread " + Thread.currentThread().getId())) .max(Comparator.naturalOrder())); 

或者,如果我们希望继续处理顺序stream

  ReactiveSeq.range(1, 1_000_000) .parallel(new ForkJoinPool(10), s->s.filter(i->true) .peek(i->System.out.println("Thread " + Thread.currentThread().getId()))) .map(this::processSequentially) .forEach(System.out::println); 

[披露我是独眼巨人反应的主要开发者]