Java 8stream和RxJava可观察对象之间的区别
Java 8stream类似于RxJava observables?
Java 8stream的定义:
新的
java.util.stream
包中的类提供了一个Stream API来支持元素stream上的函数式操作。
TL; DR :所有的序列/stream处理库都提供了非常相似的API用于pipe道build设。 API的差异在于处理multithreading和pipe道组合。
RxJava与Stream完全不同。 在所有JDK中,最接近rx.Observable的可能是java.util.stream.Collector Stream + CompletableFuture组合(这是以处理额外monad层为代价的,也就是必须处理Stream<CompletableFuture<T>>
和CompletableFuture<Stream<T>>
)。
Observable和Stream之间有显着的区别:
- stream是基于拉的,观察者是基于推的。 这听起来可能太抽象了,但是具有非常具体的重大后果。
- Stream只能使用一次,Observable可以多次订阅
-
Stream#parallel()
将序列分割成多个分区,Observable#subscribeOn()
和Observable#observeOn()
不要; 使用Observable模拟Stream#parallel()
行为是非常棘手的,它曾经有过.parallel()
方法,但是这个方法引起了如此多的混淆,.parallel()
支持被移动到github上的独立版本库RxJavaParallel上。 更多细节在另一个答案 。 -
Stream#parallel()
不允许指定要使用的线程池,这与大多数接受可选Scheduler的RxJava方法不同。 由于JVM中的所有stream实例都使用相同的fork-join池,因此添加.parallel()
会意外地影响程序中另一个模块的行为 - Streams缺乏时间相关的操作,如
Observable#interval()
,Observable#window()
等等。 这主要是因为Streams是基于拉的 - Streams缺less切断操作(
takeWhile()
,takeUntil()
); 使用Stream#anyMatch()
解决方法是有限的:它是terminal操作,所以你不能每个stream使用一次 - 从JDK 8开始,不存在Stream#zip操作,有时这非常有用
-
stream是很难自己构build的,Observable可以通过多种方式构build编辑:正如评论中指出的,有构buildstream的方法。 但是,由于不存在非terminal短路,所以不能轻易地在文件中生成行stream(JDK提供了Files#行和BufferedReader#行,而其他类似的情况可以通过构buildStream来pipe理来自迭代器)。 - Observable提供资源pipe理function(
Observable#using()
); 你可以包装IOstream或互斥体,并确保用户不会忘记释放资源 – 它将在订阅终止时自动处理; stream具有onClose(Runnable)
方法,但您必须手动或通过尝试资源来调用它。 例如, 你必须记住,Files#lines() 必须包含在try-with-resources块中。 - 观察对象同步通过(我没有真正检查Streams是否也是如此)。 这让您不用考虑基本操作是否是线程安全的(答案总是'是',除非有错误),但是并发相关的开销将会在那里,无论您的代码是否需要它。
综述:RxJava与Streams显着不同。 真正的RxJava替代品是ReactiveStreams的其他实现,例如Akka的相关部分。
更新 。 对Stream#parallel
使用非默认的fork-join池有一些技巧,请参阅Java 8并行stream中的自定义线程池
更新 。 以上所有内容都是基于RxJava 1.x的经验。 现在RxJava 2.x在这里 ,这个答案可能是过时的。
Java 8 Stream和RxJava看起来非常相似。 他们看起来像运营商(filter,地图,flatMap …),但不是build立在相同的用法。
您可以使用RxJava执行asynchonus任务。
使用Java 8stream,您将遍历集合中的项目。
你可以在RxJava中执行几乎相同的操作(遍历集合中的项目),但是由于RxJava专注于并发任务,所以它使用同步,锁存,…所以使用RxJava的相同任务可能比与Java 8stream。
RxJava可以和CompletableFuture
比较,但是可以计算多个值。
有一些技术上和概念上的差异,例如,Java 8stream是一次性的,基于拉的,同步的值序列,而RxJava Observables是可重新观察的,自适应推拉式的,可能是asynchronous的值序列。 RxJava的目标是Java 6+,也适用于Android。
Java 8 Streams是基于拉的。 您遍历消费每个项目的Java 8stream。 这可能是层出不穷的。
RXJava Observable
默认是基于push的。 您订阅了一个Observable,当下一个项目到达( onNext
)时,或者当stream完成( onCompleted
)或发生错误( onError
)时,您将得到通知。 因为使用Observable
你可以获得onNext
, onCompleted
, onError
事件,你可以做一些强大的function,比如把不同的Observable
组合成一个新的( zip
, merge
, concat
)。 你可以做的其他东西是caching,限制,…它使用或多或less相同的API在不同的语言(RxJava,RX在C#中,RxJS,…)
默认情况下,RxJava是单线程的。 除非你开始使用调度程序,否则一切都将在同一个线程上发生。
现有的答案是全面和正确的,但缺乏一个初学者的明显例子。 请允许我具体说一下“推/拉”和“重新观察”等术语。 注 :我讨厌Observable
这个词(这是为了天堂的缘故),所以简单的说就是J8 vs RXstream。
考虑一个列表,如果整数,
digits = [1,2,3,4,5]
J8 Stream是一个修改集合的简单工具。 例如,甚至可以提取数字,
evens = digits.stream().filter(x -> x%2)
这基本上是Python的映射,过滤,减less ,Java的一个非常好的(并且长期)。 但是如果数字没有提前存储呢? 还能过滤偶数吗?
设想一个单独的线程进程是随机输出整数( ---
表示时间)
digits = 12345---6------7--8--9-10--------11--12
在RX中, even
可以对每个新数字作出反应 ,并实时应用滤波器
even = -2-4-5---6---------8----10------------12
没有必要存储input和输出列表。 如果你想要一个输出列表,没有问题,也可以stream传。 事实上, 一切都是stream。
evens_stored = even.collect()
这就是为什么像“无状态”和“function”这样的术语更多地与RX相关联的原因
RxJava也与反应stream主动性密切相关,并将其视为反应streamAPI的简单实现(例如,与Akkastream实现相比)。 主要区别在于,反应stream被devise为能够处理背压,但是如果你看看反应stream页面,你会明白这个想法。 他们很好地描述了他们的目标,这些stream也与被动宣言密切相关。
Java 8stream几乎是一个无限集合的实现,非常类似于Scalastream或Clojure惰性 序列 。
Java 8 Streams能够有效处理真正的大集合,同时利用多核体系结构。 相比之下,RxJava默认是单线程的(没有Schedulers)。 所以RxJava不会利用多核机器,除非你自己编写这个逻辑。
- 如何将Java 8stream转换为数组?
- Lambdaexpression式与方法参考
- 使用Lambdas将列表列表变成列表
- Lambdaexpression式和通用方法
- Arrays.stream()。map()。sum()
- java.time.Clock.systemDefaultZone()。getZone()与java.util.TimeZone.getDefault()。toZoneId()之间的任何区别?
- Javatypes推断:在Java 8中引用是不明确的,但不是Java 7
- Java 8date时间API(java.time)和Joda-Time之间的区别
- 我怎样才能收集一个Java 8stream到一个番石榴ImmutableCollection?