合并Observables列表,并等到所有完成
TL; DR如何将Task.whenAll(List<Task>)
转换为RxJava
?
我现有的代码使用Bolts来build立一个asynchronous任务的列表,并等到所有这些任务完成之后再执行其他步骤。 本质上,它build立一个List<Task>
并返回一个单独的Task
,当完成列表中的所有任务时,按照Bolts站点上的例子,这个 Task
被标记为已完成。
我期待用RxJava
取代Bolts
,我假设这种方法build立一个asynchronous任务列表(大小不是事先知道的),并将它们全部包装成一个单一的Observable
是可能的,但我不知道如何。
我试着看着merge
, zip
, concat
等等……但不能在List<Observable>
,我会build立起来,因为他们似乎一度只能在两个Observables
上工作,如果我正确理解文档。
我正在努力学习RxJava
并且RxJava
还是很新的,所以请原谅我,如果这是一个明显的问题或者在文档中解释的话; 我试过search。 任何帮助将非常感激。
这听起来像你正在寻找Zip运算符 。
有几种不同的使用方法,让我们来看一个例子。 假设我们有一些简单的不同types的观测值:
Observable<Integer> obs1 = Observable.just(1); Observable<String> obs2 = Observable.just("Blah"); Observable<Boolean> obs3 = Observable.just(true);
等待它们的最简单的方法是这样的:
Observable.zip(obs1, obs2, obs3, (Integer i, String s, Boolean b) -> i + " " + s + " " + b) .subscribe(str -> System.out.println(str));
请注意,在zip函数中,这些参数具有与要压缩的观测值的types相对应的具体types。
压缩可观测量列表也是可能的,或者直接:
List<Observable<?>> obsList = Arrays.asList(obs1, obs2, obs3); Observable.zip(obsList, (i) -> i[0] + " " + i[1] + " " + i[2]) .subscribe(str -> System.out.println(str));
…或通过将列表包装到Observable<Observable<?>>
:
Observable<Observable<?>> obsObs = Observable.from(obsList); Observable.zip(obsObs, (i) -> i[0] + " " + i[1] + " " + i[2]) .subscribe(str -> System.out.println(str));
然而,在这两种情况下,zip函数只能接受一个Object[]
参数,因为列表中的可观察types不能预先知道它们的types以及它们的编号。 这意味着,zip函数将不得不检查参数的数量并相应地转换它们。
无论如何,上述所有例子最终都会打印出1 Blah true
编辑:当使用Zip,请确保被压缩的Observables
都发出相同数量的项目。 在上面的例子中,所有三个可观察的事件都发出了一个单独的项 如果我们要改变他们这样的事情:
Observable<Integer> obs1 = Observable.from(new Integer[]{1,2,3}); //Emits three items Observable<String> obs2 = Observable.from(new String[]{"Blah","Hello"}); //Emits two items Observable<Boolean> obs3 = Observable.from(new Boolean[]{true,true}); //Emits two items
然后1, Blah, True
和2, Hello, True
将是唯一传递给zip函数的项目。 因为其他可观测量已经完成,所以项目3
不会被压缩。
如果你有dynamic的任务组合,你可以使用flatMap
。 像这样的东西:
public Observable<Boolean> whenAll(List<Observable<Boolean>> tasks) { return Observable.from(tasks) //execute in parallel .flatMap(task -> task.observeOn(Schedulers.computation())) //wait, until all task are executed //be aware, all your observable should emit onComplemete event //otherwise you will wait forever .toList() //could implement more intelligent logic. eg. check that everything is successful .map(results -> true); }
并行执行的另一个很好的例子
注意:我不太了解error handling的要求。 例如,如果只有一个任务失败,该怎么办。 我认为你应该validation这种情况。
你可能看过与2个Observable一起使用的zip
操作符。
还有静态方法Observable.zip
。 它有一个应该对你有用的forms:
zip(java.lang.Iterable<? extends Observable<?>> ws, FuncN<? extends R> zipFunction)
你可以查看更多的javadoc。
我正在使用JavaRx Observables和RxKotlin在Kotlin中编写一些计算heave代码。 我想观察一系列可观测的事件,同时给我一个最新的进展和最新的结果。 最后它返回最好的计算结果。 一个额外的要求是并行运行Observables使用我所有的CPU核心。 我结束了这个解决scheme:
@Volatile var results: MutableList<CalculationResult> = mutableListOf() fun doALotOfCalculations(listOfCalculations: List<Calculation>): Observable<Pair<String, CalculationResult>> { return Observable.create { subscriber -> Observable.concatEager(listOfCalculations.map { calculation: Calculation -> doCalculation(calculation).subscribeOn(Schedulers.computation()) // function doCalculation returns an Observable with only one result }).subscribeBy( onNext = { results.add(it) subscriber.onNext(Pair("A calculation is ready", it)) }, onComplete = { subscriber.onNext(Pair("Finished: ${results.size}", findBestCalculation(results)) subscriber.onComplete() }, onError = { subscriber.onError(it) } ) } }