你能把一个stream分成两个stream吗?

我有一个由Java 8stream表示的数据集:

Stream<T> stream = ...; 

我可以看到如何过滤它以获得一个随机子集 – 例如

 Random r = new Random(); PrimitiveIterator.OfInt coin = r.ints(0, 2).iterator(); Stream<T> heads = stream.filter((x) -> (coin.nextInt() == 0)); 

我还可以看到我怎样才能减less这个数据stream,例如,得到两个表示数据集中两个随机数的列表,然后将这些数据集转换回stream。 但是,是否有一种直接的方式来从最初的一个生成两个stream? 就像是

 (heads, tails) = stream.[some kind of split based on filter] 

感谢您的任何见解。

不完全是。 你不能从一个Stream获得两个Stream ; 这没有任何意义 – 你将如何迭代一个,而不需要同时生成另一个? stream只能运行一次。

但是,如果你想把它们转储到列表或其他东西,你可以这样做

 stream.forEach((x) -> ((x == 0) ? heads : tails).add(x)); 

收集器可以用于此。

  • 对于两个类别,请使用Collectors.partitioningBy()工厂。

这将创build一个从BooleanListMap ,并根据Predicate把项目放在一个或另一个列表中。

注意:由于stream需要被全部使用,所以这无法在无限stream上工作。 因为无论如何这个stream都被消耗掉了,所以这个方法只是简单地将它们放在Lists中,而不是创build一个新的带有内存的stream。

另外,不需要迭代器,甚至不需要你提供的头像。

 Random r = new Random(); Map<Boolean, List<String>> groups = stream .collect(Collectors.partitioningBy(x -> r.nextBoolean())); System.out.println(groups.get(false).size()); System.out.println(groups.get(true).size()); 
  • 有关更多类别,请使用Collectors.groupingBy()工厂。
 Map<Object, List<String>> groups = stream .collect(Collectors.groupingBy(x -> r.nextInt(3))); System.out.println(groups.get(0).size()); System.out.println(groups.get(1).size()); System.out.println(groups.get(2).size()); 

如果stream不是Stream ,而是像IntStream这样的基本streamIntStream ,那么这个.collect(Collectors)方法是不可用的。 您必须在没有收集器工厂的情况下手动完成。 它的实现看起来像这样:

 IntStream intStream = IntStream.iterate(0, i -> i + 1).limit(1000000); Predicate<Integer> p = x -> r.nextBoolean(); Map<Boolean, List<Integer>> groups = intStream.collect(() -> { Map<Boolean, List<Integer>> map = new HashMap<>(); map.put(false, new ArrayList<>()); map.put(true, new ArrayList<>()); return map; }, (map, x) -> { boolean partition = p.test(x); List<Integer> list = map.get(partition); list.add(x); }, (map1, map2) -> { map1.get(false).addAll(map2.get(false)); map1.get(true).addAll(map2.get(true)); }); System.out.println(groups.get(false).size()); System.out.println(groups.get(true).size()); 

编辑

正如所指出的,上面的“解决方法”不是线程安全的。 在收集之前转换为正常的Stream是一种方式:

 Stream<Integer> stream = intStream.boxed(); 

不幸的是,你所要求的在Stream的JavaDoc中直接被忽略了 :

应该只对一个数据stream进行操作(调用中间或terminalstream操作)一次。 这排除了例如“分叉”stream,其中相同的源馈送两个或更多个pipe线,或者多个遍历同一个stream。

如果你真的渴望这种行为,你可以使用peek或其他方法来解决这个问题。 在这种情况下,你应该做的是不要试图从一个源代码filter中取回来自同一个原始stream源的两个stream,而是复制你的stream并适当地过滤每个重复。

但是,您可能希望重新考虑Stream是否适合您的用例。

这是违反Stream的一般机制。 假设你可以将Stream S0分解为Sa和Sb,就像你想的那样。 在Sa上执行任何terminal操作(例如count()将必然“消耗”S0中的所有元素。 所以Sb丢失了它的数据源。

以前,Stream有一个tee()方法,我认为它将一个stream复制到两个。 它现在被删除。

Stream虽然有一个peek()方法,但你也许可以用它来实现你的需求。

我偶然发现了这个问题我自己,我觉得叉stream有一些可以certificate是有效的用例。 我把下面的代码作为一个消费者来写,这样它就不会做任何事情,但是你可以将它应用到函数和其他你可能遇到的任何东西上。

 class PredicateSplitterConsumer<T> implements Consumer<T> { private Predicate<T> predicate; private Consumer<T> positiveConsumer; private Consumer<T> negativeConsumer; public PredicateSplitterConsumer(Predicate<T> predicate, Consumer<T> positive, Consumer<T> negative) { this.predicate = predicate; this.positiveConsumer = positive; this.negativeConsumer = negative; } @Override public void accept(T t) { if (predicate.test(t)) { positiveConsumer.accept(t); } else { negativeConsumer.accept(t); } } } 

现在你的代码实现可能是这样的:

 personsArray.forEach( new PredicateSplitterConsumer<>( person -> person.getDateOfBirth().isPresent(), person -> System.out.println(person.getName()), person -> System.out.println(person.getName() + " does not have Date of birth"))); 

不完全是,但是你可以通过调用Collectors.groupingBy()来完成你所需要的。 您创build一个新的集合,然后可以在该新集合上实例化stream。

这是我能想出的最不好的答案。

 import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; public class Test { public static <T, L, R> Pair<L, R> splitStream(Stream<T> inputStream, Predicate<T> predicate, Function<Stream<T>, L> trueStreamProcessor, Function<Stream<T>, R> falseStreamProcessor) { Map<Boolean, List<T>> partitioned = inputStream.collect(Collectors.partitioningBy(predicate)); L trueResult = trueStreamProcessor.apply(partitioned.get(Boolean.TRUE).stream()); R falseResult = falseStreamProcessor.apply(partitioned.get(Boolean.FALSE).stream()); return new ImmutablePair<L, R>(trueResult, falseResult); } public static void main(String[] args) { Stream<Integer> stream = Stream.iterate(0, n -> n + 1).limit(10); Pair<List<Integer>, String> results = splitStream(stream, n -> n > 5, s -> s.filter(n -> n % 2 == 0).collect(Collectors.toList()), s -> s.map(n -> n.toString()).collect(Collectors.joining("|"))); System.out.println(results); } } 

这需要一个整数stream,并在5分裂。对于那些大于5它只过滤偶数,并把它们放在一个列表中。 剩下的就用|来连接它们。

输出:

  ([6, 8],0|1|2|3|4|5) 

它并不理想,因为它把所有事情都收集到中介集合中(这个集合有太多争论!)

我偶然发现这个问题,同时寻找一种方法来过滤stream中的某些元素,并将它们logging为错误。 所以我并不需要太多的分stream,而是用一种不显眼的语法把一个过早的终止动作附加到一个谓词上。 这就是我想到的:

 public class MyProcess { /* Return a Predicate that performs a bail-out action on non-matching items. */ private static <T> Predicate<T> withAltAction(Predicate<T> pred, Consumer<T> altAction) { return x -> { if (pred.test(x)) { return true; } altAction.accept(x); return false; }; /* Example usage in non-trivial pipeline */ public void processItems(Stream<Item> stream) { stream.filter(Objects::nonNull) .peek(this::logItem) .map(Item::getSubItems) .filter(withAltAction(SubItem::isValid, i -> logError(i, "Invalid"))) .peek(this::logSubItem) .filter(withAltAction(i -> i.size() > 10, i -> logError(i, "Too large"))) .map(SubItem::toDisplayItem) .forEach(this::display); } } 

怎么样:

 Supplier<Stream<Integer>> randomIntsStreamSupplier = () -> (new Random()).ints(0, 2).boxed(); Stream<Integer> tails = randomIntsStreamSupplier.get().filter(x->x.equals(0)); Stream<Integer> heads = randomIntsStreamSupplier.get().filter(x->x.equals(1));