我如何在ForEach中使用Async?
使用ForEach时可以使用Async吗? 以下是我正在尝试的代码:
using (DataContext db = new DataLayer.DataContext()) { db.Groups.ToList().ForEach(i => async { await GetAdminsFromGroup(i.Gid); }); }
我收到错误:
名称“Async”在当前上下文中不存在
using语句被封入的方法被设置为async。
List<T>.ForEach
在async
中performance得不是很好(出于同样的原因,LINQ到对象也是如此)。
在这种情况下,我build议将每个元素投影到asynchronous操作中,然后可以(asynchronous)等待它们全部完成。
using (DataContext db = new DataLayer.DataContext()) { var tasks = db.Groups.ToList().Select(i => GetAdminsFromGroupAsync(i.Gid)); var results = await Task.WhenAll(tasks); }
这种方法在向ForEach
提供async
ForEach
方面的好处是:
- error handling更合适。
async void
exception不能被catch
; 这种方法会在await Task.WhenAll
行传播exception,从而允许自然的exception处理。 - 你知道这个任务在这个方法结束时是完整的,因为它会
await Task.WhenAll
。 如果使用async void
,则无法轻易分辨操作何时完成。 - 这种方法有一个自然的语法来检索结果。
GetAdminsFromGroupAsync
听起来像是产生结果的操作(pipe理员),如果这样的操作可以返回结果而不是将值设置为副作用,那么这样的代码更自然。
这个小扩展方法应该给你exception安全的asynchronous迭代:
public static async Task ForEachAsync<T>(this List<T> list, Func<T, Task> func) { foreach (var value in list) { await func(value); } }
由于我们将lambda的返回types从void
更改为Task
,exception将正确传播。 这将允许你在实践中写这样的东西:
await db.Groups.ToList().ForEachAsync(async i => { await GetAdminsFromGroup(i.Gid); });
问题是, async
关键字需要出现在lambda之前,而不是在body之前:
db.Groups.ToList().ForEach(async (i) => { await GetAdminsFromGroup(i.Gid); });
添加这个扩展方法
public static class ForEachAsyncExtension { public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body) { return Task.WhenAll(from partition in Partitioner.Create(source).GetPartitions(dop) select Task.Run(async delegate { using (partition) while (partition.MoveNext()) await body(partition.Current).ConfigureAwait(false); })); } }
然后像这样使用:
Task.Run(async () => { var s3 = new AmazonS3Client(Config.Instance.Aws.Credentials, Config.Instance.Aws.RegionEndpoint); var buckets = await s3.ListBucketsAsync(); foreach (var s3Bucket in buckets.Buckets) { if (s3Bucket.BucketName.StartsWith("mybucket-")) { log.Information("Bucket => {BucketName}", s3Bucket.BucketName); ListObjectsResponse objects; try { objects = await s3.ListObjectsAsync(s3Bucket.BucketName); } catch { log.Error("Error getting objects. Bucket => {BucketName}", s3Bucket.BucketName); continue; } // ForEachAsync (4 is how many tasks you want to run in parallel) await objects.S3Objects.ForEachAsync(4, async s3Object => { try { log.Information("Bucket => {BucketName} => {Key}", s3Bucket.BucketName, s3Object.Key); await s3.DeleteObjectAsync(s3Bucket.BucketName, s3Object.Key); } catch { log.Error("Error deleting bucket {BucketName} object {Key}", s3Bucket.BucketName, s3Object.Key); } }); try { await s3.DeleteBucketAsync(s3Bucket.BucketName); } catch { log.Error("Error deleting bucket {BucketName}", s3Bucket.BucketName); } } } }).Wait();
以下是上述asynchronousforeach变体顺序处理的实际工作版本:
public static async Task ForEachAsync<T>(this List<T> enumerable, Action<T> action) { foreach (var item in enumerable) await Task.Run(() => { action(item); }).ConfigureAwait(false); }
这是实现:
public async void SequentialAsync() { var list = new List<Action>(); Action action1 = () => { //do stuff 1 }; Action action2 = () => { //do stuff 2 }; list.Add(action1); list.Add(action2); await list.ForEachAsync(); }
关键区别是什么? .ConfigureAwait(false);
它保持主线程的上下文,同时对每个任务进行asynchronous顺序处理。