任务sorting和re-entracy
我有以下情况,我认为这可能是相当普遍的:
-
有一个任务(一个UI命令处理程序)可以完成同步或asynchronous。
-
命令可能比他们正在处理更快到达。
-
如果某个命令已经有待处理的任务,那么新的命令处理程序任务应该排队并按顺序处理。
-
每个新任务的结果可能取决于前一个任务的结果。
取消应该遵守,但为了简单起见,我想把它放在这个问题的范围之外。 此外,线程安全性(并发性)不是必需的,但必须支持重入性。
下面是我想要实现的基本示例(作为控制台应用程序,为了简单起见):
using System; using System.Threading.Tasks; namespace ConsoleApp { class Program { static void Main(string[] args) { var asyncOp = new AsyncOp<int>(); Func<int, Task<int>> handleAsync = async (arg) => { Console.WriteLine("this task arg: " + arg); //await Task.Delay(arg); // make it async return await Task.FromResult(arg); // sync }; Console.WriteLine("Test #1..."); asyncOp.RunAsync(() => handleAsync(1000)); asyncOp.RunAsync(() => handleAsync(900)); asyncOp.RunAsync(() => handleAsync(800)); asyncOp.CurrentTask.Wait(); Console.WriteLine("\nPress any key to continue to test #2..."); Console.ReadLine(); asyncOp.RunAsync(() => { asyncOp.RunAsync(() => handleAsync(200)); return handleAsync(100); }); asyncOp.CurrentTask.Wait(); Console.WriteLine("\nPress any key to exit..."); Console.ReadLine(); } // AsyncOp class AsyncOp<T> { Task<T> _pending = Task.FromResult(default(T)); public Task<T> CurrentTask { get { return _pending; } } public Task<T> RunAsync(Func<Task<T>> handler) { var pending = _pending; Func<Task<T>> wrapper = async () => { // await the prev task var prevResult = await pending; Console.WriteLine("\nprev task result: " + prevResult); // start and await the handler return await handler(); }; _pending = wrapper(); return _pending; } } } }
输出:
testing#1 ... 上一个任务结果:0 这个任务arg:1000 上一个任务结果:1000 这个任务是900 上一个任务结果:900 这个任务arg:800 按任意键继续testing#2 ... 上一个任务结果:800 上一个任务结果:800 这个任务arg:200 这个任务arg:100 按任何一个键退出...
它按照要求工作,直到在testing#2中引入重入:
asyncOp.RunAsync(() => { asyncOp.RunAsync(() => handleAsync(200)); return handleAsync(100); });
期望的输出应该是200
,而不是200
,因为已经有一个等待100
的外挂任务了。 这显然是因为内部任务同步执行,打破了逻辑var pending = _pending; /* ... */ _pending = wrapper()
var pending = _pending; /* ... */ _pending = wrapper()
用于外部任务。
如何使它为testing#2工作呢?
一个解决scheme是通过Task.Factory.StartNew(..., TaskScheduler.FromCurrentSynchronizationContext()
强制每个任务的asynchronous执行。但是,我不希望在内部同步的命令处理程序上实施asynchronous执行。 ,我不想依赖于任何特定的同步上下文的行为(即依赖于Task.Factory.StartNew
应该在创build的任务实际开始之前返回)。
在真实的项目中,我负责上面的AsyncOp
,但是不能控制命令处理程序(即handleAsync
任何handleAsync
)。
我几乎忘记了可以手动构build一个Task
,而无需启动或安排Task
。 然后, “Task.Factory.StartNew”vs“新任务(…)。开始”让我回到正轨。 我认为这是Task<TResult>
构造函数实际上可能有用的less数情况之一,嵌套任务( Task<Task<T>>
)和Task.Unwrap()
:
// AsyncOp class AsyncOp<T> { Task<T> _pending = Task.FromResult(default(T)); public Task<T> CurrentTask { get { return _pending; } } public Task<T> RunAsync(Func<Task<T>> handler, bool useSynchronizationContext = false) { var pending = _pending; Func<Task<T>> wrapper = async () => { // await the prev task var prevResult = await pending; Console.WriteLine("\nprev task result: " + prevResult); // start and await the handler return await handler(); }; var task = new Task<Task<T>>(wrapper); var inner = task.Unwrap(); _pending = inner; task.RunSynchronously(useSynchronizationContext ? TaskScheduler.FromCurrentSynchronizationContext() : TaskScheduler.Current); return inner; } }
输出:
testing#1 ... 上一个任务结果:0 这个任务arg:1000 上一个任务结果:1000 这个任务是900 上一个任务结果:900 这个任务arg:800 按任意键继续testing#2 ... 上一个任务结果:800 这个任务arg:100 上一个任务结果:100 这个任务arg:200
如果需要的_pending
,通过添加一个lock
来保护_pending
,现在也可以非常容易地使AsyncOp
线程安全的。
更新 ,下面是这个模式的最新版本,它使用TaskCompletionSource
并且是线程安全的:
/// <summary> /// AsyncOperation /// By Noseratio - http://stackoverflow.com/a/21427264 /// </summary> /// <typeparam name="T">Task result type</typeparam> class AsyncOperation<T> { readonly object _lock = new Object(); Task<T> _currentTask = null; CancellationTokenSource _currentCts = null; // a client of this class (eg a ViewModel) has an option // to handle TaskSucceeded or TaskFailed, if needed public event EventHandler<TaskEventArgs> TaskSucceeded = null; public event EventHandler<TaskEventArgs> TaskFailing = null; public Task<T> CurrentTask { get { lock (_lock) return _currentTask; } } public bool IsCurrent(Task task) { lock (_lock) return task == _currentTask; } public bool IsPending { get { lock (_lock) return _currentTask != null && !_currentTask.IsCompleted; } } public bool IsCancellationRequested { get { lock (_lock) return _currentCts != null && _currentCts.IsCancellationRequested; } } public void Cancel() { lock (_lock) { if (_currentTask != null && !_currentTask.IsCompleted) _currentCts.Cancel(); } } /// <summary> /// Start the task routine and observe the result of the previous task routine /// </summary> /// <param name="routine"></param> /// <param name="token"></param> /// <param name="cancelPrevious"></param> /// <param name="throwImmediately"></param> public Task<T> StartAsync( Func<CancellationToken, Task<T>> routine, CancellationToken token, bool cancelPrevious = true, bool throwImmediately = true) { Task<T> previousTask = null; // pending instance CancellationTokenSource previousCts = null; // pending instance CTS CancellationTokenSource thisCts = CancellationTokenSource.CreateLinkedTokenSource(token); TaskCompletionSource<T> thisTcs = new TaskCompletionSource<T>(); // this task CancellationToken thisToken; // this task's cancellation Token Task<T> routineTask = null; // as returned by routine lock (_lock) { // remember the _currentTask as previousTask previousTask = _currentTask; previousCts = _currentCts; thisToken = thisCts.Token; // set the new _currentTask _currentTask = thisTcs.Task; _currentCts = thisCts; } Action startAsync = async () => { // because startAsync is "async void" method, // any exception not handled inside it // will be immediately thrown on the current synchronization context, // more details: http://stackoverflow.com/a/22395161/1768303 // run and await this task try { // await the previous task instance if (previousTask != null) { if (cancelPrevious) previousCts.Cancel(); try { await previousTask; } catch (OperationCanceledException) { // ignore previous cancellations } } thisToken.ThrowIfCancellationRequested(); routineTask = routine(thisToken); await routineTask; } catch (Exception ex) { // ignore cancellation if (ex is OperationCanceledException) { System.Diagnostics.Debug.Print("Task cancelled, id={0}", thisTcs.Task.Id); thisTcs.SetCanceled(); return; } // fire TaskFailing System.Diagnostics.Debug.Print("Task failing, id={0}", thisTcs.Task.Id); if (this.TaskFailing != null) { var args = new TaskEventArgs(thisTcs.Task, ex); this.TaskFailing(this, args); if (args.Handled) { // exception handled // make thisTcs cancelled rather than faulted thisTcs.SetCanceled(); return; } } // exception unhandled thisTcs.SetException(ex); if (throwImmediately) throw; // rethrow on the current synchronization context // exception should be observed via CurrentTask.Exception return; } // success, fire TaskSucceeded System.Diagnostics.Debug.Print("Task succeded, id={0}", thisTcs.Task.Id); thisTcs.SetResult(routineTask.Result); if (this.TaskSucceeded != null) this.TaskSucceeded(this, new TaskEventArgs(thisTcs.Task)); }; startAsync(); return thisTcs.Task; } // StartAsync with CancellationToken.None public Task<T> StartAsync( Func<CancellationToken, Task<T>> routine, bool cancelPrevious = true, bool throwImmediately = true) { return StartAsync(routine, CancellationToken.None, cancelPrevious: true, throwImmediately: true); } /// <summary> /// TaskEventArgs /// </summary> public class TaskEventArgs : EventArgs { public Task<T> Task { get; private set; } public Exception Exception { get; private set; } public bool Handled { get; set; } public TaskEventArgs(Task<T> task, Exception exception = null) { this.Task = task; this.Exception = exception; } } }