合并Observables列表,并等到所有完成

TL; DR如何将Task.whenAll(List<Task>)转换为RxJava

我现有的代码使用Bolts来build立一个asynchronous任务的列表,并等到所有这些任务完成之后再执行其他步骤。 本质上,它build立一个List<Task>并返回一个单独的Task ,当完成列表中的所有任务时,按照Bolts站点上的例子,这个 Task被标记为已完成。

我期待用RxJava取代Bolts ,我假设这种方法build立一个asynchronous任务列表(大小不是事先知道的),并将它们全部包装成一个单一的Observable是可能的,但我不知道如何。

我试着看着mergezipconcat等等……但不能在List<Observable> ,我会build立起来,因为他们似乎一度只能在两个Observables上工作,如果我正确理解文档。

我正在努力学习RxJava并且RxJava还是很新的,所以请原谅我,如果这是一个明显的问题或者在文档中解释的话; 我试过search。 任何帮助将非常感激。

这听起来像你正在寻找Zip运算符 。

有几种不同的使用方法,让我们来看一个例子。 假设我们有一些简单的不同types的观测值:

 Observable<Integer> obs1 = Observable.just(1); Observable<String> obs2 = Observable.just("Blah"); Observable<Boolean> obs3 = Observable.just(true); 

等待它们的最简单的方法是这样的:

 Observable.zip(obs1, obs2, obs3, (Integer i, String s, Boolean b) -> i + " " + s + " " + b) .subscribe(str -> System.out.println(str)); 

请注意,在zip函数中,这些参数具有与要压缩的观测值的types相对应的具体types。

压缩可观测量列表也是可能的,或者直接:

 List<Observable<?>> obsList = Arrays.asList(obs1, obs2, obs3); Observable.zip(obsList, (i) -> i[0] + " " + i[1] + " " + i[2]) .subscribe(str -> System.out.println(str)); 

…或通过将列表包装到Observable<Observable<?>>

 Observable<Observable<?>> obsObs = Observable.from(obsList); Observable.zip(obsObs, (i) -> i[0] + " " + i[1] + " " + i[2]) .subscribe(str -> System.out.println(str)); 

然而,在这两种情况下,zip函数只能接受一个Object[]参数,因为列表中的可观察types不能预先知道它们的types以及它们的编号。 这意味着,zip函数将不得不检查参数的数量并相应地转换它们。

无论如何,上述所有例子最终都会打印出1 Blah true

编辑:当使用Zip,请确保被压缩的Observables都发出相同数量的项目。 在上面的例子中,所有三个可观察的事件都发出了一个单独的项 如果我们要改变他们这样的事情:

 Observable<Integer> obs1 = Observable.from(new Integer[]{1,2,3}); //Emits three items Observable<String> obs2 = Observable.from(new String[]{"Blah","Hello"}); //Emits two items Observable<Boolean> obs3 = Observable.from(new Boolean[]{true,true}); //Emits two items 

然后1, Blah, True2, Hello, True将是唯一传递给zip函数的项目。 因为其他可观测量已经完成,所以项目3不会被压缩。

如果你有dynamic的任务组合,你可以使用flatMap 。 像这样的东西:

 public Observable<Boolean> whenAll(List<Observable<Boolean>> tasks) { return Observable.from(tasks) //execute in parallel .flatMap(task -> task.observeOn(Schedulers.computation())) //wait, until all task are executed //be aware, all your observable should emit onComplemete event //otherwise you will wait forever .toList() //could implement more intelligent logic. eg. check that everything is successful .map(results -> true); } 

并行执行的另一个很好的例子

注意:我不太了解error handling的要求。 例如,如果只有一个任务失败,该怎么办。 我认为你应该validation这种情况。

你可能看过与2个Observable一起使用的zip操作符。

还有静态方法Observable.zip 。 它有一个应该对你有用的forms:

 zip(java.lang.Iterable<? extends Observable<?>> ws, FuncN<? extends R> zipFunction) 

你可以查看更多的javadoc。

我正在使用JavaRx Observables和RxKotlin在Kotlin中编写一些计算heave代码。 我想观察一系列可观测的事件,同时给我一个最新的进展和最新的结果。 最后它返回最好的计算结果。 一个额外的要求是并行运行Observables使用我所有的CPU核心。 我结束了这个解决scheme:

 @Volatile var results: MutableList<CalculationResult> = mutableListOf() fun doALotOfCalculations(listOfCalculations: List<Calculation>): Observable<Pair<String, CalculationResult>> { return Observable.create { subscriber -> Observable.concatEager(listOfCalculations.map { calculation: Calculation -> doCalculation(calculation).subscribeOn(Schedulers.computation()) // function doCalculation returns an Observable with only one result }).subscribeBy( onNext = { results.add(it) subscriber.onNext(Pair("A calculation is ready", it)) }, onComplete = { subscriber.onNext(Pair("Finished: ${results.size}", findBestCalculation(results)) subscriber.onComplete() }, onError = { subscriber.onError(it) } ) } }