WaitHandle.WaitAll 64句柄限制的解决方法?

我的应用程序通过ThreadPool.QueueUserWorkItem产生不同的小工作者线程的负载,我通过多个ManualResetEvent实例跟踪。 我使用WaitHandle.WaitAll方法阻止我的应用程序closures,直到这些线程完成。


WaitHandles must be less than or equal to 64 - missing documentation



 List<AutoResetEvent> events = new List<AutoResetEvent>(); // multiple instances of... var evt = new AutoResetEvent(false); events.Add(evt); ThreadPool.QueueUserWorkItem(delegate { // do work evt.Set(); }); ... WaitHandle.WaitAll(events.ToArray()); 


 int threadCount = 0; ManualResetEvent finished = new ManualResetEvent(false); ... Interlocked.Increment(ref threadCount); ThreadPool.QueueUserWorkItem(delegate { try { // do work } finally { if (Interlocked.Decrement(ref threadCount) == 0) { finished.Set(); } } }); ... finished.WaitOne(); 


 int numberOfTasks = 100; 


 ManualResetEvent signal = new ManualResetEvent(false); 


 if (Interlocked.Decrement(ref numberOftasks) == 0) { 


  signal.Set(); } 



从.NET 4.0开始,您可以使用另外两个(和IMO,更干净的)选项。

首先是使用CountdownEvent类 。 它可以防止需要自行处理递增和递减:

 int tasks = <however many tasks you're performing>; // Dispose when done. using (var e = new CountdownEvent(tasks)) { // Queue work. ThreadPool.QueueUserWorkItem(() => { // Do work ... // Signal when done. e.Signal(); }); // Wait till the countdown reaches zero. e.Wait(); } 

但是,还有一个更强大的解决scheme,就是使用Task类 ,如下所示:

 // The source of your work items, create a sequence of Task instances. Task[] tasks = Enumerable.Range(0, 100).Select(i => // Create task here. Task.Factory.StartNew(() => { // Do work. } // No signalling, no anything. ).ToArray(); // Wait on all the tasks. Task.WaitAll(tasks); 

使用Task类和对WaitAll的调用要WaitAll得多,IMO,因为你在整个代码中编织的线程原语较less(注意,没有等待句柄)。 您不必设置计数器,处理递增/递减,您只需设置您的任务,然后等待。 这可以让代码在你想做什么的时候更具performance力,而不是如何 (至less在pipe理它的并行化方面)的基本原理

.NET 4.5提供了更多选项,您可以通过调用Task类上的静态Run方法来简化Task实例序列的生成:

 // The source of your work items, create a sequence of Task instances. Task[] tasks = Enumerable.Range(0, 100).Select(i => // Create task here. Task.Run(() => { // Do work. }) // No signalling, no anything. ).ToArray(); // Wait on all the tasks. Tasks.WaitAll(tasks); 

或者,您可以利用TPL DataFlow库 (它位于System命名空间中,所以它是官方的,即使它是从NuGet下载的,如entity framework),并使用ActionBlock<TInput> ,如下所示:

 // Create the action block. Since there's not a non-generic // version, make it object, and pass null to signal, or // make T the type that takes the input to the action // and pass that. var actionBlock = new ActionBlock<object>(o => { // Do work. }); // Post 100 times. foreach (int i in Enumerable.Range(0, 100)) actionBlock.Post(null); // Signal complete, this doesn't actually stop // the block, but says that everything is done when the currently // posted items are completed. actionBlock.Complete(); // Wait for everything to complete, the Completion property // exposes a Task which can be waited on. actionBlock.Completion.Wait(); 

请注意, ActionBlock<TInput>默认一次处理一个项目,所以如果你想让它一次处理多个动作,你必须通过传递一个ExecutionDataflowBlockOptions来设置你想要在构造器中处理的并发项目的数量实例并设置MaxDegreeOfParallelism属性 :

 var actionBlock = new ActionBlock<object>(o => { // Do work. }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 }); 


 var actionBlock = new ActionBlock<object>(o => { // Do work. }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataFlowBlockOptions.Unbounded }); 


当然,如果你有一系列想要传入你的ActionBlock<TInput>实例的项目,那么你可以链接一个ISourceBlock<TOutput>实现来提供ActionBlock<TInput> ,如下所示:

 // The buffer block. var buffer = new BufferBlock<int>(); // Create the action block. Since there's not a non-generic // version, make it object, and pass null to signal, or // make T the type that takes the input to the action // and pass that. var actionBlock = new ActionBlock<int>(o => { // Do work. }); // Link the action block to the buffer block. // NOTE: An IDisposable is returned here, you might want to dispose // of it, although not totally necessary if everything works, but // still, good housekeeping. using (link = buffer.LinkTo(actionBlock, // Want to propagate completion state to the action block. new DataflowLinkOptions { PropagateCompletion = true, }, // Can filter on items flowing through if you want. i => true) { // Post 100 times to the *buffer* foreach (int i in Enumerable.Range(0, 100)) buffer.Post(i); // Signal complete, this doesn't actually stop // the block, but says that everything is done when the currently // posted items are completed. actionBlock.Complete(); // Wait for everything to complete, the Completion property // exposes a Task which can be waited on. actionBlock.Completion.Wait(); } 


您的解决方法是不正确的。 原因是SetWaitOne可能会竞争,如果最后一个工作项导致threadCount在排队线程必须排队所有工作项之前归零。 修复很简单。 把你的排队线程看作是一个工作项目本身。 初始化threadCount为1,并在排队完成时进行递减和信号。

 int threadCount = 1; ManualResetEvent finished = new ManualResetEvent(false); ... Interlocked.Increment(ref threadCount); ThreadPool.QueueUserWorkItem(delegate { try { // do work } finally { if (Interlocked.Decrement(ref threadCount) == 0) { finished.Set(); } } }); ... if (Interlocked.Decrement(ref threadCount) == 0) { finished.Set(); } finished.WaitOne(); 


 var finished = new CountdownEvent(1); ... finished.AddCount(); ThreadPool.QueueUserWorkItem(delegate { try { // do work } finally { finished.Signal(); } }); ... finished.Signal(); finished.Wait(); 


 public class Countdown : IDisposable { private readonly ManualResetEvent done; private readonly int total; private long current; public Countdown(int total) { this.total = total; current = total; done = new ManualResetEvent(false); } public void Signal() { if (Interlocked.Decrement(ref current) == 0) { done.Set(); } } public void Wait() { done.WaitOne(); } public void Dispose() { ((IDisposable)done).Dispose(); } } 


 using System; using System.Runtime.Remoting.Messaging; using System.Threading; class Program { static void Main(string[] args) { Main m = new Main(); m.TestMRE(); Console.ReadKey(); } } class Main { CalHandler handler = new CalHandler(); int numberofTasks =0; public void TestMRE() { for (int j = 0; j <= 3; j++) { Console.WriteLine("Outer Loop is :" + j.ToString()); ManualResetEvent signal = new ManualResetEvent(false); numberofTasks = 4; for (int i = 0; i <= 3; i++) { CalHandler.count caller = new CalHandler.count(handler.messageHandler); caller.BeginInvoke(i, new AsyncCallback(NumberCallback),signal); } signal.WaitOne(); } } private void NumberCallback(IAsyncResult result) { AsyncResult asyncResult = (AsyncResult)result; CalHandler.count caller = (CalHandler.count)asyncResult.AsyncDelegate; int num = caller.EndInvoke(asyncResult); Console.WriteLine("Number is :"+ num.ToString()); ManualResetEvent mre = (ManualResetEvent)asyncResult.AsyncState; if (Interlocked.Decrement(ref numberofTasks) == 0) { mre.Set(); } } } public class CalHandler { public delegate int count(int number); public int messageHandler ( int number ) { return number; } } 
 protected void WaitAllExt(WaitHandle[] waitHandles) { //workaround for limitation of WaitHandle.WaitAll by <=64 wait handles const int waitAllArrayLimit = 64; var prevEndInd = -1; while (prevEndInd < waitHandles.Length - 1) { var stInd = prevEndInd + 1; var eInd = stInd + waitAllArrayLimit - 1; if (eInd > waitHandles.Length - 1) { eInd = waitHandles.Length - 1; } prevEndInd = eInd; //do wait var whSubarray = waitHandles.Skip(stInd).Take(eInd - stInd + 1).ToArray(); WaitHandle.WaitAll(whSubarray); } } 

我只是通过分类事件的数量来解决这个问题,而没有太多的性能损失,而且它在生产环境中完美地工作。 遵循代码:

  var events = new List<ManualResetEvent>(); // code omited var newEvent = new ManualResetEvent(false); events.Add(newEvent); ThreadPool.QueueUserWorkItem(c => { //thread code newEvent.Set(); }); // code omited var wait = true; while (wait) { WaitHandle.WaitAll(events.Take(60).ToArray()); events.RemoveRange(0, events.Count > 59 ? 60 : events.Count); wait = events.Any(); } 

Windows XP SP3支持最多两个WaitHandles。 对于超过2个WaitHandles应用程序过早终止的情况。

Interesting Posts