Java 8的stream:为什么并行stream更慢?

我正在玩Java 8的stream,不能理解我得到的性能结果。

我有2核心CPU(英特尔i73520M),Windows 8的64位和64位Java 8更新5.我正在做简单的地图串stream/并行串stream,发现并行版本有点慢。

当我运行这个代码:

String[] array = new String[1000000]; Arrays.fill(array, "AbabagalamagA"); Stream<String> stream = Arrays.stream(array); long time1 = System.nanoTime(); List<String> list = stream.map((x) -> x.toLowerCase()).collect(Collectors.toList()); long time2 = System.nanoTime(); System.out.println((time2 - time1) / 1000000f); 

…我得到了600左右的结果。这个版本,使用并行stream:

 String[] array = new String[1000000]; Arrays.fill(array, "AbabagalamagA"); Stream<String> stream = Arrays.stream(array).parallel(); long time1 = System.nanoTime(); List<String> list = stream.map((x) -> x.toLowerCase()).collect(Collectors.toList()); long time2 = System.nanoTime(); System.out.println((time2 - time1) / 1000000f); 

给我900左右的东西

考虑到我有2个CPU核心的事实,平行版本不应该更快吗? 有人可以给我一个暗示,为什么水货版本更慢?

这里有几个问题同时进行,因为它是。

首先是并行解决问题总是涉及到比实际操作更多的实际工作。 开销涉及在多个线程之间分割工作并join或合并结果。 诸如将短string转换为小写字母这样的问题足够小,以至于它们有可能被平行分离开销所淹没。

第二个问题是Java程序的基准testing是非常微妙的,并且很容易得到令人困惑的结果。 两个常见的问题是JIT编译和死代码消除。 简短的基准testing通常在JIT编译之前或者之中完成,所以它们不是测量峰值吞吐量,事实上它们可能是测量JIT本身。 编译发生时有些不确定,所以可能会导致结果变化很大。

对于小的综合性基准testing,工作量往往会计算出被丢弃的结果。 JIT编译器相当擅长检测这种情况,消除了不会产生任何地方使用的结果的代码。 在这种情况下,这可能不会发生,但是如果您用其他合成工作负载进行debugging,则肯定会发生。 当然,如果JIT消除了基准testing的工作量,那么它就会使基准testing失效。

我强烈build议使用一个完善的基准testing框架,如JMH,而不是手动滚动你自己的。 JMH有助于避免常见的基准缺陷,包括这些缺陷,而且设置和运行起来相当简单。 这是你的基准转换为使用JMH:

 package com.stackoverflow.questions; import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; import java.util.concurrent.TimeUnit; import org.openjdk.jmh.annotations.*; public class SO23170832 { @State(Scope.Benchmark) public static class BenchmarkState { static String[] array; static { array = new String[1000000]; Arrays.fill(array, "AbabagalamagA"); } } @GenerateMicroBenchmark @OutputTimeUnit(TimeUnit.SECONDS) public List<String> sequential(BenchmarkState state) { return Arrays.stream(state.array) .map(x -> x.toLowerCase()) .collect(Collectors.toList()); } @GenerateMicroBenchmark @OutputTimeUnit(TimeUnit.SECONDS) public List<String> parallel(BenchmarkState state) { return Arrays.stream(state.array) .parallel() .map(x -> x.toLowerCase()) .collect(Collectors.toList()); } } 

我用这个命令运行这个:

 java -jar dist/microbenchmarks.jar ".*SO23170832.*" -wi 5 -i 5 -f 1 

(这些选项表示五次热身迭代,五次基准迭代,以及一次分叉的JVM。)在运行期间,JMH发出大量冗长的消息,这些消息我已经消失了。 总结结果如下。

 Benchmark Mode Samples Mean Mean error Units csqSO23170832.parallel thrpt 5 4.600 5.995 ops/s csqSO23170832.sequential thrpt 5 1.500 1.727 ops/s 

请注意,结果是每秒运算,因此看起来并行运行比顺序运行快三倍。 但是我的机器只有两个内核。 嗯。 每次运行的平均误差实际上大于平均运行时间! WAT? 有什么可疑的事情在这里发生。

这给我们带来了第三个问题。 仔细查看工作负载,我们可以看到它为每个input分配一个新的String对象,并且将结果收集到一个列表中,这涉及到大量的重新分配和复制。 我猜这会导致相当数量的垃圾收集。 我们可以通过启用GC消息重新运行基准来看到这一点:

 java -verbose:gc -jar dist/microbenchmarks.jar ".*SO23170832.*" -wi 5 -i 5 -f 1 

这给出如下结果:

 [GC (Allocation Failure) 512K->432K(130560K), 0.0024130 secs] [GC (Allocation Failure) 944K->520K(131072K), 0.0015740 secs] [GC (Allocation Failure) 1544K->777K(131072K), 0.0032490 secs] [GC (Allocation Failure) 1801K->1027K(132096K), 0.0023940 secs] # Run progress: 0.00% complete, ETA 00:00:20 # VM invoker: /Users/src/jdk/jdk8-b132.jdk/Contents/Home/jre/bin/java # VM options: -verbose:gc # Fork: 1 of 1 [GC (Allocation Failure) 512K->424K(130560K), 0.0015460 secs] [GC (Allocation Failure) 933K->552K(131072K), 0.0014050 secs] [GC (Allocation Failure) 1576K->850K(131072K), 0.0023050 secs] [GC (Allocation Failure) 3075K->1561K(132096K), 0.0045140 secs] [GC (Allocation Failure) 1874K->1059K(132096K), 0.0062330 secs] # Warmup: 5 iterations, 1 s each # Measurement: 5 iterations, 1 s each # Threads: 1 thread, will synchronize iterations # Benchmark mode: Throughput, ops/time # Benchmark: com.stackoverflow.questions.SO23170832.parallel # Warmup Iteration 1: [GC (Allocation Failure) 7014K->5445K(132096K), 0.0184680 secs] [GC (Allocation Failure) 7493K->6346K(135168K), 0.0068380 secs] [GC (Allocation Failure) 10442K->8663K(135168K), 0.0155600 secs] [GC (Allocation Failure) 12759K->11051K(139776K), 0.0148190 secs] [GC (Allocation Failure) 18219K->15067K(140800K), 0.0241780 secs] [GC (Allocation Failure) 22167K->19214K(145920K), 0.0208510 secs] [GC (Allocation Failure) 29454K->25065K(147456K), 0.0333080 secs] [GC (Allocation Failure) 35305K->30729K(153600K), 0.0376610 secs] [GC (Allocation Failure) 46089K->39406K(154624K), 0.0406060 secs] [GC (Allocation Failure) 54766K->48299K(164352K), 0.0550140 secs] [GC (Allocation Failure) 71851K->62725K(165376K), 0.0612780 secs] [GC (Allocation Failure) 86277K->74864K(184320K), 0.0649210 secs] [GC (Allocation Failure) 111216K->94203K(185856K), 0.0875710 secs] [GC (Allocation Failure) 130555K->114932K(199680K), 0.1030540 secs] [GC (Allocation Failure) 162548K->141952K(203264K), 0.1315720 secs] [Full GC (Ergonomics) 141952K->59696K(159232K), 0.5150890 secs] [GC (Allocation Failure) 105613K->85547K(184832K), 0.0738530 secs] 1.183 ops/s 

注意:以#开始的行是普通的JMH输出行。 其余的都是GC消息。 这只是五次热身迭代中的第一次,这是五次基准迭代之前的事情。 在剩下的迭代期间,GC消息继续保持一致。 我认为可以肯定地说,衡量的业绩是由GC开销主导的,所报告的结果是不应该相信的。

目前还不清楚该怎么做。 这纯粹是一个综合的工作量。 与分配和复制相比,这显然涉及执行实际工作的CPU时间很less。 很难说你在这里测量的是什么。 一种方法是提出一个不同的工作量,在某种意义上说,这个工作量更“真实”一些。 另一种方法是在基准testing期间更改堆和GC参数以避免GC。

在做基准testing的时候,应该注意JIT编译器,当JIT开始时,时序行为会改变。如果我在testing程序中join一个预热阶段,那么并行版本比顺序版本要快一点。 结果如下:

 Warmup... Benchmark... Run 0: sequential 0.12s - parallel 0.11s Run 1: sequential 0.13s - parallel 0.08s Run 2: sequential 0.15s - parallel 0.08s Run 3: sequential 0.12s - parallel 0.11s Run 4: sequential 0.13s - parallel 0.08s 

以下是我用于此testing的完整源代码。

 public static void main(String... args) { String[] array = new String[1000000]; Arrays.fill(array, "AbabagalamagA"); System.out.println("Warmup..."); for (int i = 0; i < 100; ++i) { sequential(array); parallel(array); } System.out.println("Benchmark..."); for (int i = 0; i < 5; ++i) { System.out.printf("Run %d: sequential %s - parallel %s\n", i, test(() -> sequential(array)), test(() -> parallel(array))); } } private static void sequential(String[] array) { Arrays.stream(array).map(String::toLowerCase).collect(Collectors.toList()); } private static void parallel(String[] array) { Arrays.stream(array).parallel().map(String::toLowerCase).collect(Collectors.toList()); } private static String test(Runnable runnable) { long start = System.currentTimeMillis(); runnable.run(); long elapsed = System.currentTimeMillis() - start; return String.format("%4.2fs", elapsed / 1000.0); } 

使用多个线程来处理您的数据有一些初始设置成本,例如初始化线程池。 这些成本可能超过使用这些线程的收益,特别是在运行时已经非常低的情况下。 另外,如果存在争用,例如其他正在运行的线程,后台进程等,则并行处理的性能会进一步下降。

这个问题并不是新的并行处理。 本文根据Java 8 parallel()以及其他一些需要考虑的内容提供了一些细节: http : //java.dzone.com/articles/think-twice-using-java-8