如何在Scala中取消未来?
Java Future有cancel
方法,它可以中断运行Future
任务的线程。 例如,如果我在Java Future
封装一个可中断的阻塞调用,我可以稍后中断它。
Scala Future未提供cancel
方法。 假设我在Scala Future
封装了一个可中断的阻塞调用。 我怎么能打断它?
这还不是Future
的API的一部分,但是可以在将来作为扩展添加。
作为一种解决方法,您可以使用firstCompletedOf
来包装2期货 – 您想要取消的期货以及来自定制Promise
的未来。 那么你可以通过不履行承诺来取消这样创造的未来:
def cancellable[T](f: Future[T])(customCode: => Unit): (() => Unit, Future[T]) = { val p = Promise[T] val first = Future firstCompletedOf Seq(p.future, f) val cancellation: () => Unit = { () => first onFailure { case e => customCode} p failure new Exception } (cancellation, first) }
现在你可以在任何未来调用这个函数来获得一个“可取消封装”。 示例用例:
val f = callReturningAFuture() val (cancel, f1) = cancellable(f) { cancelTheCallReturningAFuture() } // somewhere else in code if (condition) cancel() else println(Await.result(f1))
编辑:
有关取消的详细讨论,请参阅Scala书籍中的学习并发编程的第4章。
我没有testing过这个,但是这扩展了Pablo FranciscoPérezHidalgo的答案。 而不是阻止等待Java Future
,而是使用中间Promise
。
import java.util.concurrent.{Callable, FutureTask} import scala.concurrent.{ExecutionContext, Promise} import scala.util.Try class Cancellable[T](executionContext: ExecutionContext, todo: => T) { private val promise = Promise[T]() def future = promise.future private val jf: FutureTask[T] = new FutureTask[T]( new Callable[T] { override def call(): T = todo } ) { override def done() = promise.complete(Try(get())) } def cancel(): Unit = jf.cancel(true) executionContext.execute(jf) } object Cancellable { def apply[T](todo: => T)(implicit executionContext: ExecutionContext): Cancellable[T] = new Cancellable[T](executionContext, todo) }
通过取消,我想你想猛烈打断future
。
find这段代码: https : //gist.github.com/viktorklang/5409467
做了一些testing,似乎工作正常!
请享用 :)
我认为可以通过使用Java 7 Future
接口及其实现来降低实现的复杂性。
cancel
可以build立一个Java的未来,它是取消它的cancel
方法。 另一个未来可以等待它的完成,从而成为本身在状态中不变的可观察的接口:
class Cancellable[T](executionContext: ExecutionContext, todo: => T) { private val jf: FutureTask[T] = new FutureTask[T]( new Callable[T] { override def call(): T = todo } ) executionContext.execute(jf) implicit val _: ExecutionContext = executionContext val future: Future[T] = Future { jf.get } def cancel(): Unit = jf.cancel(true) } object Cancellable { def apply[T](todo: => T)(implicit executionContext: ExecutionContext): Cancellable[T] = new Cancellable[T](executionContext, todo) }