斯卡拉:列出和未来无视未来的未来

我正在寻找一种方法将任意长度的期货列表转换为列表的未来。 我正在使用Playframework,所以最终,我真正想要的是一个Future[Result] ,但为了简单起见,让我们只是说Future[List[Int]]正常的方法是使用Future.sequence(...)但有一个转折…我给出的名单通常有大约10-20期货,其中一个期货失败(他们正在提出外部networking服务请求)并不罕见。 如果其中一个失败,我们不必重试所有这些,我希望能够得到那些成功并返回的。

例如,执行以下操作不起作用

 import scala.concurrent._ import scala.concurrent.ExecutionContext.Implicits.global import scala.util.Success import scala.util.Failure val listOfFutures = Future.successful(1) :: Future.failed(new Exception("Failure")) :: Future.successful(3) :: Nil val futureOfList = Future.sequence(listOfFutures) futureOfList onComplete { case Success(x) => println("Success!!! " + x) case Failure(ex) => println("Failed !!! " + ex) } scala> Failed !!! java.lang.Exception: Failure 

我不希望得到唯一的例外,而是想把1和3拉出来。 我尝试使用Future.fold ,但显然只是在幕后调用Future.sequence

先谢谢您的帮助!

诀窍是首先确保没有任何一个期货失败。 .recover在这里是你的朋友,你可以把它和map结合起来,把所有Future[T]结果转换成Future[Try[T]]]实例,所有这些都是一定成功的。

注意:您也可以在这里使用OptionEither ,但是如果您特别想要捕获exception, Try是最干净的方法

 def futureToFutureTry[T](f: Future[T]): Future[Try[T]] = f.map(Success(_)).recover(x => Failure(x)) val listOfFutures = ... val listOfFutureTrys = listOfFutures.map(futureToFutureTry(_)) 

然后像以前一样使用Future.sequence ,给你一个Future[List[Try[T]]]

 val futureListOfTrys = Future.sequence(listOfFutureTrys) 

然后过滤:

 val futureListOfSuccesses = futureListOfTrys.map(_.filter(_.isSuccess)) 

如果你需要的话,你甚至可以提出具体的失败:

 val futureListOfFailures = futureListOfTrys.map(_.filter(_.isFailure)) 

我试着凯文的答案,我遇到了我的版本的斯卡拉(2.11.5)的一个小故障…我纠正了这一点,并写了一些额外的testing,如果有人感兴趣…这是我的版本>

 implicit class FutureCompanionOps(val f: Future.type) extends AnyVal { /** Given a list of futures `fs`, returns the future holding the list of Try's of the futures from `fs`. * The returned future is completed only once all of the futures in `fs` have been completed. */ def allAsTrys[T](fItems: /* future items */ List[Future[T]]): Future[List[Try[T]]] = { val listOfFutureTrys: List[Future[Try[T]]] = fItems.map(futureToFutureTry) Future.sequence(listOfFutureTrys) } def futureToFutureTry[T](f: Future[T]): Future[Try[T]] = { f.map(Success(_)) .recover({case x => Failure(x)}) } def allFailedAsTrys[T](fItems: /* future items */ List[Future[T]]): Future[List[Try[T]]] = { allAsTrys(fItems).map(_.filter(_.isFailure)) } def allSucceededAsTrys[T](fItems: /* future items */ List[Future[T]]): Future[List[Try[T]]] = { allAsTrys(fItems).map(_.filter(_.isSuccess)) } } // Tests... // allAsTrys tests // test("futureToFutureTry returns Success if no exception") { val future = Future.futureToFutureTry(Future{"mouse"}) Thread.sleep(0, 100) val futureValue = future.value assert(futureValue == Some(Success(Success("mouse")))) } test("futureToFutureTry returns Failure if exception thrown") { val future = Future.futureToFutureTry(Future{throw new IllegalStateException("bad news")}) Thread.sleep(5) // need to sleep a LOT longer to get Exception from failure case... interesting..... val futureValue = future.value assertResult(true) { futureValue match { case Some(Success(Failure(error: IllegalStateException))) => true } } } test("Future.allAsTrys returns Nil given Nil list as input") { val future = Future.allAsTrys(Nil) assert ( Await.result(future, 100 nanosecond).isEmpty ) } test("Future.allAsTrys returns successful item even if preceded by failing item") { val future1 = Future{throw new IllegalStateException("bad news")} var future2 = Future{"dog"} val futureListOfTrys = Future.allAsTrys(List(future1,future2)) val listOfTrys = Await.result(futureListOfTrys, 10 milli) System.out.println("successItem:" + listOfTrys); assert(listOfTrys(0).failed.get.getMessage.contains("bad news")) assert(listOfTrys(1) == Success("dog")) } test("Future.allAsTrys returns successful item even if followed by failing item") { var future1 = Future{"dog"} val future2 = Future{throw new IllegalStateException("bad news")} val futureListOfTrys = Future.allAsTrys(List(future1,future2)) val listOfTrys = Await.result(futureListOfTrys, 10 milli) System.out.println("successItem:" + listOfTrys); assert(listOfTrys(1).failed.get.getMessage.contains("bad news")) assert(listOfTrys(0) == Success("dog")) } test("Future.allFailedAsTrys returns the failed item and only that item") { var future1 = Future{"dog"} val future2 = Future{throw new IllegalStateException("bad news")} val futureListOfTrys = Future.allFailedAsTrys(List(future1,future2)) val listOfTrys = Await.result(futureListOfTrys, 10 milli) assert(listOfTrys(0).failed.get.getMessage.contains("bad news")) assert(listOfTrys.size == 1) } test("Future.allSucceededAsTrys returns the succeeded item and only that item") { var future1 = Future{"dog"} val future2 = Future{throw new IllegalStateException("bad news")} val futureListOfTrys = Future.allSucceededAsTrys(List(future1,future2)) val listOfTrys = Await.result(futureListOfTrys, 10 milli) assert(listOfTrys(0) == Success("dog")) assert(listOfTrys.size == 1) } 

我刚刚遇到了这个问题,并有另一个解决scheme提供:

 def allSuccessful[A, M[X] <: TraversableOnce[X]](in: M[Future[A]]) (implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], executor: ExecutionContext): Future[M[A]] = { in.foldLeft(Future.successful(cbf(in))) { (fr, fa) ⇒ (for (r ← fr; a ← fa) yield r += a) fallbackTo fr } map (_.result()) } 

这里的想法是,你在等待下一个元素在列表中完成(使用for-comprehension语法),如果下一个失败,你只是回退到你已经有的东西。