Parallel.ForEach是否限制活动线程的数量?
鉴于此代码:
var arrayStrings = new string[1000]; Parallel.ForEach<string>(arrayStrings, someString => { DoSomething(someString); });
将所有1000线程几乎同时产卵?
不,它不会启动1000个线程 – 是的,它会限制使用多less个线程。 并行扩展使用适当数量的内核,根据您实际拥有的数量以及已经占用多less内核。 它为每个核心分配工作,然后使用一种称为“ 工作窃取”的技术,让每个线程高效地处理自己的队列,只需要在需要时进行任何昂贵的跨线程访问。
查看PFX团队博客 ,了解有关如何分配工作和各种其他主题的大量信息。
请注意,在某些情况下,您也可以指定所需的并行度。
在单个核心机器上… Parallel.ForEach分区(块)在多个线程之间工作的集合,但是这个数字是基于一个algorithm计算出来的,该algorithm考虑到并似乎不断地监视由它分配给ForEach的线程。 因此, 如果ForEach的正文部分调用长时间运行的IO绑定/阻塞函数,这会使线程等待,algorithm会产生更多的线程并重新分配它们之间的集合 。 如果线程很快完成并且不会在IO线程上阻塞,例如简单地计算一些数字, 那么algorithm会将线程数量增加(或者确实减less)到algorithm认为最优的吞吐量(平均完成每次迭代的时间) 。
基本上,所有各种并行库函数后面的线程池,将会找出最佳的线程数量来使用。 物理处理器内核的数量仅构成等式的一部分。 内核数量和产生的线程数量之间不存在简单的一对一关系。
我没有find有关取消和处理同步线程的文档,非常有帮助。 希望MS能够在MSDN中提供更好的示例。
不要忘记,正文代码必须写成在多个线程上运行,以及所有常见的线程安全考虑,框架不会抽象出这个因素。
它根据处理器/内核的数量计算出最佳线程数。 他们不会马上产卵。
请参见并行。每次迭代使用一个任务? 为了一个“心理模型”使用的想法。 然而,作者确实说:“在一天结束的时候,记住实现细节随时可能改变是很重要的。”
伟大的问题。 在你的例子中,即使在四核处理器上,并行化水平也相当低,但是有一些等待并行化的水平可能会相当高。
// Max concurrency: 5 [Test] public void Memory_Operations() { ConcurrentBag<int> monitor = new ConcurrentBag<int>(); ConcurrentBag<int> monitorOut = new ConcurrentBag<int>(); var arrayStrings = new string[1000]; Parallel.ForEach<string>(arrayStrings, someString => { monitor.Add(monitor.Count); monitor.TryTake(out int result); monitorOut.Add(result); }); Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First()); }
现在看看添加等待操作来模拟HTTP请求时会发生什么。
// Max concurrency: 34 [Test] public void Waiting_Operations() { ConcurrentBag<int> monitor = new ConcurrentBag<int>(); ConcurrentBag<int> monitorOut = new ConcurrentBag<int>(); var arrayStrings = new string[1000]; Parallel.ForEach<string>(arrayStrings, someString => { monitor.Add(monitor.Count); System.Threading.Thread.Sleep(1000); monitor.TryTake(out int result); monitorOut.Add(result); }); Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First()); }
我还没有做出任何改变,并发/并行水平急剧上升。 ParallelOptions.MaxDegreeOfParallelism
可以提高ParallelOptions.MaxDegreeOfParallelism
。
// Max concurrency: 43 [Test] public void Test() { ConcurrentBag<int> monitor = new ConcurrentBag<int>(); ConcurrentBag<int> monitorOut = new ConcurrentBag<int>(); var arrayStrings = new string[1000]; var options = new ParallelOptions {MaxDegreeOfParallelism = int.MaxValue}; Parallel.ForEach<string>(arrayStrings, options, someString => { monitor.Add(monitor.Count); System.Threading.Thread.Sleep(1000); monitor.TryTake(out int result); monitorOut.Add(result); }); Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First()); } // Max concurrency: 391 [Test] public void Test() { ConcurrentBag<int> monitor = new ConcurrentBag<int>(); ConcurrentBag<int> monitorOut = new ConcurrentBag<int>(); var arrayStrings = new string[1000]; var options = new ParallelOptions {MaxDegreeOfParallelism = int.MaxValue}; Parallel.ForEach<string>(arrayStrings, options, someString => { monitor.Add(monitor.Count); System.Threading.Thread.Sleep(100000); monitor.TryTake(out int result); monitorOut.Add(result); }); Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First()); }
我build议设置ParallelOptions.MaxDegreeOfParallelism
。 它不一定会增加正在使用的线程数量,但它将确保您只能启动一个正常的线程数量,这似乎是您的担心。
最后回答你的问题,不,你不会立刻开始所有的线程。 如果您正在寻找并行调用,例如testing竞态条件,请使用Parallel.Invoke。
// 636462943623363344 // 636462943623363344 // 636462943623363344 // 636462943623363344 // 636462943623363344 // 636462943623368346 // 636462943623368346 // 636462943623373351 // 636462943623393364 // 636462943623393364 [Test] public void Test() { ConcurrentBag<string> monitor = new ConcurrentBag<string>(); ConcurrentBag<string> monitorOut = new ConcurrentBag<string>(); var arrayStrings = new string[1000]; var options = new ParallelOptions {MaxDegreeOfParallelism = int.MaxValue}; Parallel.ForEach<string>(arrayStrings, options, someString => { monitor.Add(DateTime.UtcNow.Ticks.ToString()); monitor.TryTake(out string result); monitorOut.Add(result); }); var startTimes = monitorOut.OrderBy(x => x.ToString()).ToList(); Console.WriteLine(string.Join(Environment.NewLine, startTimes.Take(10))); }