你如何在C中实现一个循环缓冲区?

我需要一个固定的大小(创build时可以在运行时select,而不是编译时)可以容纳任何types的对象的循环缓冲区,它需要非常高的性能。 我不认为会有资源争夺问题,因为虽然它是在一个多任务embedded式环境中,但它是一个合作的环境,所以任务本身可以pipe理这个问题。

我最初的想法是存储一个简单的结构在缓冲区中将包含types(简单的枚举/定义)和一个无效的指针有效载荷,但我希望这是尽可能快,所以我打开的build议,涉及绕过堆。

其实我很高兴绕过任何标准库的原始速度 – 从我所看到的代码,它没有严格优化的CPU:它看起来像他们只是编译C代码的东西,如strcpy()等,没有手工编码的组件。

任何代码或想法将不胜感激。 所需的操作是:

  • 创build一个特定大小的缓冲区。
  • 放在尾部。
  • 从头开始。
  • 返回计数。
  • 删除一个缓冲区。

你可以列举你编写缓冲区时需要的types,还是需要在运行时通过dynamic调用添加types? 如果前者,那么我会创build缓冲区作为一个堆结构n结构数组,其中每个结构由两个元素组成:标识数据types的枚举标记和所有数据types的联合。 对于小型元素来说,额外的存储空间会损失什么,因此无需处理分配/解除分配以及由此产生的内存碎片。 然后你只需要跟踪定义缓冲区头部和尾部元素的开始和结束索引,并确保在增加/减less索引时计算mod n。

最简单的解决scheme是跟踪项目大小和项目数量,然后创build适当字节数的缓冲区:

 typedef struct circular_buffer { void *buffer; // data buffer void *buffer_end; // end of data buffer size_t capacity; // maximum number of items in the buffer size_t count; // number of items in the buffer size_t sz; // size of each item in the buffer void *head; // pointer to head void *tail; // pointer to tail } circular_buffer; void cb_init(circular_buffer *cb, size_t capacity, size_t sz) { cb->buffer = malloc(capacity * sz); if(cb->buffer == NULL) // handle error cb->buffer_end = (char *)cb->buffer + capacity * sz; cb->capacity = capacity; cb->count = 0; cb->sz = sz; cb->head = cb->buffer; cb->tail = cb->buffer; } void cb_free(circular_buffer *cb) { free(cb->buffer); // clear out other fields too, just to be safe } void cb_push_back(circular_buffer *cb, const void *item) { if(cb->count == cb->capacity){ // handle error } memcpy(cb->head, item, cb->sz); cb->head = (char*)cb->head + cb->sz; if(cb->head == cb->buffer_end) cb->head = cb->buffer; cb->count++; } void cb_pop_front(circular_buffer *cb, void *item) { if(cb->count == 0){ // handle error } memcpy(item, cb->tail, cb->sz); cb->tail = (char*)cb->tail + cb->sz; if(cb->tail == cb->buffer_end) cb->tail = cb->buffer; cb->count--; } 
 // Note power of two buffer size #define kNumPointsInMyBuffer 1024 typedef struct _ringBuffer { UInt32 currentIndex; UInt32 sizeOfBuffer; double data[kNumPointsInMyBuffer]; } ringBuffer; // Initialize the ring buffer ringBuffer *myRingBuffer = (ringBuffer *)calloc(1, sizeof(ringBuffer)); myRingBuffer->sizeOfBuffer = kNumPointsInMyBuffer; myRingBuffer->currentIndex = 0; // A little function to write into the buffer // NB First argument of writeIntoBuffer() just happens to have the // same as the one calloc'ed above. It will only point to the same // space in memory if the calloc'ed pointer is passed to // writeIntoBuffer() as an arg when the function is called. Consider // using another name for clarity void writeIntoBuffer(ringBuffer *myRingBuffer, double *myData, int numsamples) { // -1 for our binary modulo in a moment int buffLen = myRingBuffer->sizeOfBuffer - 1; int lastWrittenSample = myRingBuffer->currentIndex; int idx; for (int i=0; i < numsamples; ++i) { // modulo will automagically wrap around our index idx = (i + lastWrittenSample) & buffLen; myRingBuffer->data[idx] = myData[i]; } // Update the current index of our ring buffer. myRingBuffer->currentIndex += numsamples; myRingBuffer->currentIndex &= myRingBuffer->sizeOfBuffer - 1; } 

只要你的环形缓冲区的长度是2的幂,令人难以置信的快速二进制“&”操作将环绕你的索引。 对于我的应用程序,我正在从麦克风获取的audio的环形缓冲区中向用户显示一段audio。

我总是确保可以在屏幕上显示的最大音量远小于环形缓冲区的大小。 否则,您可能正在读取和写入相同的块。 这可能会给你奇怪的显示文物。

首先,标题。 如果您使用位整数来保存头部和尾部“指针”,则不需要模算术来封装缓冲区,并调整它们的大小以使它们完全同步。 IE:4096塞入一个12位无符号整数本身是0,以任何方式unmolested。 消除模运算,即使是2的幂运算,几乎完全是双倍的速度。

在我的第三代i7戴尔XPS 8500上,使用Visual Studio 2010的C ++编译器进行默认内联处理,其中1 / 8192nd用于服务数据,在填充和排空4096个任意types数据元素的缓冲区时,需要花费52万次。

我会RX重写在main()中的testing循环,所以他们不再控制stream量 – 这是,并应该由返回值指示缓冲区是满或空,以及随之而来的中断控制; 声明。 IE:填料和沥液应该能够相互殴打而不会腐败或不稳定。 在某个时候,我希望multithreading这个代码,因此这种行为将是至关重要的。

QUEUE_DESC(队列描述符)和初始化函数强制此代码中的所有缓冲区是2的幂。否则上述scheme将不起作用。 关于这个问题,请注意,QUEUE_DESC不是硬编码的,它使用一个清单常量(#define BITS_ELE_KNT)来构造。 (我假设2的幂是足够的灵活性在这里)

为了使缓冲区大小运行时间可选,我尝试了不同的方法(这里没有显示),并使用能够pipe理FIFO缓冲区的USHRT(Head,Tail,EleKnt)进行结算[USHRT]。 为了避免模算术,我用Head,Tail创build了一个掩码,但是这个掩码变成了(EleKnt-1) 在安静的机器上使用USHRTS,而不是使用bit ints,性能提高了〜15%。 英特尔的CPU内核总是比总线快,所以在一个繁忙的共享机器上,打包你的数据结构可以让你加载并在其他竞争线程之前执行。 权衡。

注意,缓冲区的实际存储空间是通过calloc()分配在堆上的,指针位于结构体的底部,因此结构体和指针的地址是完全相同的。 IE浏览器; 没有偏移需要被添加到结构地址绑定寄存器。

与此同时,所有与缓冲区服务相关的variables在物理上都与缓冲区相邻,并绑定到同一个结构中,因此编译器可以制作漂亮的汇编语言。 你将不得不杀死内联优化来查看任何程序集,因为否则它会被淹没。

为了支持任何数据types的多态,我使用了memcpy()而不是赋值。 如果您只需要灵活性来支持每个编译一个随机variablestypes,那么这个代码完美的工作。

对于多态,你只需要知道types和它的存储需求。 描述符的DATA_DESC数组提供了一种方法来跟踪每个放入QUEUE_DESC.pB​​uffer中的数据,以便可以正确地检索。 我只是分配足够的pBuffer内存来保存最大数据types的所有元素,但要loggingDATA_DESC.dBytes中给定数据实际使用的存储量。 另一种方法是重新创build一个堆pipe理器。

这意味着QUEUE_DESC的UCHAR * pBuffer将有一个并行伴随数组来跟踪数据types和大小,而pBuffer中的数据存储位置将保持原样。 如果你可以find一种方法来打败你的编译器提交这样一个前向引用,那么新成员就像DATA_DESC * pDataDesc,或者DATA_DESC DataDesc [2 ^ BITS_ELE_KNT]。 Calloc()在这些情况下总是更加灵活。

您仍然可以在Q_Put(),Q_Get中使用memcpy(),但实际复制的字节数由DATA_DESC.dBytes而不是QUEUE_DESC.EleBytes决定。 对于任何给定的input或输出,这些元素可能都是不同的types/大小。

我相信这个代码可以满足速度和缓冲区大小的要求,可以满足6种不同数据types的需求。 我以printf()语句的forms离开了很多testing装置,所以你可以满足自己(或不)代码正常工作。 随机数生成器演示了该代码适用于任何随机头/尾组合。

 enter code here // Queue_Small.cpp : Defines the entry point for the console application. // #include "stdafx.h" #include <stdio.h> #include <time.h> #include <limits.h> #include <stdlib.h> #include <malloc.h> #include <memory.h> #include <math.h> #define UCHAR unsigned char #define ULONG unsigned long #define USHRT unsigned short #define dbl double /* Queue structure */ #define QUEUE_FULL_FLAG 1 #define QUEUE_EMPTY_FLAG -1 #define QUEUE_OK 0 // #define BITS_ELE_KNT 12 //12 bits will create 4.096 elements numbered 0-4095 // //typedef struct { // USHRT dBytes:8; //amount of QUEUE_DESC.EleBytes storage used by datatype // USHRT dType :3; //supports 8 possible data types (0-7) // USHRT dFoo :5; //unused bits of the unsigned short host's storage // } DATA_DESC; // This descriptor gives a home to all the housekeeping variables typedef struct { UCHAR *pBuffer; // pointer to storage, 16 to 4096 elements ULONG Tail :BITS_ELE_KNT; // # elements, with range of 0-4095 ULONG Head :BITS_ELE_KNT; // # elements, with range of 0-4095 ULONG EleBytes :8; // sizeof(elements) with range of 0-256 bytes // some unused bits will be left over if BITS_ELE_KNT < 12 USHRT EleKnt :BITS_ELE_KNT +1;// 1 extra bit for # elements (1-4096) //USHRT Flags :(8*sizeof(USHRT) - BITS_ELE_KNT +1); // flags you can use USHRT IsFull :1; // queue is full USHRT IsEmpty :1; // queue is empty USHRT Unused :1; // 16th bit of USHRT } QUEUE_DESC; // --------------------------------------------------------------------------- // Function prototypes QUEUE_DESC *Q_Init(QUEUE_DESC *Q, int BitsForEleKnt, int DataTypeSz); int Q_Put(QUEUE_DESC *Q, UCHAR *pNew); int Q_Get(UCHAR *pOld, QUEUE_DESC *Q); // --------------------------------------------------------------------------- QUEUE_DESC *Q_Init(QUEUE_DESC *Q, int BitsForEleKnt, int DataTypeSz) { memset((void *)Q, 0, sizeof(QUEUE_DESC));//init flags and bit integers to zero //select buffer size from powers of 2 to receive modulo // arithmetic benefit of bit uints overflowing Q->EleKnt = (USHRT)pow(2.0, BitsForEleKnt); Q->EleBytes = DataTypeSz; // how much storage for each element? // Randomly generated head, tail a test fixture only. // Demonstrates that the queue can be entered at a random point // and still perform properly. Normally zero srand(unsigned(time(NULL))); // seed random number generator with current time Q->Head = Q->Tail = rand(); // supposed to be set to zero here, or by memset Q->Head = Q->Tail = 0; // allocate queue's storage if(NULL == (Q->pBuffer = (UCHAR *)calloc(Q->EleKnt, Q->EleBytes))) { return NULL; } else { return Q; } } // --------------------------------------------------------------------------- int Q_Put(QUEUE_DESC *Q, UCHAR *pNew) { memcpy(Q->pBuffer + (Q->Tail * Q->EleBytes), pNew, Q->EleBytes); if(Q->Tail == (Q->Head + Q->EleKnt)) { // Q->IsFull = 1; Q->Tail += 1; return QUEUE_FULL_FLAG; // queue is full } Q->Tail += 1; // the unsigned bit int MUST wrap around, just like modulo return QUEUE_OK; // No errors } // --------------------------------------------------------------------------- int Q_Get(UCHAR *pOld, QUEUE_DESC *Q) { memcpy(pOld, Q->pBuffer + (Q->Head * Q->EleBytes), Q->EleBytes); Q->Head += 1; // the bit int MUST wrap around, just like modulo if(Q->Head == Q->Tail) { // Q->IsEmpty = 1; return QUEUE_EMPTY_FLAG; // queue Empty - nothing to get } return QUEUE_OK; // No errors } // // --------------------------------------------------------------------------- int _tmain(int argc, _TCHAR* argv[]) { // constrain buffer size to some power of 2 to force faux modulo arithmetic int LoopKnt = 1000000; // for benchmarking purposes only int k, i=0, Qview=0; time_t start; QUEUE_DESC Queue, *Q; if(NULL == (Q = Q_Init(&Queue, BITS_ELE_KNT, sizeof(int)))) { printf("\nProgram failed to initialize. Aborting.\n\n"); return 0; } start = clock(); for(k=0; k<LoopKnt; k++) { //printf("\n\n Fill'er up please...\n"); //Q->Head = Q->Tail = rand(); for(i=1; i<= Q->EleKnt; i++) { Qview = i*i; if(QUEUE_FULL_FLAG == Q_Put(Q, (UCHAR *)&Qview)) { //printf("\nQueue is full at %i \n", i); //printf("\nQueue value of %i should be %i squared", Qview, i); break; } //printf("\nQueue value of %i should be %i squared", Qview, i); } // Get data from queue until completely drained (empty) // //printf("\n\n Step into the lab, and see what's on the slab... \n"); Qview = 0; for(i=1; i; i++) { if(QUEUE_EMPTY_FLAG == Q_Get((UCHAR *)&Qview, Q)) { //printf("\nQueue value of %i should be %i squared", Qview, i); //printf("\nQueue is empty at %i", i); break; } //printf("\nQueue value of %i should be %i squared", Qview, i); } //printf("\nQueue head value is %i, tail is %i\n", Q->Head, Q->Tail); } printf("\nQueue time was %5.3f to fill & drain %i element queue %i times \n", (dbl)(clock()-start)/(dbl)CLOCKS_PER_SEC,Q->EleKnt, LoopKnt); printf("\nQueue head value is %i, tail is %i\n", Q->Head, Q->Tail); getchar(); return 0; } 

这里是C中的一个简单的解决scheme。假设每个函数的中断都被closures。 没有多态性和东西,只是常识。


 #define BUFSIZE 128 char buf[BUFSIZE]; char *pIn, *pOut, *pEnd; char full; // init void buf_init() { pIn = pOut = buf; // init to any slot in buffer pEnd = &buf[BUFSIZE]; // past last valid slot in buffer full = 0; // buffer is empty } // add char 'c' to buffer int buf_put(char c) { if (pIn == pOut && full) return 0; // buffer overrun *pIn++ = c; // insert c into buffer if (pIn >= pEnd) // end of circular buffer? pIn = buf; // wrap around if (pIn == pOut) // did we run into the output ptr? full = 1; // can't add any more data into buffer return 1; // all OK } // get a char from circular buffer int buf_get(char *pc) { if (pIn == pOut && !full) return 0; // buffer empty FAIL *pc = *pOut++; // pick up next char to be returned if (pOut >= pEnd) // end of circular buffer? pOut = buf; // wrap around full = 0; // there is at least 1 slot return 1; // *pc has the data to be returned } 

一个简单的实现可以包括:

  • 一个缓冲区,实现为一个大小为n的数组,可以是任何你需要的types
  • 读指针或索引(以处理器的效率更高为准)
  • 写指针或索引
  • 指示缓冲区中有多less数据的计数器(可以从读指针和写指针推导出来,但是可以更快地分别进行跟踪)

每次写入数据时,都会提前写入指针并递增计数器。 读取数据时,增加读取指针并减less计数器。 如果任一指​​针到达n,则将其设置为零。

如果counter = n,你不能写。 如果计数器= 0,则无法读取。

C风格,整数简单的环形缓冲区。 首先使用init比使用put和get。 如果缓冲区不包含任何数据,则返回“0”零。

 //===================================== // ring buffer address based //===================================== #define cRingBufCount 512 int sRingBuf[cRingBufCount]; // Ring Buffer int sRingBufPut; // Input index address int sRingBufGet; // Output index address Bool sRingOverWrite; void GetRingBufCount(void) { int r; ` r= sRingBufPut - sRingBufGet; if ( r < cRingBufCount ) r+= cRingBufCount; return r; } void InitRingBuffer(void) { sRingBufPut= 0; sRingBufGet= 0; } void PutRingBuffer(int d) { sRingBuffer[sRingBufPut]= d; if (sRingBufPut==sRingBufGet)// both address are like ziro { sRingBufPut= IncRingBufferPointer(sRingBufPut); sRingBufGet= IncRingBufferPointer(sRingBufGet); } else //Put over write a data { sRingBufPut= IncRingBufferPointer(sRingBufPut); if (sRingBufPut==sRingBufGet) { sRingOverWrite= Ture; sRingBufGet= IncRingBufferPointer(sRingBufGet); } } } int GetRingBuffer(void) { int r; if (sRingBufGet==sRingBufPut) return 0; r= sRingBuf[sRingBufGet]; sRingBufGet= IncRingBufferPointer(sRingBufGet); sRingOverWrite=False; return r; } int IncRingBufferPointer(int a) { a+= 1; if (a>= cRingBufCount) a= 0; return a; }