从头开始实现IObservable <T>
Reactive Extensions提供了很多帮助方法来将现有事件和asynchronous操作转化为observable,但是如何从头开始实现一个IObservable <T>?
IEnumerable具有可爱的yield关键字,使其实现起来非常简单。
实现IObservable <T>的正确方法是什么?
我需要担心线程安全吗?
我知道有支持在特定的同步上下文callback,但这是我作为一个IObservable <T>作者需要担心或这种内置?
更新:
这是我的C#版本的布赖恩的F#解决scheme
using System; using System.Linq; using Microsoft.FSharp.Collections; namespace Jesperll { class Observable<T> : IObservable<T>, IDisposable where T : EventArgs { private FSharpMap<int, IObserver<T>> subscribers = FSharpMap<int, IObserver<T>>.Empty; private readonly object thisLock = new object(); private int key; private bool isDisposed; public void Dispose() { Dispose(true); } protected virtual void Dispose(bool disposing) { if (disposing && !isDisposed) { OnCompleted(); isDisposed = true; } } protected void OnNext(T value) { if (isDisposed) { throw new ObjectDisposedException("Observable<T>"); } foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value)) { observer.OnNext(value); } } protected void OnError(Exception exception) { if (isDisposed) { throw new ObjectDisposedException("Observable<T>"); } if (exception == null) { throw new ArgumentNullException("exception"); } foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value)) { observer.OnError(exception); } } protected void OnCompleted() { if (isDisposed) { throw new ObjectDisposedException("Observable<T>"); } foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value)) { observer.OnCompleted(); } } public IDisposable Subscribe(IObserver<T> observer) { if (observer == null) { throw new ArgumentNullException("observer"); } lock (thisLock) { int k = key++; subscribers = subscribers.Add(k, observer); return new AnonymousDisposable(() => { lock (thisLock) { subscribers = subscribers.Remove(k); } }); } } } class AnonymousDisposable : IDisposable { Action dispose; public AnonymousDisposable(Action dispose) { this.dispose = dispose; } public void Dispose() { dispose(); } } }
编辑:如果Dispose被调用两次,不要抛出ObjectDisposedException
老实说,我不确定这是多么的“正确”,但是根据我的经验,如果感觉还不错的话。 这是F#代码,但希望你有一个风味的感觉。 它可以让你“新build”一个源对象,然后你可以调用Next / Completed / Error,它pipe理订阅,并试图在源或客户端做坏事时断言。
type ObservableSource<'T>() = // ' let protect f = let mutable ok = false try f() ok <- true finally Debug.Assert(ok, "IObserver methods must not throw!") // TODO crash? let mutable key = 0 // Why a Map and not a Dictionary? Someone's OnNext() may unsubscribe, so we need threadsafe 'snapshots' of subscribers to Seq.iter over let mutable subscriptions = Map.empty : Map<int,IObserver<'T>> // ' let next(x) = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnNext(x))) let completed() = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnCompleted())) let error(e) = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnError(e))) let thisLock = new obj() let obs = { new IObservable<'T> with // ' member this.Subscribe(o) = let k = lock thisLock (fun () -> let k = key key <- key + 1 subscriptions <- subscriptions.Add(k, o) k) { new IDisposable with member this.Dispose() = lock thisLock (fun () -> subscriptions <- subscriptions.Remove(k)) } } let mutable finished = false // The methods below are not thread-safe; the source ought not call these methods concurrently member this.Next(x) = Debug.Assert(not finished, "IObserver is already finished") next x member this.Completed() = Debug.Assert(not finished, "IObserver is already finished") finished <- true completed() member this.Error(e) = Debug.Assert(not finished, "IObserver is already finished") finished <- true error e // The object returned here is threadsafe; you can subscribe and unsubscribe (Dispose) concurrently from multiple threads member this.Value = obs
我会对这里有什么好或坏的想法感兴趣。 我还没有机会从devlabs看到所有新的Rx的东西,但…
我自己的经验表明:
- 那些赞成观察的人不应该从订阅中抛出。 当用户抛出时,观察者可以做的没有什么合理的。 (这与事件类似)。最有可能的exception只会冒泡到顶级的catch-all处理程序或崩溃应用程序。
- 来源可能应该是“逻辑上单线程”。 我认为编写能够对并发OnNext调用作出反应的客户端可能会比较困难; 即使每个单独的呼叫来自不同的线程,避免并发呼叫也是有帮助的。
- 有一个强制执行一些“契约”的基类/助手类是非常有用的。
我很好奇,如果人们能够根据这些方面提出更具体的build议。
官方文档弃用自己实现IObservable的用户。 相反,用户需要使用工厂方法Observable.Create
如有可能,通过组合现有的操作员来实现新的操作员。 否则使用Observable.Create实现自定义运算符
碰巧的是,Observable.Create是Reactive的内部类AnonymousObservable
一个简单的包装:
public static IObservable<TSource> Create<TSource>(Func<IObserver<TSource>, IDisposable> subscribe) { if (subscribe == null) { throw new ArgumentNullException("subscribe"); } return new AnonymousObservable<TSource>(subscribe); }
我不知道为什么他们没有公开执行,但是,嘿,不pipe。
是的,yield关键字是可爱的; 也许会有类似的IObservable(OfT)? 在埃里克·梅杰(Eric Meijer)的PDC '09谈话中,他说:“是的,看这个空间”是为了产生可观察的声明性收益。
对于接近的东西(而不是自己翻译),请查看“ (尚未)101 Rx Samples ”wiki 的底部 ,团队build议使用Subject(T)类作为“后端”来实现IObservable OFT)。 这是他们的例子:
public class Order { private DateTime? _paidDate; private readonly Subject<Order> _paidSubj = new Subject<Order>(); public IObservable<Order> Paid { get { return _paidSubj.AsObservable(); } } public void MarkPaid(DateTime paidDate) { _paidDate = paidDate; _paidSubj.OnNext(this); // Raise PAID event } } private static void Main() { var order = new Order(); order.Paid.Subscribe(_ => Console.WriteLine("Paid")); // Subscribe order.MarkPaid(DateTime.Now); }
-
打开反光板,看看。
-
观看一些C9video – 这一个显示如何“派生”select“组合器”
-
秘密是创buildAnonymousObservable,AnonymousObserver和AnonymousDisposable类,(这只是为了避免实例化接口的事实)。 他们包含零实现,因为你通过Actions和Funcs传入。
例如:
public class AnonymousObservable<T> : IObservable<T> { private Func<IObserver<T>, IDisposable> _subscribe; public AnonymousObservable(Func<IObserver<T>, IDisposable> subscribe) { _subscribe = subscribe; } public IDisposable Subscribe(IObserver<T> observer) { return _subscribe(observer); } }
我会让你解决其余的问题……这是一个非常好的理解练习。
有一个很好的小线程在这里增长与相关的问题。
关于这个实现只是一个评论:
在.net fw 4中引入并发集合之后,最好使用ConcurrentDictioary而不是简单的字典。
它节省了对集合的处理locking。
ADI公司。