ObserveOn和SubscribeOn – 工作正在完成
基于阅读这个问题: SubscribeOn和ObserveOn有什么区别
ObserveOn
设置Subscribe
处理程序中代码的执行位置:
stream.Subscribe(_ => { // this code here });
SubscribeOn
方法设置stream的设置在哪个线程上完成。
我明白,如果这些没有明确设置,那么使用TaskPool。
现在我的问题是,可以说我做了这样的事情:
Observable.Interval(new Timespan(0, 0, 1)).Where(t => predicate(t)).SelectMany(t => lots_of(t)).ObserveOnDispatcher().Subscribe(t => some_action(t));
where predicate
和SelectMany
lots_of
在被执行,假设在调度器上正在执行some_action
?
关于SubscribeOn
和ObserveOn
有很多误导性的信息。
概要
-
SubscribeOn
截取对SubscribeOn
的IObservable<T>
的单一方法的调用,并在Subscribe
返回的IDisposable
句柄上调用Dispose
。 -
ObserveOn
截获对IObserver<T>
方法的调用,这些调用是OnNext
,OnCompleted
和OnError
。 - 这两种方法都会导致在指定的调度程序上进行相应的调用。
分析与示范
该声明
ObserveOn设置订阅处理程序中代码的执行位置:
比帮助更困惑。 你所说的“订阅处理程序”实际上是一个OnNext
处理程序。 请记住, IObservable
的Subscribe
方法接受一个具有OnNext
, OnCompleted
和OnError
方法的IObserver
,但它是扩展方法,提供接受lambdaexpression式的便利重载并为您构build一个IObserver
实现。
让我适当的这个词虽然; 我认为“订阅处理程序”是调用Subscribe
时调用的observable中的代码。 以这种方式,上面的描述更类似于SubscribeOn
的目的。
SubscribeOn
SubscribeOn
使得observable的Subscribe
方法在指定的调度器或上下文上asynchronous执行。 当你不想在你运行的任何线程的observable上调用Subscribe
方法时,你可以使用它 – 通常是因为它可以长时间运行,而且你不想阻塞调用线程。
当你打电话给Subscribe
,你正在呼叫一个观察者,这个观察者可能是可观察的长链的一部分。 只有SubscribeOn
被应用于它的效果。 现在可能是这样的情况,即所有链中的观察者都将立即在同一个线程上订阅 – 但事实并非如此。 以Concat
为例 – 只有在前一个stream完成后才会订阅每个后续stream,而且通常会在前一个stream称为OnCompleted
任何线程上进行。
所以SubscribeOn
位于你的Subscribe
调用和Subscribe
的observable之间,拦截这个调用并使之asynchronous。
这也影响处理订阅。 Subscribe
将返回一个用于取消Subscribe
的IDisposable
句柄。 SubscribeOn
确保在提供的调度程序上调度Dispose
。
当试图了解SubscribeOn
做什么的时候,一个常见的困惑就是observable的Subscribe
处理程序可能在同一个线程上调用OnNext
, OnCompleted
或OnError
。 但是,其目的不是影响这些电话。 在Subscribe
方法返回之前stream完成并不罕见。 例如, Observable.Return
这样做的。 让我们来看看。
如果您使用我写的Spy方法,并运行以下代码:
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Return(1).Spy("Return"); source.Subscribe(); Console.WriteLine("Subscribe returned");
你得到这个输出(线程ID可能会有所不同):
Calling from Thread: 1 Return: Observable obtained on Thread: 1 Return: Subscribed to on Thread: 1 Return: OnNext(1) on Thread: 1 Return: OnCompleted() on Thread: 1 Return: Subscription completed. Subscribe returned
您可以看到整个订阅处理程序在同一个线程上运行,并在返回之前完成。
让我们使用SubscribeOn
asynchronous运行。 我们会对可观察到的Return
和SubscribeOn
可观察到的间谍:
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Return(1).Spy("Return"); source.SubscribeOn(Scheduler.Default).Spy("SubscribeOn").Subscribe(); Console.WriteLine("Subscribe returned");
这个输出(由我添加的行号):
01 Calling from Thread: 1 02 Return: Observable obtained on Thread: 1 03 SubscribeOn: Observable obtained on Thread: 1 04 SubscribeOn: Subscribed to on Thread: 1 05 SubscribeOn: Subscription completed. 06 Subscribe returned 07 Return: Subscribed to on Thread: 2 08 Return: OnNext(1) on Thread: 2 09 SubscribeOn: OnNext(1) on Thread: 2 10 Return: OnCompleted() on Thread: 2 11 SubscribeOn: OnCompleted() on Thread: 2 12 Return: Subscription completed.
01 – 主线程在线程1上运行。
02 – 在调用线程上评估Return
observable。 我们刚刚获得了IObservable
,现在还没有订阅。
03 – 在调用线程上评估SubscribeOn
observable。
04 – 现在最后我们调用Subscribe
方法。
05 – Subscribe
方法asynchronous完成…
06 – …并且线程1返回到主要方法。 这是SubscribeOn在行动中的效果!
07 – 与此同时, SubscribeOn
在默认调度程序上调用Return
来调用Return
。 这是在线程2上收到的。
08 – 和Return
一样,它在Subscribe
线程上调用OnNext
…
09和SubscribeOn
只是一个通过现在。
10,11 – 相同的OnCompleted
12 – 所有Return
订阅处理程序的最后一个完成。
希望能够澄清SubscribeOn
的目的和效果!
ObserveOn
如果您将SubscribeOn
作为将此调用传递到另一个线程的Subscribe
方法的拦截器,那么ObserveOn
执行相同的工作,但对于OnNext
, OnCompleted
和OnError
调用。
回想一下我们最初的例子:
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Return(1).Spy("Return"); source.Subscribe(); Console.WriteLine("Subscribe returned");
这给了这个输出:
Calling from Thread: 1 Return: Observable obtained on Thread: 1 Return: Subscribed to on Thread: 1 Return: OnNext(1) on Thread: 1 Return: OnCompleted() on Thread: 1 Return: Subscription completed. Subscribe returned
现在让我们改变这个使用ObserveOn
:
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Return(1).Spy("Return"); source.ObserveOn(Scheduler.Default).Spy("ObserveOn").Subscribe(); Console.WriteLine("Subscribe returned");
我们得到以下输出:
01 Calling from Thread: 1 02 Return: Observable obtained on Thread: 1 03 ObserveOn: Observable obtained on Thread: 1 04 ObserveOn: Subscribed to on Thread: 1 05 Return: Subscribed to on Thread: 1 06 Return: OnNext(1) on Thread: 1 07 ObserveOn: OnNext(1) on Thread: 2 08 Return: OnCompleted() on Thread: 1 09 Return: Subscription completed. 10 ObserveOn: Subscription completed. 11 Subscribe returned 12 ObserveOn: OnCompleted() on Thread: 2
01 – 主线程在线程1上运行。
02 – 如前所述,在调用线程上评估Return
可观察值。 我们刚刚获得了IObservable
,现在还没有订阅。
03 – ObserveOn
observable也在调用线程上进行评估。
04 – 现在我们再次在调用线程上订阅ObserveOn
observable …
05 – …然后将呼叫传递给Return
可观测值。
06 – 现在在其Subscribe
处理程序中Return
OnNext
。
07 – 这是ObserveOn
的效果。 我们可以看到OnNext
是在线程2上asynchronous调度的。
08 – 同时Return
调用线程1上完成…
09 – 和Return
的订阅处理程序完成…
10 – ObserveOn
的订阅处理程序也是如此…
11 – 所以控制返回到主要方法
12 – 与此同时, ObserveOn
已经将Return
的OnCompleted
调用到了Thread 2.这可能是在09-11的任何时候发生的,因为它是asynchronous运行的。 恰好如此,现在终于被调用了。
什么是典型的用例?
当你需要Subscribe
一个长时间运行的observable,并希望尽快离开调度器线程时,你会经常看到在GUI中使用的Subscribe
– 也许是因为你知道这是其中的一个可观察的事情,处理程序。 将其应用于可观察链的末端,因为这是您订阅时调用的第一个可观察对象。
当你想确保OnNext
, OnCompleted
和OnError
调用被编组callback度器线程时,你会经常看到ObserveOn
在GUI中使用。 将其应用于可观察链的末端,尽可能晚地转换回去。
希望你可以看到你的问题的答案是ObserveOnDispatcher
将不会对Where
和SelectMany
执行的线程有任何改变 – 这一切都取决于线程stream从哪个线程调用它们! stream的订阅处理程序将在调用线程上调用,但是不可能说Where
和SelectMany
将在不知道stream
如何实现的情况下运行。
观看者的生活时间超过了订阅电话
到目前为止,我们一直在专注于Observable.Return
。 Return
在Subscribe
处理程序中完成它的stream。 这不是非典型的,但是stream式传输比Subscribe
处理器更为常见。 看Observable.Timer
例如:
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer"); source.Subscribe(); Console.WriteLine("Subscribe returned");
这返回以下内容:
Calling from Thread: 1 Timer: Observable obtained on Thread: 1 Timer: Subscribed to on Thread: 1 Timer: Subscription completed. Subscribe returned Timer: OnNext(0) on Thread: 2 Timer: OnCompleted() on Thread: 2
您可以清楚地看到订阅完成,然后OnNext
和OnCompleted
稍后调用另一个线程。
请注意, SubscribeOn
或ObserveOn
组合不会影响任何线程或调度程序Timer
select调用OnNext
和OnCompleted
。
当然,您可以使用SubscribeOn
来确定Subscribe
线程:
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer"); source.SubscribeOn(NewThreadScheduler.Default).Spy("SubscribeOn").Subscribe(); Console.WriteLine("Subscribe returned");
(我故意改变到这里的NewThreadScheduler
,以防止在Timer
发生的情况下混淆得到与SubscribeOn
相同的线程池线程)
赠送:
Calling from Thread: 1 Timer: Observable obtained on Thread: 1 SubscribeOn: Observable obtained on Thread: 1 SubscribeOn: Subscribed to on Thread: 1 SubscribeOn: Subscription completed. Subscribe returned Timer: Subscribed to on Thread: 2 Timer: Subscription completed. Timer: OnNext(0) on Thread: 3 SubscribeOn: OnNext(0) on Thread: 3 Timer: OnCompleted() on Thread: 3 SubscribeOn: OnCompleted() on Thread: 3
在这里你可以清楚地看到线程(1)在它的Subscribe
调用之后返回的主线程,但是Timer
订阅获得它自己的线程(2),但是在线程(3)上运行的OnNext
和OnCompleted
调用。
现在对于ObserveOn
,让我们将代码更改为(对于代码中的代码,使用nuget包rx-wpf):
var dispatcher = Dispatcher.CurrentDispatcher; Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer"); source.ObserveOnDispatcher().Spy("ObserveOn").Subscribe(); Console.WriteLine("Subscribe returned");
这个代码有点不同。 第一行确保我们有一个调度程序,并且还引入了ObserveOnDispatcher
– 这就像ObserveOn
一样,除了它指定我们应该使用ObserveOnDispatcher
评估的任何线程的DispatcherScheduler
。
这段代码给出了以下输出:
Calling from Thread: 1 Timer: Observable obtained on Thread: 1 ObserveOn: Observable obtained on Thread: 1 ObserveOn: Subscribed to on Thread: 1 Timer: Subscribed to on Thread: 1 Timer: Subscription completed. ObserveOn: Subscription completed. Subscribe returned Timer: OnNext(0) on Thread: 2 ObserveOn: OnNext(0) on Thread: 1 Timer: OnCompleted() on Thread: 2 ObserveOn: OnCompleted() on Thread: 1
注意调度程序(和主线程)是线程1. Timer
仍然在其select的线程(2)上调用OnNext
和OnCompleted
,但ObserveOnDispatcher
正在调用callback度器线程thread(1)。
另外请注意,如果我们要阻塞调度程序线程(比如Thread.Sleep
),您将看到ObserveOnDispatcher
会阻塞(这个代码在LINQPad主要方法中最好):
var dispatcher = Dispatcher.CurrentDispatcher; Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId); var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer"); source.ObserveOnDispatcher().Spy("ObserveOn").Subscribe(); Console.WriteLine("Subscribe returned"); Console.WriteLine("Blocking the dispatcher"); Thread.Sleep(2000); Console.WriteLine("Unblocked");
你会看到这样的输出:
Calling from Thread: 1 Timer: Observable obtained on Thread: 1 ObserveOn: Observable obtained on Thread: 1 ObserveOn: Subscribed to on Thread: 1 Timer: Subscribed to on Thread: 1 Timer: Subscription completed. ObserveOn: Subscription completed. Subscribe returned Blocking the dispatcher Timer: OnNext(0) on Thread: 2 Timer: OnCompleted() on Thread: 2 Unblocked ObserveOn: OnNext(0) on Thread: 1 ObserveOn: OnCompleted() on Thread: 1
通过ObserveOnDispatcher
的呼叫只有在Sleep
运行后才能离开。
关键点
记住Reactive Extensions本质上是一个自由线程库,并且试图尽可能地使用它运行的线程 – 你必须故意干预ObserveOn
, SubscribeOn
并将特定的调度程序传递给接受它们的操作符改变这一点。
ObserveOn
和SubscribeOn
是装饰器 ,它们包装观察者和观察者的表面区域,以便跨线程编组呼叫,观察者无法控制内部所做的事情。 希望这些例子已经说清楚了。
我发现詹姆斯的答案非常清楚和全面。 但是,尽pipe如此,我仍然不得不解释这些差异。
因此,我创build了一个非常简单/愚蠢的例子,让我以graphics的方式展示调度器正在调用的东西。 我已经创build了一个MyScheduler
类来立即执行动作,但会改变控制台的颜色。
SubscribeOn
调度程序输出的文本以红色输出, ObserveOn
调度程序以蓝色输出。
using System; using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Reactive.Linq; namespace SchedulerExample { class Program { static void Main(string[] args) { var mydata = new[] {"A", "B", "C", "D", "E"}; var observable = Observable.Create<string>(observer => { Console.WriteLine("Observable.Create"); return mydata.ToObservable(). Subscribe(observer); }); observable. SubscribeOn(new MyScheduler(ConsoleColor.Red)). ObserveOn(new MyScheduler(ConsoleColor.Blue)). Subscribe(s => Console.WriteLine("OnNext {0}", s)); Console.ReadKey(); } } }
这输出:
并参考MyScheduler(不适合实际使用):
using System; using System.Reactive.Concurrency; using System.Reactive.Disposables; namespace SchedulerExample { class MyScheduler : IScheduler { private readonly ConsoleColor _colour; public MyScheduler(ConsoleColor colour) { _colour = colour; } public IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action) { return Execute(state, action); } private IDisposable Execute<TState>(TState state, Func<IScheduler, TState, IDisposable> action) { var tmp = Console.ForegroundColor; Console.ForegroundColor = _colour; action(this, state); Console.ForegroundColor = tmp; return Disposable.Empty; } public IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action) { throw new NotImplementedException(); } public IDisposable Schedule<TState>(TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action) { throw new NotImplementedException(); } public DateTimeOffset Now { get { return DateTime.UtcNow; } } } }
我经常会误认为.SubcribeOn
是用来设置线程里面.Subscribe
正在执行的代码。 但要记住,只要认为发布和订阅必须像阴阳一样配对。 要设置正在执行的Subscribe's code
使用ObserveOn
。 设置Observable's code
执行使用SubscribeOn
。 或摘要口头禅: where-what
, Subscribe-Observe
, Observe-Subscribe
。
- 比.NET Reflector更好的东西?
- 允许使用HttpClient的不可信SSL证书
- 将logging仪作为单件使用是一种好的做法吗?
- 自MS14-059以来,System.Web.MVC不会复制到bin文件夹中。 如何防止由于Windows更新而创build缺lessDLL的生成?
- 正则expression式,用大写字母拆分string,但忽略TLA
- 使用Case / Switch和GetType来确定对象
- “该操作对于事务状态”错误和事务范围无效
- 在WPF窗口中获取当前的焦点元素/控件
- 使用ASP.NET Core Web应用程序(.NET Core)与net461设置为唯一框架并使用(.NET Framework)模板之间的区别