我可以在Java 8中复制一个stream吗?

有时我想在一个stream上执行一组操作,然后用其他操作以两种不同的方式处理结果stream。

我能做到这一点,而不必指定两次通用的初始操作?

例如,我希望有一个如下所示的dup()方法:

 Stream [] desired_streams = IntStream.range(1, 100).filter(n -> n % 2 == 0).dup(); Stream stream14 = desired_streams[0].filter(n -> n % 7 == 0); // multiples of 14 Stream stream10 = desired_streams[1].filter(n -> n % 5 == 0); // multiples of 10 

一般情况下是不可能的。

如果你想复制一个inputstream或input迭代器,你有两个select:

A.把所有东西放在一个集合中,比如一个List<>

假设您将一个stream复制到两个streams1s2 。 如果你在s1n2有先进的n1元素,那么你必须保留|n2 - n1| 记忆中的元素,只是为了跟上步伐。 如果您的stream是无限的,那么所需的存储空间可能没有上限。

看看Python的tee()来看看它是什么:

这个itertool可能需要大量的辅助存储(取决于需要存储多less临时数据)。 通常,如果一个迭代器在另一个迭代器启动之前使用大部分或全部数据,则使用list()而不是tee()会更快。

B.尽可能:复制创build元素的生成器的状态

要使此选项有效,您可能需要访问stream的内部工作。 换句话说,生成器 – 创build元素的部分 – 应该首先支持复制。 [OP:看到这个很好的答案 ,作为一个例子来说明在问题中如何做这个例子]

它不会在用户的input上工作,因为你必须复制整个“外部世界”的状态。 Java的Stream不支持复制,因为它被devise为尽可能通用,特别是与文件,networking,键盘,传感器,随机等等一起工作。[OP:另一个例子是根据需要读取温度传感器的stream。 没有存储读数的副本,它不能被复制]

这不仅在Java的情况下, 这是一个通用规则。 你可以看到,C ++中的std::istream只支持移动语义,而不是复制语义(“copy constructor(deleted)”),出于这个原因(和其他)。

以这种方式复制stream是不可能的。 但是,通过将通用部分移动到方法或lambdaexpression式中,可以避免代码重复。

 Supplier<IntStream> supplier = () -> IntStream.range(1, 100).filter(n -> n % 2 == 0); supplier.get().filter(...); supplier.get().filter(...); 

如果你在一个副本中缓冲了你已经消耗的元素,而另一个却没有。

我们为jOOλ中的stream实现了一个duplicate()方法,我们创build了一个开源库来改进jOOQ的集成testing。 本质上,你可以写:

 Tuple2<Seq<Integer>, Seq<Integer>> desired_streams = Seq.seq( IntStream.range(1, 100).filter(n -> n % 2 == 0).boxed() ).duplicate(); 

(注意:我们目前需要IntSeqstream,因为我们还没有实现一个IntSeq

在内部,有一个LinkedList缓冲区,用于存储从一个stream中消耗的所有值,而不是从另一个stream中消耗的值。 如果您的两个stream以相同的速度消耗,那么这可能是高效的。

algorithm的工作原理如下:

 static <T> Tuple2<Seq<T>, Seq<T>> duplicate(Stream<T> stream) { final LinkedList<T> gap = new LinkedList<>(); final Iterator<T> it = stream.iterator(); @SuppressWarnings("unchecked") final Iterator<T>[] ahead = new Iterator[] { null }; class Duplicate implements Iterator<T> { @Override public boolean hasNext() { if (ahead[0] == null || ahead[0] == this) return it.hasNext(); return !gap.isEmpty(); } @Override public T next() { if (ahead[0] == null) ahead[0] = this; if (ahead[0] == this) { T value = it.next(); gap.offer(value); return value; } return gap.poll(); } } return tuple(seq(new Duplicate()), seq(new Duplicate())); } 

更多源代码在这里

事实上,使用jOOλ ,你将能够写出一个完整的一行:

 Tuple2<Seq<Integer>, Seq<Integer>> desired_streams = Seq.seq( IntStream.range(1, 100).filter(n -> n % 2 == 0).boxed() ).duplicate() .map1(s -> s.filter(n -> n % 7 == 0)) .map2(s -> s.filter(n -> n % 5 == 0)); // This will yield 14, 28, 42, 56... desired_streams.v1.forEach(System.out::println) // This will yield 10, 20, 30, 40... desired_streams.v2.forEach(System.out::println); 

要么,

  • 将初始化移动到一个方法中,只需再次调用该方法即可

这有一个明确的优点,你在做什么,也适用于无限的stream。

  • 收集stream,然后重新stream

在你的例子中:

 final int[] arr = IntStream.range(1, 100).filter(n -> n % 2 == 0).toArray(); 

然后

 final IntStream s = IntStream.of(arr); 

更新:不起作用 。 在原答案的文字后面看下面的解释。

我多么傻。 所有我需要做的是:

 Stream desired_stream = IntStream.range(1, 100).filter(n -> n % 2 == 0); Stream stream14 = desired_stream.filter(n -> n % 7 == 0); // multiples of 14 Stream stream10 = desired_stream.filter(n -> n % 5 == 0); // multiples of 10 

解释为什么这不起作用:

如果你编码并尝试收集这两个stream,第一个将收集罚款,但试图stream第二个将抛出exception: java.lang.IllegalStateException: stream has already been operated upon or closed

详细来说,stream是有状态的对象(顺便说一句,不能被重置或倒带)。 你可以把它们看作迭代器,而这又是指针。 所以stream14stream10可以被认为是对同一个指针的引用。 首先消耗第一个stream将导致指针“走到尽头”。 试图消耗第二个数据stream就像试图访问一个已经“超过终点”的指针,这自然是非法操作。

正如接受的答案所示,创buildstream的代码必须执行两次,但可以划分为Supplier lambda或类似的结构。

完整的testing代码:保存到Foo.java ,然后保存到javac Foo.java ,然后是java Foo

 import java.util.stream.IntStream; public class Foo { public static void main (String [] args) { IntStream s = IntStream.range(0, 100).filter(n -> n % 2 == 0); IntStream s1 = s.filter(n -> n % 5 == 0); s1.forEach(n -> System.out.println(n)); IntStream s2 = s.filter(n -> n % 7 == 0); s2.forEach(n -> System.out.println(n)); } } 

输出:

 $ javac Foo.java $ java Foo 0 10 20 30 40 50 60 70 80 90 Exception in thread "main" java.lang.IllegalStateException: stream has already been operated upon or closed at java.util.stream.AbstractPipeline.<init>(AbstractPipeline.java:203) at java.util.stream.IntPipeline.<init>(IntPipeline.java:91) at java.util.stream.IntPipeline$StatelessOp.<init>(IntPipeline.java:592) at java.util.stream.IntPipeline$9.<init>(IntPipeline.java:332) at java.util.stream.IntPipeline.filter(IntPipeline.java:331) at Foo.main(Foo.java:8) 

您也可以将stream生成移动到单独的方法/函数,返回此stream并调用它两次。