Java 8stream与批处理

我有一个包含项目列表的大文件。

我想创build一批物品,用这个批处理做一个HTTP请求(在HTTP请求中所有这些项都需要作为参数)。 我可以用for循环很容易地做到这一点,但是作为Java 8的爱好者,我想尝试用Java 8的Stream框架来编写这个代码(并获得延迟处理的好处)。

例:

 List<String> batch = new ArrayList<>(BATCH_SIZE); for (int i = 0; i < data.size(); i++) { batch.add(data.get(i)); if (batch.size() == BATCH_SIZE) process(batch); } if (batch.size() > 0) process(batch); 

我想做一些lazyFileStream.group(500).map(processBatch).collect(toList())

什么是最好的方法来做到这一点?

您可以使用jOOλ来实现 ,这是一个为单线程,顺序stream使用情况扩展Java 8stream的库:

 Seq.seq(lazyFileStream) // Seq<String> .zipWithIndex() // Seq<Tuple2<String, Long>> .groupBy(tuple -> tuple.v2 / 500) // Map<Long, List<String>> .forEach((index, batch) -> { process(batch); }); 

在幕后, zipWithIndex()只是:

 static <T> Seq<Tuple2<T, Long>> zipWithIndex(Stream<T> stream) { final Iterator<T> it = stream.iterator(); class ZipWithIndex implements Iterator<Tuple2<T, Long>> { long index; @Override public boolean hasNext() { return it.hasNext(); } @Override public Tuple2<T, Long> next() { return tuple(it.next(), index++); } } return seq(new ZipWithIndex()); } 

…而groupBy()是API的便利:

 default <K> Map<K, List<T>> groupBy(Function<? super T, ? extends K> classifier) { return collect(Collectors.groupingBy(classifier)); } 

(免责声明:我为jOOλ背后的公司工作)

为了完整,这是一个番石榴的解决scheme。

 Iterators.partition(stream.iterator(), batchSize).forEachRemaining(this::process); 

在这个问题集合是可用的,所以不需要stream,它可以被写成,

 Iterables.partition(data, batchSize).forEach(this::process); 

纯Java-8实现也是可能的:

 int BATCH = 500; IntStream.range(0, (data.size()+BATCH-1)/BATCH) .mapToObj(i -> data.subList(i*BATCH, Math.min(data.size(), (i+1)*BATCH))) .forEach(batch -> process(batch)); 

请注意,与JOOl不同,它可以并行工作(假设您的data是随机访问列表)。

纯Java 8解决scheme

我们可以创build一个自定义收集器来优雅地完成这个工作,这个过程需要batch size处理和一个Consumer来处理每个批处理:

 import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Set; import java.util.function.*; import java.util.stream.Collector; import static java.util.Objects.requireNonNull; /** * Collects elements in the stream and calls the supplied batch processor * after the configured batch size is reached. * * In case of a parallel stream, the batch processor may be called with * elements less than the batch size. * * The elements are not kept in memory, and the final result will be an * empty list. * * @param <T> Type of the elements being collected */ class BatchCollector<T> implements Collector<T, List<T>, List<T>> { private final int batchSize; private final Consumer<List<T>> batchProcessor; /** * Constructs the batch collector * * @param batchSize the batch size after which the batchProcessor should be called * @param batchProcessor the batch processor which accepts batches of records to process */ BatchCollector(int batchSize, Consumer<List<T>> batchProcessor) { batchProcessor = requireNonNull(batchProcessor); this.batchSize = batchSize; this.batchProcessor = batchProcessor; } public Supplier<List<T>> supplier() { return ArrayList::new; } public BiConsumer<List<T>, T> accumulator() { return (ts, t) -> { ts.add(t); if (ts.size() >= batchSize) { batchProcessor.accept(ts); ts.clear(); } }; } public BinaryOperator<List<T>> combiner() { return (ts, ots) -> { // process each parallel list without checking for batch size // avoids adding all elements of one to another // can be modified if a strict batching mode is required batchProcessor.accept(ts); batchProcessor.accept(ots); return Collections.emptyList(); }; } public Function<List<T>, List<T>> finisher() { return ts -> { batchProcessor.accept(ts); return Collections.emptyList(); }; } public Set<Characteristics> characteristics() { return Collections.emptySet(); } } 

然后可以创build一个辅助工具类:

 import java.util.List; import java.util.function.Consumer; import java.util.stream.Collector; public class StreamUtils { /** * Creates a new batch collector * @param batchSize the batch size after which the batchProcessor should be called * @param batchProcessor the batch processor which accepts batches of records to process * @param <T> the type of elements being processed * @return a batch collector instance */ public static <T> Collector<T, List<T>, List<T>> batchCollector(int batchSize, Consumer<List<T>> batchProcessor) { return new BatchCollector<T>(batchSize, batchProcessor); } } 

用法示例:

 List<Integer> input = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); List<Integer> output = new ArrayList<>(); int batchSize = 3; Consumer<List<Integer>> batchProcessor = xs -> output.addAll(xs); input.stream() .collect(StreamUtils.batchCollector(batchSize, batchProcessor)); 

我已经在GitHub上发布了我的代码,如果有人想看:

链接到Github

你也可以使用RxJava :

 Observable.from(data).buffer(BATCH_SIZE).forEach((batch) -> process(batch)); 

要么

 Observable.from(lazyFileStream).buffer(500).map((batch) -> process(batch)).toList(); 

要么

 Observable.from(lazyFileStream).buffer(500).map(MyClass::process).toList(); 

你也可以看看独眼巨人的反应 ,我是这个图书馆的作者。 它实现了jOOλ接口(并通过扩展JDK 8 Streams),但与JDK 8 Parallel Streams不同,它着重于Asyncrhonous操作(如潜在阻塞Async I / O调用)。 相比之下,JDK并行数据stream专注于CPU绑定操作的数据并行性。 它通过pipe理基于Future的任务聚合来工作,但向最终用户呈现标准的扩展streamAPI。

此示例代码可以帮助您开始

 LazyFutureStream.parallelCommonBuilder() .react(data) .grouped(BATCH_SIZE) .map(this::process) .run(); 

这里有一个关于批处理的教程

还有一个更一般的教程

要使用自己的线程池(这可能更适合阻塞I / O),可以开始处理

  LazyReact reactor = new LazyReact(40); reactor.react(data) .grouped(BATCH_SIZE) .map(this::process) .run(); 

我为这样的场景写了一个自定义的Spliterator。 它将填充inputstream的给定大小的列表。 这种方法的优点是它会执行惰性处理,并且可以和其他streamfunction一起工作。

 public static <T> Stream<List<T>> batches(Stream<T> stream, int batchSize) { return batchSize <= 0 ? Stream.of(stream.collect(Collectors.toList())) : StreamSupport.stream(new BatchSpliterator<>(stream.spliterator(), batchSize), stream.isParallel()); } private static class BatchSpliterator<E> implements Spliterator<List<E>> { private final Spliterator<E> base; private final int batchSize; public BatchSpliterator(Spliterator<E> base, int batchSize) { this.base = base; this.batchSize = batchSize; } @Override public boolean tryAdvance(Consumer<? super List<E>> action) { final List<E> batch = new ArrayList<>(batchSize); for (int i=0; i < batchSize && base.tryAdvance(batch::add); i++) ; if (batch.isEmpty()) return false; action.accept(batch); return true; } @Override public Spliterator<List<E>> trySplit() { if (base.estimateSize() <= batchSize) return null; final Spliterator<E> splitBase = this.base.trySplit(); return splitBase == null ? null : new BatchSpliterator<>(splitBase, batchSize); } @Override public long estimateSize() { final double baseSize = base.estimateSize(); return baseSize == 0 ? 0 : (long) Math.ceil(baseSize / (double) batchSize); } @Override public int characteristics() { return base.characteristics(); } } 

我们有类似的问题要解决。 我们想要一个比系统内存大的数据stream(遍历数据库中的所有对象)并尽可能最好地sorting – 我们认为可以caching10,000个条目并对它们进行随机化。

目标是一个function,它采取了一个stream。

在这里提出的解决scheme中,似乎有一系列的select:

  • 使用各种非java 8附加库
  • 从不是stream的东西开始 – 例如随机访问列表
  • 有一个可以在拆分器中轻松拆分的stream

我们的本能最初是使用自定义收集器,但这意味着退出stream式传输。 上面的自定义收集器解决scheme非常好,我们几乎使用它。

这里有一个解决scheme,通过使用Stream可以给你一个Iterator的事实,你可以使用它作为一个逃生孵化器 ,让你做一些额外的stream不支持。 Iterator被转换回使用另一个Java 8 StreamSupport巫术的位stream。

 /** * An iterator which returns batches of items taken from another iterator */ public class BatchingIterator<T> implements Iterator<List<T>> { /** * Given a stream, convert it to a stream of batches no greater than the * batchSize. * @param originalStream to convert * @param batchSize maximum size of a batch * @param <T> type of items in the stream * @return a stream of batches taken sequentially from the original stream */ public static <T> Stream<List<T>> batchedStreamOf(Stream<T> originalStream, int batchSize) { return asStream(new BatchingIterator<>(originalStream.iterator(), batchSize)); } private static <T> Stream<T> asStream(Iterator<T> iterator) { return StreamSupport.stream( Spliterators.spliteratorUnknownSize(iterator,ORDERED), false); } private int batchSize; private List<T> currentBatch; private Iterator<T> sourceIterator; public BatchingIterator(Iterator<T> sourceIterator, int batchSize) { this.batchSize = batchSize; this.sourceIterator = sourceIterator; } @Override public boolean hasNext() { prepareNextBatch(); return currentBatch!=null && !currentBatch.isEmpty(); } @Override public List<T> next() { return currentBatch; } private void prepareNextBatch() { currentBatch = new ArrayList<>(batchSize); while (sourceIterator.hasNext() && currentBatch.size() < batchSize) { currentBatch.add(sourceIterator.next()); } } } 

使用这个简单的例子看起来像这样:

 @Test public void getsBatches() { BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3) .forEach(System.out::println); } 

上面的打印

 [A, B, C] [D, E, F] 

对于我们的用例,我们想把这些批次洗牌,然后把它们保存成一个stream – 看起来像这样:

 @Test public void howScramblingCouldBeDone() { BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3) // the lambda in the map expression sucks a bit because Collections.shuffle acts on the list, rather than returning a shuffled one .map(list -> { Collections.shuffle(list); return list; }) .flatMap(List::stream) .forEach(System.out::println); } 

这输出的东西像(它是随机的,每次都不一样)

 A C B E D F 

这里的秘诀就是始终有一个stream,所以你可以在批处理stream上操作,或者对每个批处理做一些事情,然后将其flatMap回stream。 更好的是,以上所有内容仅作为最终forEachcollect或其他终止expression式通过stream数据。

事实certificate, iterator是一种特殊types的stream终止操作 ,并不会导致整个stream运行并进入内存! 感谢Java 8家伙的精彩devise!

简单的例子使用Spliterator

  // read file into stream, try-with-resources try (Stream<String> stream = Files.lines(Paths.get(fileName))) { //skip header Spliterator<String> split = stream.skip(1).spliterator(); Chunker<String> chunker = new Chunker<String>(); while(true) { boolean more = split.tryAdvance(chunker::doSomething); if (!more) { break; } } } catch (IOException e) { e.printStackTrace(); } } static class Chunker<T> { int ct = 0; public void doSomething(T line) { System.out.println(ct++ + " " + line.toString()); if (ct % 100 == 0) { System.out.println("====================chunk====================="); } } } 

}

布鲁斯的回答更全面,但我正在寻找一些快速和肮脏的东西来处理一堆文件。