Tag: rx java

为什么RxJava经常与改造一起使用?

与Rxjava结合使用Retrofit有什么好处?

将Observable <List <Car >>转换为RxJava中的Observable <Car>序列

给定的汽车列表(汽车List<Car> cars ),我可以这样做: Observable.just(cars); //returns an Observable that emits one List<Car> Observable.from(cars); //returns an Observable that emits a squence of Car 有没有办法从一个List<Car>的Observable<Car>到一个Observable<Car>的序列? 像没有参数的东西 Obserable.just(cars).from()

合并Observables列表,并等到所有完成

TL; DR如何将Task.whenAll(List<Task>)转换为RxJava ? 我现有的代码使用Bolts来build立一个asynchronous任务的列表,并等到所有这些任务完成之后再执行其他步骤。 本质上,它build立一个List<Task>并返回一个单独的Task ,当完成列表中的所有任务时,按照Bolts站点上的例子,这个 Task被标记为已完成。 我期待用RxJava取代Bolts ,我假设这种方法build立一个asynchronous任务列表(大小不是事先知道的),并将它们全部包装成一个单一的Observable是可能的,但我不知道如何。 我试着看着merge , zip , concat等等……但不能在List<Observable> ,我会build立起来,因为他们似乎一度只能在两个Observables上工作,如果我正确理解文档。 我正在努力学习RxJava并且RxJava还是很新的,所以请原谅我,如果这是一个明显的问题或者在文档中解释的话; 我试过search。 任何帮助将非常感激。

在RxJava中,如何在链接可观察的时候传递一个variables?

我使用RxJava链接asynchronous操作,并且想要向下游传递一些variables: Observable .from(modifications) .flatmap( (data1) -> { return op1(data1); }) … .flatmap( (data2) -> { // How to access data1 here ? return op2(data2); }) 这似乎是一个普遍的模式,但我无法find有关它的信息。

如果对RxJava主题进行onComplete调用,是否需要再次手动取消订阅?

我在我的片段中使用RxJava ReplaySubject 。 我试图以某种方式使用ReplaySubject,在那里我希望主体执行一个进程直到完成(可能超出片段的生命期)。 在完成这个过程之后,我想释放资源,据我所知,在注册观察者(在我的情况下,是主题本身)时,退订了这个资源。 在这个github的问题线程 @benjchristensen说: 如果它是一个Observable,那么它应该发出一个onCompleted并完成。 如果它是一个观察者,那么当它调用Observable.subscribe时,它应该取消订阅它接收到的订阅,它将使Observable有机会closures和清理。 如果它是一个主体 – 既是观察者又是观察者 – 这是什么行为? 如果我打电话onComplete关于这个问题,这基本上是否意味着订阅被停止,我因此无需手动取消订阅我通过注册观察员得到的订阅?

可观察vs可stream动rxJava2

我一直在寻找新的Rx的Java 2,我不太确定我理解backpressure的想法了… 我知道,我们有Observable没有backpressure支持和Flowable有它。 所以基于实例,可以说我有flowable与interval : Flowable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { // do smth } }); 这将会在128个值后崩溃,这很明显,我比消费项目慢。 但是我们跟Observable Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { // do smth } }); 即使我在延迟使用它仍然有效时,这一切都不会崩溃。 为了使onBackpressureDrop工作可以说我把onBackpressureDrop操作符,崩溃消失,但并不是所有的价值都发出。 所以我现在头脑中找不到答案的基本问题是为什么我应该关心backpressure时候我可以使用普通的Observable仍然接收所有的值而不pipe理buffer ? 或者从另一方面来说, backpressure什么好处可以帮助我pipe理和处理消费?

使用Retrofit 2.0和RxJava获取响应状态代码

我正在尝试升级到Retrofit 2.0,并在我的Android项目中添加RxJava。 我正在做一个API调用,并希望检索错误代码的情况下,从服务器的错误响应。 Observable<MyResponseObject> apiCall(@Body body); 在RxJava调用中: myRetrofitObject.apiCall(body).subscribe(new Subscriber<MyResponseObject>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(MyResponseObject myResponseObject) { //On response from server } }); 在1.9版本中,RetrofitError依然存在,我们可以通过以下方式获得状态: error.getResponse().getStatus() 如何使用RxJava进行Retrofit 2.0操作?

观察者和订阅者有什么区别?

我试图破译以下function: Subscription getCar(id, Observer<Car> observer) { return getCarDetails(id, new Observer<CarDetails> { @Override onNext(CarDetails details) { observer.onNext(details.getCar()); } }); } 我从http://blog.danlew.net/2014/09/15/grokking-rxjava-part-1/获得了一个很好的rxjava入门介绍,但是它只是在提到Observer时说过,你将会使用Subscriber的大部分时间消费者从Observable发射的物品。 有人可以向我解释 什么是观察员? 观察者与订阅者有什么不同? 上面的代码片断是做什么的? Javadoc使它看起来就像一个用户。 用户的javadoc表示它实现观察者和订阅。 我很困扰。

RxJava并行获取可观测数据

在RxJava中实现并行asynchronous调用需要一些帮助。 我select了一个简单的用例,其中FIRST调用获取(而不是search)要显示的产品列表(Tile)。 随后的呼叫出去并取(A)REVIEWS和(B)产品图像 经过几次尝试,我到了这个地方。 1 Observable<Tile> searchTile = searchServiceClient.getSearchResults(searchTerm); 2 List<Tile> allTiles = new ArrayList<Tile>(); 3 ClientResponse response = new ClientResponse(); 4 searchTile.parallel(oTile -> { 5 return oTile.flatMap(t -> { 6 Observable<Reviews> reviews = reviewsServiceClient.getSellerReviews(t.getSellerId()); 7 Observable<String> imageUrl = reviewsServiceClient.getProductImage(t.getProductId()); 8 return Observable.zip(reviews, imageUrl, (r, u) -> { 9 t.setReviews(r); 10 t.setImageUrl(u); 11 return t; […]

CompletableFuture,Future和RxJava的Observable之间的区别

我想知道CompletableFuture , Future和Observable RxJava之间的区别。 我所知道的都是asynchronous的,但是 Future.get()阻塞线程 CompletableFuture给出了callback方法 RxJava Observable —与CompletableFuture类似,具有其他好处(不确定) 例如:如果客户端需要进行多个服务调用,并且当我们使用Futures (Java)时, Future.get()将会按顺序执行…想知道RxJava如何更好。 而文档http://reactivex.io/intro.html说 使用Futures来优化组合有条件的asynchronous执行stream是困难的(或者不可能,因为每个请求的延迟在运行时间上是不同的)。 这当然可以做到,但是它很快变得复杂(并且因此容易出错),或者它过早地阻塞了Future.get(),这消除了asynchronous执行的好处。 真的有兴趣知道如何RxJava解决这个问题。我发现很难从文档中理解。请帮助!