WaitHandle.WaitAll 64句柄限制的解决方法?
我的应用程序通过ThreadPool.QueueUserWorkItem
产生不同的小工作者线程的负载,我通过多个ManualResetEvent
实例跟踪。 我使用WaitHandle.WaitAll
方法阻止我的应用程序closures,直到这些线程完成。
我从来没有任何问题,但是,因为我的应用程序正在更多的负载,即更多的线程被创build,我现在开始得到这个exception:
WaitHandles must be less than or equal to 64 - missing documentation
什么是最好的替代解决scheme呢?
代码片段
List<AutoResetEvent> events = new List<AutoResetEvent>(); // multiple instances of... var evt = new AutoResetEvent(false); events.Add(evt); ThreadPool.QueueUserWorkItem(delegate { // do work evt.Set(); }); ... WaitHandle.WaitAll(events.ToArray());
解决方法
int threadCount = 0; ManualResetEvent finished = new ManualResetEvent(false); ... Interlocked.Increment(ref threadCount); ThreadPool.QueueUserWorkItem(delegate { try { // do work } finally { if (Interlocked.Decrement(ref threadCount) == 0) { finished.Set(); } } }); ... finished.WaitOne();
创build一个跟踪正在运行的任务数量的variables:
int numberOfTasks = 100;
创build一个信号:
ManualResetEvent signal = new ManualResetEvent(false);
减less任务完成时的任务数量:
if (Interlocked.Decrement(ref numberOftasks) == 0) {
如果没有剩余任务,请设置信号:
signal.Set(); }
同时,在别的地方,等待信号被设置:
signal.WaitOne();
从.NET 4.0开始,您可以使用另外两个(和IMO,更干净的)选项。
首先是使用CountdownEvent
类 。 它可以防止需要自行处理递增和递减:
int tasks = <however many tasks you're performing>; // Dispose when done. using (var e = new CountdownEvent(tasks)) { // Queue work. ThreadPool.QueueUserWorkItem(() => { // Do work ... // Signal when done. e.Signal(); }); // Wait till the countdown reaches zero. e.Wait(); }
但是,还有一个更强大的解决scheme,就是使用Task
类 ,如下所示:
// The source of your work items, create a sequence of Task instances. Task[] tasks = Enumerable.Range(0, 100).Select(i => // Create task here. Task.Factory.StartNew(() => { // Do work. } // No signalling, no anything. ).ToArray(); // Wait on all the tasks. Task.WaitAll(tasks);
使用Task
类和对WaitAll
的调用要WaitAll
得多,IMO,因为你在整个代码中编织的线程原语较less(注意,没有等待句柄)。 您不必设置计数器,处理递增/递减,您只需设置您的任务,然后等待。 这可以让代码在你想做什么的时候更具performance力,而不是如何 (至less在pipe理它的并行化方面)的基本原理 。
.NET 4.5提供了更多选项,您可以通过调用Task
类上的静态Run
方法来简化Task
实例序列的生成:
// The source of your work items, create a sequence of Task instances. Task[] tasks = Enumerable.Range(0, 100).Select(i => // Create task here. Task.Run(() => { // Do work. }) // No signalling, no anything. ).ToArray(); // Wait on all the tasks. Tasks.WaitAll(tasks);
或者,您可以利用TPL DataFlow库 (它位于System
命名空间中,所以它是官方的,即使它是从NuGet下载的,如entity framework),并使用ActionBlock<TInput>
,如下所示:
// Create the action block. Since there's not a non-generic // version, make it object, and pass null to signal, or // make T the type that takes the input to the action // and pass that. var actionBlock = new ActionBlock<object>(o => { // Do work. }); // Post 100 times. foreach (int i in Enumerable.Range(0, 100)) actionBlock.Post(null); // Signal complete, this doesn't actually stop // the block, but says that everything is done when the currently // posted items are completed. actionBlock.Complete(); // Wait for everything to complete, the Completion property // exposes a Task which can be waited on. actionBlock.Completion.Wait();
请注意, ActionBlock<TInput>
默认一次处理一个项目,所以如果你想让它一次处理多个动作,你必须通过传递一个ExecutionDataflowBlockOptions
来设置你想要在构造器中处理的并发项目的数量实例并设置MaxDegreeOfParallelism
属性 :
var actionBlock = new ActionBlock<object>(o => { // Do work. }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });
如果您的操作确实是线程安全的,那么您可以将MaxDegreeOfParallelsim
属性设置为DataFlowBlockOptions.Unbounded
:
var actionBlock = new ActionBlock<object>(o => { // Do work. }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataFlowBlockOptions.Unbounded });
关键在于,您对于希望select的平行性有着很好的控制。
当然,如果你有一系列想要传入你的ActionBlock<TInput>
实例的项目,那么你可以链接一个ISourceBlock<TOutput>
实现来提供ActionBlock<TInput>
,如下所示:
// The buffer block. var buffer = new BufferBlock<int>(); // Create the action block. Since there's not a non-generic // version, make it object, and pass null to signal, or // make T the type that takes the input to the action // and pass that. var actionBlock = new ActionBlock<int>(o => { // Do work. }); // Link the action block to the buffer block. // NOTE: An IDisposable is returned here, you might want to dispose // of it, although not totally necessary if everything works, but // still, good housekeeping. using (link = buffer.LinkTo(actionBlock, // Want to propagate completion state to the action block. new DataflowLinkOptions { PropagateCompletion = true, }, // Can filter on items flowing through if you want. i => true) { // Post 100 times to the *buffer* foreach (int i in Enumerable.Range(0, 100)) buffer.Post(i); // Signal complete, this doesn't actually stop // the block, but says that everything is done when the currently // posted items are completed. actionBlock.Complete(); // Wait for everything to complete, the Completion property // exposes a Task which can be waited on. actionBlock.Completion.Wait(); }
取决于你需要做什么,TPL数据stream库成为一个更有吸引力的select,因为它处理所有链接在一起的任务的并发性,它允许你非常具体地说明你想要每个片断是多么平行同时保持每个街区的关注点的适当分离。
您的解决方法是不正确的。 原因是Set
和WaitOne
可能会竞争,如果最后一个工作项导致threadCount
在排队线程必须排队所有工作项之前归零。 修复很简单。 把你的排队线程看作是一个工作项目本身。 初始化threadCount
为1,并在排队完成时进行递减和信号。
int threadCount = 1; ManualResetEvent finished = new ManualResetEvent(false); ... Interlocked.Increment(ref threadCount); ThreadPool.QueueUserWorkItem(delegate { try { // do work } finally { if (Interlocked.Decrement(ref threadCount) == 0) { finished.Set(); } } }); ... if (Interlocked.Decrement(ref threadCount) == 0) { finished.Set(); } finished.WaitOne();
作为个人喜好,我喜欢使用CountdownEvent
类来为我计数。
var finished = new CountdownEvent(1); ... finished.AddCount(); ThreadPool.QueueUserWorkItem(delegate { try { // do work } finally { finished.Signal(); } }); ... finished.Signal(); finished.Wait();
添加到DTB的答案,你可以把它包装成一个很好的简单的类。
public class Countdown : IDisposable { private readonly ManualResetEvent done; private readonly int total; private long current; public Countdown(int total) { this.total = total; current = total; done = new ManualResetEvent(false); } public void Signal() { if (Interlocked.Decrement(ref current) == 0) { done.Set(); } } public void Wait() { done.WaitOne(); } public void Dispose() { ((IDisposable)done).Dispose(); } }
添加到dtb的答案,当我们想要callback。
using System; using System.Runtime.Remoting.Messaging; using System.Threading; class Program { static void Main(string[] args) { Main m = new Main(); m.TestMRE(); Console.ReadKey(); } } class Main { CalHandler handler = new CalHandler(); int numberofTasks =0; public void TestMRE() { for (int j = 0; j <= 3; j++) { Console.WriteLine("Outer Loop is :" + j.ToString()); ManualResetEvent signal = new ManualResetEvent(false); numberofTasks = 4; for (int i = 0; i <= 3; i++) { CalHandler.count caller = new CalHandler.count(handler.messageHandler); caller.BeginInvoke(i, new AsyncCallback(NumberCallback),signal); } signal.WaitOne(); } } private void NumberCallback(IAsyncResult result) { AsyncResult asyncResult = (AsyncResult)result; CalHandler.count caller = (CalHandler.count)asyncResult.AsyncDelegate; int num = caller.EndInvoke(asyncResult); Console.WriteLine("Number is :"+ num.ToString()); ManualResetEvent mre = (ManualResetEvent)asyncResult.AsyncState; if (Interlocked.Decrement(ref numberofTasks) == 0) { mre.Set(); } } } public class CalHandler { public delegate int count(int number); public int messageHandler ( int number ) { return number; } }
protected void WaitAllExt(WaitHandle[] waitHandles) { //workaround for limitation of WaitHandle.WaitAll by <=64 wait handles const int waitAllArrayLimit = 64; var prevEndInd = -1; while (prevEndInd < waitHandles.Length - 1) { var stInd = prevEndInd + 1; var eInd = stInd + waitAllArrayLimit - 1; if (eInd > waitHandles.Length - 1) { eInd = waitHandles.Length - 1; } prevEndInd = eInd; //do wait var whSubarray = waitHandles.Skip(stInd).Take(eInd - stInd + 1).ToArray(); WaitHandle.WaitAll(whSubarray); } }
我只是通过分类事件的数量来解决这个问题,而没有太多的性能损失,而且它在生产环境中完美地工作。 遵循代码:
var events = new List<ManualResetEvent>(); // code omited var newEvent = new ManualResetEvent(false); events.Add(newEvent); ThreadPool.QueueUserWorkItem(c => { //thread code newEvent.Set(); }); // code omited var wait = true; while (wait) { WaitHandle.WaitAll(events.Take(60).ToArray()); events.RemoveRange(0, events.Count > 59 ? 60 : events.Count); wait = events.Any(); }
Windows XP SP3支持最多两个WaitHandles。 对于超过2个WaitHandles应用程序过早终止的情况。