循环无锁缓冲区

我正在devise一个连接到一个或多个数据馈送stream的系统,并对数据进行一些分析,而不是基于结果触发事件。 在典型的multithreading生产者/消费者设置中,我将有多个生产者线程将数据放入一个队列,多个消费者线程读取数据,而消费者只对最新的数据点加上n个点感兴趣。 生产者线程将不得不阻止,如果缓慢的消费者无法跟上,当然消费者线程将阻止时,没有未经处理的更新。 使用读/写锁的典型并发队列可以很好地工作,但数据传入的速度可能很大,所以我想减less我的locking开销,尤其是制作者的locking锁。 我认为我需要一个循环的无锁缓冲区。

现在有两个问题:

  1. 循环锁无缓冲区的答案?

  2. 如果是这样,在我推出自己的之前,你知道任何符合我需要的公共实现吗?

任何实现循环无锁缓冲区的指针总是受欢迎的。

顺便说一句,在Linux上使用C ++来完成。

一些额外的信息:

响应时间对我的系统至关重要。 理想情况下,消费者线程将希望看到任何更新尽快进入,因为额外的1毫秒的延迟可能会使系统变得毫无价值,或价值更低。

我所倾向的devise思路是一个半无锁的循环缓冲区,其中生产者线程尽可能快地将数据放入缓冲区,让我们调用缓冲区A的头部,不阻塞,除非缓冲区满了, A满足缓冲区Z的结束。消费者线程将分别持有指向循环缓冲区P和P n的两个指针,其中P是线程的本地缓冲区头,P n是P之后的第n个项目。每个消费者线程将前进它的P和P n一旦完成处理当前的P,缓冲区指针Z的结束以最慢的P n前进。 当P赶上A,这意味着没有更多的新的更新处理,消费者旋转,并忙着等待A再次前进。 如果消费者线程旋转太久,它可以进入睡眠状态,并等待一个条件variables,但我可以消费者占用CPU周期等待更新,因为这不会增加我的延迟(我会有更多的CPU核心比线程)。 想象一下,你有一个循环轨道,生产者在一群消费者面前运行,关键是调整系统,使生产者通常跑在消费者的前面几步,这些操作大部分可以使用无锁技术完成。 我明白,要获得执行权的细节并不容易……好吧,非常辛苦,这就是为什么我想在从自己的angular度去学习别人的错误之前。

你想要的艺术术语是一个无锁的队列 。 有一套很好的笔记,可以链接到 Ross Bencina的代码和论文 。 那些我最信任的人是Maurice Herlihy (对美国人来说,他的名字叫“Morris”)。

在过去的几年中,我已经对无锁数据结构进行了专门的研究。 我已经阅读了大部分在这个领域的论文(只有大约四十篇左右 – 虽然只有十或十五个是真正的用途:-)

AFAIK,一个无锁循环缓冲区没有被发明。 这个问题将会解决读者超越写作者的复杂情况,或者相反。

如果你没有花费至less六个月的时间学习无锁数据结构,不要试图自己写一个。 你会弄错它,直到你的代码在部署之后失败,在新平台上出现错误,你可能不会明白。

我相信,但是有一个解决scheme,您的要求。

您应该将无锁队列与无锁免费列表配对。

免费列表将给你预先分配,从而避免了无锁分配器的(财政上昂贵的)要求; 当空闲列表为空时,通过立即从队列中取出一个元素并使用它来复制循环缓冲区的行为。

(当然,在一个基于锁的循环缓冲区中,一旦获得了锁,获得一个元素非常快 – 基本上只是一个指针取消引用 – 但是在任何无锁algorithm中你都不会得到这个结果;他们通常不得不去完全没有办法做事情;没有自由列表popup,然后是出列的开销与任何无锁algorithm需要做的工作量相当。

迈克尔和斯科特在1996年发展了一个非常好的无锁排队。下面的链接将给你足够的细节来追踪他们论文的PDF; 迈克尔和斯科特,FIFO

一个无锁的自由列表是最简单的无锁algorithm,事实上我不认为我已经看到了一个实际的文件。

要求生产者或消费者阻止缓冲区是空的还是满的,这表明你应该使用正常的locking数据结构,信号量或条件variables来阻止生产者和消费者,直到数据可用。 无锁的代码通常不会在这种情况下阻塞 – 它会旋转或放弃无法完成的操作,而不是阻止使用操作系统。 (如果你可以等到另一个线程产生或使用数据,那么为什么还要等待另一个线程的锁来更新数据结构呢?)

在(x86 / x64)Linux上,如果没有争用,使用互斥锁的线程内同步是相当便宜的。 把精力集中在最大限度地减less生产者和消费者所需要的时间上。 既然你说过你只关心最近N个logging的数据点,我认为一个循环缓冲区就可以做到这一点。 然而,我并不真正理解这是如何适应阻塞要求和消费者实际使用(移除)他们阅读的数据的想法。 (你想让消费者只看最后N个数据点,而不是删除它们吗?你想让生产者不关心消费者是否跟不上,只是覆盖旧数据?)

此外,正如Zan Lynx评论的那样,当有大量数据进来时,您可以将数据聚合/缓冲到更大的块中。您可以缓冲固定数量的点或在一定时间内收到的所有数据。 这意味着将会有更less的同步操作。 但是,它确实会引入延迟,但是如果您不使用实时Linux,则无论如何您都必须在某种程度上处理这个问题。

在DDJ上有很多这方面的文章。 作为这件事情有多困难的一个标志,这是对早期文章的一个错误纠正。 确保你理解错误之前,你自己推出) – ;

boost库中的实现值得考虑。 它很容易使用和相当高的性能。 我写了一个testing,在四核i7笔记本电脑上运行(8个线程),每秒进行〜4M入队/出队操作。 到目前为止没有提到的另一个实现是http://moodycamel.com/blog/2014/detailed-design-of-a-lock-free-queue上的MPMC队列。; 我已经在32个生产者和32个消费者的同一台笔记本电脑上做了一些简单的testing。 正如所宣称的那样,加速无锁队列的速度更快。

由于大多数其他答案状态无锁编程很难。 大多数实现将很难检测需要大量testing和debugging才能修复的angular落案例。 这些通常是在代码中仔细放置内存屏障的情况下修复的。 您还可以在许多学术文章中find正确的证据。 我更喜欢用暴力工具来testing这些实现。 应使用http://research.microsoft.com/en-us/um/people/lamport/tla/tla.html这样的工具检查计划在生产中使用的任何无锁algorithm的正确性。;

减less争用的一个有用的技术是将项目散列成多个队列,并使每个消费者致力于“主题”。

对于消费者感兴趣的最近数量的项目,您不希望locking整个队列并遍历它以find要覆盖的项目 – 只是以N元组(即所有N个最近的项目)发布项目。 生产者在完全队列中阻塞(当消费者无法跟上时)超时,更新其本地元组caching的实现奖励点 – 这样你就不会对数据源施加反压。

我会同意这篇文章,并build议不要使用无锁数据结构。 最近关于无锁fifo队列的文章是这样的 ,search同一作者的进一步的论文; 还有关于查尔莫斯关于无锁数据结构的博士论文(我失去了链接)。 然而,你没有说你的元素有多大 – 无锁的数据结构只能用字大小的项目有效地工作,所以如果元素大于一个机器字(32或64),你将不得不dynamic分配元素位)。 如果你dynamic地分配元素,那么你就需要一个无锁的内存分配器(例如Streamflow )来移动(假设,因为你没有分析你的程序,而且你基本上是在做过早的优化)瓶颈,所以你需要一个无锁的内存分配器,它与您的应用程序。

Sutter的队列是次优的,他知道这一点。 多核编程的艺术是一个很好的参考,但不要相信内存模型上的Java人,期间。 罗斯的联系不会给你一个明确的答案,因为他们的图书馆遇到这样的问题等等。

做无锁编程就是要求麻烦,除非你想在解决问题之前花费大量的时间去解决问题(从描述来看,这是一种常见的“寻求完美'在高速caching一致性)。 这需要花费数年的时间,而不是先解决问题,然后再优化,这是一种常见的疾病。

我不是硬件内存模型和locking免费数据结构的专家,我倾向于避免使用这些在我的项目中,我与传统的locking数据结构。

不过最近我注意到这个video: 基于环形缓冲区的Lockless SPSC队列

这是基于交易系统使用的称为LMAX distruptor的开源高性能Java库: LMAX Distruptor

根据上面的介绍,您可以将头部和尾部指针primefaces化并自动检查头部从后面捕捉尾部的情况,反之亦然。

下面你可以看到一个非常基本的C ++ 11实现:

// USING SEQUENTIAL MEMORY #include<thread> #include<atomic> #include <cinttypes> using namespace std; #define RING_BUFFER_SIZE 1024 // power of 2 for efficient % class lockless_ring_buffer_spsc { public : lockless_ring_buffer_spsc() { write.store(0); read.store(0); } bool try_push(int64_t val) { const auto current_tail = write.load(); const auto next_tail = increment(current_tail); if (next_tail != read.load()) { buffer[current_tail] = val; write.store(next_tail); return true; } return false; } void push(int64_t val) { while( ! try_push(val) ); // TODO: exponential backoff / sleep } bool try_pop(int64_t* pval) { auto currentHead = read.load(); if (currentHead == write.load()) { return false; } *pval = buffer[currentHead]; read.store(increment(currentHead)); return true; } int64_t pop() { int64_t ret; while( ! try_pop(&ret) ); // TODO: exponential backoff / sleep return ret; } private : std::atomic<int64_t> write; std::atomic<int64_t> read; static const int64_t size = RING_BUFFER_SIZE; int64_t buffer[RING_BUFFER_SIZE]; int64_t increment(int n) { return (n + 1) % size; } }; int main (int argc, char** argv) { lockless_ring_buffer_spsc queue; std::thread write_thread( [&] () { for(int i = 0; i<1000000; i++) { queue.push(i); } } // End of lambda expression ); std::thread read_thread( [&] () { for(int i = 0; i<1000000; i++) { queue.pop(); } } // End of lambda expression ); write_thread.join(); read_thread.join(); return 0; } 

这是一个古老的线程,但是由于它没有被提及,但是在JUCE C ++框架中有一个无锁的,循环的,1个生产者 – > 1个消费者的FIFO。

https://www.juce.com/doc/classAbstractFifo#details

这是我将如何做到这一点:

  • 将队列映射到一个数组
  • 保持下一次读取和下一次写入索引的状态
  • 保持一个空的全位vector

插入包括使用带有增量的CAS,并在下一次写入时滚动。 一旦你有一个插槽,添加你的价值,然后设置与它匹配​​的空/满位。

删除之前需要检查下溢的位,但除此之外,与写操作相同,但使用读索引并清空空/满位。

被警告,

  1. 我不是这方面的专家
  2. primefacesASM操作似乎是非常缓慢,当我使用它们,所以如果你最终结束了其中的几个,你可能会更快地使用内嵌embedded/删除function的embedded式锁。 这个理论是,一个单一的primefaces操作来获取锁,然后是非常less的非primefacesASM操作,可能比几个primefaces操作完成的操作要快。 但要使这项工作需要手动或自动内联,所以这是ASM的一小块。

为了完整起见 ,在OtlContainers中有很好的无锁循环缓冲区,但是它是用Delphi编写的(TOmniBaseBoundedQueue是循环缓冲区,TOmniBaseBoundedStack是有界的栈)。 在同一个单元中还有一个无限的队列(TOmniBaseQueue)。 dynamic无锁队列中描述了无界队列- 正确执行 。 有界队列(循环缓冲区)的初始实现在一个无锁队列中描述,最后! 但代码从那以后被更新了。

查看Disruptor ( 如何使用它 ),这是一个multithreading可以订阅的环形缓冲区:

虽然这是一个老问题,但没有人提到DPDK的无锁环形缓冲区。 这是一个高吞吐量环形缓冲区,支持多个生产者和多个消费者。 它还提供单用户和单生产者模式,并且在SPSC模式下环形缓冲器是等待的。 它用C语言编写,支持多种体系结构。

另外,它支持批量和突发模式,在这些模式下,物品可以成批/排队。 devise让多个消费者或多个生产者通过移动primefaces指针来简单地保留空间,同时写入队列。