我怎样才能防止任务同步延续?
我有一些库(套接字networking)代码,基于TaskCompletionSource<T>
提供了一个基于Task
的API来处理对请求的响应。 然而,TPL的一个烦恼是,似乎不可能防止同步延续。 我希望能够做的是:
- 告诉一个
TaskCompletionSource<T>
,它不应该允许调用者使用TaskContinuationOptions.ExecuteSynchronously
来附加 - 设置结果(
SetResult
/TrySetResult
)的方式,指定应忽略TaskContinuationOptions.ExecuteSynchronously
,使用池而不是
具体来说,我遇到的问题是传入的数据正在由专门的阅读器处理,如果调用者可以附加TaskContinuationOptions.ExecuteSynchronously
他们可以停止阅读器(这不仅影响他们)。 以前,我曾经通过一些骇客来解决这个问题,它检测是否存在任何延续,如果是,则将完成推送到ThreadPool
,但是如果调用者已经饱和他们的工作队列,则会产生显着的影响,因为完成不会得到及时处理。 如果他们使用Task.Wait()
(或类似的),他们将基本上自己死锁。 同样,这就是为什么读者是在一个专门的线程,而不是使用工人。
所以; 在我尝试唠叨TPL团队之前,我是否错过了一个选项?
关键点:
- 我不希望外部呼叫者能够劫持我的线程
- 我不能使用
ThreadPool
作为实现,因为它需要在池饱和时工作
下面的例子产生输出(sorting可能因时间而异):
Continuation on: Main thread Press [return] Continuation on: Thread pool
问题是一个随机的调用者设法在“主线程”上得到延续。 在真实的代码中,这会中断初级读者; 坏事!
码:
using System; using System.Threading; using System.Threading.Tasks; static class Program { static void Identify() { var thread = Thread.CurrentThread; string name = thread.IsThreadPoolThread ? "Thread pool" : thread.Name; if (string.IsNullOrEmpty(name)) name = "#" + thread.ManagedThreadId; Console.WriteLine("Continuation on: " + name); } static void Main() { Thread.CurrentThread.Name = "Main thread"; var source = new TaskCompletionSource<int>(); var task = source.Task; task.ContinueWith(delegate { Identify(); }); task.ContinueWith(delegate { Identify(); }, TaskContinuationOptions.ExecuteSynchronously); source.TrySetResult(123); Console.WriteLine("Press [return]"); Console.ReadLine(); } }
.NET 4.6新增function:
.NET 4.6包含一个新的TaskCreationOptions
: RunContinuationsAsynchronously
。
既然你愿意使用reflection访问私人领域…
您可以使用TASK_STATE_THREAD_WAS_ABORTED
标记来标记TCS的任务,这将导致所有延续不被内联。
const int TASK_STATE_THREAD_WAS_ABORTED = 134217728; var stateField = typeof(Task).GetField("m_stateFlags", BindingFlags.NonPublic | BindingFlags.Instance); stateField.SetValue(task, (int) stateField.GetValue(task) | TASK_STATE_THREAD_WAS_ABORTED);
编辑:
而不是使用reflection发射,我build议你使用expression式。 这是更可读性和PCL兼容的优势:
var taskParameter = Expression.Parameter(typeof (Task)); const string stateFlagsFieldName = "m_stateFlags"; var setter = Expression.Lambda<Action<Task>>( Expression.Assign(Expression.Field(taskParameter, stateFlagsFieldName), Expression.Or(Expression.Field(taskParameter, stateFlagsFieldName), Expression.Constant(TASK_STATE_THREAD_WAS_ABORTED))), taskParameter).Compile();
没有使用reflection:
如果任何人有兴趣,我已经find了一个没有reflection的方法,但它有点“脏”,当然还有一个不可忽视的性能损失:
try { Thread.CurrentThread.Abort(); } catch (ThreadAbortException) { source.TrySetResult(123); Thread.ResetAbort(); }
我不认为TPL中有什么可以通过TaskCompletionSource.SetResult
延续提供显式的 API控制。 我决定保留我的初始答案来控制async/await
场景的这种行为。
如果tcs.SetResult
触发的延续发生在调用SetResult
的同一个线程上,则这是另一种在ContinueWith
上强加asynchronous的解决scheme:
public static class TaskExt { static readonly ConcurrentDictionary<Task, Thread> s_tcsTasks = new ConcurrentDictionary<Task, Thread>(); // SetResultAsync static public void SetResultAsync<TResult>( this TaskCompletionSource<TResult> @this, TResult result) { s_tcsTasks.TryAdd(@this.Task, Thread.CurrentThread); try { @this.SetResult(result); } finally { Thread thread; s_tcsTasks.TryRemove(@this.Task, out thread); } } // ContinueWithAsync, TODO: more overrides static public Task ContinueWithAsync<TResult>( this Task<TResult> @this, Action<Task<TResult>> action, TaskContinuationOptions continuationOptions = TaskContinuationOptions.None) { return @this.ContinueWith((Func<Task<TResult>, Task>)(t => { Thread thread = null; s_tcsTasks.TryGetValue(t, out thread); if (Thread.CurrentThread == thread) { // same thread which called SetResultAsync, avoid potential deadlocks // using thread pool return Task.Run(() => action(t)); // not using thread pool (TaskCreationOptions.LongRunning creates a normal thread) // return Task.Factory.StartNew(() => action(t), TaskCreationOptions.LongRunning); } else { // continue on the same thread var task = new Task(() => action(t)); task.RunSynchronously(); return Task.FromResult(task); } }), continuationOptions).Unwrap(); } }
更新以解决评论:
我不控制呼叫者 – 我不能让他们使用一个特定的继续变式:如果我可以,问题不会存在在第一位
我不知道你不能控制来电者。 不过,如果你不控制它,你可能不直接传递TaskCompletionSource
对象给调用者。 从逻辑上讲,你会传递令牌的一部分,即tcs.Task
。 在这种情况下,通过在上面添加另一种扩展方法,解决scheme可能会更容易:
// ImposeAsync, TODO: more overrides static public Task<TResult> ImposeAsync<TResult>(this Task<TResult> @this) { return @this.ContinueWith(new Func<Task<TResult>, Task<TResult>>(antecedent => { Thread thread = null; s_tcsTasks.TryGetValue(antecedent, out thread); if (Thread.CurrentThread == thread) { // continue on a pool thread return antecedent.ContinueWith(t => t, TaskContinuationOptions.None).Unwrap(); } else { return antecedent; } }), TaskContinuationOptions.ExecuteSynchronously).Unwrap(); }
使用:
// library code var source = new TaskCompletionSource<int>(); var task = source.Task.ImposeAsync(); // ... // client code task.ContinueWith(delegate { Identify(); }, TaskContinuationOptions.ExecuteSynchronously); // ... // library code source.SetResultAsync(123);
这实际上适用于await
和ContinueWith
( 小提琴 ),并没有反省黑客。
怎么样,而不是做
var task = source.Task;
你这样做
var task = source.Task.ContinueWith<Int32>( x => x.Result );
因此,你总是添加一个将被asynchronous执行的延续,然后不pipe用户是否想要在相同的上下文中继续。 这是一种curl的任务,不是吗?
如果你可以并且准备好使用反思,那么应该这样做;
public static class MakeItAsync { static public void TrySetAsync<T>(this TaskCompletionSource<T> source, T result) { var continuation = typeof(Task).GetField("m_continuationObject", BindingFlags.NonPublic | BindingFlags.GetField | BindingFlags.Instance); var continuations = (List<object>)continuation.GetValue(source.Task); foreach (object c in continuations) { var option = c.GetType().GetField("m_options", BindingFlags.NonPublic | BindingFlags.GetField | BindingFlags.Instance); var options = (TaskContinuationOptions)option.GetValue(c); options &= ~TaskContinuationOptions.ExecuteSynchronously; option.SetValue(c, options); } source.TrySetResult(result); } }
更新后 ,我发布了一个单独的答案来处理ContinueWith
而不是await
(因为ContinueWith
不关心当前的同步上下文)。
您可以使用哑同步上下文在通过调用SetResult/SetCancelled/SetException
上的SetResult/SetCancelled/SetException
触发的延续时强加asynchronous。 我相信当前的同步上下文(在await tcs.Task
)是TPL用来决定是否使这样的继续同步或asynchronous的标准。
以下为我工作:
if (notifyAsync) { tcs.SetResultAsync(null); } else { tcs.SetResult(null); }
SetResultAsync
是这样实现的:
public static class TaskExt { static public void SetResultAsync<T>(this TaskCompletionSource<T> tcs, T result) { FakeSynchronizationContext.Execute(() => tcs.SetResult(result)); } // FakeSynchronizationContext class FakeSynchronizationContext : SynchronizationContext { private static readonly ThreadLocal<FakeSynchronizationContext> s_context = new ThreadLocal<FakeSynchronizationContext>(() => new FakeSynchronizationContext()); private FakeSynchronizationContext() { } public static FakeSynchronizationContext Instance { get { return s_context.Value; } } public static void Execute(Action action) { var savedContext = SynchronizationContext.Current; SynchronizationContext.SetSynchronizationContext(FakeSynchronizationContext.Instance); try { action(); } finally { SynchronizationContext.SetSynchronizationContext(savedContext); } } // SynchronizationContext methods public override SynchronizationContext CreateCopy() { return this; } public override void OperationStarted() { throw new NotImplementedException("OperationStarted"); } public override void OperationCompleted() { throw new NotImplementedException("OperationCompleted"); } public override void Post(SendOrPostCallback d, object state) { throw new NotImplementedException("Post"); } public override void Send(SendOrPostCallback d, object state) { throw new NotImplementedException("Send"); } } }
SynchronizationContext.SetSynchronizationContext
在增加的开销方面非常便宜 。 实际上, WPF Dispatcher.BeginInvoke
的实现采取了非常类似的方法。
TPL比较await
点处的目标同步上下文与tcs.SetResult
点的目标同步上下文。 如果同步上下文相同(或者在两个地方都没有同步上下文),则同步直接调用延续。 否则,它在目标同步上下文中使用SynchronizationContext.Post
进行排队,即正常的await
行为。 这种方法的作用是始终强加SynchronizationContext.Post
行为(如果没有目标同步上下文,则继续使用池线程)。
更新后 ,这将不适用于task.ContinueWith
,因为ContinueWith
不关心当前的同步上下文。 它然而工作await task
( 小提琴 )。 它也可以用于await task.ConfigureAwait(false)
。
OTOH, 这种方法适用于ContinueWith
。
模拟中止方法看起来非常好,但导致TPL 在某些情况下劫持线程。
然后我有一个类似于检查继续对象的实现 ,但只是检查任何延续,因为实际上给定的代码实际上有太多的场景工作,但是这意味着甚至像Task.Wait
这样的Task.Wait
导致线程 -池查找。
最终,在检查大量的IL之后,唯一安全且有用的场景是SetOnInvokeMres
场景(手动重置事件SetOnInvokeMres
延续)。 还有很多其他的情况:
- 有些不安全,并导致线程劫持
- 其余的都没有用,因为它们最终会导致线程池
所以最后我select了检查一个非空的连续对象; 如果是空的,罚款(不延续); 如果它是非空的,则对SetOnInvokeMres
特殊检查 – 如果是:fine(可以调用); 否则,让线程池执行TrySetComplete
,而不告诉任务做任何特别的,如欺骗中止。 Task.Wait
使用SetOnInvokeMres
方法,这是我们想要尝试的特定场景, 非常难以避免死锁。
Type taskType = typeof(Task); FieldInfo continuationField = taskType.GetField("m_continuationObject", BindingFlags.Instance | BindingFlags.NonPublic); Type safeScenario = taskType.GetNestedType("SetOnInvokeMres", BindingFlags.NonPublic); if (continuationField != null && continuationField.FieldType == typeof(object) && safeScenario != null) { var method = new DynamicMethod("IsSyncSafe", typeof(bool), new[] { typeof(Task) }, typeof(Task), true); var il = method.GetILGenerator(); var hasContinuation = il.DefineLabel(); il.Emit(OpCodes.Ldarg_0); il.Emit(OpCodes.Ldfld, continuationField); Label nonNull = il.DefineLabel(), goodReturn = il.DefineLabel(); // check if null il.Emit(OpCodes.Brtrue_S, nonNull); il.MarkLabel(goodReturn); il.Emit(OpCodes.Ldc_I4_1); il.Emit(OpCodes.Ret); // check if is a SetOnInvokeMres - if so, we're OK il.MarkLabel(nonNull); il.Emit(OpCodes.Ldarg_0); il.Emit(OpCodes.Ldfld, continuationField); il.Emit(OpCodes.Isinst, safeScenario); il.Emit(OpCodes.Brtrue_S, goodReturn); il.Emit(OpCodes.Ldc_I4_0); il.Emit(OpCodes.Ret); IsSyncSafe = (Func<Task, bool>)method.CreateDelegate(typeof(Func<Task, bool>));