Java 8stream和RxJava可观察对象之间的区别

Java 8stream类似于RxJava observables?

Java 8stream的定义:

新的java.util.stream包中的类提供了一个Stream API来支持元素stream上的函数式操作。

TL; DR :所有的序列/stream处理库都提供了非常相似的API用于pipe道build设。 API的差异在于处理multithreading和pipe道组合。

RxJava与Stream完全不同。 在所有JDK中,最接近rx.Observable的可能是java.util.stream.Collector Stream + CompletableFuture组合(这是以处理额外monad层为代价的,也就是必须处理Stream<CompletableFuture<T>>CompletableFuture<Stream<T>> )。

Observable和Stream之间有显着的区别:

  • stream是基于拉的,观察者是基于推的。 这听起来可能太抽象了,但是具有非常具体的重大后果。
  • Stream只能使用一次,Observable可以多次订阅
  • Stream#parallel()将序列分割成多个分区, Observable#subscribeOn()Observable#observeOn()不要; 使用Observable模拟Stream#parallel()行为是非常棘手的,它曾经有过.parallel()方法,但是这个方法引起了如此多的混淆, .parallel()支持被移动到github上的独立版本库RxJavaParallel上。 更多细节在另一个答案 。
  • Stream#parallel()不允许指定要使用的线程池,这与大多数接受可选Scheduler的RxJava方法不同。 由于JVM中的所有stream实例使用相同的fork-join池,因此添加.parallel()会意外地影响程序中另一个模块的行为
  • Streams缺乏时间相关的操作,如Observable#interval()Observable#window()等等。 这主要是因为Streams是基于拉的
  • Streams缺less切断操作( takeWhile()takeUntil() ); 使用Stream#anyMatch()解决方法是有限的:它是terminal操作,所以你不能每个stream使用一次
  • 从JDK 8开始,不存在Stream#zip操作,有时这非常有用
  • stream是很难自己构build的,Observable可以通过多种方式构build 编辑:正如评论中指出的,有构buildstream的方法。 但是,由于不存在非terminal短路,所以不能轻易地在文件中生成行stream(JDK提供了Files#行和BufferedReader#行,而其他类似的情况可以通过构buildStream来pipe理来自迭代器)。
  • Observable提供资源pipe理function( Observable#using() ); 你可以包装IOstream或互斥体,并确保用户不会忘记释放资源 – 它将在订阅终止时自动处理; stream具有onClose(Runnable)方法,但您必须手动或通过尝试资源来调用它。 例如, 你必须记住,Files#lines() 必须包含在try-with-resources块中。
  • 观察对象同步通过(我没有真正检查Streams是否也是如此)。 这让您不用考虑基本操作是否是线程安全的(答案总是'是',除非有错误),但是并发相关的开销将会在那里,无论您的代码是否需要它。

综述:RxJava与Streams显着不同。 真正的RxJava替代品是ReactiveStreams的其他实现,例如Akka的相关部分。

更新 。 对Stream#parallel使用非默认的fork-join池有一些技巧,请参阅Java 8并行stream中的自定义线程池

更新 。 以上所有内容都是基于RxJava 1.x的经验。 现在RxJava 2.x在这里 ,这个答案可能是过时的。

Java 8 Stream和RxJava看起来非常相似。 他们看起来像运营商(filter,地图,flatMap …),但不是build立在相同的用法。

您可以使用RxJava执行asynchonus任务。

使用Java 8stream,您将遍历集合中的项目。

你可以在RxJava中执行几乎相同的操作(遍历集合中的项目),但是由于RxJava专注于并发任务,所以它使用同步,锁存,…所以使用RxJava的相同任务可能比与Java 8stream。

RxJava可以和CompletableFuture比较,但是可以计算多个值。

有一些技术上和概念上的差异,例如,Java 8stream是一次性的,基于拉的,同步的值序列,而RxJava Observables是可重新观察的,自适应推拉式的,可能是asynchronous的值序列。 RxJava的目标是Java 6+,也适用于Android。

Java 8 Streams是基于拉的。 您遍历消费每个项目的Java 8stream。 这可能是层出不穷的。

RXJava Observable默认是基于push的。 您订阅了一个Observable,当下一个项目到达( onNext )时,或者当stream完成( onCompleted )或发生错误( onError )时,您将得到通知。 因为使用Observable你可以获得onNextonCompletedonError事件,你可以做一些强大的function,比如把不同的Observable组合成一个新的( zipmergeconcat )。 你可以做的其他东西是caching,限制,…它使用或多或less相同的API在不同的语言(RxJava,RX在C#中,RxJS,…)

默认情况下,RxJava是单线程的。 除非你开始使用调度程序,否则一切都将在同一个线程上发生。

现有的答案是全面和正确的,但缺乏一个初学者的明显例子。 请允许我具体说一下“推/拉”和“重新观察”等术语。 :我讨厌Observable这个词(这是为了天堂的缘故),所以简单的说就是J8 vs RXstream。

考虑一个列表,如果整数,

 digits = [1,2,3,4,5] 

J8 Stream是一个修改集合的简单工具。 例如,甚至可以提取数字,

 evens = digits.stream().filter(x -> x%2) 

这基本上是Python的映射,过滤,减less ,Java的一个非常好的(并且长期)。 但是如果数字没有提前存储呢? 还能过滤偶数吗?

设想一个单独的线程进程是随机输出整数( ---表示时间)

 digits = 12345---6------7--8--9-10--------11--12 

在RX中, even可以每个新数字作出反应 ,并实时应用滤波器

 even = -2-4-5---6---------8----10------------12 

没有必要存储input和输出列表。 如果你想要一个输出列表,没有问题,也可以stream传。 事实上, 一切都是stream。

 evens_stored = even.collect() 

这就是为什么像“无状态”和“function”这样的术语更多地与RX相关联的原因

RxJava也与反应stream主动性密切相关,并将其视为反应streamAPI的简单实现(例如,与Akkastream实现相比)。 主要区别在于,反应stream被devise为能够处理背压,但是如果你看看反应stream页面,你会明白这个想法。 他们很好地描述了他们的目标,这些stream也与被动宣言密切相关。

Java 8stream几乎是一个无限集合的实现,非常类似于Scalastream或Clojure惰性 序列 。

Java 8 Streams能够有效处理真正的大集合,同时利用多核体系结构。 相比之下,RxJava默认是单线程的(没有Schedulers)。 所以RxJava不会利用多核机器,除非你自己编写这个逻辑。