嵌套在Parallel.ForEach中等待

在metro应用程序中,我需要执行一些WCF调用。 有大量的电话要做,所以我需要做一个并行循环。 问题是并行循环在WCF调用全部完成之前退出。

你将如何重构这个按预期工作?

var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" }; var customers = new System.Collections.Concurrent.BlockingCollection<Customer>(); Parallel.ForEach(ids, async i => { ICustomerRepo repo = new CustomerRepo(); var cust = await repo.GetCustomer(i); customers.Add(cust); }); foreach ( var customer in customers ) { Console.WriteLine(customer.ID); } Console.ReadKey(); 

Parallel.ForEach()背后的全部思想是你有一组线程,每个线程处理部分集合。 正如你注意到的,这不适用于asyncawait ,你想在asynchronous调用期间释放线程。

你可以通过阻塞ForEach()线程来“解决”这个问题,但是这会使整个asyncawait

你可以做的是使用TPL数据stream而不是Parallel.ForEach() ,它支持asynchronousTask

具体来说,您的代码可以使用TransformBlock编写,该TransformBlock使用async lambda将每个id转换为Customer 。 该块可以configuration为并行执行。 您可以将该块链接到将每个Customer写入控制台的ActionBlock 。 设置完成块networking之后,可以将每个ID都Post()TransformBlock

在代码中:

 var ids = new List<string> { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" }; var getCustomerBlock = new TransformBlock<string, Customer>( async i => { ICustomerRepo repo = new CustomerRepo(); return await repo.GetCustomer(i); }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded }); var writeCustomerBlock = new ActionBlock<Customer>(c => Console.WriteLine(c.ID)); getCustomerBlock.LinkTo( writeCustomerBlock, new DataflowLinkOptions { PropagateCompletion = true }); foreach (var id in ids) getCustomerBlock.Post(id); getCustomerBlock.Complete(); writeCustomerBlock.Completion.Wait(); 

尽pipe您可能想要将TransformBlock的并行性限制为一些小常量。 此外,您可以限制TransformBlock的容量,并使用SendAsync()asynchronous添加项目,例如,如果集合太大。

与您的代码相比(如果有效的话),作为一个额外的好处是,只要一个项目完成,写入就会开始,而不是等到所有的处理完成。

svick的答案是(一如既往)优秀。

但是,如果实际上有大量数据要传输,我发现Dataflow更有用。 或者当你需要一个async兼容队列。

在你的情况下,一个更简单的解决scheme是只使用async风格的并行性:

 var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" }; var customerTasks = ids.Select(i => { ICustomerRepo repo = new CustomerRepo(); return repo.GetCustomer(i); }); var customers = await Task.WhenAll(customerTasks); foreach (var customer in customers) { Console.WriteLine(customer.ID); } Console.ReadKey(); 

使用DataFlow作为svickbuild议可能是矫枉过正,而斯蒂芬的答案没有提供手段来控制操作的并发性。 但是,这可以简单地实现:

 public static async Task RunWithMaxDegreeOfConcurrency<T>( int maxDegreeOfConcurrency, IEnumerable<T> collection, Func<T, Task> taskFactory) { var activeTasks = new List<Task>(maxDegreeOfConcurrency); foreach (var task in collection.Select(taskFactory)) { activeTasks.Add(task); if (activeTasks.Count == maxDegreeOfConcurrency) { await Task.WhenAny(activeTasks.ToArray()); //observe exceptions here activeTasks.RemoveAll(t => t.IsCompleted); } } await Task.WhenAll(activeTasks.ToArray()).ContinueWith(t => { //observe exceptions in a manner consistent with the above }); } 

ToArray()调用可以通过使用数组而不是列表来优化,并replace已完成的任务,但是我怀疑它在大多数情况下会有很大的不同。 每个OP的问题的样本用法:

 RunWithMaxDegreeOfConcurrency(10, ids, async i => { ICustomerRepo repo = new CustomerRepo(); var cust = await repo.GetCustomer(i); customers.Add(cust); }); 

编辑研究员SO用户和TPL志愿者Eli阿尔贝尔指出我从Stephen Toub的相关文章 。 像往常一样,他的实施既优雅又高效:

 public static Task ForEachAsync<T>( this IEnumerable<T> source, int dop, Func<T, Task> body) { return Task.WhenAll( from partition in Partitioner.Create(source).GetPartitions(dop) select Task.Run(async delegate { using (partition) while (partition.MoveNext()) await body(partition.Current).ContinueWith(t => { //observe exceptions }); })); } 

将Parallel.Foreach包装到Task.Run()中,而不是使用await关键字use [yourasyncmethod] .Result

(你需要做的Task.Run事情不阻止UI线程)

像这样的东西:

 var yourForeachTask = Task.Run(() => { Parallel.ForEach(ids, i => { ICustomerRepo repo = new CustomerRepo(); var cust = repo.GetCustomer(i).Result; customers.Add(cust); }); }); await yourForeachTask; 

这应该是非常有效的,并比获得整个TPL数据stream工作更容易:

 var customers = await ids.SelectAsync(async i => { ICustomerRepo repo = new CustomerRepo(); return await repo.GetCustomer(i); }); ... public static async Task<IList<TResult>> SelectAsync<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> selector, int maxDegreesOfParallelism = 4) { var results = new List<TResult>(); var activeTasks = new HashSet<Task<TResult>>(); foreach (var item in source) { activeTasks.Add(selector(item)); if (activeTasks.Count >= maxDegreesOfParallelism) { var completed = await Task.WhenAny(activeTasks); activeTasks.Remove(completed); results.Add(completed.Result); } } results.AddRange(await Task.WhenAll(activeTasks)); return results; } 

您可以使用新的AsyncEnumerator NuGet软件包来节省工作,而这个软件包在4年前当问题最初发布时并不存在。 它允许你控制并行度:

 using System.Collections.Async; ... await ids.ParallelForEachAsync(async i => { ICustomerRepo repo = new CustomerRepo(); var cust = await repo.GetCustomer(i); customers.Add(cust); }, maxDegreeOfParallelism: 10); 

我晚了一点派对,但你可能要考虑使用GetAwaiter.GetResult()在同步上下文中运行你的asynchronous代码,但如下所示:

  Parallel.ForEach(ids, i => { ICustomerRepo repo = new CustomerRepo(); // Run this in thread which Parallel library occupied. var cust = repo.GetCustomer(i).GetAwaiter().GetResult(); customers.Add(cust); }); 

引入了一堆辅助方法后,你可以用这个简单的sintax来运行并行查询:

 const int DegreeOfParallelism = 10; IEnumerable<double> result = await Enumerable.Range(0, 1000000) .Split(DegreeOfParallelism) .SelectManyAsync(async i => await CalculateAsync(i).ConfigureAwait(false)) .ConfigureAwait(false); 

这里发生的是我们将源收集拆分为10个块( .Split(DegreeOfParallelism) ),然后运行10个任务,每个处理它们的项目( .SelectManyAsync(...) ),并将它们合并成一个列表。

值得一提的是有一个更简单的方法:

 double[] result2 = await Enumerable.Range(0, 1000000) .Select(async i => await CalculateAsync(i).ConfigureAwait(false)) .WhenAll() .ConfigureAwait(false); 

但是需要注意的是,如果你的源代码集合太大,就会马上为每个项目挑选一个Task ,这可能会导致很大的性能提升。

上例中使用的扩展方法如下所示:

 public static class CollectionExtensions { /// <summary> /// Splits collection into number of collections of nearly equal size. /// </summary> public static IEnumerable<List<T>> Split<T>(this IEnumerable<T> src, int slicesCount) { if (slicesCount <= 0) throw new ArgumentOutOfRangeException(nameof(slicesCount)); List<T> source = src.ToList(); var sourceIndex = 0; for (var targetIndex = 0; targetIndex < slicesCount; targetIndex++) { var list = new List<T>(); int itemsLeft = source.Count - targetIndex; while (slicesCount * list.Count < itemsLeft) { list.Add(source[sourceIndex++]); } yield return list; } } /// <summary> /// Takes collection of collections, projects those in parallel and merges results. /// </summary> public static async Task<IEnumerable<TResult>> SelectManyAsync<T, TResult>( this IEnumerable<IEnumerable<T>> source, Func<T, Task<TResult>> func) { List<TResult>[] slices = await source .Select(async slice => await slice.SelectListAsync(func).ConfigureAwait(false)) .WhenAll() .ConfigureAwait(false); return slices.SelectMany(s => s); } /// <summary>Runs selector and awaits results.</summary> public static async Task<List<TResult>> SelectListAsync<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> selector) { List<TResult> result = new List<TResult>(); foreach (TSource source1 in source) { TResult result1 = await selector(source1).ConfigureAwait(false); result.Add(result1); } return result; } /// <summary>Wraps tasks with Task.WhenAll.</summary> public static Task<TResult[]> WhenAll<TResult>(this IEnumerable<Task<TResult>> source) { return Task.WhenAll<TResult>(source); } }