System.Threading.Tasks – 限制并发任务的数量

我刚刚开始在.Net 4.0中查看新的“System.Threading.Tasks”的好处,并想知道是否有任何构build支持限制一次运行的并发任务的数量,或者如果这应该手动处理。

EG:如果我需要调用100次计算方法,有没有办法设置100个任务,但是只能同时执行5个? 答案可能只是创build5个任务,调用Task.WaitAny,并创build一个新的任务,每个前一个完成。 如果还有更好的方法,我只想确保我不会错过任何一个技巧。

基本上,是否有内置的方式来做到这一点:

Dim taskArray() = {New Task(Function() DoComputation1()), New Task(Function() DoComputation2()), ... New Task(Function() DoComputation100())} Dim maxConcurrentThreads As Integer = 5 RunAllTasks(taskArray, maxConcurrentThreads) 

感谢您的帮助。

我知道这已经快一年了,但是我发现了一个更简单的方法来实现这个目标,所以我想我会分享一下:

 Dim actionsArray() As Action = new Action(){ New Action(Sub() DoComputation1()), New Action(Sub() DoComputation2()), ... New Action(Sub() DoComputation100()) } System.Threading.Tasks.Parallel.Invoke(New Tasks.ParallelOptions() With {.MaxDegreeOfParallelism = 5}, actionsArray) 

瞧!

我知道这是一个古老的线程,但我只是想分享我的解决scheme,这个问题:使用信号量。

(这是在C#中)

 private void RunAllActions(IEnumerable<Action> actions, int maxConcurrency) { using(SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency)) { foreach(Action action in actions) { Task.Factory.StartNew(() => { concurrencySemaphore.Wait(); try { action(); } finally { concurrencySemaphore.Release(); } }); } } } 

一个解决scheme可能是在这里看一下微软的预制代码。

描述如下:“提供一个任务调度程序,确保在ThreadPool上运行时具有最大的并发级别”,并且就我已经能够testing它而言,似乎可以做到这一点,就像ParallelOptions中的MaxDegreeOfParallelism属性。

C#相当于James提供的样本

 Action[] actionsArray = new Action[] { new Action(() => DoComputation1()), new Action(() => DoComputation2()), //... new Action(() => DoComputation100()) }; System.Threading.Tasks.Parallel.Invoke(new Tasks.ParallelOptions {MaxDegreeOfParallelism = 5 }, actionsArray) 

简短的回答:如果你想要的是限制工作任务的数量,使他们不饱和你的networking服务,那么我认为你的方法是好的。

长答案: .NET 4.0中的新System.Threading.Tasks引擎运行在.NET ThreadPool之上。 由于每个进程只有一个ThreadPool,并且默认最多有250个工作线程。 因此,如果要将ThreadPool的最大线程数设置为更适中的数,则可以减less并发执行的线程数,从而减less使用ThreadPool.SetMaxThreads (...) API的任务数。

不过,请注意,您可能不会单独使用ThreadPool,因为您使用的许多其他类也可能会将项目排列到ThreadPool。 因此,这样做很可能最终导致应用程序的其他部分崩溃。 还要注意的是,由于ThreadPool使用一种algorithm来优化对给定机器底层内核的使用,限制线程池可以排列到任意低数量的线程数量可能会导致一些相当严重的性能问题。

同样,如果你想执行less量的工作任务/线程来执行一些任务,那么只有创buildless量的任务(对比100)是最好的方法。

我的博客文章展示了如何使用“任务”和“操作”来完成此任务,并提供了一个示例项目,您可以下载并运行这些项目以查看这两个实例。

与行动

如果使用Actions,则可以使用内置的.Net Parallel.Invoke函数。 这里我们限制它并行运行最多5个线程。

 var listOfActions = new List<Action>(); for (int i = 0; i < 100; i++) { // Note that we create the Action here, but do not start it. listOfActions.Add(() => DoSomething()); } var options = new ParallelOptions {MaxDegreeOfParallelism = 5}; Parallel.Invoke(options, listOfActions.ToArray()); 

与任务

既然你在这里使用任务,没有内置函数。 不过,您可以使用我在我的博客上提供的那个。

  /// <summary> /// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel. /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para> /// </summary> /// <param name="tasksToRun">The tasks to run.</param> /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param> /// <param name="cancellationToken">The cancellation token.</param> public static void StartAndWaitAllThrottled(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, CancellationToken cancellationToken = new CancellationToken()) { StartAndWaitAllThrottled(tasksToRun, maxTasksToRunInParallel, -1, cancellationToken); } /// <summary> /// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel. /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para> /// </summary> /// <param name="tasksToRun">The tasks to run.</param> /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param> /// <param name="timeoutInMilliseconds">The maximum milliseconds we should allow the max tasks to run in parallel before allowing another task to start. Specify -1 to wait indefinitely.</param> /// <param name="cancellationToken">The cancellation token.</param> public static void StartAndWaitAllThrottled(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, int timeoutInMilliseconds, CancellationToken cancellationToken = new CancellationToken()) { // Convert to a list of tasks so that we don&#39;t enumerate over it multiple times needlessly. var tasks = tasksToRun.ToList(); using (var throttler = new SemaphoreSlim(maxTasksToRunInParallel)) { var postTaskTasks = new List<Task>(); // Have each task notify the throttler when it completes so that it decrements the number of tasks currently running. tasks.ForEach(t => postTaskTasks.Add(t.ContinueWith(tsk => throttler.Release()))); // Start running each task. foreach (var task in tasks) { // Increment the number of tasks currently running and wait if too many are running. throttler.Wait(timeoutInMilliseconds, cancellationToken); cancellationToken.ThrowIfCancellationRequested(); task.Start(); } // Wait for all of the provided tasks to complete. // We wait on the list of "post" tasks instead of the original tasks, otherwise there is a potential race condition where the throttler&#39;s using block is exited before some Tasks have had their "post" action completed, which references the throttler, resulting in an exception due to accessing a disposed object. Task.WaitAll(postTaskTasks.ToArray(), cancellationToken); } } 

然后创build任务列表并调用函数让它们运行,并且每次最多同时运行5次,那么可以这样做:

 var listOfTasks = new List<Task>(); for (int i = 0; i < 100; i++) { var count = i; // Note that we create the Task here, but do not start it. listOfTasks.Add(new Task(() => Something())); } Tasks.StartAndWaitAllThrottled(listOfTasks, 5); 

它看起来不像,虽然你可以创build一个TaskScheduler的子类来实现这样的行为。

如果您的程序使用webservices,同时连接的数量将被限制为ServicePointManager.DefaultConnectionLimit属性。 如果你想要5个同时连接,使用Arrow_Raider解决scheme是不够的。 您还应该增加ServicePointManager.DefaultConnectionLimit因为它默认只有2个。