C#生产者/消费者

我最近遇到了一个生产者/消费者模式的C#实现。 这非常简单,至less对我来说非常优雅。

它似乎是在2006年前后devise的,所以我想知道这个实现是否是
– 安全
– 仍然适用

代码如下(原代码被引用在http://bytes.com/topic/net/answers/575276-producer-consumer#post2251375 )

using System; using System.Collections; using System.Threading; public class Test { static ProducerConsumer queue; static void Main() { queue = new ProducerConsumer(); new Thread(new ThreadStart(ConsumerJob)).Start(); Random rng = new Random(0); for (int i=0; i < 10; i++) { Console.WriteLine ("Producing {0}", i); queue.Produce(i); Thread.Sleep(rng.Next(1000)); } } static void ConsumerJob() { // Make sure we get a different random seed from the // first thread Random rng = new Random(1); // We happen to know we've only got 10 // items to receive for (int i=0; i < 10; i++) { object o = queue.Consume(); Console.WriteLine ("\t\t\t\tConsuming {0}", o); Thread.Sleep(rng.Next(1000)); } } } public class ProducerConsumer { readonly object listLock = new object(); Queue queue = new Queue(); public void Produce(object o) { lock (listLock) { queue.Enqueue(o); // We always need to pulse, even if the queue wasn't // empty before. Otherwise, if we add several items // in quick succession, we may only pulse once, waking // a single thread up, even if there are multiple threads // waiting for items. Monitor.Pulse(listLock); } } public object Consume() { lock (listLock) { // If the queue is empty, wait for an item to be added // Note that this is a while loop, as we may be pulsed // but not wake up before another thread has come in and // consumed the newly added object. In that case, we'll // have to wait for another pulse. while (queue.Count==0) { // This releases listLock, only reacquiring it // after being woken up by a call to Pulse Monitor.Wait(listLock); } return queue.Dequeue(); } } } 

代码比这个更早 – 在.NET 2.0推出之前我写了一些代码。 生产者/消费者队列的概念比这个更老了:)

是的,就我所知,该代码是安全的,但它有一些缺陷:

  • 这是非通用的。 现代版本肯定是通用的。
  • 它无法停止队列。 一个简单的停止队列的方法(使所有的消费者线程退出)是有一个“停止工作”令牌,可以放入队列中。 然后,您可以添加与线程一样多的标记。 另外,你有一个单独的标志,表明你想停止。 (这允许其他线程在完成队列中的所有当前工作之前停止。)
  • 如果工作量很小,一次只能工作一次可能不是最有效率的事情。

说实话,代码背后的思想比代码本身更重要。

你可以做下面的代码片段。 它是通用的,并有一个入队空值(或任何你想使用的标志)告诉工作线程退出的方法。

代码取自: http : //www.albahari.com/threading/part4.aspx#_Wait_and_Pulse

 using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; namespace ConsoleApplication1 { public class TaskQueue<T> : IDisposable where T : class { object locker = new object(); Thread[] workers; Queue<T> taskQ = new Queue<T>(); public TaskQueue(int workerCount) { workers = new Thread[workerCount]; // Create and start a separate thread for each worker for (int i = 0; i < workerCount; i++) (workers[i] = new Thread(Consume)).Start(); } public void Dispose() { // Enqueue one null task per worker to make each exit. foreach (Thread worker in workers) EnqueueTask(null); foreach (Thread worker in workers) worker.Join(); } public void EnqueueTask(T task) { lock (locker) { taskQ.Enqueue(task); Monitor.PulseAll(locker); } } void Consume() { while (true) { T task; lock (locker) { while (taskQ.Count == 0) Monitor.Wait(locker); task = taskQ.Dequeue(); } if (task == null) return; // This signals our exit Console.Write(task); Thread.Sleep(1000); // Simulate time-consuming task } } } } 

回到那一天,我从上面的代码和文章系列中学到了Monitor.Wait / Pulse如何工作(以及一般的线程)。 正如Jon所说,它有很大的价值,而且确实是安全和适用的。

但是,从.NET 4开始, 框架中有一个生产者 – 消费者队列的实现 。 我只是自己find了它,但是到了这一点,它完成了我所需要的一切。

警告:如果你阅读评论,你会明白我的回答是错误的:)

你的代码有可能发生死锁

想象下面的情况,为了清晰起见,我使用了单线程的方法,但应该很容易转换为带睡眠的multithreading:

 // We create some actions... object locker = new object(); Action action1 = () => { lock (locker) { System.Threading.Monitor.Wait(locker); Console.WriteLine("This is action1"); } }; Action action2 = () => { lock (locker) { System.Threading.Monitor.Wait(locker); Console.WriteLine("This is action2"); } }; // ... (stuff happens, etc.) // Imagine both actions were running // and there's 0 items in the queue // And now the producer kicks in... lock (locker) { // This would add a job to the queue Console.WriteLine("Pulse now!"); System.Threading.Monitor.Pulse(locker); } // ... (more stuff) // and the actions finish now! Console.WriteLine("Consume action!"); action1(); // Oops... they're locked... action2(); 

请让我知道这是否没有任何意义。

如果这个被证实,那么你的问题的答案是“不,这是不安全的”;)我希望这有助于。