并行处理文件中的行(Scala)?

我知道斯卡拉平行集合。 他们很方便! 但是,我想遍历一个太大的文件的行并行内存。 例如,我可以创build线程并设置对扫描器的locking,但如果我可以运行如下代码将会很好:

Source.fromFile(path).getLines.par foreach { line => 

不幸的是,

 error: value par is not a member of Iterator[String] 

在这里完成一些平行的最简单的方法是什么? 就目前而言,我会用一些线条来解读并行处理。

您可以使用分组来轻松地将迭代器分割成可加载到内存中的块,然后并行处理。

 val chunkSize = 128 * 1024 val iterator = Source.fromFile(path).getLines.grouped(chunkSize) iterator.foreach { lines => lines.par.foreach { line => process(line) } } 

在我看来,像这样的事情是最简单的方法。

我将这个作为一个单独的答案,因为它是从根本上不同于我的最后一个(它实际上工作)

下面是使用演员的解决scheme的大纲,这基本上是金·史蒂贝尔的评论所描述的。 有两个actor类,一个FileReader actor和一个Worker actor。 工作人员都向读写器发送线路请求,并在从文件读取时并行处理线路。

我在这里使用阿卡演员,但使用另一个实现基本上是相同的想法。

 case object LineRequest case object BeginProcessing class FileReader extends Actor { //reads a single line from the file or returns None if EOF def getLine:Option[String] = ... def receive = { case LineRequest => self.sender.foreach{_ ! getLine} //sender is an Option[ActorRef] } } class Worker(reader: ActorRef) extends Actor { def process(line:String) ... def receive = { case BeginProcessing => reader ! LineRequest case Some(line) => { process(line) reader ! LineRequest } case None => self.stop } } val reader = actorOf[FileReader].start val workers = Vector.fill(4)(actorOf(new Worker(reader)).start) workers.foreach{_ ! BeginProcessing} //wait for the workers to stop... 

这样,一次不会有超过4个(或者你有多less工人)未处理的行。

丹·西蒙的回答评论让我思考。 为什么我们不尝试在stream中包装Source:

 def src(source: Source) = Stream[String] = { if (source.hasNext) Stream.cons(source.takeWhile( _ != '\n' ).mkString) else Stream.empty } 

那么你可以像这样并行地使用它:

 src(Source.fromFile(path)).par foreach process 

我试了一下,它编译和运行在任何速度。 我不确定是否将整个文件加载到内存中,但我不认为它是。

我意识到这是一个老问题,但是您可能会发现在iterata库中的ParIterator实现是一个有用的非程序集所需的实现:

 scala> import com.timgroup.iterata.ParIterator.Implicits._ scala> val it = (1 to 100000).toIterator.par().map(n => (n + 1, Thread.currentThread.getId)) scala> it.map(_._2).toSet.size res2: Int = 8 // addition was distributed over 8 threads 

下面帮我实现了

 source.getLines.toStream.par.foreach( line => println(line)) 

我们最终在我们公司编写了一个定制的解决scheme ,所以我们可以完全理解并行性。