如何在Java或C#中编写一个高效的循环缓冲区?

我想要一个简单的类来实现一个固定大小的循环缓冲区。 它应该是高效的,容易在眼睛上,一般types。

编辑:现在不需要是MT能力。 我以后可以随时添加一个锁,在任何情况下都不会是高并发的。

方法应该是:.Add和我猜.List,我检索所有条目。 第二个想法,我认为应该通过索引器来完成检索。 在任何时候,我都希望能够通过索引检索缓冲区中的任何元素。 但请记住,从一个时刻到下一个元素[n]可能会不同,因为循环缓冲区填满并翻转。

这不是一个堆栈,它是一个循环缓冲区。 关于“溢出”:我期望内部会有一个数组来保存项目,随着时间的推移,缓冲区的头部和尾部将围绕该固定数组旋转。 但是这应该是用户不可见的。 应该没有外部可检测的“溢出”事件或行为。

这不是一个学校的任务 – 它最常用于MRUcaching或固定大小的事务或事件日志。

我会使用T,一个头部和尾部指针的数组,并添加和获取方法。

喜欢:(find用户的问题)

// Hijack these for simplicity import java.nio.BufferOverflowException; import java.nio.BufferUnderflowException; public class CircularBuffer<T> { private T[] buffer; private int tail; private int head; @SuppressWarnings("unchecked") public CircularBuffer(int n) { buffer = (T[]) new Object[n]; tail = 0; head = 0; } public void add(T toAdd) { if (head != (tail - 1)) { buffer[head++] = toAdd; } else { throw new BufferOverflowException(); } head = head % buffer.length; } public T get() { T t = null; int adjTail = tail > head ? tail - buffer.length : tail; if (adjTail < head) { t = (T) buffer[tail++]; tail = tail % buffer.length; } else { throw new BufferUnderflowException(); } return t; } public String toString() { return "CircularBuffer(size=" + buffer.length + ", head=" + head + ", tail=" + tail + ")"; } public static void main(String[] args) { CircularBuffer<String> b = new CircularBuffer<String>(3); for (int i = 0; i < 10; i++) { System.out.println("Start: " + b); b.add("One"); System.out.println("One: " + b); b.add("Two"); System.out.println("Two: " + b); System.out.println("Got '" + b.get() + "', now " + b); b.add("Three"); System.out.println("Three: " + b); // Test Overflow // b.add("Four"); // System.out.println("Four: " + b); System.out.println("Got '" + b.get() + "', now " + b); System.out.println("Got '" + b.get() + "', now " + b); // Test Underflow // System.out.println("Got '" + b.get() + "', now " + b); // Back to start, let's shift on one b.add("Foo"); b.get(); } } } 

这是我如何(或不)在Java中编写一个高效的循环缓冲区。 它由一个简单的数组支持。 对于我的特殊用例,我需要高并发吞吐量,所以我用CAS来分配索引。 然后,我创build了可靠副本的机制,包括整个缓冲区的CAS副本。 我在一篇简短的文章中详细介绍的案例中使用了这一点。

 import java.util.concurrent.atomic.AtomicLong; import java.lang.reflect.Array; /** * A circular array buffer with a copy-and-swap cursor. * * <p>This class provides an list of T objects who's size is <em>unstable</em>. * It's intended for capturing data where the frequency of sampling greatly * outweighs the frequency of inspection (for instance, monitoring).</p> * * <p>This object keeps in memory a fixed size buffer which is used for * capturing objects. It copies the objects to a snapshot array which may be * worked with. The size of the snapshot array will vary based on the * stability of the array during the copy operation.</p> * * <p>Adding buffer to the buffer is <em>O(1)</em>, and lockless. Taking a * stable copy of the sample is <em>O(n)</em>.</p> */ public class ConcurrentCircularBuffer <T> { private final AtomicLong cursor = new AtomicLong(); private final T[] buffer; private final Class<T> type; /** * Create a new concurrent circular buffer. * * @param type The type of the array. This is captured for the same reason * it's required by {@link java.util.List.toArray()}. * * @param bufferSize The size of the buffer. * * @throws IllegalArgumentException if the bufferSize is a non-positive * value. */ public ConcurrentCircularBuffer (final Class <T> type, final int bufferSize) { if (bufferSize < 1) { throw new IllegalArgumentException( "Buffer size must be a positive value" ); } this.type = type; this.buffer = (T[]) new Object [ bufferSize ]; } /** * Add a new object to this buffer. * * <p>Add a new object to the cursor-point of the buffer.</p> * * @param sample The object to add. */ public void add (T sample) { buffer[(int) (cursor.getAndIncrement() % buffer.length)] = sample; } /** * Return a stable snapshot of the buffer. * * <p>Capture a stable snapshot of the buffer as an array. The snapshot * may not be the same length as the buffer, any objects which were * unstable during the copy will be factored out.</p> * * @return An array snapshot of the buffer. */ public T[] snapshot () { T[] snapshots = (T[]) new Object [ buffer.length ]; /* Determine the size of the snapshot by the number of affected * records. Trim the size of the snapshot by the number of records * which are considered to be unstable during the copy (the amount the * cursor may have moved while the copy took place). * * If the cursor eliminated the sample (if the sample size is so small * compared to the rate of mutation that it did a full-wrap during the * copy) then just treat the buffer as though the cursor is * buffer.length - 1 and it was not changed during copy (this is * unlikley, but it should typically provide fairly stable results). */ long before = cursor.get(); /* If the cursor hasn't yet moved, skip the copying and simply return a * zero-length array. */ if (before == 0) { return (T[]) Array.newInstance(type, 0); } System.arraycopy(buffer, 0, snapshots, 0, buffer.length); long after = cursor.get(); int size = buffer.length - (int) (after - before); long snapshotCursor = before - 1; /* Highly unlikely, but the entire buffer was replaced while we * waited...so just return a zero length array, since we can't get a * stable snapshot... */ if (size <= 0) { return (T[]) Array.newInstance(type, 0); } long start = snapshotCursor - (size - 1); long end = snapshotCursor; if (snapshotCursor < snapshots.length) { size = (int) snapshotCursor + 1; start = 0; } /* Copy the sample snapshot to a new array the size of our stable * snapshot area. */ T[] result = (T[]) Array.newInstance(type, size); int startOfCopy = (int) (start % snapshots.length); int endOfCopy = (int) (end % snapshots.length); /* If the buffer space wraps the physical end of the array, use two * copies to construct the new array. */ if (startOfCopy > endOfCopy) { System.arraycopy(snapshots, startOfCopy, result, 0, snapshots.length - startOfCopy); System.arraycopy(snapshots, 0, result, (snapshots.length - startOfCopy), endOfCopy + 1); } else { /* Otherwise it's a single continuous segment, copy the whole thing * into the result. */ System.arraycopy(snapshots, startOfCopy, result, 0, endOfCopy - startOfCopy + 1); } return (T[]) result; } /** * Get a stable snapshot of the complete buffer. * * <p>This operation fetches a snapshot of the buffer using the algorithm * defined in {@link snapshot()}. If there was concurrent modification of * the buffer during the copy, however, it will retry until a full stable * snapshot of the buffer was acquired.</p> * * <p><em>Note, for very busy buffers on large symmetric multiprocessing * machines and supercomputers running data processing intensive * applications, this operation has the potential of being fairly * expensive. In practice on commodity hardware, dualcore processors and * non-processing intensive systems (such as web services) it very rarely * retries.</em></p> * * @return A full copy of the internal buffer. */ public T[] completeSnapshot () { T[] snapshot = snapshot(); /* Try again until we get a snapshot that's the same size as the * buffer... This is very often a single iteration, but it depends on * how busy the system is. */ while (snapshot.length != buffer.length) { snapshot = snapshot(); } return snapshot; } /** * The size of this buffer. */ public int size () { return buffer.length; } } 

我将使用ArrayBlockingQueue或其他预build队列实现之一,具体取决于需求。 很less有必要自己实现这样的数据结构(除非是学校作业)。

编辑:现在你已经添加了要求“通过索引检索缓冲区中的任何元素”,我想你需要实现自己的类(除非谷歌集合或一些其他库提供)。 正如JeeBee的例子所示,循环缓冲区很容易实现。 你也可以看看ArrayBlockingQueue的源代码 – 它的代码非常干净,只需删除locking和不需要的方法,并添加通过索引访问它的方法。

这是一个即时可用的用于生产代码的Java的CircularArrayList实现 。 通过以Java推荐的方式覆盖AbstractList,它支持Java Collections Framework中的标准List实现(通用元素types,子列表,迭代等)所期望的所有function。

O(1)中完成以下调用:

  • 添加(item) – 添加到列表的末尾
  • 删除(0) – 从列表开始删除
  • get(i) – 检索列表中的随机元素

使用Java的ArrayDeque

只要使用别人的实现:

Power Collections Deque <T>由循环缓冲区实现。

功率集合库是不完整的,但Deque是完全可以接受的扩展循环缓冲区。

既然你表明你不想扩展,而是想要覆盖,你可以很容易地修改代码覆盖。 这只需要删除逻辑上相邻的指针的检查,然后只是写。 同时,私有缓冲区可以只读。

System.Collections.Generic.Queue – 简单的循环缓冲区里面(T []与头部和尾部,就像在JeeBee样本 )。

在Guava 15中,我们引入了EvictingQueue ,它是一个非阻塞的有界队列,它在尝试将元素添加到完整队列时自动从队列的头部逐出(移除)元素。 这与传统的有界队列不同,当队列满时阻塞或拒绝新的元素。

这听起来像是应该满足你的需求,并且比直接使用ArrayDeque更简单的接口(尽pipe它使用了一个!)。

更多信息可以在这里find。

如果一个lrucaching可以工作,可以考虑使用http://java.sun.com/javase/6/docs/api/java/util/LinkedHashMap.html#LinkedHashMap(int,%20float,%20boolean),http:/ /java.sun.com/javase/6/docs/api/java/util/LinkedHashMap.html#removeEldestEntry(java.util.Map.Entry)

这是我自己编写的一个实现,但这可能是有用的。

缓冲区包含最大固定的一组项目。 该集是圆形的,旧的项目被自动删除。 调用者可以通过绝对增量索引(long)获得项目尾部,但是项目可能在时间间隔太远的呼叫之间丢失。 这个类是完全线程安全的。

 public sealed class ConcurrentCircularBuffer<T> : ICollection<T> { private T[] _items; private int _index; private bool _full; public ConcurrentCircularBuffer(int capacity) { if (capacity <= 1) // need at least two items throw new ArgumentException(null, "capacity"); Capacity = capacity; _items = new T[capacity]; } public int Capacity { get; private set; } public long TotalCount { get; private set; } public int Count { get { lock (SyncObject) // full & _index need to be in sync { return _full ? Capacity : _index; } } } public void AddRange(IEnumerable<T> items) { if (items == null) return; lock (SyncObject) { foreach (var item in items) { AddWithLock(item); } } } private void AddWithLock(T item) { _items[_index] = item; _index++; if (_index == Capacity) { _full = true; _index = 0; } TotalCount++; } public void Add(T item) { lock (SyncObject) { AddWithLock(item); } } public void Clear() { lock (SyncObject) { _items = new T[Capacity]; _index = 0; _full = false; TotalCount = 0; } } // this gives raw access to the underlying buffer. not sure I should keep that public T this[int index] { get { return _items[index]; } } public T[] GetTail(long startIndex) { long lostCount; return GetTail(startIndex, out lostCount); } public T[] GetTail(long startIndex, out long lostCount) { if (startIndex < 0 || startIndex >= TotalCount) throw new ArgumentOutOfRangeException("startIndex"); T[] array = ToArray(); lostCount = (TotalCount - Count) - startIndex; if (lostCount >= 0) return array; lostCount = 0; // this maybe could optimized to not allocate the initial array // but in multi-threading environment, I suppose this is arguable (and more difficult). T[] chunk = new T[TotalCount - startIndex]; Array.Copy(array, array.Length - (TotalCount - startIndex), chunk, 0, chunk.Length); return chunk; } public T[] ToArray() { lock (SyncObject) { T[] items = new T[_full ? Capacity : _index]; if (_full) { if (_index == 0) { Array.Copy(_items, items, Capacity); } else { Array.Copy(_items, _index, items, 0, Capacity - _index); Array.Copy(_items, 0, items, Capacity - _index, _index); } } else if (_index > 0) { Array.Copy(_items, items, _index); } return items; } } public IEnumerator<T> GetEnumerator() { return ToArray().AsEnumerable().GetEnumerator(); } IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); } bool ICollection<T>.Contains(T item) { return _items.Contains(item); } void ICollection<T>.CopyTo(T[] array, int arrayIndex) { if (array == null) throw new ArgumentNullException("array"); if (array.Rank != 1) throw new ArgumentException(null, "array"); if (arrayIndex < 0) throw new ArgumentOutOfRangeException("arrayIndex"); if ((array.Length - arrayIndex) < Count) throw new ArgumentException(null, "array"); T[] thisArray = ToArray(); Array.Copy(thisArray, 0, array, arrayIndex, thisArray.Length); } bool ICollection<T>.IsReadOnly { get { return false; } } bool ICollection<T>.Remove(T item) { return false; } private static object _syncObject; private static object SyncObject { get { if (_syncObject == null) { object obj = new object(); Interlocked.CompareExchange(ref _syncObject, obj, null); } return _syncObject; } } } 

这是另一个使用Apache公共集合的BoundedFifoBuffer的实现。 如果您使用Apache的最新JAR,请使用CircularFifoQueue,因为下面的类已弃用

  BoundedFifoBuffer apiCallHistory = new BoundedFifoBuffer(20); for(int i =1 ; i < 25; i++){ if(apiCallHistory.isFull()){ System.out.println("removing :: "+apiCallHistory.remove()); } apiCallHistory.add(i); } 

我想在java的angular度来回答这个问题。

要用java实现一个循环缓冲区,你可能需要三件事情:一个循环缓冲类,通用的和less量的操作(为了了解你需要哪些操作以及这些操作的内部机制,你可能需要阅读wiki for首先是循环缓冲区 )。

其次,缓冲区满或空的判断应该非常谨慎。 在这里,我给出了两个本能的全面/空洞的判断。 在解决scheme一中,您需要创build两个整数variables来存储缓冲区的当前大小和缓冲区的最大大小。 显然,如果当前大小等于最大大小,则缓冲区已满。

在另一个解决scheme中,我们将最后一个存储空间设置为空闲(对于大小为7的循环缓冲区,我们将空间中的存储设置为7)。 据此,当expression式(rp+1)%MAXSIZE == fp;时,我们可以确定缓冲区已满(rp+1)%MAXSIZE == fp; 满意。

为了更清楚地说明,这里给出了一个使用java语言的实现。

 import java.nio.BufferOverflowException; import java.nio.BufferUnderflowException; public class CircularBuffer<T> { private int front; private int rear; private int currentSize; private int maxSize; private T[] buffer; public CircularBuffer(int n) { buffer = (T[]) new Object[n]; front = 0; rear = 0; currentSize = 0; maxSize = n; } public void push(T e) { if (!isFull()) { buffer[rear] = e; currentSize++; rear = (rear + 1) % maxSize; } else throw new BufferOverflowException(); } public T pop() { if (!isEmpty()) { T temp = buffer[front]; buffer[front] = null; front = (front + 1) % maxSize; currentSize--; return temp; } else throw new BufferUnderflowException(); } public T peekFirst() { if (!isEmpty()) { return buffer[front]; } else return null; } public T peekLast() { if (!isEmpty()) { return buffer[rear - 1]; } else return null; } public int size() { return currentSize; } public boolean isEmpty() { if (currentSize == 0) { return true; } else return false; } public boolean isFull() { if (currentSize == maxSize) { return true; } else return false; } public boolean clean() { front = 0; rear = 0; while (rear != 0) { buffer[rear] = null; rear = (rear + 1) % maxSize; } return true; } public static void main(String[] args) { CircularBuffer<Integer> buff = new CircularBuffer<>(7); buff.push(0); buff.push(1); buff.push(2); System.out.println(buff.size()); System.out.println("The head element is: " + buff.pop()); System.out.println("Size should be twoo: " + buff.size()); System.out.println("The last element is one: " + buff.peekLast()); System.out.println("Size should be two: " + buff.size()); buff.clean(); System.out.println("Size should be zero: " + buff.size()); } } 
 // The following is in C# public class fqueue { // The following code implements a circular queue of objects //private data: private bool empty; private bool full; private int begin, end; private object[] x; //public data: public fqueue() { empty = !(full = false); begin = end = 0xA2; x = new object[256]; return; } public fqueue(int size) { if (1 > size) throw new Exception("fqueue: Size cannot be zero or negative"); empty = !(full = false); begin = end = 0xA2; x = new object[size]; return; } public object write { set { if(full) throw new Exception("Write error: Queue is full"); end = empty ? end : (end + 1) % x.Length; full = ((end + 1) % x.Length) == begin; empty = false; x[end] = value; } } public object read { get { if(empty) throw new Exception("Read error: Queue is empty"); full = false; object ret = x[begin]; begin = (empty=end==begin) ? begin : (begin + 1) % x.Length; return ret; } } public int maxSize { get { return x.Length; } } public int queueSize { get { return end - begin + (empty ? 0 : 1 + ((end < begin) ? x.Length : 0)); } } public bool isEmpty { get { return empty; } } public bool isFull { get { return full; } } public int start { get { return begin; } } public int finish { get { return end; } } }