限制asynchronous任务
我想运行一堆asynchronous任务,限制在任何给定时间有多less任务可能正在等待完成。
假设您有1000个url,并且您只希望一次打开50个请求; 但只要一个请求完成,您就可以打开到列表中下一个URL的连接。 那样的话,一次只能打开50个连接,直到URL列表被耗尽。
如果可能的话,我也想利用给定数量的线程。
我想出了一个扩展方法, ThrottleTasksAsync
,做我想要的。 那里有一个更简单的解决scheme吗? 我会假设这是一个常见的情况。
用法:
class Program { static void Main(string[] args) { Enumerable.Range(1, 10).ThrottleTasksAsync(5, 2, async i => { Console.WriteLine(i); return i; }).Wait(); Console.WriteLine("Press a key to exit..."); Console.ReadKey(true); } }
这里是代码:
static class IEnumerableExtensions { public static async Task<Result_T[]> ThrottleTasksAsync<Enumerable_T, Result_T>(this IEnumerable<Enumerable_T> enumerable, int maxConcurrentTasks, int maxDegreeOfParallelism, Func<Enumerable_T, Task<Result_T>> taskToRun) { var blockingQueue = new BlockingCollection<Enumerable_T>(new ConcurrentBag<Enumerable_T>()); var semaphore = new SemaphoreSlim(maxConcurrentTasks); // Run the throttler on a separate thread. var t = Task.Run(() => { foreach (var item in enumerable) { // Wait for the semaphore semaphore.Wait(); blockingQueue.Add(item); } blockingQueue.CompleteAdding(); }); var taskList = new List<Task<Result_T>>(); Parallel.ForEach(IterateUntilTrue(() => blockingQueue.IsCompleted), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism }, _ => { Enumerable_T item; if (blockingQueue.TryTake(out item, 100)) { taskList.Add( // Run the task taskToRun(item) .ContinueWith(tsk => { // For effect Thread.Sleep(2000); // Release the semaphore semaphore.Release(); return tsk.Result; } ) ); } }); // Await all the tasks. return await Task.WhenAll(taskList); } static IEnumerable<bool> IterateUntilTrue(Func<bool> condition) { while (!condition()) yield return true; } }
该方法利用BlockingCollection
和SemaphoreSlim
使其工作。 throttler在一个线程上运行,所有的asynchronous任务在另一个线程上运行。 为了实现并行性,我添加了一个传递给Parallel.ForEach
循环的maxDegreeOfParallelism参数,作为while
循环重新使用。
旧版本是:
foreach (var master = ...) { var details = ...; Parallel.ForEach(details, detail => { // Process each detail record here }, new ParallelOptions { MaxDegreeOfParallelism = 15 }); // Perform the final batch updates here }
但是,线程池快速耗尽,无法进行async
/ await
。
奖金:为了解决BlockingCollection
中的问题,当调用CompleteAdding()
时,在Take()
引发exception,我使用TryTake
重载超时。 如果我没有在TryTake
使用超时,它将失败使用BlockingCollection
的目的,因为TryTake
不会阻塞。 有没有更好的办法? 理想情况下,会有一个TakeAsync
方法。
如所build议,使用TPL数据stream。
TransformBlock<TInput, TOutput>
可能就是你要找的东西。
您定义了一个MaxDegreeOfParallelism
来限制可以并行转换多less个string(即可以下载多less个url)。 然后,您将url发布到该块,完成后,您会告诉该块您已完成添加项目并获取响应。
var downloader = new TransformBlock<string, HttpResponse>( url => Download(url), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 50 } ); var buffer = new BufferBlock<HttpResponse>(); downloader.LinkTo(buffer); foreach(var url in urls) downloader.Post(url); //or await downloader.SendAsync(url); downloader.Complete(); await downloader.Completion; IList<HttpResponse> responses; if (buffer.TryReceiveAll(out responses)) { //process responses }
注意: TransformBlock
缓冲其input和输出。 那么为什么我们需要把它连接到一个BufferBlock
?
因为直到所有项目( HttpResponse
)已被使用,并且await downloader.Completion
者, TransformBlock
才会完成。完成将挂起。 相反,我们让downloader
将其所有输出转发到一个专用的缓冲区块,然后等待downloader
器完成,然后检查缓冲区块。
假设您有1000个url,并且您只希望一次打开50个请求; 但只要一个请求完成,您就可以打开到列表中下一个URL的连接。 那样的话,一次只能打开50个连接,直到URL列表被耗尽。
以下简单的解决scheme已经在SO上多次出现。 它不使用阻塞代码,也不会显式创build线程,因此可以很好地扩展:
const int MAX_DOWNLOADS = 50; static async Task DownloadAsync(string[] urls) { using (var semaphore = new SemaphoreSlim(MAX_DOWNLOADS)) using (var httpClient = new HttpClient()) { var tasks = urls.Select(async url => { await semaphore.WaitAsync(); try { var data = await httpClient.GetStringAsync(url); Console.WriteLine(data); } finally { semaphore.Release(); } }); await Task.WhenAll(tasks); } }
问题是,下载数据的处理应该在不同的stream水线上完成,具有不同的并行级别,特别是如果是CPU限制的处理。
例如,你可能希望有4个线程同时进行数据处理(CPU核心数量),以及多达50个待处理的更多数据请求(根本不使用线程)。 AFAICT,这不是你的代码正在做什么。
这就是TPL Dataflow或Rx可以派上用场的首选解决scheme。 然而,用普通的TPL来实现这样的东西当然是可能的。 请注意,这里唯一的阻止代码是在Task.Run
进行实际数据处理的Task.Run
:
const int MAX_DOWNLOADS = 50; const int MAX_PROCESSORS = 4; // process data class Processing { SemaphoreSlim _semaphore = new SemaphoreSlim(MAX_PROCESSORS); HashSet<Task> _pending = new HashSet<Task>(); object _lock = new Object(); async Task ProcessAsync(string data) { await _semaphore.WaitAsync(); try { await Task.Run(() => { // simuate work Thread.Sleep(1000); Console.WriteLine(data); }); } finally { _semaphore.Release(); } } public async void QueueItemAsync(string data) { var task = ProcessAsync(data); lock (_lock) _pending.Add(task); try { await task; } catch { if (!task.IsCanceled && !task.IsFaulted) throw; // not the task's exception, rethrow // don't remove faulted/cancelled tasks from the list return; } // remove successfully completed tasks from the list lock (_lock) _pending.Remove(task); } public async Task WaitForCompleteAsync() { Task[] tasks; lock (_lock) tasks = _pending.ToArray(); await Task.WhenAll(tasks); } } // download data static async Task DownloadAsync(string[] urls) { var processing = new Processing(); using (var semaphore = new SemaphoreSlim(MAX_DOWNLOADS)) using (var httpClient = new HttpClient()) { var tasks = urls.Select(async (url) => { await semaphore.WaitAsync(); try { var data = await httpClient.GetStringAsync(url); // put the result on the processing pipeline processing.QueueItemAsync(data); } finally { semaphore.Release(); } }); await Task.WhenAll(tasks.ToArray()); await processing.WaitForCompleteAsync(); } }
按照要求,这是我最终结束的代码。
工作在主从configuration中进行设置,每个主服务器都作为批处理进行处理。 每个工作单位都以这种方式排队:
var success = true; // Start processing all the master records. Master master; while (null != (master = await StoredProcedures.ClaimRecordsAsync(...))) { await masterBuffer.SendAsync(master); } // Finished sending master records masterBuffer.Complete(); // Now, wait for all the batches to complete. await batchAction.Completion; return success;
大师一次缓冲一个,以节省其他外部stream程的工作。 通过masterTransform
TransformManyBlock
调度每个master的详细信息。 BatchedJoinBlock
也被创build来收集一批中的细节。
实际工作是在detailTransform
TransformBlock
中asynchronous完成的, detailTransform
150个。 BoundedCapacity
被设置为300,以确保太多的Masters在链的开始处不被缓冲,同时留出足够的细节logging排队以允许一次处理150个logging。 该块输出一个object
到它的目标,因为它通过链接被过滤,取决于它是一个Detail
还是Exception
。
batchAction
ActionBlock
收集所有批次的输出,并为每个批次执行批量数据库更新,错误logging等。
将会有几个BatchedJoinBlock
,每个主人一个。 由于每个ISourceBlock
是按顺序输出的,每个批次只接受与一个主文件关联的详细logging的数量,所以批次将按顺序处理。 每个块只输出一个组,并在完成时取消链接。 只有最后一个批处理块将其完成传播到最终的ActionBlock
。
数据streamnetworking:
// The dataflow network BufferBlock<Master> masterBuffer = null; TransformManyBlock<Master, Detail> masterTransform = null; TransformBlock<Detail, object> detailTransform = null; ActionBlock<Tuple<IList<object>, IList<object>>> batchAction = null; // Buffer master records to enable efficient throttling. masterBuffer = new BufferBlock<Master>(new DataflowBlockOptions { BoundedCapacity = 1 }); // Sequentially transform master records into a stream of detail records. masterTransform = new TransformManyBlock<Master, Detail>(async masterRecord => { var records = await StoredProcedures.GetObjectsAsync(masterRecord); // Filter the master records based on some criteria here var filteredRecords = records; // Only propagate completion to the last batch var propagateCompletion = masterBuffer.Completion.IsCompleted && masterTransform.InputCount == 0; // Create a batch join block to encapsulate the results of the master record. var batchjoinblock = new BatchedJoinBlock<object, object>(records.Count(), new GroupingDataflowBlockOptions { MaxNumberOfGroups = 1 }); // Add the batch block to the detail transform pipeline's link queue, and link the batch block to the the batch action block. var detailLink1 = detailTransform.LinkTo(batchjoinblock.Target1, detailResult => detailResult is Detail); var detailLink2 = detailTransform.LinkTo(batchjoinblock.Target2, detailResult => detailResult is Exception); var batchLink = batchjoinblock.LinkTo(batchAction, new DataflowLinkOptions { PropagateCompletion = propagateCompletion }); // Unlink batchjoinblock upon completion. // (the returned task does not need to be awaited, despite the warning.) batchjoinblock.Completion.ContinueWith(task => { detailLink1.Dispose(); detailLink2.Dispose(); batchLink.Dispose(); }); return filteredRecords; }, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 }); // Process each detail record asynchronously, 150 at a time. detailTransform = new TransformBlock<Detail, object>(async detail => { try { // Perform the action for each detail here asynchronously await DoSomethingAsync(); return detail; } catch (Exception e) { success = false; return e; } }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 150, BoundedCapacity = 300 }); // Perform the proper action for each batch batchAction = new ActionBlock<Tuple<IList<object>, IList<object>>>(async batch => { var details = batch.Item1.Cast<Detail>(); var errors = batch.Item2.Cast<Exception>(); // Do something with the batch here }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 }); masterBuffer.LinkTo(masterTransform, new DataflowLinkOptions { PropagateCompletion = true }); masterTransform.LinkTo(detailTransform, new DataflowLinkOptions { PropagateCompletion = true });