共享内存IPC同步(无锁)
考虑以下情况:
要求:
- 英特尔x64服务器(多个CPUsockets=> NUMA)
- Ubuntu 12,GCC 4.6
- 两个进程通过(命名)共享内存共享大量数据
- 古典生产者 – 消费者情景
- 内存被安排在一个循环缓冲区(有M个元素)
程序序列(伪代码):
过程A(制作人):
int bufferPos = 0; while( true ) { if( isBufferEmpty( bufferPos ) ) { writeData( bufferPos ); setBufferFull( bufferPos ); bufferPos = ( bufferPos + 1 ) % M; } }
stream程B(消费者):
int bufferPos = 0; while( true ) { if( isBufferFull( bufferPos ) ) { readData( bufferPos ); setBufferEmpty( bufferPos ); bufferPos = ( bufferPos + 1 ) % M; } }
现在这个古老的问题:如何有效地同步它们!?
- 使用互斥锁保护每个读/写访问
- 引入一个“宽限期”,以允许写入完成:当缓冲区(N + 3)被标记为已满(危险,但似乎正常工作…)时,读取缓冲区N中的数据。
- ?!?
理想情况下,我希望沿着内存屏障的方向行事,保证所有先前的读/写都可以在所有的CPU上看到,如下所示:
writeData( i ); MemoryBarrier(); //All data written and visible, set flag setBufferFull( i );
这样,我只需要监视缓冲区标志,然后可以安全地读取大数据块。
一般来说,我正在按照Preshing所述的方式寻找获得/释放栅栏的东西:
http://preshing.com/20130922/acquire-and-release-fences/
(如果我理解正确,C ++ 11primefaces只对单个进程的线程有效,而不是对多个进程有效)。
然而,GCC自己的内存屏障(__sync_synchronize结合编译器屏障asm volatile(“”:::“memory”)肯定)似乎并不像预期的那样工作,因为写入在屏障后变得可见,当我预计他们将完成。
任何帮助将不胜感激…
顺便说一句:在Windows下,这只是使用易变的variables(微软的具体行为)正常工作…
Boost Interprocess支持共享内存。
Boost Lockfree具有单生产者单消费者队列types( spsc_queue
)。 这基本上就是你所说的循环缓冲区。
这是一个演示,它以无锁的方式传递IPC消息(在本例中为string
types)。
定义types
首先,我们来定义我们的types:
namespace bip = boost::interprocess; namespace shm { template <typename T> using alloc = bip::allocator<T, bip::managed_shared_memory::segment_manager>; using char_alloc = alloc<char>; using shared_string = bip::basic_string<char, std::char_traits<char>, char_alloc >; using string_alloc = alloc<shared_string>; using ring_buffer = boost::lockfree::spsc_queue< shared_string, boost::lockfree::capacity<200> // alternatively, pass // boost::lockfree::allocator<string_alloc> >; }
为了简单起见,我select了演示运行时大小的spsc_queue
实现,随机请求200个元素的容量。
shared_string
typedef定义了一个将从共享内存段透明分配的string,所以它们也“神奇”地与其他进程共享。
消费者方面
这是最简单的,所以:
int main() { // create segment and corresponding allocator bip::managed_shared_memory segment(bip::open_or_create, "MySharedMemory", 65536); shm::string_alloc char_alloc(segment.get_segment_manager()); shm::ring_buffer *queue = segment.find_or_construct<shm::ring_buffer>("queue")();
这将打开共享内存区域,find共享队列(如果存在)。 注意这应该在现实生活中同步。
现在进行实际的演示:
while (true) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); shm::shared_string v(char_alloc); if (queue->pop(v)) std::cout << "Processed: '" << v << "'\n"; }
消费者只是无限地监视待处理作业的队列,每处理一个~10ms。
制片人方面
制片方非常相似:
int main() { bip::managed_shared_memory segment(bip::open_or_create, "MySharedMemory", 65536); shm::char_alloc char_alloc(segment.get_segment_manager()); shm::ring_buffer *queue = segment.find_or_construct<shm::ring_buffer>("queue")();
再次,添加适当的同步到初始化阶段。 另外,你可能会让生产者在适当的时候释放共享内存段。 在这个演示中,我只是“让它挂起来”。 这对testing很好,见下文。
那么,生产者是做什么的?
for (const char* s : { "hello world", "the answer is 42", "where is your towel" }) { std::this_thread::sleep_for(std::chrono::milliseconds(250)); queue->push({s, char_alloc}); } }
对,制作人在〜750ms内准确地产生 3条消息,然后退出。
请注意,因此,如果我们这样做(假设一个POSIXshell与作业控制):
./producer& ./producer& ./producer& wait ./consumer&
将“立即”打印3×3消息,同时使消费者运行。 干
./producer& ./producer& ./producer&
在此之后,再次显示“滴入”消息(以〜250ms间隔的3次突发),因为消费者仍然在后台运行
在这个要点在线查看完整的代码: https : //gist.github.com/sehe/9376856