分区Java 8stream

如何在Java 8 Stream上实现“分区”操作? 通过分区我的意思是,将一个stream划分成给定大小的子stream。 不知何故,它将与Guava Iterators.partition()方法相同,只是希望分区是懒惰评估的Streams而不是List。

将任意源码stream分割成固定大小的批处理是不可能的,因为这会导致并行处理。 并行处理时,您可能不知道分割后的第一个子任务中有多less个元素,因此在第一个子任务完全处理之前,您无法为下一个子任务创build分区。

但是可以从随机访问List创build分区stream。 例如,我的StreamEx库中提供了这样的function:

 List<Type> input = Arrays.asList(...); Stream<List<Type>> stream = StreamEx.ofSubLists(input, partitionSize); 

或者如果你真的想要stream的stream:

 Stream<Stream<Type>> stream = StreamEx.ofSubLists(input, partitionSize).map(List::stream); 

如果您不想依赖第三方库,可以手动实现这样的ofSubLists方法:

 public static <T> Stream<List<T>> ofSubLists(List<T> source, int length) { if (length <= 0) throw new IllegalArgumentException("length = " + length); int size = source.size(); if (size <= 0) return Stream.empty(); int fullChunks = (size - 1) / length; return IntStream.range(0, fullChunks + 1).mapToObj( n -> source.subList(n * length, n == fullChunks ? size : (n + 1) * length)); } 

这个实现看起来有点长,但是它考虑了一些特殊情况,比如接近MAX_VALUE列表大小。


如果你想为无序stream提供并行友好的解决scheme(所以你不关心哪一个stream元素将被合并在一个批次中),你可以像这样使用收集器(感谢@sibnick的启发):

 public static <T, A, R> Collector<T, ?, R> unorderedBatches(int batchSize, Collector<List<T>, A, R> downstream) { class Acc { List<T> cur = new ArrayList<>(); A acc = downstream.supplier().get(); } BiConsumer<Acc, T> accumulator = (acc, t) -> { acc.cur.add(t); if(acc.cur.size() == batchSize) { downstream.accumulator().accept(acc.acc, acc.cur); acc.cur = new ArrayList<>(); } }; return Collector.of(Acc::new, accumulator, (acc1, acc2) -> { acc1.acc = downstream.combiner().apply(acc1.acc, acc2.acc); for(T t : acc2.cur) accumulator.accept(acc1, t); return acc1; }, acc -> { if(!acc.cur.isEmpty()) downstream.accumulator().accept(acc.acc, acc.cur); return downstream.finisher().apply(acc.acc); }, Collector.Characteristics.UNORDERED); } 

用法示例:

 List<List<Integer>> list = IntStream.range(0,20) .boxed().parallel() .collect(unorderedBatches(3, Collectors.toList())); 

结果:

 [[2, 3, 4], [7, 8, 9], [0, 1, 5], [12, 13, 14], [17, 18, 19], [10, 11, 15], [6, 16]] 

这种收集器是完全线程安全的,并生成顺序stream的有序批次。

如果要为每个批次应用中间转换,则可以使用以下版本:

 public static <T, AA, A, B, R> Collector<T, ?, R> unorderedBatches(int batchSize, Collector<T, AA, B> batchCollector, Collector<B, A, R> downstream) { return unorderedBatches(batchSize, Collectors.mapping(list -> list.stream().collect(batchCollector), downstream)); } 

例如,用这种方法,您可以在每个批次中对数字进行求和:

 List<Integer> list = IntStream.range(0,20) .boxed().parallel() .collect(unorderedBatches(3, Collectors.summingInt(Integer::intValue), Collectors.toList())); 

如果您想要顺序使用Stream,可以对Stream进行分区(以及执行相关函数,如窗口 – 在这种情况下我认为是您真正想要的)。 两个支持分离标准stream的库是独眼巨人反应 (我是作者), 以及哪个独眼巨人反应延伸(以增加窗口等function)。

Cyclops- Stream有一个静态函数StreamUtils用于在Java Streams上操作的集合,以及一系列函数,如splitAt,headAndTail,splitBy,用于分区的分区。

要将一个Stream窗口变成一个大小为30的嵌套Streamstream,可以使用window方法。

对于OP来说,在Streaming术语中,将Stream拆分成给定大小的多个Stream是Windowing操作(而不是分区操作)。

  Stream<Streamable<Integer>> streamOfStreams = StreamUtils.window(stream,30); 

有一个名为ReactiveSeq的Stream扩展类,扩展了jool.Seq并添加了Windowingfunction,这可能会使代码变得更简洁 。

  ReactiveSeq<Integer> seq; ReactiveSeq<ListX<Integer>> streamOfLists = seq.grouped(30); 

正如Tagir指出的那样,这并不适合并行Streams。 如果您想要以multithreading的方式对希望执行的stream进行窗口化或批处理。 LazyFutureStream在独眼巨人反应可能是有用的(窗口是在待办事项列表,但普通的旧批处理现在可用)。

在这种情况下,数据将从执行Stream的多个线程传递到多生产者/单消费者等待空闲队列,并且来自该队列的顺序数据可以在被再次分发到线程之前被窗口化。

  Stream<List<Data>> batched = new LazyReact().range(0,1000) .grouped(30) .map(this::process); 

看起来像Jon Skeet在他的评论中所表明的那样,懒得分区是不可能的。 对于非懒的分区,我已经有了这样的代码:

 public static <T> Stream<Stream<T>> partition(Stream<T> source, int size) { final Iterator<T> it = source.iterator(); final Iterator<Stream<T>> partIt = Iterators.transform(Iterators.partition(it, size), List::stream); final Iterable<Stream<T>> iterable = () -> partIt; return StreamSupport.stream(iterable.spliterator(), false); } 

我认为这是可能的一些内部黑客:

为批处理创build实用程序类:

 public static class ConcurrentBatch { private AtomicLong id = new AtomicLong(); private int batchSize; public ConcurrentBatch(int batchSize) { this.batchSize = batchSize; } public long next() { return (id.getAndIncrement()) / batchSize; } public int getBatchSize() { return batchSize; } } 

和方法:

 public static <T> void applyConcurrentBatchToStream(Consumer<List<T>> batchFunc, Stream<T> stream, int batchSize){ ConcurrentBatch batch = new ConcurrentBatch(batchSize); //hack java map: extends and override computeIfAbsent Supplier<ConcurrentMap<Long, List<T>>> mapFactory = () -> new ConcurrentHashMap<Long, List<T>>() { @Override public List<T> computeIfAbsent(Long key, Function<? super Long, ? extends List<T>> mappingFunction) { List<T> rs = super.computeIfAbsent(key, mappingFunction); //apply batchFunc to old lists, when new batch list is created if(rs.isEmpty()){ for(Entry<Long, List<T>> e : entrySet()) { List<T> batchList = e.getValue(); //todo: need to improve synchronized (batchList) { if (batchList.size() == batch.getBatchSize()){ batchFunc.accept(batchList); remove(e.getKey()); batchList.clear(); } } } } return rs; } }; stream.map(s -> new AbstractMap.SimpleEntry<>(batch.next(), s)) .collect(groupingByConcurrent(AbstractMap.SimpleEntry::getKey, mapFactory, mapping(AbstractMap.SimpleEntry::getValue, toList()))) .entrySet() .stream() //map contains only unprocessed lists (size<batchSize) .forEach(e -> batchFunc.accept(e.getValue())); } 

这是AbacusUtil的快速解决scheme

 IntStream.range(0, Integer.MAX_VALUE).split(size).forEach(s -> N.println(s.toArray())); 

声明:我是AbacusUtil的开发人员。