在.NET中创build一个阻塞队列<T>?

我有一个场景,我有多个线程添加到一个队列和多个线程从同一个队列读取。 如果队列达到特定的大小, 所有填充队列的线程将被阻塞,直到从队列中删除一个项目。

下面的解决scheme是我现在使用的,我的问题是:如何改进? 有没有一个对象已经在我应该使用的BCL中启用这种行为?

internal class BlockingCollection<T> : CollectionBase, IEnumerable { //todo: might be worth changing this into a proper QUEUE private AutoResetEvent _FullEvent = new AutoResetEvent(false); internal T this[int i] { get { return (T) List[i]; } } private int _MaxSize; internal int MaxSize { get { return _MaxSize; } set { _MaxSize = value; checkSize(); } } internal BlockingCollection(int maxSize) { MaxSize = maxSize; } internal void Add(T item) { Trace.WriteLine(string.Format("BlockingCollection add waiting: {0}", Thread.CurrentThread.ManagedThreadId)); _FullEvent.WaitOne(); List.Add(item); Trace.WriteLine(string.Format("BlockingCollection item added: {0}", Thread.CurrentThread.ManagedThreadId)); checkSize(); } internal void Remove(T item) { lock (List) { List.Remove(item); } Trace.WriteLine(string.Format("BlockingCollection item removed: {0}", Thread.CurrentThread.ManagedThreadId)); } protected override void OnRemoveComplete(int index, object value) { checkSize(); base.OnRemoveComplete(index, value); } internal new IEnumerator GetEnumerator() { return List.GetEnumerator(); } private void checkSize() { if (Count < MaxSize) { Trace.WriteLine(string.Format("BlockingCollection FullEvent set: {0}", Thread.CurrentThread.ManagedThreadId)); _FullEvent.Set(); } else { Trace.WriteLine(string.Format("BlockingCollection FullEvent reset: {0}", Thread.CurrentThread.ManagedThreadId)); _FullEvent.Reset(); } } } 

这看起来非常不安全(很less同步); 怎么样的东西:

 class SizeQueue<T> { private readonly Queue<T> queue = new Queue<T>(); private readonly int maxSize; public SizeQueue(int maxSize) { this.maxSize = maxSize; } public void Enqueue(T item) { lock (queue) { while (queue.Count >= maxSize) { Monitor.Wait(queue); } queue.Enqueue(item); if (queue.Count == 1) { // wake up any blocked dequeue Monitor.PulseAll(queue); } } } public T Dequeue() { lock (queue) { while (queue.Count == 0) { Monitor.Wait(queue); } T item = queue.Dequeue(); if (queue.Count == maxSize - 1) { // wake up any blocked enqueue Monitor.PulseAll(queue); } return item; } } } 

(编辑)

实际上,你需要一种方法来closures队列,以便读者能够干净地退出 – 也许是一个布尔标志 – 如果设置了,一个空的队列只是返回(而不是阻塞):

 bool closing; public void Close() { lock(queue) { closing = true; Monitor.PulseAll(queue); } } public bool TryDequeue(out T value) { lock (queue) { while (queue.Count == 0) { if (closing) { value = default(T); return false; } Monitor.Wait(queue); } value = queue.Dequeue(); if (queue.Count == maxSize - 1) { // wake up any blocked enqueue Monitor.PulseAll(queue); } return true; } } 

使用.net 4 BlockingCollection,使用Add()排队,使用Take()。 它在内部使用非阻塞的ConcurrentQueue。 更多信息在这里快速和最佳的生产者/消费者队列技术BlockingCollection vs并发队列

“这怎么能改进?”

那么,你需要看看你的类中的每一个方法,并考虑如果另一个线程同时调用该方法或其他方法会发生什么。 例如,您将一个锁放在Remove方法中,但不在Add方法中。 如果一个线程同时添加另一个线程,会发生什么情况? 坏事。

还要考虑一个方法可以返回第二个对象,该对象提供对第一个对象的内部数据的访问 – 例如GetEnumerator。 设想一个线程正在经历该枚举器,另一个线程正在同时修改列表。 不好。

一个好的经验法则是把这个类中的方法数量减less到最小,从而简化这个过程。

特别是,不要inheritance另一个容器类,因为您将公开所有该类的方法,为调用者提供了一种方法来破坏内部数据,或者看到部分完整的数据更改(同样也是一样糟糕,因为数据在那一刻出现损坏)。 隐藏所有的细节,并对你如何允许访问他们完全无情。

我强烈build议您使用现成的解决scheme – 获取关于线程化的书或使用第三方库。 否则,考虑到你正在尝试,你将会很长一段时间debugging你的代码。

另外,对于Remove来说,返回一个项目(比如,首先添加的项目,因为它是一个队列),而不是调用者select特定的项目会更有意义吗? 而当队列是空的,也许删除也应该阻止。

更新:Marc的答案实际上实现了所有这些build议! :)但是我会留在这里,因为它可能有助于理解为什么他的版本是这样的改进。

您可以使用System.Collections.Concurrent命名空间中的BlockingCollection和ConcurrentQueue

  public class ProducerConsumerQueue<T> : BlockingCollection<T> { /// <summary> /// Initializes a new instance of the ProducerConsumerQueue, Use Add and TryAdd for Enqueue and TryEnqueue and Take and TryTake for Dequeue and TryDequeue functionality /// </summary> public ProducerConsumerQueue() : base(new ConcurrentQueue<T>()) { } /// <summary> /// Initializes a new instance of the ProducerConsumerQueue, Use Add and TryAdd for Enqueue and TryEnqueue and Take and TryTake for Dequeue and TryDequeue functionality /// </summary> /// <param name="maxSize"></param> public ProducerConsumerQueue(int maxSize) : base(new ConcurrentQueue<T>(), maxSize) { } } 

我只是用反应式扩展(Reactive Extensions)来解决这个问题,并记住了这个问题:

 public class BlockingQueue<T> { private readonly Subject<T> _queue; private readonly IEnumerator<T> _enumerator; private readonly object _sync = new object(); public BlockingQueue() { _queue = new Subject<T>(); _enumerator = _queue.GetEnumerator(); } public void Enqueue(T item) { lock (_sync) { _queue.OnNext(item); } } public T Dequeue() { _enumerator.MoveNext(); return _enumerator.Current; } } 

不一定完全安全,但非常简单。

这是我来到一个线程安全的有界阻塞队列。

 using System; using System.Collections.Generic; using System.Text; using System.Threading; public class BlockingBuffer<T> { private Object t_lock; private Semaphore sema_NotEmpty; private Semaphore sema_NotFull; private T[] buf; private int getFromIndex; private int putToIndex; private int size; private int numItems; public BlockingBuffer(int Capacity) { if (Capacity <= 0) throw new ArgumentOutOfRangeException("Capacity must be larger than 0"); t_lock = new Object(); buf = new T[Capacity]; sema_NotEmpty = new Semaphore(0, Capacity); sema_NotFull = new Semaphore(Capacity, Capacity); getFromIndex = 0; putToIndex = 0; size = Capacity; numItems = 0; } public void put(T item) { sema_NotFull.WaitOne(); lock (t_lock) { while (numItems == size) { Monitor.Pulse(t_lock); Monitor.Wait(t_lock); } buf[putToIndex++] = item; if (putToIndex == size) putToIndex = 0; numItems++; Monitor.Pulse(t_lock); } sema_NotEmpty.Release(); } public T take() { T item; sema_NotEmpty.WaitOne(); lock (t_lock) { while (numItems == 0) { Monitor.Pulse(t_lock); Monitor.Wait(t_lock); } item = buf[getFromIndex++]; if (getFromIndex == size) getFromIndex = 0; numItems--; Monitor.Pulse(t_lock); } sema_NotFull.Release(); return item; } } 

我还没有完全探索TPL,但他们可能有一些符合您需要的东西,或者至less是一些Reflector饲料,以从中汲取灵感。

希望有所帮助。

那么你可以看看System.Threading.Semaphore类。 除此之外 – 不,你必须自己做。 AFAIK没有这样的内置集合。

如果您希望获得最大的吞吐量,允许多个读取器读取,并且只有一个写入器可以写入,BCL有一个名为ReaderWriterLockSlim的应用程序,可以帮助您细化代码…