为什么在.NET Reactive Extensions中不推荐使用主题?
我目前正在掌握.NET的Reactive Extensions框架,我正在通过我find的各种介绍资源(主要是http://www.introtorx.com )
我们的应用程序涉及许多检测networking帧的硬件接口,这些接口将是我的IObservables,然后我将拥有各种组件,这些组件将消耗这些帧或对数据执行某种方式的转换,并生成新的帧types。 例如,还会有其他组件需要显示每个第n帧。 我确信Rx对于我们的应用程序来说是非常有用的,但是我正在努力处理IObserver接口的实现细节。
大部分(如果不是全部)我一直在阅读的资源都说我不应该自己实现IObservable接口,而是使用提供的函数或类之一。 从我的研究看来,创build一个Subject<IBaseFrame>
将为我提供我所需的,我将有我的单线程从硬件接口读取数据,然后调用我的Subject<IBaseFrame>
实例的OnNext函数。 不同的IObserver组件将从该主题接收通知。
我的困惑来自于本教程附录中给出的build议:
避免使用主题types。 Rx实际上是一个函数式编程范例。 使用科目意味着我们现在正在pipe理状态,这可能会发生变化。 同时处理变异状态和asynchronous编程是非常困难的。 此外,许多运营商(扩展方法)都经过精心编写,以确保订阅和序列的正确和一致的使用期限得以维持; 当你介绍科目时,你可以打破这个。 如果明确使用主题,未来版本也可能会出现明显的性能下降。
我的应用程序是相当性能的关键,我很明显要testing使用Rx模式的性能之前,它进入生产代码; 但是我担心我正在使用Subject类来做一些违背Rx框架的精神,而且未来版本的框架将会损害性能。
有没有更好的方式做我想要的? 无论是否有观察者,硬件轮询线程都将持续运行(否则硬件缓冲区将会备份),所以这是一个非常热门的程序。 我需要将接收到的帧传给多个观察者。
任何build议将不胜感激。
好吧,如果我们忽视了我的教条方式,而忽略了“科目好坏”。 让我们看看问题空间。
我敢打赌,你要么有2种系统中的1种需要join。
- 消息到达时,系统会发起事件或回叫
- 您需要轮询系统以查看是否有任何消息要处理
对于选项1,容易,我们只是用适当的FromEvent方法包装它,我们就完成了。 去酒吧!
对于选项2,我们现在需要考虑我们如何进行调查以及如何有效地做到这一点。 当我们获得价值时,我们该如何发表呢?
我会想象你会想要一个专用的线程进行轮询。 你不会想要其他编码器锤击ThreadPool / TaskPool,并让你处于ThreadPool饥饿状态。 另外,你不希望上下文切换的麻烦(我猜)。 所以假设我们有自己的线程,我们可能会有某种我们坐在轮询的While / Sleep循环。 当检查发现一些消息,我们发布他们。 那么所有这一切听起来完美的Observable.Create。 现在我们可能不能使用While循环,因为这不会让我们返回一次性使用,以允许取消。 幸运的是,你已经阅读了整本书,所以精通recursion调度!
我想像这样的东西可以工作。 #NotTested
public class MessageListener { private readonly IObservable<IMessage> _messages; private readonly IScheduler _scheduler; public MessageListener() { _scheduler = new EventLoopScheduler(); var messages = ListenToMessages() .SubscribeOn(_scheduler) .Publish(); _messages = messages; messages.Connect(); } public IObservable<IMessage> Messages { get {return _messages;} } private IObservable<IMessage> ListenToMessages() { return Observable.Create<IMessage>(o=> { return _scheduler.Schedule(recurse=> { try { var messages = GetMessages(); foreach (var msg in messages) { o.OnNext(msg); } recurse(); } catch (Exception ex) { o.OnError(ex); } }); }); } private IEnumerable<IMessage> GetMessages() { //Do some work here that gets messages from a queue, // file system, database or other system that cant push // new data at us. // //This may return an empty result when no new data is found. } }
我真的不喜欢Subjects的原因是,这通常是一个开发者没有真正清楚devise问题的例子。 劈开一个主题,把它戳在这里,到处都是,然后让蹩脚的支持开发者猜测跆拳道是否正在进行。 当你使用创build/生成等方法,你正在本地化的影响序列。 你可以用一种方法看到这一切,你知道没有其他人正在抛出一个讨厌的副作用。 如果我看到一个主题领域,我现在必须去找一个课程中的所有地方。 如果某个MFer公开曝光,那么所有投注都closures,谁知道这个序列是如何使用的! asynchronous/并发/ Rx很难。 你不需要让副作用和因果关系编程更加困难。
一般来说,你应该避免使用Subject
,但是对于你在这里做的事情,我认为他们工作得很好。 当我在Rx教程中遇到“避免主题”的消息时,我也提出了类似的问题 。
引用Dave Sexton (Rxx的)
“主题是Rx的有状态组件,当你需要创build一个类似于事件的可观察对象作为一个字段或一个局部variables时,它们非常有用。
我倾向于将它们用作Rx的入口点。 所以,如果我有一些代码需要说'发生了什么'(就像你有),我会使用一个Subject
并调用OnNext
。 然后将这个IObservable
暴露给其他人订阅(你可以在你的主题上使用AsObservable()
来确保没有人能够投射到一个主题并搞砸了)。
你也可以通过一个.NET事件来实现这一点,并使用FromEventPattern
,但是如果我只是要把事件变成一个IObservable
,我不会看到有事件而不是Subject
的好处(这可能意味着我在这里错过了一些东西)
然而,你应该非常强烈地避免使用一个Subject
来订阅一个IObservable
,即不要把一个Subject
传递给IObservable.Subscribe
方法。
通常当你pipe理一个Subject时,你实际上只是重新实现了Rx中已经存在的function,而且可能不是一个健壮,简单和可扩展的方式。
当您尝试将某些asynchronous数据stream调整为Rx(或者从当前不是asynchronous的数据stream创buildasynchronous数据stream)时,最常见的情况通常是:
-
数据的来源是一个事件 :正如Lee所说,这是最简单的情况:使用FromEvent并且前往酒吧。
-
数据的来源是来自同步操作,你想要轮询更新 (例如web服务或数据库调用):在这种情况下,你可以使用Lee的build议方法,或者对于简单的情况,你可以使用像
Observable.Interval.Select(_ => <db fetch>)
。 当源数据中没有任何更改时,您可能需要使用DistinctUntilChanged()来防止发布更新。 -
数据源是某种调用callback的asynchronousAPI :在这种情况下,使用Observable.Create挂接callback函数,以便在观察者上调用OnNext / OnError / OnComplete。
-
数据源是一个阻塞直到有新数据可用的调用 (例如一些同步套接字读取操作):在这种情况下,您可以使用Observable.Create来封装从套接字读取并发布到Observer.OnNext当数据被读取时。 这可能与您在主题中所做的相似。
使用Observable.Create与创build一个pipe理Subject的类相当于使用yield关键字vs创build实现IEnumerator的整个类。 当然,您可以编写一个IEnumerator,使其像收益率代码一样清洁,公民,但哪一个封装得更好,感觉是一个整洁的devise? Observable.Create和pipe理主题也是如此。
Observable.Create为懒惰设置和清除拆卸提供了一个干净的模式。 你如何用一个包装一个主题的类来实现呢? 你需要某种Start方法…你怎么知道什么时候打电话? 或者,即使没有人在听,你是否总是开始呢? 当你完成后,你如何得到它停止读取套接字/轮询数据库等? 您必须具有某种停止方法,而且您不得不访问您订阅的IObservable,而是首先创build主题的类。
随着Observable.Create,它都包裹在一个地方。 Observable.Create的主体不会运行,直到有人订阅,所以如果没有人订阅,你永远不会使用你的资源。 而Observable.Create返回一个Disposable,可以干净地closures你的资源/callback等 – 这是Observer退订时调用的。 您用来生成Observable的资源的生命周期与Observable本身的生命周期完美地绑定在一起。
被引用的块文本很好的解释了为什么你不应该使用Subject<T>
,但是为了简单起见,你将observer和observable的function结合起来,同时在它们之间注入某种状态(无论是封装还是封装扩展)。
这是你遇到麻烦的地方; 这些责任应该是分开的,彼此不同的。
这就是说,在你的具体情况下,我build议你把你的顾虑分解成更小的部分。
首先,你有你的线程是热的,并始终监视硬件的信号,以提高通知。 你通常如何做到这一点? 事件 。 所以我们先从这个开始。
我们来定义事件触发的EventArgs
。
// The event args that has the information. public class BaseFrameEventArgs : EventArgs { public BaseFrameEventArgs(IBaseFrame baseFrame) { // Validate parameters. if (baseFrame == null) throw new ArgumentNullException("IBaseFrame"); // Set values. BaseFrame = baseFrame; } // Poor man's immutability. public IBaseFrame BaseFrame { get; private set; } }
现在,这个class级将会启动这个活动。 请注意,这可能是一个静态类(因为您总是有一个线程运行监视硬件缓冲区),或者您按需要调用的某个订阅该类的东西。 您必须根据需要进行修改。
public class BaseFrameMonitor { // You want to make this access thread safe public event EventHandler<BaseFrameEventArgs> HardwareEvent; public BaseFrameMonitor() { // Create/subscribe to your thread that // drains hardware signals. } }
所以现在你有一个暴露事件的类。 观察对象适合于事件。 所以如果遵循标准事件模式,通过Observable
类的静态FromEventPattern
方法 ,将事件stream(将事件stream视为事件的多个事件)转换为IObservable<T>
实现, 。
通过事件源和FromEventPattern
方法,我们可以很容易地创build一个IObservable<EventPattern<BaseFrameEventArgs>>
( EventPattern<TEventArgs>
类体现了.NET事件中可以看到的东西,特别是从EventArgs
派生的实例和代表发件人的对象),如下所示:
// The event source. // Or you might not need this if your class is static and exposes // the event as a static event. var source = new BaseFrameMonitor(); // Create the observable. It's going to be hot // as the events are hot. IObservable<EventPattern<BaseFrameEventArgs>> observable = Observable. FromEventPattern<BaseFrameEventArgs>( h => source.HardwareEvent += h, h => source.HardwareEvent -= h);
当然,你需要一个IObservable<IBaseFrame>
,但这很容易,使用Observable
类的Select
扩展方法来创build一个投影(就像你在LINQ中一样),我们可以把所有这些都放在一个易于理解的地方,使用方法):
public IObservable<IBaseFrame> CreateHardwareObservable() { // The event source. // Or you might not need this if your class is static and exposes // the event as a static event. var source = new BaseFrameMonitor(); // Create the observable. It's going to be hot // as the events are hot. IObservable<EventPattern<BaseFrameEventArgs>> observable = Observable. FromEventPattern<BaseFrameEventArgs>( h => source.HardwareEvent += h, h => source.HardwareEvent -= h); // Return the observable, but projected. return observable.Select(i => i.EventArgs.BaseFrame); }