等待池线程完成

我很抱歉有一个多余的问题。 不过,我发现了很多解决scheme,但没有一个解释得很好。 我希望在这里可以明确说明。

我的C#应用​​程序的主线程产生1..n使用ThreadPool的后台工作人员。 我希望原始的线程locking,直到所有的工人完成。 我特别研究了ManualResetEvent,但是我不清楚它的用途。

在伪:

foreach( var o in collection ) { queue new worker(o); } while( workers not completed ) { continue; } 

如有需要,我会知道将要排队的工人数量。

尝试这个。 该函数包含一个Action委托的列表。 它将为列表中的每个项目添加一个ThreadPool worker条目。 它将等待每一个动作完成之前返回。

 public static void SpawnAndWait(IEnumerable<Action> actions) { var list = actions.ToList(); var handles = new ManualResetEvent[actions.Count()]; for (var i = 0; i < list.Count; i++) { handles[i] = new ManualResetEvent(false); var currentAction = list[i]; var currentHandle = handles[i]; Action wrappedAction = () => { try { currentAction(); } finally { currentHandle.Set(); } }; ThreadPool.QueueUserWorkItem(x => wrappedAction()); } WaitHandle.WaitAll(handles); } 

这是一个不同的方法 – 封装; 所以你的代码可以像下面这样简单:

  Forker p = new Forker(); foreach (var obj in collection) { var tmp = obj; p.Fork(delegate { DoSomeWork(tmp); }); } p.Join(); 

Forker类在下面给出(我在列车上感到无聊; -p)…再次,这避免了操作系统的对象,但包装很整齐(海事组织):

 using System; using System.Threading; /// <summary>Event arguments representing the completion of a parallel action.</summary> public class ParallelEventArgs : EventArgs { private readonly object state; private readonly Exception exception; internal ParallelEventArgs(object state, Exception exception) { this.state = state; this.exception = exception; } /// <summary>The opaque state object that identifies the action (null otherwise).</summary> public object State { get { return state; } } /// <summary>The exception thrown by the parallel action, or null if it completed without exception.</summary> public Exception Exception { get { return exception; } } } /// <summary>Provides a caller-friendly wrapper around parallel actions.</summary> public sealed class Forker { int running; private readonly object joinLock = new object(), eventLock = new object(); /// <summary>Raised when all operations have completed.</summary> public event EventHandler AllComplete { add { lock (eventLock) { allComplete += value; } } remove { lock (eventLock) { allComplete -= value; } } } private EventHandler allComplete; /// <summary>Raised when each operation completes.</summary> public event EventHandler<ParallelEventArgs> ItemComplete { add { lock (eventLock) { itemComplete += value; } } remove { lock (eventLock) { itemComplete -= value; } } } private EventHandler<ParallelEventArgs> itemComplete; private void OnItemComplete(object state, Exception exception) { EventHandler<ParallelEventArgs> itemHandler = itemComplete; // don't need to lock if (itemHandler != null) itemHandler(this, new ParallelEventArgs(state, exception)); if (Interlocked.Decrement(ref running) == 0) { EventHandler allHandler = allComplete; // don't need to lock if (allHandler != null) allHandler(this, EventArgs.Empty); lock (joinLock) { Monitor.PulseAll(joinLock); } } } /// <summary>Adds a callback to invoke when each operation completes.</summary> /// <returns>Current instance (for fluent API).</returns> public Forker OnItemComplete(EventHandler<ParallelEventArgs> handler) { if (handler == null) throw new ArgumentNullException("handler"); ItemComplete += handler; return this; } /// <summary>Adds a callback to invoke when all operations are complete.</summary> /// <returns>Current instance (for fluent API).</returns> public Forker OnAllComplete(EventHandler handler) { if (handler == null) throw new ArgumentNullException("handler"); AllComplete += handler; return this; } /// <summary>Waits for all operations to complete.</summary> public void Join() { Join(-1); } /// <summary>Waits (with timeout) for all operations to complete.</summary> /// <returns>Whether all operations had completed before the timeout.</returns> public bool Join(int millisecondsTimeout) { lock (joinLock) { if (CountRunning() == 0) return true; Thread.SpinWait(1); // try our luck... return (CountRunning() == 0) || Monitor.Wait(joinLock, millisecondsTimeout); } } /// <summary>Indicates the number of incomplete operations.</summary> /// <returns>The number of incomplete operations.</returns> public int CountRunning() { return Interlocked.CompareExchange(ref running, 0, 0); } /// <summary>Enqueues an operation.</summary> /// <param name="action">The operation to perform.</param> /// <returns>The current instance (for fluent API).</returns> public Forker Fork(ThreadStart action) { return Fork(action, null); } /// <summary>Enqueues an operation.</summary> /// <param name="action">The operation to perform.</param> /// <param name="state">An opaque object, allowing the caller to identify operations.</param> /// <returns>The current instance (for fluent API).</returns> public Forker Fork(ThreadStart action, object state) { if (action == null) throw new ArgumentNullException("action"); Interlocked.Increment(ref running); ThreadPool.QueueUserWorkItem(delegate { Exception exception = null; try { action(); } catch (Exception ex) { exception = ex;} OnItemComplete(state, exception); }); return this; } } 

首先,工人执行多久? 池线程通常应该用于短期任务 – 如果他们要运行一段时间,考虑手动线程。

重新解决问题; 你真的需要阻止主线程吗? 你可以使用callback吗? 如果是这样的话:

 int running = 1; // start at 1 to prevent multiple callbacks if // tasks finish faster than they are started Action endOfThread = delegate { if(Interlocked.Decrement(ref running) == 0) { // ****run callback method**** } }; foreach(var o in collection) { var tmp = o; // avoid "capture" issue Interlocked.Increment(ref running); ThreadPool.QueueUserWorkItem(delegate { DoSomeWork(tmp); // [A] should handle exceptions internally endOfThread(); }); } endOfThread(); // opposite of "start at 1" 

这是一个相当轻量级的(无操作系统原语)跟踪工作人员的方式。

如果需要阻止,可以使用Monitor (同样避免OS对象)执行相同的操作:

  object syncLock = new object(); int running = 1; Action endOfThread = delegate { if (Interlocked.Decrement(ref running) == 0) { lock (syncLock) { Monitor.Pulse(syncLock); } } }; lock (syncLock) { foreach (var o in collection) { var tmp = o; // avoid "capture" issue ThreadPool.QueueUserWorkItem(delegate { DoSomeWork(tmp); // [A] should handle exceptions internally endOfThread(); }); } endOfThread(); Monitor.Wait(syncLock); } Console.WriteLine("all done"); 

我一直在CTP中使用新的并行任务库:

  Parallel.ForEach(collection, o => { DoSomeWork(o); }); 

这是一个使用CountdownEvent类的解决scheme。

 var complete = new CountdownEvent(1); foreach (var o in collection) { var capture = o; ThreadPool.QueueUserWorkItem((state) => { try { DoSomething(capture); } finally { complete.Signal(); } }, null); } complete.Signal(); complete.Wait(); 

当然,如果你有权访问CountdownEvent类,那么你有整个TPL的工作。 Parallelclass负责等待你。

 Parallel.ForEach(collection, o => { DoSomething(o); }); 

我认为你在ManualResetEvent的正确轨道上。 这个链接有一个代码示例,与你想要做的事情非常吻合。 关键是使用WaitHandle.WaitAll并传递一个等待事件数组。 每个线程都需要设置其中一个等待事件。

  // Simultaneously calculate the terms. ThreadPool.QueueUserWorkItem( new WaitCallback(CalculateBase)); ThreadPool.QueueUserWorkItem( new WaitCallback(CalculateFirstTerm)); ThreadPool.QueueUserWorkItem( new WaitCallback(CalculateSecondTerm)); ThreadPool.QueueUserWorkItem( new WaitCallback(CalculateThirdTerm)); // Wait for all of the terms to be calculated. WaitHandle.WaitAll(autoEvents); // Reset the wait handle for the next calculation. manualEvent.Reset(); 

编辑:

确保在您的工作线程代码path中设置事件(即autoEvents 1 .Set();)。 一旦所有信号都发出,waitAll将返回。

 void CalculateSecondTerm(object stateInfo) { double preCalc = randomGenerator.NextDouble(); manualEvent.WaitOne(); secondTerm = preCalc * baseNumber * randomGenerator.NextDouble(); autoEvents[1].Set(); } 

我在这里find了一个好的解决scheme:

http://msdn.microsoft.com/en-us/magazine/cc163914.aspx

可以用同样的问题派上用场

使用.NET 4.0 Barrie r类:

  Barrier sync = new Barrier(1); foreach(var o in collection) { WaitCallback worker = (state) => { // do work sync.SignalAndWait(); }; sync.AddParticipant(); ThreadPool.QueueUserWorkItem(worker, o); } sync.SignalAndWait();