如何限制并发asynchronousI / O操作的数量?
// let's say there is a list of 1000+ URLs string[] urls = { "http://google.com", "http://yahoo.com", ... }; // now let's send HTTP requests to each of these URLs in parallel urls.AsParallel().ForAll(async (url) => { var client = new HttpClient(); var html = await client.GetStringAsync(url); });
这是问题,它启动1000多个并发的Web请求。 有没有简单的方法来限制这些asynchronousHTTP请求的并发量? 因此,在任何时候都不会有超过20个网页被下载。 如何以最有效的方式做到这一点?
您绝对可以在最新版本的.NET for .NET中使用.NET 4.5 Beta来执行此操作。 “usr”的前一篇文章指出了Stephen Toub写的一篇很好的文章,但是不太公布的消息是,asynchronous信号实际上已经成为.NET 4.5 Beta版本
如果你看看我们心爱的SemaphoreSlim
类(你应该使用它,因为它比原来的Semaphore
更高性能),它现在拥有WaitAsync(...)
系列的重载,所有预期的参数 – 超时间隔,取消标记,所有你平常的日程安排的朋友:)
Stephen还写了一篇更新的博客文章,介绍.NET 4.5新function,看看.NET 4.5 Beta中的新function 。
最后,下面是一些关于如何使用SemaphoreSlim进行asynchronous方法调节的示例代码:
public async Task MyOuterMethod() { // let's say there is a list of 1000+ URLs var urls = { "http://google.com", "http://yahoo.com", ... }; // now let's send HTTP requests to each of these URLs in parallel var allTasks = new List<Task>(); var throttler = new SemaphoreSlim(initialCount: 20); foreach (var url in urls) { // do an async wait until we can schedule again await throttler.WaitAsync(); // using Task.Run(...) to run the lambda in its own parallel // flow on the threadpool allTasks.Add( Task.Run(async () => { try { var client = new HttpClient(); var html = await client.GetStringAsync(url); } finally { throttler.Release(); } })); } // won't get here until all urls have been put into tasks await Task.WhenAll(allTasks); // won't get here until all tasks have completed in some way // (either success or exception) }
最后,可能值得一提的是使用基于TPL的调度的解决scheme。 您可以在TPL上创build尚未启动的委托绑定任务,并允许自定义任务计划程序限制并发。 实际上,这里有一个MSDN示例:
另请参阅TaskScheduler 。
不幸的是,.NET Framework缺less用于编排并行asynchronous任务的最重要的组合器。 没有内置的东西。
看看最受人尊敬的Stephen Toub制作的AsyncSemaphore类。 你想要的就是所谓的信号量,你需要一个asynchronous版本。
Theo Yaung的例子很好,但有一个没有等待任务列表的变种。
class SomeChecker { private const int ThreadCount=20; private CountdownEvent _countdownEvent; private SemaphoreSlim _throttler; public Task Check(IList<string> urls) { _countdownEvent = new CountdownEvent(urls.Count); _throttler = new SemaphoreSlim(ThreadCount); return Task.Run( // prevent UI thread lock async () =>{ foreach (var url in urls) { // do an async wait until we can schedule again await _throttler.WaitAsync(); ProccessUrl(url); // NOT await } //instead of await Task.WhenAll(allTasks); _countdownEvent.Wait(); }); } private async Task ProccessUrl(string url) { try { var page = await new WebClient() .DownloadStringTaskAsync(new Uri(url)); ProccessResult(page); } finally { _throttler.Release(); _countdownEvent.Signal(); } } private void ProccessResult(string page){/*....*/} }
如果你有一个IEnumerable(即URL的string),并且你想要对每一个这样的I / O绑定操作(也就是做一个asynchronoushttp请求)同时进行,并且可选地,你还要设置最大并发数I / O请求实时,这里是如何做到这一点。 这样你不使用线程池等,该方法使用信号量控制最大的并发I / O请求类似于一个请求完成的滑动窗口模式,离开信号量和下一个请求进入。
用法:await ForEachAsync(urlStrings,YourAsyncFunc,optionalMaxDegreeOfConcurrency);
public static Task ForEachAsync<TIn>( IEnumerable<TIn> inputEnumerable, Func<TIn, Task> asyncProcessor, int? maxDegreeOfParallelism = null) { int maxAsyncThreadCount = maxDegreeOfParallelism ?? DefaultMaxDegreeOfParallelism; SemaphoreSlim throttler = new SemaphoreSlim(maxAsyncThreadCount, maxAsyncThreadCount); IEnumerable<Task> tasks = inputEnumerable.Select(async input => { await throttler.WaitAsync().ConfigureAwait(false); try { await asyncProcessor(input).ConfigureAwait(false); } finally { throttler.Release(); } }); return Task.WhenAll(tasks); }
虽然1000个任务可能会非常快速地排队,但Parallel Tasks库只能处理与机器中CPU内核数量相等的并发任务。 这意味着如果你有一个四核机器,在给定的时间只有4个任务将被执行(除非你降低MaxDegreeOfParallelism)。
并行计算应该用于加速CPU绑定操作。 这里我们正在讨论I / O绑定操作。 你的实现应该是纯粹的asynchronous ,除非你压倒多核心CPU上繁忙的单核心。
编辑我喜欢由usr在这里使用“asynchronous信号量”的build议。
使用MaxDegreeOfParallelism
,这是您可以在Parallel.ForEach()
指定的选项:
var options = new ParallelOptions { MaxDegreeOfParallelism = 20 }; Parallel.ForEach(urls, options, url => { var client = new HttpClient(); var html = client.GetStringAsync(url); // do stuff with html });
基本上,您将要为每个要创build的URL创build一个Action或Task,将它们放在List中,然后处理该列表,限制可以并行处理的数量。
我的博客文章展示了如何使用“任务”和“操作”来完成此任务,并提供了一个示例项目,您可以下载并运行这些项目以查看这两个实例。
与行动
如果使用Actions,则可以使用内置的.Net Parallel.Invoke函数。 这里我们限制它并行运行最多20个线程。
var listOfActions = new List<Action>(); foreach (var url in urls) { var localUrl = url; // Note that we create the Task here, but do not start it. listOfTasks.Add(new Task(() => CallUrl(localUrl))); } var options = new ParallelOptions {MaxDegreeOfParallelism = 20}; Parallel.Invoke(options, listOfActions.ToArray());
与任务
随着任务没有内置的function。 不过,您可以使用我在我的博客上提供的那个。
/// <summary> /// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel. /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para> /// </summary> /// <param name="tasksToRun">The tasks to run.</param> /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param> /// <param name="cancellationToken">The cancellation token.</param> public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, CancellationToken cancellationToken = new CancellationToken()) { await StartAndWaitAllThrottledAsync(tasksToRun, maxTasksToRunInParallel, -1, cancellationToken); } /// <summary> /// Starts the given tasks and waits for them to complete. This will run the specified number of tasks in parallel. /// <para>NOTE: If a timeout is reached before the Task completes, another Task may be started, potentially running more than the specified maximum allowed.</para> /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para> /// </summary> /// <param name="tasksToRun">The tasks to run.</param> /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param> /// <param name="timeoutInMilliseconds">The maximum milliseconds we should allow the max tasks to run in parallel before allowing another task to start. Specify -1 to wait indefinitely.</param> /// <param name="cancellationToken">The cancellation token.</param> public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, int timeoutInMilliseconds, CancellationToken cancellationToken = new CancellationToken()) { // Convert to a list of tasks so that we don't enumerate over it multiple times needlessly. var tasks = tasksToRun.ToList(); using (var throttler = new SemaphoreSlim(maxTasksToRunInParallel)) { var postTaskTasks = new List<Task>(); // Have each task notify the throttler when it completes so that it decrements the number of tasks currently running. tasks.ForEach(t => postTaskTasks.Add(t.ContinueWith(tsk => throttler.Release()))); // Start running each task. foreach (var task in tasks) { // Increment the number of tasks currently running and wait if too many are running. await throttler.WaitAsync(timeoutInMilliseconds, cancellationToken); cancellationToken.ThrowIfCancellationRequested(); task.Start(); } // Wait for all of the provided tasks to complete. // We wait on the list of "post" tasks instead of the original tasks, otherwise there is a potential race condition where the throttler's using block is exited before some Tasks have had their "post" action completed, which references the throttler, resulting in an exception due to accessing a disposed object. await Task.WhenAll(postTaskTasks.ToArray()); } }
然后创build任务列表并调用函数让它们运行,并且每次最多同时运行20个,您可以这样做:
var listOfTasks = new List<Task>(); foreach (var url in urls) { var localUrl = url; // Note that we create the Task here, but do not start it. listOfTasks.Add(new Task(async () => await CallUrl(localUrl))); } await Tasks.StartAndWaitAllThrottledAsync(listOfTasks, 20);
有很多陷阱,直接使用信号量在错误情况下可能会非常棘手,所以我build议使用AsyncEnumerator NuGet Package而不是重新发明轮子:
// let's say there is a list of 1000+ URLs string[] urls = { "http://google.com", "http://yahoo.com", ... }; // now let's send HTTP requests to each of these URLs in parallel await urls.ParallelForEachAsync(async (url) => { var client = new HttpClient(); var html = await client.GetStringAsync(url); }, maxDegreeOfParallelism: 20);
老问题,新答案。 @vitidev有一个代码块在我回顾的一个项目中几乎完好地重复使用。 在与几位同事讨论后,他问“为什么不使用内置的TPL方法?” ActionBlock看起来像那里的赢家。 https://msdn.microsoft.com/en-us/library/hh194773(v=vs.110).aspx 。 可能不会改变任何现有的代码,但肯定会考虑采用这个nuget,并重用索菲蒂先生的最佳实践来限制并行。