斯卡拉期货 – 内置超时?
从官方的教程参考文献中我并不完全了解未来的一个方面。 http://docs.scala-lang.org/overviews/core/futures.html
scala中的期货是否有某种内置的超时机制? 假设下面的例子是一个5千兆字节的文本文件…“Implicits.global”的隐含范围最终会导致onFailure以非阻塞方式触发,还是可以定义? 没有某种默认的暂停,这是不是意味着成功或失败都不可能发生?
import scala.concurrent._ import ExecutionContext.Implicits.global val firstOccurence: Future[Int] = future { val source = scala.io.Source.fromFile("myText.txt") source.toSeq.indexOfSlice("myKeyword") } firstOccurence onSuccess { case idx => println("The keyword first appears at position: " + idx) } firstOccurence onFailure { case t => println("Could not process file: " + t.getMessage) }
当您使用阻塞来获取Future
的结果时,您只会收到超时行为。 如果你想使用非阻塞callbackonComplete
, onSuccess
或onFailure
,那么你将不得不推出自己的超时处理。 Akka已经为参与者之间的请求/响应( ?
)消息传递build立了超时处理,但不确定是否要开始使用Akka。 在Akka的FWIW中,为了超时处理,他们通过Future.firstCompletedOf
组成两个Futures
,一个代表实际的asynchronous任务,一个代表超时。 如果超时定时器(通过HashedWheelTimer
)首先popup,则asynchronouscallback失败。
滚动你自己的一个非常简单的例子可能会像这样。 首先,调度超时的对象:
import org.jboss.netty.util.{HashedWheelTimer, TimerTask, Timeout} import java.util.concurrent.TimeUnit import scala.concurrent.duration.Duration import scala.concurrent.Promise import java.util.concurrent.TimeoutException object TimeoutScheduler{ val timer = new HashedWheelTimer(10, TimeUnit.MILLISECONDS) def scheduleTimeout(promise:Promise[_], after:Duration) = { timer.newTimeout(new TimerTask{ def run(timeout:Timeout){ promise.failure(new TimeoutException("Operation timed out after " + after.toMillis + " millis")) } }, after.toNanos, TimeUnit.NANOSECONDS) } }
然后是一个函数来获取一个Future,并为其添加超时行为:
import scala.concurrent.{Future, ExecutionContext, Promise} import scala.concurrent.duration.Duration def withTimeout[T](fut:Future[T])(implicit ec:ExecutionContext, after:Duration) = { val prom = Promise[T]() val timeout = TimeoutScheduler.scheduleTimeout(prom, after) val combinedFut = Future.firstCompletedOf(List(fut, prom.future)) fut onComplete{case result => timeout.cancel()} combinedFut }
请注意,我在这里使用的HashedWheelTimer
来自Netty。
我刚刚为同事创build了一个TimeoutFuture
类:
TimeoutFuture
package model import scala.concurrent._ import scala.concurrent.duration._ import play.libs.Akka import play.api.libs.concurrent.Execution.Implicits._ object TimeoutFuture { def apply[A](timeout: FiniteDuration)(block: => A): Future[A] = { val prom = promise[A] // timeout logic Akka.system.scheduler.scheduleOnce(timeout) { prom tryFailure new java.util.concurrent.TimeoutException } // business logic Future { prom success block } prom.future } }
用法
val future = TimeoutFuture(10 seconds) { // do stuff here } future onComplete { case Success(stuff) => // use "stuff" case Failure(exception) => // catch exception (either TimeoutException or an exception inside the given block) }
笔记:
- 假设玩! 框架(但很容易适应)
- 每一段代码都运行在相同的
ExecutionContext
,这可能并不理想。
所有这些答案都需要额外的依赖关系。 我决定写一个使用java.util.Timer的版本,这是一个在将来运行一个函数的有效方法,在这种情况下会触发一个超时。
博客文章在这里有更多的细节
在Scala的Promise中使用这个,我们可以通过如下方式创build一个Future:
package justinhj.concurrency import java.util.concurrent.TimeoutException import java.util.{Timer, TimerTask} import scala.concurrent.duration.FiniteDuration import scala.concurrent.{ExecutionContext, Future, Promise} import scala.language.postfixOps object FutureUtil { // All Future's that use futureWithTimeout will use the same Timer object // it is thread safe and scales to thousands of active timers // The true parameter ensures that timeout timers are daemon threads and do not stop // the program from shutting down val timer: Timer = new Timer(true) /** * Returns the result of the provided future within the given time or a timeout exception, whichever is first * This uses Java Timer which runs a single thread to handle all futureWithTimeouts and does not block like a * Thread.sleep would * @param future Caller passes a future to execute * @param timeout Time before we return a Timeout exception instead of future's outcome * @return Future[T] */ def futureWithTimeout[T](future : Future[T], timeout : FiniteDuration)(implicit ec: ExecutionContext): Future[T] = { // Promise will be fulfilled with either the callers Future or the timer task if it times out var p = Promise[T] // and a Timer task to handle timing out val timerTask = new TimerTask() { def run() : Unit = { p.tryFailure(new TimeoutException()) } } // Set the timeout to check in the future timer.schedule(timerTask, timeout.toMillis) future.map { a => if(p.trySuccess(a)) { timerTask.cancel() } } .recover { case e: Exception => if(p.tryFailure(e)) { timerTask.cancel() } } p.future } }
Play框架包含Promise.timeout,因此您可以编写如下代码
private def get(): Future[Option[Boolean]] = { val timeoutFuture = Promise.timeout(None, Duration("1s")) val mayBeHaveData = Future{ // do something Some(true) } // if timeout occurred then None will be result of method Future.firstCompletedOf(List(mayBeHaveData, timeoutFuture)) }
您可以在等待未来时指定超时时间:
对于scala.concurrent.Future
, result
方法允许您指定超时。
对于scala.actors.Future
, Futures.awaitAll
让你指定一个超时。
我不认为在Future的执行中有一个超时。
我很惊讶这是不是在斯卡拉标准。 我的版本很短,没有依赖关系
import scala.concurrent.Future sealed class TimeoutException extends RuntimeException object FutureTimeout { import scala.concurrent.ExecutionContext.Implicits.global implicit class FutureTimeoutLike[T](f: Future[T]) { def withTimeout(ms: Long): Future[T] = Future.firstCompletedOf(List(f, Future { Thread.sleep(ms) throw new TimeoutException })) lazy val withTimeout: Future[T] = withTimeout(2000) // default 2s timeout } }
用法示例
import FutureTimeout._ Future { /* do smth */ } withTimeout
没有人提到akka-streams
。 这些stream程有一个简单的completionTimeout
方法,并且将其应用于单一源码stream就像Future。
但是,阿卡stream也会取消,所以它实际上可以结束源运行,即它将超时信号发送给源。
如果您希望作者(promise holder)是控制超时逻辑的人,请使用akka.pattern.after ,方法如下:
val timeout = akka.pattern.after(10 seconds, system.scheduler)(Future.failed(new TimeoutException(s"timed out during..."))) Future.firstCompletedOf(Seq(promiseRef.future, timeout))
这样,如果您的承诺完成逻辑从不发生,您的呼叫者的未来仍然会在某个失败点完成。