使用Scalaz Streamparsing任务(replaceScalaz迭代器)

介绍

我在一些项目中使用了Scalaz 7的迭代,主要用于处理大文件。 我想开始转换到斯卡拉斯stream ,这是旨在取代iteratee包(坦率地说是缺less了很多件,是一种痛苦的使用)。

stream是基于机器 (iteratee思想的另一个变种),这也已经在Haskell中实现 。 我已经使用了一些Haskell机器库,但是机器和stream之间的关系并不是完全明显的(至less对我来说),stream库的文档还是有点稀疏 。

这个问题是关于一个简单的parsing任务,我希望看到实现使用stream而不是iteratees。 如果没有人能够让我知道,我会自己回答这个问题,但是我相信我并不是唯一一个正在(或者至less考虑)这个过渡的人,因为无论如何我都需要完成这个练习,我觉得我不妨公开做。

任务

假设我有一个包含句子的文件,这些句子已经被标记并且标记了词性:

no UH , , it PRP was VBD n't RB monday NNP . . the DT equity NN market NN was VBD illiquid JJ . . 

每行有一个令牌,单词和词性由一个空格分隔,空行代表句子的边界。 我想parsing这个文件并返回一个句子列表,我们可以将它们表示为string元组列表:

 List((no,UH), (,,,), (it,PRP), (was,VBD), (n't,RB), (monday,NNP), (.,.)) List((the,DT), (equity,NN), (market,NN), (was,VBD), (illiquid,JJ), (.,.) 

像往常一样,如果我们打到无效input或文件读取exception,我们希望优雅地失败,我们不希望手动closures资源。

迭代解决scheme

首先对于一些通用的文件阅读材料(这实际上应该是iteratee软件包的一部分,它目前不提供任何远程的高级function):

 import java.io.{ BufferedReader, File, FileReader } import scalaz._, Scalaz._, effect.IO import iteratee.{ Iteratee => I, _ } type ErrorOr[A] = EitherT[IO, Throwable, A] def tryIO[A, B](action: IO[B]) = I.iterateeT[A, ErrorOr, B]( EitherT(action.catchLeft).map(I.sdone(_, I.emptyInput)) ) def enumBuffered(r: => BufferedReader) = new EnumeratorT[String, ErrorOr] { lazy val reader = r def apply[A] = (s: StepT[String, ErrorOr, A]) => s.mapCont(k => tryIO(IO(Option(reader.readLine))).flatMap { case None => s.pointI case Some(line) => k(I.elInput(line)) >>== apply[A] } ) } def enumFile(f: File) = new EnumeratorT[String, ErrorOr] { def apply[A] = (s: StepT[String, ErrorOr, A]) => tryIO( IO(new BufferedReader(new FileReader(f))) ).flatMap(reader => I.iterateeT[String, ErrorOr, A]( EitherT( enumBuffered(reader).apply(s).value.run.ensuring(IO(reader.close())) ) )) } 

然后我们的句子读者:

 def sentence: IterateeT[String, ErrorOr, List[(String, String)]] = { import I._ def loop(acc: List[(String, String)])(s: Input[String]): IterateeT[String, ErrorOr, List[(String, String)]] = s( el = _.trim.split(" ") match { case Array(form, pos) => cont(loop(acc :+ (form, pos))) case Array("") => cont(done(acc, _)) case pieces => val throwable: Throwable = new Exception( "Invalid line: %s!".format(pieces.mkString(" ")) ) val error: ErrorOr[List[(String, String)]] = EitherT.left( throwable.point[IO] ) IterateeT.IterateeTMonadTrans[String].liftM(error) }, empty = cont(loop(acc)), eof = done(acc, eofInput) ) cont(loop(Nil)) } 

最后我们的parsing操作:

 val action = I.consume[List[(String, String)], ErrorOr, List] %= sentence.sequenceI &= enumFile(new File("example.txt")) 

我们可以certificate它的工作原理:

 scala> action.run.run.unsafePerformIO().foreach(_.foreach(println)) List((no,UH), (,,,), (it,PRP), (was,VBD), (n't,RB), (monday,NNP), (.,.)) List((the,DT), (equity,NN), (market,NN), (was,VBD), (illiquid,JJ), (.,.)) 

我们完成了。

我想要的是

或多或less使用Scalazstream而不是迭代实现相同的程序。

斯卡拉stream解决scheme:

 import scalaz.std.vector._ import scalaz.syntax.traverse._ import scalaz.std.string._ val action = linesR("example.txt").map(_.trim). splitOn("").flatMap(_.traverseU { s => s.split(" ") match { case Array(form, pos) => emit(form -> pos) case _ => fail(new Exception(s"Invalid input $s")) }}) 

我们可以certificate它的工作原理:

 scala> action.collect.attempt.run.foreach(_.foreach(println)) Vector((no,UH), (,,,), (it,PRP), (was,VBD), (n't,RB), (monday,NNP), (.,.)) Vector((the,DT), (equity,NN), (market,NN), (was,VBD), (illiquid,JJ), (.,.)) 

我们完成了。

traverseU函数是一个普通的Scalaz组合器。 在这种情况下,它被用来遍历Process monad中由splitOn生成的语句Vector 。 它相当于map然后是sequence

Interesting Posts