ObserveOn和SubscribeOn – 工作正在完成

基于阅读这个问题: SubscribeOn和ObserveOn有什么区别

ObserveOn设置Subscribe处理程序中代码的执行位置:

stream.Subscribe(_ => { // this code here });

SubscribeOn方法设置stream的设置在哪个线程上完成。

我明白,如果这些没有明确设置,那么使用TaskPool。

现在我的问题是,可以说我做了这样的事情:

Observable.Interval(new Timespan(0, 0, 1)).Where(t => predicate(t)).SelectMany(t => lots_of(t)).ObserveOnDispatcher().Subscribe(t => some_action(t));

where predicateSelectMany lots_of在被执行,假设在调度器上正在执行some_action

关于SubscribeOnObserveOn有很多误导性的信息。

概要

  • SubscribeOn截取对SubscribeOnIObservable<T>的单一方法的调用,并在Subscribe返回的IDisposable句柄上调用Dispose
  • ObserveOn截获对IObserver<T>方法的调用,这些调用是OnNextOnCompletedOnError
  • 这两种方法都会导致在指定的调度程序上进行相应的调用。

分析与示范

该声明

ObserveOn设置订阅处理程序中代码的执行位置:

比帮助更困惑。 你所说的“订阅处理程序”实际上是一个OnNext处理程序。 请记住, IObservableSubscribe方法接受一个具有OnNextOnCompletedOnError方法的IObserver ,但它是扩展方法,提供接受lambdaexpression式的便利重载并为您构build一个IObserver实现。

让我适当的这个词虽然; 我认为“订阅处理程序”是调用Subscribe时调用的observable中的代码。 以这种方式,上面的描述更类似于SubscribeOn的目的。

SubscribeOn

SubscribeOn使得observable的Subscribe方法在指定的调度器或上下文上asynchronous执行。 当你不想在你运行的任何线程的observable上调用Subscribe方法时,你可以使用它 – 通常是因为它可以长时间运行,而且你不想阻塞调用线程。

当你打电话给Subscribe ,你正在呼叫一个观察者,这个观察者可能是可观察的长链的一部分。 只有SubscribeOn被应用于它的效果。 现在可能是这样的情况,即所有链中的观察者都将立即在同一个线程上订阅 – 但事实并非如此。 以Concat为例 – 只有在前一个stream完成后才会订阅每个后续stream,而且通常会在前一个stream称为OnCompleted任何线程上进行。

所以SubscribeOn位于你的Subscribe调用和Subscribe的observable之间,拦截这个调用并使之asynchronous。

这也影响处理订阅。 Subscribe将返回一个用于取消SubscribeIDisposable句柄。 SubscribeOn确保在提供的调度程序上调度Dispose

当试图了解SubscribeOn做什么的时候,一个常见的困惑就是observable的Subscribe处理程序可能在同一个线程上调用OnNextOnCompletedOnError 。 但是,其目的不是影响这些电话。 在Subscribe方法返回之前stream完成并不罕见。 例如, Observable.Return这样做的。 让我们来看看。

如果您使用我写的Spy方法,并运行以下代码:

 Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Return(1).Spy("Return"); source.Subscribe(); Console.WriteLine("Subscribe returned"); 

你得到这个输出(线程ID可能会有所不同):

 Calling from Thread: 1 Return: Observable obtained on Thread: 1 Return: Subscribed to on Thread: 1 Return: OnNext(1) on Thread: 1 Return: OnCompleted() on Thread: 1 Return: Subscription completed. Subscribe returned 

您可以看到整个订阅处理程序在同一个线程上运行,并在返回之前完成。

让我们使用SubscribeOnasynchronous运行。 我们会对可观察到的ReturnSubscribeOn可观察到的间谍:

 Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Return(1).Spy("Return"); source.SubscribeOn(Scheduler.Default).Spy("SubscribeOn").Subscribe(); Console.WriteLine("Subscribe returned"); 

这个输出(由我添加的行号):

 01 Calling from Thread: 1 02 Return: Observable obtained on Thread: 1 03 SubscribeOn: Observable obtained on Thread: 1 04 SubscribeOn: Subscribed to on Thread: 1 05 SubscribeOn: Subscription completed. 06 Subscribe returned 07 Return: Subscribed to on Thread: 2 08 Return: OnNext(1) on Thread: 2 09 SubscribeOn: OnNext(1) on Thread: 2 10 Return: OnCompleted() on Thread: 2 11 SubscribeOn: OnCompleted() on Thread: 2 12 Return: Subscription completed. 

01 – 主线程在线程1上运行。

02 – 在调用线程上评估Return observable。 我们刚刚获得了IObservable ,现在还没有订阅。

03 – 在调用线程上评估SubscribeOn observable。

04 – 现在最后我们调用Subscribe方法。

05 – Subscribe方法asynchronous完成…

06 – …并且线程1返回到主要方法。 这是SubscribeOn在行动中的效果!

07 – 与此同时, SubscribeOn在默认调度程序上调用Return来调用Return 。 这是在线程2上收到的。

08 – 和Return一样,它在Subscribe线程上调用OnNext

09和SubscribeOn只是一个通过现在。

10,11 – 相同的OnCompleted

12 – 所有Return订阅处理程序的最后一个完成。

希望能够澄清SubscribeOn的目的和效果!

ObserveOn

如果您将SubscribeOn作为将此调用传递到另一个线程的Subscribe方法的拦截器,那么ObserveOn执行相同的工作,但对于OnNextOnCompletedOnError调用。

回想一下我们最初的例子:

 Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Return(1).Spy("Return"); source.Subscribe(); Console.WriteLine("Subscribe returned"); 

这给了这个输出:

 Calling from Thread: 1 Return: Observable obtained on Thread: 1 Return: Subscribed to on Thread: 1 Return: OnNext(1) on Thread: 1 Return: OnCompleted() on Thread: 1 Return: Subscription completed. Subscribe returned 

现在让我们改变这个使用ObserveOn

 Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Return(1).Spy("Return"); source.ObserveOn(Scheduler.Default).Spy("ObserveOn").Subscribe(); Console.WriteLine("Subscribe returned"); 

我们得到以下输出:

 01 Calling from Thread: 1 02 Return: Observable obtained on Thread: 1 03 ObserveOn: Observable obtained on Thread: 1 04 ObserveOn: Subscribed to on Thread: 1 05 Return: Subscribed to on Thread: 1 06 Return: OnNext(1) on Thread: 1 07 ObserveOn: OnNext(1) on Thread: 2 08 Return: OnCompleted() on Thread: 1 09 Return: Subscription completed. 10 ObserveOn: Subscription completed. 11 Subscribe returned 12 ObserveOn: OnCompleted() on Thread: 2 

01 – 主线程在线程1上运行。

02 – 如前所述,在调用线程上评估Return可观察值。 我们刚刚获得了IObservable ,现在还没有订阅。

03 – ObserveOn observable也在调用线程上进行评估。

04 – 现在我们再次在调用线程上订阅ObserveOn observable …

05 – …然后将呼叫传递给Return可观测值。

06 – 现在在其Subscribe处理程序中Return OnNext

07 – 这是ObserveOn的效果。 我们可以看到OnNext是在线程2上asynchronous调度的。

08 – 同时Return调用线程1上完成…

09 – 和Return的订阅处理程序完成…

10 – ObserveOn的订阅处理程序也是如此…

11 – 所以控制返回到主要方法

12 – 与此同时, ObserveOn已经将ReturnOnCompleted调用到了Thread 2.这可能是在09-11的任何时候发生的,因为它是asynchronous运行的。 恰好如此,现在终于被调用了。

什么是典型的用例?

当你需要Subscribe一个长时间运行的observable,并希望尽快离开调度器线程时,你会经常看到在GUI中使用的Subscribe – 也许是因为你知道这是其中的一个可观察的事情,处理程序。 将其应用于可观察链的末端,因为这是您订阅时调用的第一个可观察对象。

当你想确保OnNextOnCompletedOnError调用被编组callback度器线程时,你会经常看到ObserveOn在GUI中使用。 将其应用于可观察链的末端,尽可能晚地转换回去。

希望你可以看到你的问题的答案是ObserveOnDispatcher将不会对WhereSelectMany执行的线程有任何改变 – 这一切都取决于线程stream从哪个线程调用它们! stream的订阅处理程序将在调用线程上调用,但是不可能说WhereSelectMany将在不知道stream如何实现的情况下运行。

观看者的生活时间超过了订阅电话

到目前为止,我们一直在专注于Observable.ReturnReturnSubscribe处理程序中完成它的stream。 这不是非典型的,但是stream式传输比Subscribe处理器更为常见。 看Observable.Timer例如:

 Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer"); source.Subscribe(); Console.WriteLine("Subscribe returned"); 

这返回以下内容:

 Calling from Thread: 1 Timer: Observable obtained on Thread: 1 Timer: Subscribed to on Thread: 1 Timer: Subscription completed. Subscribe returned Timer: OnNext(0) on Thread: 2 Timer: OnCompleted() on Thread: 2 

您可以清楚地看到订阅完成,然后OnNextOnCompleted稍后调用另一个线程。

请注意, SubscribeOnObserveOn组合不会影响任何线程或调度程序Timerselect调用OnNextOnCompleted

当然,您可以使用SubscribeOn来确定Subscribe线程:

 Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer"); source.SubscribeOn(NewThreadScheduler.Default).Spy("SubscribeOn").Subscribe(); Console.WriteLine("Subscribe returned"); 

(我故意改变到这里的NewThreadScheduler ,以防止在Timer发生的情况下混淆得到与SubscribeOn相同的线程池线程)

赠送:

 Calling from Thread: 1 Timer: Observable obtained on Thread: 1 SubscribeOn: Observable obtained on Thread: 1 SubscribeOn: Subscribed to on Thread: 1 SubscribeOn: Subscription completed. Subscribe returned Timer: Subscribed to on Thread: 2 Timer: Subscription completed. Timer: OnNext(0) on Thread: 3 SubscribeOn: OnNext(0) on Thread: 3 Timer: OnCompleted() on Thread: 3 SubscribeOn: OnCompleted() on Thread: 3 

在这里你可以清楚地看到线程(1)在它的Subscribe调用之后返回的主线程,但是Timer订阅获得它自己的线程(2),但是在线程(3)上运行的OnNextOnCompleted调用。

现在对于ObserveOn ,让我们将代码更改为(对于代码中的代码,使用nuget包rx-wpf):

 var dispatcher = Dispatcher.CurrentDispatcher; Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer"); source.ObserveOnDispatcher().Spy("ObserveOn").Subscribe(); Console.WriteLine("Subscribe returned"); 

这个代码有点不同。 第一行确保我们有一个调度程序,并且还引入了ObserveOnDispatcher – 这就像ObserveOn一样,除了它指定我们应该使用ObserveOnDispatcher评估的任何线程DispatcherScheduler

这段代码给出了以下输出:

 Calling from Thread: 1 Timer: Observable obtained on Thread: 1 ObserveOn: Observable obtained on Thread: 1 ObserveOn: Subscribed to on Thread: 1 Timer: Subscribed to on Thread: 1 Timer: Subscription completed. ObserveOn: Subscription completed. Subscribe returned Timer: OnNext(0) on Thread: 2 ObserveOn: OnNext(0) on Thread: 1 Timer: OnCompleted() on Thread: 2 ObserveOn: OnCompleted() on Thread: 1 

注意调度程序(和主线程)是线程1. Timer仍然在其select的线程(2)上调用OnNextOnCompleted ,但ObserveOnDispatcher正在调用callback度器线程thread(1)。

另外请注意,如果我们要阻塞调度程序线程(比如Thread.Sleep ),您将看到ObserveOnDispatcher会阻塞(这个代码在LINQPad主要方法中最好):

 var dispatcher = Dispatcher.CurrentDispatcher; Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer"); source.ObserveOnDispatcher().Spy("ObserveOn").Subscribe(); Console.WriteLine("Subscribe returned"); Console.WriteLine("Blocking the dispatcher"); Thread.Sleep(2000); Console.WriteLine("Unblocked"); 

你会看到这样的输出:

 Calling from Thread: 1 Timer: Observable obtained on Thread: 1 ObserveOn: Observable obtained on Thread: 1 ObserveOn: Subscribed to on Thread: 1 Timer: Subscribed to on Thread: 1 Timer: Subscription completed. ObserveOn: Subscription completed. Subscribe returned Blocking the dispatcher Timer: OnNext(0) on Thread: 2 Timer: OnCompleted() on Thread: 2 Unblocked ObserveOn: OnNext(0) on Thread: 1 ObserveOn: OnCompleted() on Thread: 1 

通过ObserveOnDispatcher的呼叫只有在Sleep运行后才能离开。

关键点

记住Reactive Extensions本质上是一个自由线程库,并且试图尽可能地使用它运行的线程 – 你必须故意干预ObserveOnSubscribeOn并将特定的调度程序传递给接受它们的操作符改变这一点。

ObserveOnSubscribeOn是装饰器 ,它们包装观察者和观察者的表面区域,以便跨线程编组呼叫,观察者无法控制内部所做的事情。 希望这些例子已经说清楚了。

我发现詹姆斯的答案非常清楚和全面。 但是,尽pipe如此,我仍然不得不解释这些差异。

因此,我创build了一个非常简单/愚蠢的例子,让我以graphics的方式展示调度器正在调用的东西。 我已经创build了一个MyScheduler类来立即执行动作,但会改变控制台的颜色。

SubscribeOn调度程序输出的文本以红色输出, ObserveOn调度程序以蓝色输出。

 using System; using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Reactive.Linq; namespace SchedulerExample { class Program { static void Main(string[] args) { var mydata = new[] {"A", "B", "C", "D", "E"}; var observable = Observable.Create<string>(observer => { Console.WriteLine("Observable.Create"); return mydata.ToObservable(). Subscribe(observer); }); observable. SubscribeOn(new MyScheduler(ConsoleColor.Red)). ObserveOn(new MyScheduler(ConsoleColor.Blue)). Subscribe(s => Console.WriteLine("OnNext {0}", s)); Console.ReadKey(); } } } 

这输出:

调度

并参考MyScheduler(不适合实际使用):

 using System; using System.Reactive.Concurrency; using System.Reactive.Disposables; namespace SchedulerExample { class MyScheduler : IScheduler { private readonly ConsoleColor _colour; public MyScheduler(ConsoleColor colour) { _colour = colour; } public IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action) { return Execute(state, action); } private IDisposable Execute<TState>(TState state, Func<IScheduler, TState, IDisposable> action) { var tmp = Console.ForegroundColor; Console.ForegroundColor = _colour; action(this, state); Console.ForegroundColor = tmp; return Disposable.Empty; } public IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action) { throw new NotImplementedException(); } public IDisposable Schedule<TState>(TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action) { throw new NotImplementedException(); } public DateTimeOffset Now { get { return DateTime.UtcNow; } } } } 

我经常会误认为.SubcribeOn是用来设置线程里面.Subscribe正在执行的代码。 但要记住,只要认为发布和订阅必须像阴阳一样配对。 要设置正在执行的Subscribe's code使用ObserveOn 。 设置Observable's code执行使用SubscribeOn 。 或摘要口头禅: where-whatSubscribe-ObserveObserve-Subscribe