在C ++中是否有生产就绪的无锁队列或哈希实现?
我已经在C ++中使用了一个非locking的队列。 我发现了一些代码和一些试验 – 但我没有能够编译。 无锁哈希也是受欢迎的。
总结:到目前为止,我没有正面的答案。 没有“生产准备”库,而且现有的库都不符合STL容器的API。
从1.53开始,boost提供了一组无锁的数据结构 ,包括队列,堆栈和单生产者/单消费者队列(即环形缓冲区)。
起点可以是Herb Sutter的DDJ文章,可以是单一的生产者和消费者 ,也可以是多个 。 他给出的代码(从每篇文章的第二页开始内联)使用C ++ 0x风格的primefaces<T>模板types; 您可以使用Boost进程库来模拟。
boost代码被隐藏在进程间库的深处,但是通过相应的头文件(atomic.hpp)读取了我熟悉的系统上所需的比较和交换操作的实现。
Facebook的Folly似乎locking了基于C ++ 11 <atomic>
免费数据结构:
-
ProducerConsumerQueue与文档和示例代码在这里 。
-
AtomicHashMap与文档和示例代码在这里
我敢说这些是目前用于生产,所以我想他们可以安全地用于其他项目。
干杯!
是!
我写了一个无锁队列 。 它具有Features™function:
- 完全等待(没有CAS循环)
- 超快速 (每秒超过一亿次入队/出队操作)
- 使用C ++ 11移动语义
- 随着需要增长(但只有当你想要它)
- 对元素进行无锁内存pipe理(使用预分配的连续块)
- 独立(两个标题加上许可证和自述文件)
- 在MSVC2010 +,Intel ICC 13和GCC 4.7.2下编译(并且应该在任何C ++ 11完全兼容的编译器下工作)
它可以在简化的BSD许可下在GitHub上使用(随意分享!)。
注意事项:
- 仅针对单生产者单消费者体系结构 (即两个线程)
- 在x86(-64)上进行了彻底testing,并且应该在ARM,PowerPC和其他CPU上工作,其中alignment的原生大小的整数和指针加载和存储本质上是primefaces的,但是尚未在非x86 CPU上进行现场testing一个来testing它,让我知道)
- 不知道是否有任何专利被侵犯(风险自负等)。 请注意,我从头开始devise和实现它。
boost.lockfree是一个尝试创buildlockfree栈和fifo类的c ++实现。
公共git存储库
在检查了大多数给出的答案后,我只能说:
答案是否定的 。
没有这样的东西可以用来开箱即用。
我知道的最接近的是Windows互锁单链表 。 当然,这只是Windows。
如果你有一个多生产者/单消费者队列/ FIFO,你可以使用SLIST或一个简单的Lock Free LIFO堆栈轻松地创build一个LockFree。 你所做的是为消费者提供第二个“私人”堆栈(这也可以作为一个简单的SLIST或者你select的其他堆栈模型来完成)。 消费者从私人堆栈中popup物品。 每当私有LIFO被使用时,你执行一次Flush而不是从共享并发SLIST(抓取整个SLIST链)中popup,然后按顺序遍历Flushed列表将项目推送到私有堆栈。
这适用于单一生产者/单一消费者和多个生产者/单一消费者。
但是,它不适用于多个消费者的情况(无论是单一生产者还是多个生产者)。
而且,就散列表而言,它们是“条带化”的理想候选者,这只是将散列分成每段caching具有锁的段。 这是Java并发库如何做(使用32条纹)。 如果你有一个轻量级的读写器锁,哈希表可以被同时访问以便同时读取,并且只有在有争用的条纹上发生写入时(以及可能的话,如果允许增长哈希表),它才会停止。
如果你自己推出,确保将你的锁与哈希条目交错,而不是把所有的锁放在一个数组中,这样你就不太可能发生错误的共享。
然后英特尔线程构build模块来了。 一段时间,这是好的。
PS:你正在寻找concurrent_queue和concurrent_hash_map
我可能会迟一点来。
没有解决scheme(在问题被问到)主要是由于C ++(C ++ 0x / 11之前)的一个重要问题:C ++有(没有)并发内存模型。
现在,使用std :: atomic,你可以控制内存顺序问题,并有适当的比较和交换操作。 我已经使用C ++ 11和Micheal的危害指针(IEEE TPDS 2004)为自己写了一个Micheal&Scott的无锁队列(PODC96)的实现,以避免早期的自由和ABA问题。 这工作正常,但是这是一个快速和肮脏的实施,我不满意的实际performance。 代码在bitbucket上可用: LockFreeExperiment
也可以使用双字CAS来实现没有危险指针的无锁队列(但是使用cmpxchg16b只能在x86-64上使用64位版本),我有一篇关于这个(有未经testing的队列代码)的博客文章: 为x86 / x86-64实现通用双字比较和交换 (LSE博客)
我自己的基准testing表明,双locking队列(同样在Micheal&Scott 1996年的论文中)和无锁的队列(我还没有达到足够的争用,所以locking的数据结构有性能问题,但是我的替补队列太轻现在),英特尔TBB的并发队列似乎更好(快两倍),相对较小的数量(取决于操作系统,在FreeBSD 9下,我发现迄今为止的最低界限,这个数字是8个线程i7有4个核心,因此有8个逻辑CPU),并且有非常奇怪的行为(我简单基准testing的执行时间从几秒钟到几个小时)!
关于STL风格的无锁队列的另一个限制:在无锁队列上有迭代器没有灵敏度。
据我所知,还没有这样的东西公开。 实现者需要解决的一个问题是,你需要一个无锁的内存分配器,虽然我现在似乎无法find链接。
以下是来自Herb Sutter关于并发locking队列的文章http://www.drdobbs.com/parallel/writing-a-generalized-concurrent-queue/211601363?pgno=1 。 我做了一些像编译器重新sorting的东西。 一个需要GCC v4.4 +来编译这个代码。
#include <atomic> #include <iostream> using namespace std; //compile with g++ setting -std=c++0x #define CACHE_LINE_SIZE 64 template <typename T> struct LowLockQueue { private: struct Node { Node( T* val ) : value(val), next(nullptr) { } T* value; atomic<Node*> next; char pad[CACHE_LINE_SIZE - sizeof(T*)- sizeof(atomic<Node*>)]; }; char pad0[CACHE_LINE_SIZE]; // for one consumer at a time Node* first; char pad1[CACHE_LINE_SIZE - sizeof(Node*)]; // shared among consumers atomic<bool> consumerLock; char pad2[CACHE_LINE_SIZE - sizeof(atomic<bool>)]; // for one producer at a time Node* last; char pad3[CACHE_LINE_SIZE - sizeof(Node*)]; // shared among producers atomic<bool> producerLock; char pad4[CACHE_LINE_SIZE - sizeof(atomic<bool>)]; public: LowLockQueue() { first = last = new Node( nullptr ); producerLock = consumerLock = false; } ~LowLockQueue() { while( first != nullptr ) { // release the list Node* tmp = first; first = tmp->next; delete tmp->value; // no-op if null delete tmp; } } void Produce( const T& t ) { Node* tmp = new Node( new T(t) ); asm volatile("" ::: "memory"); // prevent compiler reordering while( producerLock.exchange(true) ) { } // acquire exclusivity last->next = tmp; // publish to consumers last = tmp; // swing last forward producerLock = false; // release exclusivity } bool Consume( T& result ) { while( consumerLock.exchange(true) ) { } // acquire exclusivity Node* theFirst = first; Node* theNext = first-> next; if( theNext != nullptr ) { // if queue is nonempty T* val = theNext->value; // take it out asm volatile("" ::: "memory"); // prevent compiler reordering theNext->value = nullptr; // of the Node first = theNext; // swing first forward consumerLock = false; // release exclusivity result = *val; // now copy it back delete val; // clean up the value delete theFirst; // and the old dummy return true; // and report success } consumerLock = false; // release exclusivity return false; // report queue was empty } }; int main(int argc, char* argv[]) { //Instead of this Mambo Jambo one can use pthreads in Linux to test comprehensively LowLockQueue<int> Q; Q.Produce(2); Q.Produce(6); int a; Q.Consume(a); cout<< a << endl; Q.Consume(a); cout<< a << endl; return 0; }
我发现另一个解决scheme写在c:
我大概早在2010年就写过这篇文章,我确信在不同的参考文献的帮助下。 它是多生产者单一消费者。
template <typename T> class MPSCLockFreeQueue { private: struct Node { Node( T val ) : value(val), next(NULL) { } T value; Node* next; }; Node * Head; __declspec(align(4)) Node * InsertionPoint; //__declspec(align(4)) forces 32bit alignment this must be changed for 64bit when appropriate. public: MPSCLockFreeQueue() { InsertionPoint = new Node( T() ); Head = InsertionPoint; } ~MPSCLockFreeQueue() { // release the list T result; while( Consume(result) ) { //The list should be cleaned up before the destructor is called as there is no way to know whether or not to delete the value. //So we just do our best. } } void Produce( const T& t ) { Node * node = new Node(t); Node * oldInsertionPoint = (Node *) InterLockedxChange((volatile void **)&InsertionPoint,node); oldInsertionPoint->next = node; } bool Consume( T& result ) { if (Head->next) { Node * oldHead = Head; Head = Head->next; delete oldHead; result = Head->value; return true; } return false; // else report empty } };