我怎样才能防止任务同步延续?

我有一些库(套接字networking)代码,基于TaskCompletionSource<T>提供了一个基于Task的API来处理对请求的响应。 然而,TPL的一个烦恼是,似乎不可能防止同步延续。 我希望能够做的是:

  • 告诉一个TaskCompletionSource<T> ,它不应该允许调用者使用TaskContinuationOptions.ExecuteSynchronously来附加
  • 设置结果( SetResult / TrySetResult )的方式,指定应忽略TaskContinuationOptions.ExecuteSynchronously ,使用池而不是

具体来说,我遇到的问题是传入的数据正在由专门的阅读器处理,如果调用者可以附加TaskContinuationOptions.ExecuteSynchronously他们可以停止阅读器(这不仅影响他们)。 以前,我曾经通过一些骇客来解决这个问题,它检测是否存在任何延续,如果是,则将完成推送到ThreadPool ,但是如果调用者已经饱和他们的工作队列,则会产生显着的影响,因为完成不会得到及时处理。 如果他们使用Task.Wait() (或类似的),他们将基本上自己死锁。 同样,这就是为什么读者是在一个专门的线程,而不是使用工人。

所以; 在我尝试唠叨TPL团队之前,我是否错过了一个选项?

关键点:

  • 我不希望外部呼叫者能够劫持我的线程
  • 我不能使用ThreadPool作为实现,因为它需要在池饱和时工作

下面的例子产生输出(sorting可能因时间而异):

 Continuation on: Main thread Press [return] Continuation on: Thread pool 

问题是一个随机的调用者设法在“主线程”上得到延续。 在真实的代码中,这会中断初级读者; 坏事!

码:

 using System; using System.Threading; using System.Threading.Tasks; static class Program { static void Identify() { var thread = Thread.CurrentThread; string name = thread.IsThreadPoolThread ? "Thread pool" : thread.Name; if (string.IsNullOrEmpty(name)) name = "#" + thread.ManagedThreadId; Console.WriteLine("Continuation on: " + name); } static void Main() { Thread.CurrentThread.Name = "Main thread"; var source = new TaskCompletionSource<int>(); var task = source.Task; task.ContinueWith(delegate { Identify(); }); task.ContinueWith(delegate { Identify(); }, TaskContinuationOptions.ExecuteSynchronously); source.TrySetResult(123); Console.WriteLine("Press [return]"); Console.ReadLine(); } } 

.NET 4.6新增function:

.NET 4.6包含一个新的TaskCreationOptionsRunContinuationsAsynchronously


既然你愿意使用reflection访问私人领域…

您可以使用TASK_STATE_THREAD_WAS_ABORTED标记来标记TCS的任务,这将导致所有延续不被内联。

 const int TASK_STATE_THREAD_WAS_ABORTED = 134217728; var stateField = typeof(Task).GetField("m_stateFlags", BindingFlags.NonPublic | BindingFlags.Instance); stateField.SetValue(task, (int) stateField.GetValue(task) | TASK_STATE_THREAD_WAS_ABORTED); 

编辑:

而不是使用reflection发射,我build议你使用expression式。 这是更可读性和PCL兼容的优势:

 var taskParameter = Expression.Parameter(typeof (Task)); const string stateFlagsFieldName = "m_stateFlags"; var setter = Expression.Lambda<Action<Task>>( Expression.Assign(Expression.Field(taskParameter, stateFlagsFieldName), Expression.Or(Expression.Field(taskParameter, stateFlagsFieldName), Expression.Constant(TASK_STATE_THREAD_WAS_ABORTED))), taskParameter).Compile(); 

没有使用reflection:

如果任何人有兴趣,我已经find了一个没有reflection的方法,但它有点“脏”,当然还有一个不可忽视的性能损失:

 try { Thread.CurrentThread.Abort(); } catch (ThreadAbortException) { source.TrySetResult(123); Thread.ResetAbort(); } 

我不认为TPL中有什么可以通过TaskCompletionSource.SetResult延续提供显式的 API控制。 我决定保留我的初始答案来控制async/await场景的这种行为。

如果tcs.SetResult触发的延续发生在调用SetResult的同一个线程上,则这是另一种在ContinueWith上强加asynchronous的解决scheme:

 public static class TaskExt { static readonly ConcurrentDictionary<Task, Thread> s_tcsTasks = new ConcurrentDictionary<Task, Thread>(); // SetResultAsync static public void SetResultAsync<TResult>( this TaskCompletionSource<TResult> @this, TResult result) { s_tcsTasks.TryAdd(@this.Task, Thread.CurrentThread); try { @this.SetResult(result); } finally { Thread thread; s_tcsTasks.TryRemove(@this.Task, out thread); } } // ContinueWithAsync, TODO: more overrides static public Task ContinueWithAsync<TResult>( this Task<TResult> @this, Action<Task<TResult>> action, TaskContinuationOptions continuationOptions = TaskContinuationOptions.None) { return @this.ContinueWith((Func<Task<TResult>, Task>)(t => { Thread thread = null; s_tcsTasks.TryGetValue(t, out thread); if (Thread.CurrentThread == thread) { // same thread which called SetResultAsync, avoid potential deadlocks // using thread pool return Task.Run(() => action(t)); // not using thread pool (TaskCreationOptions.LongRunning creates a normal thread) // return Task.Factory.StartNew(() => action(t), TaskCreationOptions.LongRunning); } else { // continue on the same thread var task = new Task(() => action(t)); task.RunSynchronously(); return Task.FromResult(task); } }), continuationOptions).Unwrap(); } } 

更新以解决评论:

我不控制呼叫者 – 我不能让他们使用一个特定的继续变式:如果我可以,问题不会存在在第一位

我不知道你不能控制来电者。 不过,如果你不控制它,你可能不直接传递TaskCompletionSource对象给调用者。 从逻辑上讲,你会传递令牌的一部分,即tcs.Task 。 在这种情况下,通过在上面添加另一种扩展方法,解决scheme可能会更容易:

 // ImposeAsync, TODO: more overrides static public Task<TResult> ImposeAsync<TResult>(this Task<TResult> @this) { return @this.ContinueWith(new Func<Task<TResult>, Task<TResult>>(antecedent => { Thread thread = null; s_tcsTasks.TryGetValue(antecedent, out thread); if (Thread.CurrentThread == thread) { // continue on a pool thread return antecedent.ContinueWith(t => t, TaskContinuationOptions.None).Unwrap(); } else { return antecedent; } }), TaskContinuationOptions.ExecuteSynchronously).Unwrap(); } 

使用:

 // library code var source = new TaskCompletionSource<int>(); var task = source.Task.ImposeAsync(); // ... // client code task.ContinueWith(delegate { Identify(); }, TaskContinuationOptions.ExecuteSynchronously); // ... // library code source.SetResultAsync(123); 

这实际上适用于awaitContinueWith小提琴 ),并没有反省黑客。

怎么样,而不是做

 var task = source.Task; 

你这样做

 var task = source.Task.ContinueWith<Int32>( x => x.Result ); 

因此,你总是添加一个将被asynchronous执行的延续,然后不pipe用户是否想要在相同的上下文中继续。 这是一种curl的任务,不是吗?

如果你可以并且准备好使用反思,那么应该这样做;

 public static class MakeItAsync { static public void TrySetAsync<T>(this TaskCompletionSource<T> source, T result) { var continuation = typeof(Task).GetField("m_continuationObject", BindingFlags.NonPublic | BindingFlags.GetField | BindingFlags.Instance); var continuations = (List<object>)continuation.GetValue(source.Task); foreach (object c in continuations) { var option = c.GetType().GetField("m_options", BindingFlags.NonPublic | BindingFlags.GetField | BindingFlags.Instance); var options = (TaskContinuationOptions)option.GetValue(c); options &= ~TaskContinuationOptions.ExecuteSynchronously; option.SetValue(c, options); } source.TrySetResult(result); } } 

更新后 ,我发布了一个单独的答案来处理ContinueWith而不是await (因为ContinueWith不关心当前的同步上下文)。

您可以使用哑同步上下文在通过调用SetResult/SetCancelled/SetException上的SetResult/SetCancelled/SetException触发的延续时强加asynchronous。 我相信当前的同步上下文(在await tcs.Task )是TPL用来决定是否使这样的继续同步或asynchronous的标准。

以下为我工作:

 if (notifyAsync) { tcs.SetResultAsync(null); } else { tcs.SetResult(null); } 

SetResultAsync是这样实现的:

 public static class TaskExt { static public void SetResultAsync<T>(this TaskCompletionSource<T> tcs, T result) { FakeSynchronizationContext.Execute(() => tcs.SetResult(result)); } // FakeSynchronizationContext class FakeSynchronizationContext : SynchronizationContext { private static readonly ThreadLocal<FakeSynchronizationContext> s_context = new ThreadLocal<FakeSynchronizationContext>(() => new FakeSynchronizationContext()); private FakeSynchronizationContext() { } public static FakeSynchronizationContext Instance { get { return s_context.Value; } } public static void Execute(Action action) { var savedContext = SynchronizationContext.Current; SynchronizationContext.SetSynchronizationContext(FakeSynchronizationContext.Instance); try { action(); } finally { SynchronizationContext.SetSynchronizationContext(savedContext); } } // SynchronizationContext methods public override SynchronizationContext CreateCopy() { return this; } public override void OperationStarted() { throw new NotImplementedException("OperationStarted"); } public override void OperationCompleted() { throw new NotImplementedException("OperationCompleted"); } public override void Post(SendOrPostCallback d, object state) { throw new NotImplementedException("Post"); } public override void Send(SendOrPostCallback d, object state) { throw new NotImplementedException("Send"); } } } 

SynchronizationContext.SetSynchronizationContext在增加的开销方面非常便宜 。 实际上, WPF Dispatcher.BeginInvoke的实现采取了非常类似的方法。

TPL比较await点处的目标同步上下文与tcs.SetResult点的目标同步上下文。 如果同步上下文相同(或者在两个地方都没有同步上下文),则同步直接调用延续。 否则,它在目标同步上下文中使用SynchronizationContext.Post进行排队,即正常的await行为。 这种方法的作用是始终强加SynchronizationContext.Post行为(如果没有目标同步上下文,则继续使用池线程)。

更新后 ,这将不适用于task.ContinueWith ,因为ContinueWith不关心当前的同步上下文。 它然而工作await task ( 小提琴 )。 它也可以用于await task.ConfigureAwait(false)

OTOH, 这种方法适用于ContinueWith

模拟中止方法看起来非常好,但导致TPL 在某些情况下劫持线程。

然后我有一个类似于检查继续对象的实现 ,但只是检查任何延续,因为实际上给定的代码实际上有太多的场景工作,但是这意味着甚至像Task.Wait这样的Task.Wait导致线程 -池查找。

最终,在检查大量的IL之后,唯一安全且有用的场景是SetOnInvokeMres场景(手动重置事件SetOnInvokeMres延续)。 还有很多其他的情况:

  • 有些不安全,并导致线程劫持
  • 其余的都没有用,因为它们最终会导致线程池

所以最后我select了检查一个非空的连续对象; 如果是空的,罚款(不延续); 如果它是非空的,则对SetOnInvokeMres特殊检查 – 如果是:fine(可以调用); 否则,让线程池执行TrySetComplete ,而不告诉任务做任何特别的,如欺骗中止。 Task.Wait使用SetOnInvokeMres方法,这是我们想要尝试的特定场景, 非常难以避免死锁。

 Type taskType = typeof(Task); FieldInfo continuationField = taskType.GetField("m_continuationObject", BindingFlags.Instance | BindingFlags.NonPublic); Type safeScenario = taskType.GetNestedType("SetOnInvokeMres", BindingFlags.NonPublic); if (continuationField != null && continuationField.FieldType == typeof(object) && safeScenario != null) { var method = new DynamicMethod("IsSyncSafe", typeof(bool), new[] { typeof(Task) }, typeof(Task), true); var il = method.GetILGenerator(); var hasContinuation = il.DefineLabel(); il.Emit(OpCodes.Ldarg_0); il.Emit(OpCodes.Ldfld, continuationField); Label nonNull = il.DefineLabel(), goodReturn = il.DefineLabel(); // check if null il.Emit(OpCodes.Brtrue_S, nonNull); il.MarkLabel(goodReturn); il.Emit(OpCodes.Ldc_I4_1); il.Emit(OpCodes.Ret); // check if is a SetOnInvokeMres - if so, we're OK il.MarkLabel(nonNull); il.Emit(OpCodes.Ldarg_0); il.Emit(OpCodes.Ldfld, continuationField); il.Emit(OpCodes.Isinst, safeScenario); il.Emit(OpCodes.Brtrue_S, goodReturn); il.Emit(OpCodes.Ldc_I4_0); il.Emit(OpCodes.Ret); IsSyncSafe = (Func<Task, bool>)method.CreateDelegate(typeof(Func<Task, bool>));