共享内存IPC同步(无锁)

考虑以下情况:

要求:

  • 英特尔x64服务器(多个CPUsockets=> NUMA)
  • Ubuntu 12,GCC 4.6
  • 两个进程通过(命名)共享内存共享大量数据
  • 古典生产者 – 消费者情景
  • 内存被安排在一个循环缓冲区(有M个元素)

程序序列(伪代码):

过程A(制作人):

int bufferPos = 0; while( true ) { if( isBufferEmpty( bufferPos ) ) { writeData( bufferPos ); setBufferFull( bufferPos ); bufferPos = ( bufferPos + 1 ) % M; } } 

stream程B(消费者):

 int bufferPos = 0; while( true ) { if( isBufferFull( bufferPos ) ) { readData( bufferPos ); setBufferEmpty( bufferPos ); bufferPos = ( bufferPos + 1 ) % M; } } 

现在这个古老的问题:如何有效地同步它们!?

  1. 使用互斥锁保护每个读/写访问
  2. 引入一个“宽限期”,以允许写入完成:当缓冲区(N + 3)被标记为已满(危险,但似乎正常工作…)时,读取缓冲区N中的数据。
  3. ?!?

理想情况下,我希望沿着内存屏障的方向行事,保证所有先前的读/写都可以在所有的CPU上看到,如下所示:

 writeData( i ); MemoryBarrier(); //All data written and visible, set flag setBufferFull( i ); 

这样,我只需要监视缓冲区标志,然后可以安全地读取大数据块。

一般来说,我正在按照Preshing所述的方式寻找获得/释放栅栏的东西:

http://preshing.com/20130922/acquire-and-release-fences/

(如果我理解正确,C ++ 11primefaces只对单个进程的线程有效,而不是对多个进程有效)。

然而,GCC自己的内存屏障(__sync_synchronize结合编译器屏障asm volatile(“”:::“memory”)肯定)似乎并不像预期的那样工作,因为写入在屏障后变得可见,当我预计他们将完成。

任何帮助将不胜感激…

顺便说一句:在Windows下,这只是使用易变的variables(微软的具体行为)正常工作…

Boost Interprocess支持共享内存。

Boost Lockfree具有单生产者单消费者队列types( spsc_queue )。 这基本上就是你所说的循环缓冲区。

这是一个演示,它以无锁的方式传递IPC消息(在本例中为stringtypes)。

定义types

首先,我们来定义我们的types:

 namespace bip = boost::interprocess; namespace shm { template <typename T> using alloc = bip::allocator<T, bip::managed_shared_memory::segment_manager>; using char_alloc = alloc<char>; using shared_string = bip::basic_string<char, std::char_traits<char>, char_alloc >; using string_alloc = alloc<shared_string>; using ring_buffer = boost::lockfree::spsc_queue< shared_string, boost::lockfree::capacity<200> // alternatively, pass // boost::lockfree::allocator<string_alloc> >; } 

为了简单起见,我select了演示运行时大小的spsc_queue实现,随机请求200个元素的容量。

shared_string typedef定义了一个将从共享内存段透明分配的string,所以它们也“神奇”地与其他进程共享。

消费者方面

这是最简单的,所以:

 int main() { // create segment and corresponding allocator bip::managed_shared_memory segment(bip::open_or_create, "MySharedMemory", 65536); shm::string_alloc char_alloc(segment.get_segment_manager()); shm::ring_buffer *queue = segment.find_or_construct<shm::ring_buffer>("queue")(); 

这将打开共享内存区域,find共享队列(如果存在)。 注意这应该在现实生活中同步。

现在进行实际的演示:

 while (true) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); shm::shared_string v(char_alloc); if (queue->pop(v)) std::cout << "Processed: '" << v << "'\n"; } 

消费者只是无限地监视待处理作业的队列,每处理一个~10ms。

制片人方面

制片方非常相似:

 int main() { bip::managed_shared_memory segment(bip::open_or_create, "MySharedMemory", 65536); shm::char_alloc char_alloc(segment.get_segment_manager()); shm::ring_buffer *queue = segment.find_or_construct<shm::ring_buffer>("queue")(); 

再次,添加适当的同步到初始化阶段。 另外,你可能会让生产者在适当的时候释放共享内存段。 在这个演示中,我只是“让它挂起来”。 这对testing很好,见下文。

那么,生产者是做什么的?

  for (const char* s : { "hello world", "the answer is 42", "where is your towel" }) { std::this_thread::sleep_for(std::chrono::milliseconds(250)); queue->push({s, char_alloc}); } } 

对,制作人在〜750ms内准确地产生 3条消息,然后退出。

请注意,因此,如果我们这样做(假设一个POSIXshell与作业控制):

 ./producer& ./producer& ./producer& wait ./consumer& 

将“立即”打印3×3消息,同时使消费者运行。 干

 ./producer& ./producer& ./producer& 

在此之后,再次显示“滴入”消息(以〜250ms间隔的3次突发),因为消费者仍然在后台运行

在这个要点在线查看完整的代码: https : //gist.github.com/sehe/9376856

Interesting Posts