固定大小的队列,在新的enques时自动出列旧值

我正在使用ConcurrentQueue共享数据结构,目的是持有传递给它的最后N个对象(种类的历史)。

假设我们有一个浏览器,我们想要有最后100个浏览的Url。 我想要一个队列,当容量变满(历史logging为100个地址)时,在插入新条目(入队)时自动删除(出队)最早的(第一个)入口。

我怎样才能做到这一点使用System.Collections

我会写一个包装类Enqueue将检查计数,然后出列计数超过限制。

  public class FixedSizedQueue<T> { ConcurrentQueue<T> q = new ConcurrentQueue<T>(); private object lockObject = new object(); public int Limit { get; set; } public void Enqueue(T obj) { q.Enqueue(obj); lock (lockObject) { T overflow; while (q.Count > Limit && q.TryDequeue(out overflow)) ; } } } 

我会去一个轻微的变体…扩展ConcurrentQueue,以便能够在FixedSizeQueue上使用Linq扩展

 public class FixedSizedQueue<T> : ConcurrentQueue<T> { private readonly object syncObject = new object(); public int Size { get; private set; } public FixedSizedQueue(int size) { Size = size; } public new void Enqueue(T obj) { base.Enqueue(obj); lock (syncObject) { while (base.Count > Size) { T outObj; base.TryDequeue(out outObj); } } } } 

对于任何认为有用的人来说,下面是一些基于Richard Schneider上面的答案的工作代码:

 public class FixedSizedQueue<T> { private readonly object privateLockObject = new object(); readonly ConcurrentQueue<T> queue = new ConcurrentQueue<T>(); public int Size { get; private set; } public FixedSizedQueue(int size) { Size = size; } public void Enqueue(T obj) { queue.Enqueue(obj); lock (privateLockObject) { while (queue.Count > Size) { T outObj; queue.TryDequeue(out outObj); } } } } 

对于它的价值,这里是一个轻量级的循环缓冲区,其中一些方法标记为安全和不安全的使用。

 public class CircularBuffer<T> : IEnumerable<T> { readonly int size; readonly object locker; int count; int head; int rear; T[] values; public CircularBuffer(int max) { this.size = max; locker = new object(); count = 0; head = 0; rear = 0; values = new T[size]; } static int Incr(int index, int size) { return (index + 1) % size; } private void UnsafeEnsureQueueNotEmpty() { if (count == 0) throw new Exception("Empty queue"); } public int Size { get { return size; } } public object SyncRoot { get { return locker; } } #region Count public int Count { get { return UnsafeCount; } } public int SafeCount { get { lock (locker) { return UnsafeCount; } } } public int UnsafeCount { get { return count; } } #endregion #region Enqueue public void Enqueue(T obj) { UnsafeEnqueue(obj); } public void SafeEnqueue(T obj) { lock (locker) { UnsafeEnqueue(obj); } } public void UnsafeEnqueue(T obj) { values[rear] = obj; if (Count == Size) head = Incr(head, Size); rear = Incr(rear, Size); count = Math.Min(count + 1, Size); } #endregion #region Dequeue public T Dequeue() { return UnsafeDequeue(); } public T SafeDequeue() { lock (locker) { return UnsafeDequeue(); } } public T UnsafeDequeue() { UnsafeEnsureQueueNotEmpty(); T res = values[head]; values[head] = default(T); head = Incr(head, Size); count--; return res; } #endregion #region Peek public T Peek() { return UnsafePeek(); } public T SafePeek() { lock (locker) { return UnsafePeek(); } } public T UnsafePeek() { UnsafeEnsureQueueNotEmpty(); return values[head]; } #endregion #region GetEnumerator public IEnumerator<T> GetEnumerator() { return UnsafeGetEnumerator(); } public IEnumerator<T> SafeGetEnumerator() { lock (locker) { List<T> res = new List<T>(count); var enumerator = UnsafeGetEnumerator(); while (enumerator.MoveNext()) res.Add(enumerator.Current); return res.GetEnumerator(); } } public IEnumerator<T> UnsafeGetEnumerator() { int index = head; for (int i = 0; i < count; i++) { yield return values[index]; index = Incr(index, size); } } System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() { return this.GetEnumerator(); } #endregion } 

我喜欢使用Foo()/SafeFoo()/UnsafeFoo()约定:

  • Foo方法默认调用UnsafeFoo
  • UnsafeFoo方法在不locking的情况下自由修改状态,只能调用其他不安全的方法。
  • SafeFoo方法调用锁内的UnsafeFoo方法。

它有点冗长,但是它会产生明显的错误,比如在一个被认为是线程安全的方法中调用锁之外的不安全的方法。

只是为了好玩,这里是另一个实现,我相信大部分评论者的关注。 特别是,线程安全是在不locking的情况下实现的,实现被包装类隐藏。

 public class FixedSizeQueue<T> : IReadOnlyCollection<T> { private ConcurrentQueue<T> _queue = new ConcurrentQueue<T>(); private int _count; public int Limit { get; private set; } public FixedSizeQueue(int limit) { this.Limit = limit; } public void Enqueue(T obj) { _queue.Enqueue(obj); Interlocked.Increment(ref _count); // Calculate the number of items to be removed by this thread in a thread safe manner int currentCount; int finalCount; do { currentCount = _count; finalCount = Math.Min(currentCount, this.Limit); } while (currentCount != Interlocked.CompareExchange(ref _count, finalCount, currentCount)); T overflow; while (currentCount > finalCount && _queue.TryDequeue(out overflow)) currentCount--; } public int Count { get { return _count; } } public IEnumerator<T> GetEnumerator() { return _queue.GetEnumerator(); } System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() { return _queue.GetEnumerator(); } } 

为了您的编码乐趣,我向您提交“ ConcurrentDeck

 public class ConcurrentDeck<T> { private readonly int _size; private readonly T[] _buffer; private int _position = 0; public ConcurrentDeck(int size) { _size = size; _buffer = new T[size]; } public void Push(T item) { lock (this) { _buffer[_position] = item; _position++; if (_position == _size) _position = 0; } } public T[] ReadDeck() { lock (this) { return _buffer.Skip(_position).Union(_buffer.Take(_position)).ToArray(); } } } 

用法示例:

 void Main() { var deck = new ConcurrentDeck<Tuple<string,DateTime>>(25); var handle = new ManualResetEventSlim(); var task1 = Task.Factory.StartNew(()=>{ var timer = new System.Timers.Timer(); timer.Elapsed += (s,a) => {deck.Push(new Tuple<string,DateTime>("task1",DateTime.Now));}; timer.Interval = System.TimeSpan.FromSeconds(1).TotalMilliseconds; timer.Enabled = true; handle.Wait(); }); var task2 = Task.Factory.StartNew(()=>{ var timer = new System.Timers.Timer(); timer.Elapsed += (s,a) => {deck.Push(new Tuple<string,DateTime>("task2",DateTime.Now));}; timer.Interval = System.TimeSpan.FromSeconds(.5).TotalMilliseconds; timer.Enabled = true; handle.Wait(); }); var task3 = Task.Factory.StartNew(()=>{ var timer = new System.Timers.Timer(); timer.Elapsed += (s,a) => {deck.Push(new Tuple<string,DateTime>("task3",DateTime.Now));}; timer.Interval = System.TimeSpan.FromSeconds(.25).TotalMilliseconds; timer.Enabled = true; handle.Wait(); }); System.Threading.Thread.Sleep(TimeSpan.FromSeconds(10)); handle.Set(); var outputtime = DateTime.Now; deck.ReadDeck().Select(d => new {Message = d.Item1, MilliDiff = (outputtime - d.Item2).TotalMilliseconds}).Dump(true); } 

这是我的队列版本:

 public class FixedSizedQueue<T> { private object LOCK = new object(); ConcurrentQueue<T> queue; public int MaxSize { get; set; } public FixedSizedQueue(int maxSize, IEnumerable<T> items = null) { this.MaxSize = maxSize; if (items == null) { queue = new ConcurrentQueue<T>(); } else { queue = new ConcurrentQueue<T>(items); EnsureLimitConstraint(); } } public void Enqueue(T obj) { queue.Enqueue(obj); EnsureLimitConstraint(); } private void EnsureLimitConstraint() { if (queue.Count > MaxSize) { lock (LOCK) { T overflow; while (queue.Count > MaxSize) { queue.TryDequeue(out overflow); } } } } /// <summary> /// returns the current snapshot of the queue /// </summary> /// <returns></returns> public T[] GetSnapshot() { return queue.ToArray(); } } 

我发现有一个基于IEnumerable的构造函数是有用的,我觉得让GetSnapshot在调用的时候有一个multithreading安全列表(在这个例子中是数组)是很有用的,如果底层集合发生变化,则会出错。

双计数检查是为了防止某些情况下的locking。