用Scalaz 7 zipWithIndex /组枚举避免内存泄漏

背景

正如在这个问题中指出的,我正在使用Scalaz 7迭代来处理恒定堆空间中的大量(即无界)数据stream。

我的代码如下所示:

type ErrorOrT[M[+_], A] = EitherT[M, Throwable, A] type ErrorOr[A] = ErrorOrT[IO, A] def processChunk(c: Chunk, idx: Long): Result def process(data: EnumeratorT[Chunk, ErrorOr]): IterateeT[Vector[(Chunk, Long)], ErrorOr, Vector[Result]] = Iteratee.fold[Vector[(Chunk, Long)], ErrorOr, Vector[Result]](Nil) { (rs, vs) => rs ++ vs map { case (c, i) => processChunk(c, i) } } &= (data.zipWithIndex mapE Iteratee.group(P)) 

问题

我似乎已经遇到了内存泄漏,但是我不太熟悉Scalaz / FP,知道这个bug是在Scalaz还是在我的代码中。 直观地说,我期望这个代码只需要(大约) P倍的Chunk空间。

注:我发现一个类似的问题 ,其中遇到OutOfMemoryError ,但我的代码不使用consume

testing

我跑了一些testing,试图隔离问题。 总而言之,泄漏只有在使用zipWithIndexgroup时才会出现。

 // no zipping/grouping scala> (i1 &= enumArrs(1 << 25, 128)).run.unsafePerformIO res47: Long = 4294967296 // grouping only scala> (i2 &= (enumArrs(1 << 25, 128) mapE Iteratee.group(4))).run.unsafePerformIO res49: Long = 4294967296 // zipping and grouping scala> (i3 &= (enumArrs(1 << 25, 128).zipWithIndex mapE Iteratee.group(4))).run.unsafePerformIO java.lang.OutOfMemoryError: Java heap space // zipping only scala> (i4 &= (enumArrs(1 << 25, 128).zipWithIndex)).run.unsafePerformIO res51: Long = 4294967296 // no zipping/grouping, larger arrays scala> (i1 &= enumArrs(1 << 27, 128)).run.unsafePerformIO res53: Long = 17179869184 // zipping only, larger arrays scala> (i4 &= (enumArrs(1 << 27, 128).zipWithIndex)).run.unsafePerformIO res54: Long = 17179869184 

代码testing:

 import scalaz.iteratee._, scalaz.effect.IO, scalaz.std.vector._ // define an enumerator that produces a stream of new, zero-filled arrays def enumArrs(sz: Int, n: Int) = Iteratee.enumIterator[Array[Int], IO]( Iterator.continually(Array.fill(sz)(0)).take(n)) // define an iteratee that consumes a stream of arrays // and computes its length val i1 = Iteratee.fold[Array[Int], IO, Long](0) { (c, a) => c + a.length } // define an iteratee that consumes a grouped stream of arrays // and computes its length val i2 = Iteratee.fold[Vector[Array[Int]], IO, Long](0) { (c, as) => c + as.map(_.length).sum } // define an iteratee that consumes a grouped/zipped stream of arrays // and computes its length val i3 = Iteratee.fold[Vector[(Array[Int], Long)], IO, Long](0) { (c, vs) => c + vs.map(_._1.length).sum } // define an iteratee that consumes a zipped stream of arrays // and computes its length val i4 = Iteratee.fold[(Array[Int], Long), IO, Long](0) { (c, v) => c + v._1.length } 

问题

  • 是我的代码中的错误?
  • 我怎样才能使这个工作在恒定的堆空间?

对于那些坚持使用旧版本的iteratee API的人来说,这是一个小小的安慰,但是我最近证实了一个相当于scalaz-stream API的testing 。 这是一个新的stream处理API,旨在替代iteratee

为了完整性,这里是testing代码:

 // create a stream containing `n` arrays with `sz` Ints in each one def streamArrs(sz: Int, n: Int): Process[Task, Array[Int]] = (Process emit Array.fill(sz)(0)).repeat take n (streamArrs(1 << 25, 1 << 14).zipWithIndex pipe process1.chunk(4) pipe process1.fold(0L) { (c, vs) => c + vs.map(_._1.length.toLong).sum }).runLast.run 

这应该与n参数的任何值一起工作(假设您愿意等待足够长的时间) – 我用2 ^ 14 32MiB数组(即总共分配了一半的TiB内存)进行testing。