限制asynchronous任务

我想运行一堆asynchronous任务,限制在任何给定时间有多less任务可能正在等待完成。

假设您有1000个url,并且您只希望一次打开50个请求; 但只要一个请求完成,您就可以打开到列表中下一个URL的连接。 那样的话,一次只能打开50个连接,直到URL列表被耗尽。

如果可能的话,我也想利用给定数量的线程。

我想出了一个扩展方法, ThrottleTasksAsync ,做我想要的。 那里有一个更简单的解决scheme吗? 我会假设这是一个常见的情况。

用法:

 class Program { static void Main(string[] args) { Enumerable.Range(1, 10).ThrottleTasksAsync(5, 2, async i => { Console.WriteLine(i); return i; }).Wait(); Console.WriteLine("Press a key to exit..."); Console.ReadKey(true); } } 

这里是代码:

 static class IEnumerableExtensions { public static async Task<Result_T[]> ThrottleTasksAsync<Enumerable_T, Result_T>(this IEnumerable<Enumerable_T> enumerable, int maxConcurrentTasks, int maxDegreeOfParallelism, Func<Enumerable_T, Task<Result_T>> taskToRun) { var blockingQueue = new BlockingCollection<Enumerable_T>(new ConcurrentBag<Enumerable_T>()); var semaphore = new SemaphoreSlim(maxConcurrentTasks); // Run the throttler on a separate thread. var t = Task.Run(() => { foreach (var item in enumerable) { // Wait for the semaphore semaphore.Wait(); blockingQueue.Add(item); } blockingQueue.CompleteAdding(); }); var taskList = new List<Task<Result_T>>(); Parallel.ForEach(IterateUntilTrue(() => blockingQueue.IsCompleted), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism }, _ => { Enumerable_T item; if (blockingQueue.TryTake(out item, 100)) { taskList.Add( // Run the task taskToRun(item) .ContinueWith(tsk => { // For effect Thread.Sleep(2000); // Release the semaphore semaphore.Release(); return tsk.Result; } ) ); } }); // Await all the tasks. return await Task.WhenAll(taskList); } static IEnumerable<bool> IterateUntilTrue(Func<bool> condition) { while (!condition()) yield return true; } } 

该方法利用BlockingCollectionSemaphoreSlim使其工作。 throttler在一个线程上运行,所有的asynchronous任务在另一个线程上运行。 为了实现并行性,我添加了一个传递给Parallel.ForEach循环的maxDegreeOfParallelism参数,作为while循环重新使用。

旧版本是:

 foreach (var master = ...) { var details = ...; Parallel.ForEach(details, detail => { // Process each detail record here }, new ParallelOptions { MaxDegreeOfParallelism = 15 }); // Perform the final batch updates here } 

但是,线程池快速耗尽,无法进行async / await

奖金:为了解决BlockingCollection中的问题,当调用CompleteAdding()时,在Take()引发exception,我使用TryTake重载超时。 如果我没有在TryTake使用超时,它将失败使用BlockingCollection的目的,因为TryTake不会阻塞。 有没有更好的办法? 理想情况下,会有一个TakeAsync方法。

如所build议,使用TPL数据stream。

TransformBlock<TInput, TOutput>可能就是你要找的东西。

您定义了一个MaxDegreeOfParallelism来限制可以并行转换多less个string(即可以下载多less个url)。 然后,您将url发布到该块,完成后,您会告诉该块您已完成添加项目并获取响应。

 var downloader = new TransformBlock<string, HttpResponse>( url => Download(url), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 50 } ); var buffer = new BufferBlock<HttpResponse>(); downloader.LinkTo(buffer); foreach(var url in urls) downloader.Post(url); //or await downloader.SendAsync(url); downloader.Complete(); await downloader.Completion; IList<HttpResponse> responses; if (buffer.TryReceiveAll(out responses)) { //process responses } 

注意: TransformBlock缓冲其input和输出。 那么为什么我们需要把它连接到一个BufferBlock

因为直到所有项目( HttpResponse )已被使用,并且await downloader.Completion者, TransformBlock才会完成。完成将挂起。 相反,我们让downloader将其所有输出转发到一个专用的缓冲区块,然后等待downloader器完成,然后检查缓冲区块。

假设您有1000个url,并且您只希望一次打开50个请求; 但只要一个请求完成,您就可以打开到列表中下一个URL的连接。 那样的话,一次只能打开50个连接,直到URL列表被耗尽。

以下简单的解决scheme已经在SO上多次出现。 它不使用阻塞代码,也不会显式创build线程,因此可以很好地扩展:

 const int MAX_DOWNLOADS = 50; static async Task DownloadAsync(string[] urls) { using (var semaphore = new SemaphoreSlim(MAX_DOWNLOADS)) using (var httpClient = new HttpClient()) { var tasks = urls.Select(async url => { await semaphore.WaitAsync(); try { var data = await httpClient.GetStringAsync(url); Console.WriteLine(data); } finally { semaphore.Release(); } }); await Task.WhenAll(tasks); } } 

问题是,下载数据的处理应该在不同的stream水线上完成,具有不同的并行级别,特别是如果是CPU限制的处理。

例如,你可能希望有4个线程同时进行数据处理(CPU核心数量),以及多达50个待处理的更多数据请求(根本不使用线程)。 AFAICT,这不是你的代码正在做什么。

这就是TPL Dataflow或Rx可以派上用场的首选解决scheme。 然而,用普通的TPL来实现这样的东西当然是可能的。 请注意,这里唯一的阻止代码是在Task.Run进行实际数据处理的Task.Run

 const int MAX_DOWNLOADS = 50; const int MAX_PROCESSORS = 4; // process data class Processing { SemaphoreSlim _semaphore = new SemaphoreSlim(MAX_PROCESSORS); HashSet<Task> _pending = new HashSet<Task>(); object _lock = new Object(); async Task ProcessAsync(string data) { await _semaphore.WaitAsync(); try { await Task.Run(() => { // simuate work Thread.Sleep(1000); Console.WriteLine(data); }); } finally { _semaphore.Release(); } } public async void QueueItemAsync(string data) { var task = ProcessAsync(data); lock (_lock) _pending.Add(task); try { await task; } catch { if (!task.IsCanceled && !task.IsFaulted) throw; // not the task's exception, rethrow // don't remove faulted/cancelled tasks from the list return; } // remove successfully completed tasks from the list lock (_lock) _pending.Remove(task); } public async Task WaitForCompleteAsync() { Task[] tasks; lock (_lock) tasks = _pending.ToArray(); await Task.WhenAll(tasks); } } // download data static async Task DownloadAsync(string[] urls) { var processing = new Processing(); using (var semaphore = new SemaphoreSlim(MAX_DOWNLOADS)) using (var httpClient = new HttpClient()) { var tasks = urls.Select(async (url) => { await semaphore.WaitAsync(); try { var data = await httpClient.GetStringAsync(url); // put the result on the processing pipeline processing.QueueItemAsync(data); } finally { semaphore.Release(); } }); await Task.WhenAll(tasks.ToArray()); await processing.WaitForCompleteAsync(); } } 

按照要求,这是我最终结束的代码。

工作在主从configuration中进行设置,每个主服务器都作为批处理进行处理。 每个工作单位都以这种方式排队:

 var success = true; // Start processing all the master records. Master master; while (null != (master = await StoredProcedures.ClaimRecordsAsync(...))) { await masterBuffer.SendAsync(master); } // Finished sending master records masterBuffer.Complete(); // Now, wait for all the batches to complete. await batchAction.Completion; return success; 

大师一次缓冲一个,以节省其他外部stream程的工作。 通过masterTransform TransformManyBlock调度每个master的详细信息。 BatchedJoinBlock也被创build来收集一批中的细节。

实际工作是在detailTransform TransformBlock中asynchronous完成的, detailTransform 150个。 BoundedCapacity被设置为300,以确保太多的Masters在链的开始处不被缓冲,同时留出足够的细节logging排队以允许一次处理150个logging。 该块输出一个object到它的目标,因为它通过链接被过滤,取决于它是一个Detail还是Exception

batchAction ActionBlock收集所有批次的输出,并为每个批次执行批量数据库更新,错误logging等。

将会有几个BatchedJoinBlock ,每个主人一个。 由于每个ISourceBlock是按顺序输出的,每个批次只接受与一个主文件关联的详细logging的数量,所以批次将按顺序处理。 每个块只输出一个组,并在完成时取消链接。 只有最后一个批处理块将其完成传播到最终的ActionBlock

数据streamnetworking:

 // The dataflow network BufferBlock<Master> masterBuffer = null; TransformManyBlock<Master, Detail> masterTransform = null; TransformBlock<Detail, object> detailTransform = null; ActionBlock<Tuple<IList<object>, IList<object>>> batchAction = null; // Buffer master records to enable efficient throttling. masterBuffer = new BufferBlock<Master>(new DataflowBlockOptions { BoundedCapacity = 1 }); // Sequentially transform master records into a stream of detail records. masterTransform = new TransformManyBlock<Master, Detail>(async masterRecord => { var records = await StoredProcedures.GetObjectsAsync(masterRecord); // Filter the master records based on some criteria here var filteredRecords = records; // Only propagate completion to the last batch var propagateCompletion = masterBuffer.Completion.IsCompleted && masterTransform.InputCount == 0; // Create a batch join block to encapsulate the results of the master record. var batchjoinblock = new BatchedJoinBlock<object, object>(records.Count(), new GroupingDataflowBlockOptions { MaxNumberOfGroups = 1 }); // Add the batch block to the detail transform pipeline's link queue, and link the batch block to the the batch action block. var detailLink1 = detailTransform.LinkTo(batchjoinblock.Target1, detailResult => detailResult is Detail); var detailLink2 = detailTransform.LinkTo(batchjoinblock.Target2, detailResult => detailResult is Exception); var batchLink = batchjoinblock.LinkTo(batchAction, new DataflowLinkOptions { PropagateCompletion = propagateCompletion }); // Unlink batchjoinblock upon completion. // (the returned task does not need to be awaited, despite the warning.) batchjoinblock.Completion.ContinueWith(task => { detailLink1.Dispose(); detailLink2.Dispose(); batchLink.Dispose(); }); return filteredRecords; }, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 }); // Process each detail record asynchronously, 150 at a time. detailTransform = new TransformBlock<Detail, object>(async detail => { try { // Perform the action for each detail here asynchronously await DoSomethingAsync(); return detail; } catch (Exception e) { success = false; return e; } }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 150, BoundedCapacity = 300 }); // Perform the proper action for each batch batchAction = new ActionBlock<Tuple<IList<object>, IList<object>>>(async batch => { var details = batch.Item1.Cast<Detail>(); var errors = batch.Item2.Cast<Exception>(); // Do something with the batch here }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 }); masterBuffer.LinkTo(masterTransform, new DataflowLinkOptions { PropagateCompletion = true }); masterTransform.LinkTo(detailTransform, new DataflowLinkOptions { PropagateCompletion = true });