实现可重试块的正确完成
Teaser :伙计们,这个问题不是关于如何实施重试策略的。 这是关于正确完成TPL数据stream块。
这个问题主要是我以前的问题在ITargetBlock内重试策略的延续 。 这个问题的答案是@ svick利用TransformBlock
(source)和TransformManyBlock
(target)的智能解决scheme。 剩下的唯一问题就是以正确的方式完成这个块:等待所有的重试先完成,然后完成目标块。 这就是我最终的结果(这只是一个片段,不要太注意非线程安全的retries
集):
var retries = new HashSet<RetryingMessage<TInput>>(); TransformManyBlock<RetryableMessage<TInput>, TOutput> target = null; target = new TransformManyBlock<RetryableMessage<TInput>, TOutput>( async message => { try { var result = new[] { await transform(message.Data) }; retries.Remove(message); return result; } catch (Exception ex) { message.Exceptions.Add(ex); if (message.RetriesRemaining == 0) { if (failureHandler != null) failureHandler(message.Exceptions); retries.Remove(message); } else { retries.Add(message); message.RetriesRemaining--; Task.Delay(retryDelay) .ContinueWith(_ => target.Post(message)); } return null; } }, dataflowBlockOptions); source.LinkTo(target); source.Completion.ContinueWith(async _ => { while (target.InputCount > 0 || retries.Any()) await Task.Delay(100); target.Complete(); });
这个想法是执行某种轮询,并validation是否仍有消息等待处理,并且没有消息需要重试。 但在这个解决scheme,我不喜欢投票的想法。
是的,我可以将增加/删除重试的逻辑封装到一个单独的类中,甚至当重试集变空时执行一些操作,但是如何处理target.InputCount > 0
条件? 当没有挂起的消息时,没有这样的callback被调用,所以似乎validation一个具有小延迟的循环中的target.ItemCount
是唯一的select。
有没有人知道一个更聪明的方法来实现这一目标?
结合hwcverwe答案和JamieSee评论可能是理想的解决scheme。
首先,您需要创build多个事件:
var signal = new ManualResetEvent(false); var completedEvent = new ManualResetEvent(false);
然后,您必须创build一个观察者,并订阅TransformManyBlock
,以便在相关事件发生时通知您:
var observer = new RetryingBlockObserver<TOutput>(completedEvent); var observable = target.AsObservable(); observable.Subscribe(observer);
可观察性可能相当简单:
private class RetryingBlockObserver<T> : IObserver<T> { private ManualResetEvent completedEvent; public RetryingBlockObserver(ManualResetEvent completedEvent) { this.completedEvent = completedEvent; } public void OnCompleted() { completedEvent.Set(); } public void OnError(Exception error) { //TODO } public void OnNext(T value) { //TODO } }
你可以等待信号,或完成(所有源项目耗尽),或两者兼而有之
source.Completion.ContinueWith(async _ => { WaitHandle.WaitAll(completedEvent, signal); // Or WaitHandle.WaitAny, depending on your needs! target.Complete(); });
您可以检查WaitAll的结果值以了解设置了哪个事件,并作出相应的反应。 您也可以将其他事件添加到代码中,将它们传递给观察者,以便在需要时设置它们。 例如,您可以区分您的行为,并在发生错误时作出不同的响应
也许一个ManualResetEvent可以为你做的伎俩。
将公共属性添加到TransformManyBlock
private ManualResetEvent _signal = new ManualResetEvent(false); public ManualResetEvent Signal { get { return _signal; } }
在这里你去:
var retries = new HashSet<RetryingMessage<TInput>>(); TransformManyBlock<RetryableMessage<TInput>, TOutput> target = null; target = new TransformManyBlock<RetryableMessage<TInput>, TOutput>( async message => { try { var result = new[] { await transform(message.Data) }; retries.Remove(message); // Sets the state of the event to signaled, allowing one or more waiting threads to proceed if(!retries.Any()) Signal.Set(); return result; } catch (Exception ex) { message.Exceptions.Add(ex); if (message.RetriesRemaining == 0) { if (failureHandler != null) failureHandler(message.Exceptions); retries.Remove(message); // Sets the state of the event to signaled, allowing one or more waiting threads to proceed if(!retries.Any()) Signal.Set(); } else { retries.Add(message); message.RetriesRemaining--; Task.Delay(retryDelay) .ContinueWith(_ => target.Post(message)); } return null; } }, dataflowBlockOptions); source.LinkTo(target); source.Completion.ContinueWith(async _ => { //Blocks the current thread until the current WaitHandle receives a signal. target.Signal.WaitOne(); target.Complete(); });
我不知道你的target.InputCount
设置。 所以在你改变target.InputCount
的地方你可以添加下面的代码:
if(InputCount == 0) Signal.Set();