斯卡拉期货 – 内置超时?

从官方的教程参考文献中我并不完全了解未来的一个方面。 http://docs.scala-lang.org/overviews/core/futures.html

scala中的期货是否有某种内置的超时机制? 假设下面的例子是一个5千兆字节的文本文件…“Implicits.global”的隐含范围最终会导致onFailure以非阻塞方式触发,还是可以定义? 没有某种默认的暂停,这是不是意味着成功或失败都不可能发生?

import scala.concurrent._ import ExecutionContext.Implicits.global val firstOccurence: Future[Int] = future { val source = scala.io.Source.fromFile("myText.txt") source.toSeq.indexOfSlice("myKeyword") } firstOccurence onSuccess { case idx => println("The keyword first appears at position: " + idx) } firstOccurence onFailure { case t => println("Could not process file: " + t.getMessage) } 

当您使用阻塞来获取Future的结果时,您只会收到超时行为。 如果你想使用非阻塞callbackonCompleteonSuccessonFailure ,那么你将不得不推出自己的超时处理。 Akka已经为参与者之间的请求/响应( ? )消息传递build立了超时处理,但不确定是否要开始使用Akka。 在Akka的FWIW中,为了超时处理,他们通过Future.firstCompletedOf组成两个Futures ,一个代表实际的asynchronous任务,一个代表超时。 如果超时定时器(通过HashedWheelTimer )首先popup,则asynchronouscallback失败。

滚动你自己的一个非常简单的例子可能会像这样。 首先,调度超时的对象:

 import org.jboss.netty.util.{HashedWheelTimer, TimerTask, Timeout} import java.util.concurrent.TimeUnit import scala.concurrent.duration.Duration import scala.concurrent.Promise import java.util.concurrent.TimeoutException object TimeoutScheduler{ val timer = new HashedWheelTimer(10, TimeUnit.MILLISECONDS) def scheduleTimeout(promise:Promise[_], after:Duration) = { timer.newTimeout(new TimerTask{ def run(timeout:Timeout){ promise.failure(new TimeoutException("Operation timed out after " + after.toMillis + " millis")) } }, after.toNanos, TimeUnit.NANOSECONDS) } } 

然后是一个函数来获取一个Future,并为其添加超时行为:

 import scala.concurrent.{Future, ExecutionContext, Promise} import scala.concurrent.duration.Duration def withTimeout[T](fut:Future[T])(implicit ec:ExecutionContext, after:Duration) = { val prom = Promise[T]() val timeout = TimeoutScheduler.scheduleTimeout(prom, after) val combinedFut = Future.firstCompletedOf(List(fut, prom.future)) fut onComplete{case result => timeout.cancel()} combinedFut } 

请注意,我在这里使用的HashedWheelTimer来自Netty。

我刚刚为同事创build了一个TimeoutFuture类:

TimeoutFuture

 package model import scala.concurrent._ import scala.concurrent.duration._ import play.libs.Akka import play.api.libs.concurrent.Execution.Implicits._ object TimeoutFuture { def apply[A](timeout: FiniteDuration)(block: => A): Future[A] = { val prom = promise[A] // timeout logic Akka.system.scheduler.scheduleOnce(timeout) { prom tryFailure new java.util.concurrent.TimeoutException } // business logic Future { prom success block } prom.future } } 

用法

 val future = TimeoutFuture(10 seconds) { // do stuff here } future onComplete { case Success(stuff) => // use "stuff" case Failure(exception) => // catch exception (either TimeoutException or an exception inside the given block) } 

笔记:

  • 假设玩! 框架(但很容易适应)
  • 每一段代码都运行在相同的ExecutionContext ,这可能并不理想。

所有这些答案都需要额外的依赖关系。 我决定写一个使用java.util.Timer的版本,这是一个在将来运行一个函数的有效方法,在这种情况下会触发一个超时。

博客文章在这里有更多的细节

在Scala的Promise中使用这个,我们可以通过如下方式创build一个Future:

 package justinhj.concurrency import java.util.concurrent.TimeoutException import java.util.{Timer, TimerTask} import scala.concurrent.duration.FiniteDuration import scala.concurrent.{ExecutionContext, Future, Promise} import scala.language.postfixOps object FutureUtil { // All Future's that use futureWithTimeout will use the same Timer object // it is thread safe and scales to thousands of active timers // The true parameter ensures that timeout timers are daemon threads and do not stop // the program from shutting down val timer: Timer = new Timer(true) /** * Returns the result of the provided future within the given time or a timeout exception, whichever is first * This uses Java Timer which runs a single thread to handle all futureWithTimeouts and does not block like a * Thread.sleep would * @param future Caller passes a future to execute * @param timeout Time before we return a Timeout exception instead of future's outcome * @return Future[T] */ def futureWithTimeout[T](future : Future[T], timeout : FiniteDuration)(implicit ec: ExecutionContext): Future[T] = { // Promise will be fulfilled with either the callers Future or the timer task if it times out var p = Promise[T] // and a Timer task to handle timing out val timerTask = new TimerTask() { def run() : Unit = { p.tryFailure(new TimeoutException()) } } // Set the timeout to check in the future timer.schedule(timerTask, timeout.toMillis) future.map { a => if(p.trySuccess(a)) { timerTask.cancel() } } .recover { case e: Exception => if(p.tryFailure(e)) { timerTask.cancel() } } p.future } } 

Play框架包含Promise.timeout,因此您可以编写如下代码

 private def get(): Future[Option[Boolean]] = { val timeoutFuture = Promise.timeout(None, Duration("1s")) val mayBeHaveData = Future{ // do something Some(true) } // if timeout occurred then None will be result of method Future.firstCompletedOf(List(mayBeHaveData, timeoutFuture)) } 

您可以在等待未来时指定超时时间:

对于scala.concurrent.Futureresult方法允许您指定超时。

对于scala.actors.FutureFutures.awaitAll让你指定一个超时。

我不认为在Future的执行中有一个超时。

我很惊讶这是不是在斯卡拉标准。 我的版本很短,没有依赖关系

 import scala.concurrent.Future sealed class TimeoutException extends RuntimeException object FutureTimeout { import scala.concurrent.ExecutionContext.Implicits.global implicit class FutureTimeoutLike[T](f: Future[T]) { def withTimeout(ms: Long): Future[T] = Future.firstCompletedOf(List(f, Future { Thread.sleep(ms) throw new TimeoutException })) lazy val withTimeout: Future[T] = withTimeout(2000) // default 2s timeout } } 

用法示例

 import FutureTimeout._ Future { /* do smth */ } withTimeout 

没有人提到akka-streams 。 这些stream程有一个简单的completionTimeout方法,并且将其应用于单一源码stream就像Future。

但是,阿卡stream也会取消,所以它实际上可以结束源运行,即它将超时信号发送给源。

如果您希望作者(promise holder)是控制超时逻辑的人,请使用akka.pattern.after ,方法如下:

 val timeout = akka.pattern.after(10 seconds, system.scheduler)(Future.failed(new TimeoutException(s"timed out during..."))) Future.firstCompletedOf(Seq(promiseRef.future, timeout)) 

这样,如果您的承诺完成逻辑从不发生,您的呼叫者的未来仍然会在某个失败点完成。

  • 学习scala的小型和好的scala项目 – 尤其是函数式编程和types系统
  • 编辑器不包含主types
  • 通过按键Spark – 一个Spark作业写入多个输出
  • 在Play2 Scala模板中声明variables
  • 如何在Scala中创build和使用multidimensional array?
  • 阿卡在演员外面login
  • 当case类包含一个scala枚举时,如何使用带有MongoCaseClassField的Rogue更新mongologging
  • 函数式编程 – 很多重点recursion,为什么?
  • 斯卡拉咖喱与部分应用的function
  • 有固定容量和自定义比较器的PriorityQueue实现吗?
  • 计算列表的移动平均数