LMAX的干扰模式如何工作?
我正试图了解干扰模式 。 我观看了InfoQ的video并试图阅读他们的论文。 我知道有一个环形缓冲区涉及,它被初始化为一个非常大的数组,以利用caching局部性,消除新内存的分配。
这听起来像是有一个或多个primefaces整数跟踪位置。 每个“事件”似乎都得到了一个唯一的ID,并且它的环中的位置是通过相对于环的大小等来查找其模数而find的。
不幸的是,我并没有直观的感觉。 我做了很多交易应用,研究了演员模型 ,看了SEDA等。
他们在介绍中提到,这种模式基本上是路由器如何工作的; 但是我还没有find路由器如何工作的任何好的描述。
有没有更好的解释指针?
谷歌代码项目确实参考了一个关于环形缓冲区实现的技术文章 ,但是对于想要了解它是如何工作的人来说,这有点枯燥,学术和艰难。 不过也有一些博客文章已经开始以更可读的方式解释内部。 有关环形缓冲区的解释是破坏者模式的核心, 描述了消费者障碍 (涉及从破坏者处读取的部分)以及处理多个生产者的一些信息 。
Disruptor最简单的描述是:这是一种以最有效的方式在线程之间发送消息的方式。 它可以作为一个队列的替代品,但它也与SEDA和Actors共享一些function。
与队列相比:
Disruptor提供了将消息传递到另一个线程的能力,如果需要的话将其唤醒(类似于BlockingQueue)。 但是,有三个明显的差异。
- Disruptor的用户通过扩展Entry类并提供工厂来执行预分配来定义消息的存储方式。 这允许内存重用(复制)或者Entry可以包含对另一个对象的引用。
- 将消息放入Disruptor是一个2阶段的过程,首先在环形缓冲区中声明一个插槽,为用户提供可以填充相应数据的Entry。 然后必须提交条目,这个两阶段的方法是必要的,以便灵活使用上面提到的内存。 这是使消息线程可见消息的提交。
- 跟踪从环形缓冲区消耗的消息是消费者的责任。 将这个责任从环形缓冲区本身移开,有助于减less写入争用的数量,因为每个线程维护自己的计数器。
相比于演员
Actor模型比大多数其他编程模型更接近Disruptor,特别是如果使用提供的BatchConsumer / BatchHandler类。 这些类隐藏了维护消耗序列号的所有复杂性,并在发生重要事件时提供一组简单的callback。 但是,有一些细微的差别。
- Disruptor使用1个线程1的消费者模型,其中Actor使用N:M模型,也就是说,你可以拥有尽可能多的angular色,他们将分布在固定数量的线程中(通常每个核心1个)。
- BatchHandler接口为
onEndOfBatch()
提供了额外的(也是非常重要的)callback。 这允许缓慢的消费者,例如那些正在进行I / O的用户将事件一起分批以提高吞吐量。 可以在其他Actor框架中执行批处理,但是由于几乎所有其他框架在批处理结束时都不提供callback,因此需要使用超时来确定批处理的结束,从而导致延迟较差。
相比SEDA
LMAX构build了Disruptor模式来取代基于SEDA的方法。
- 它提供的SEDA的主要改进是能够并行工作。 为此,Disruptor支持将相同的消息(以相同的顺序)多次转换为多个消费者。 这避免了在stream水线中需要分叉阶段。
- 我们也允许消费者等待其他消费者的结果,而不必在他们之间进行另一个排队阶段。 消费者可以简单地观察它所依赖的消费者的序列号。 这避免了pipe道中的连接阶段的需要。
与内存障碍相比
另一种思考方式是作为一个结构化的,有序的记忆障碍。 生产者壁垒形成写屏障,消费壁垒成为阅读障碍。
首先,我们想了解它提供的编程模型。
有一个或多个作家。 有一个或多个读者。 有一行条目,完全从旧到新(从左到右)。 作家可以在右端添加新条目。 每个读者从左到右依次读取条目。 读者显然不能读过去的作家。
没有条目删除的概念。 我使用“读者”而不是“消费者”来避免使用条目的图像。 但是我们明白,最后一位读者左边的条目变得毫无用处。
一般读者可以同时独立阅读。 但是,我们可以宣布读者之间的依赖关系。 读取器依赖关系可以是任意的非循环图。 如果阅读者B依赖于阅读者A,则阅读者B不能阅读过去的阅读者A.
读者依赖性的出现是因为读者A可以注释一个条目,而读者B依赖于该注释。 例如,A对条目进行一些计算,并将结果存储在条目的字段a
中。 然后A继续前进,现在B可以读取该条目,并存储A的值。 如果读者C不依赖于A,则C不应该尝试读取a
。
这确实是一个有趣的编程模型。 无论性能如何,只有这个模型可以使许多应用程序受益。
当然,LMAX的主要目标是performance。 它使用预先分配的条目环。 这个环足够大,但是这个环是有限的,所以系统不会超出devise容量。 如果戒指已满,作者将等待最慢的读者前进并腾出空间。
入口对象被预分配并永久生存,以减less垃圾收集成本。 我们不插入新的入口对象或删除旧的入口对象,而是写入者要求预先存在的入口,填充其字段并通知读者。 这种明显的两阶段行动实际上只是一个primefaces行动
setNewEntry(EntryPopulator); interface EntryPopulator{ void populate(Entry existingEntry); }
预分配条目还意味着相邻条目(非常可能)位于相邻的存储器单元中,并且由于读取器顺序地读取条目,所以利用CPU高速caching很重要。
还有很多努力来避免locking,CAS,甚至是内存屏障(例如,如果只有一个作者,使用非易失性序列variables)
对于读者的开发人员:不同的注释读者应写入不同的领域,以避免写争用。 (其实他们应该写入不同的caching行)。注释读者不应该触及其他非依赖读者可能读取的任何东西。 这就是为什么我说这些读者注释条目,而不是修改条目。
Martin Fowler写了一篇关于LMAX的文章和LMAX架构的干扰模式,这可能会进一步澄清。
我真的花时间研究真正的来源,出于纯粹的好奇心,其背后的想法很简单。 在写这篇文章的时候最新的版本是3.2.1。
有一个缓冲区存储预先分配的事件,将保存消费者阅读的数据。
缓冲区由一个长度为数组的标志(整型数组)来支持,描述了缓冲区时隙的可用性(详见进一步的细节)。 这个数组像java#AtomicIntegerArray一样被访问,所以为了这个完美的目的,你可能会认为它是一个。
可以有任何数量的生产者。 当生产者想写入缓冲区时,会产生一个很长的数字(如调用AtomicLong#getAndIncrement,Disruptor实际上使用它自己的实现,但是它以相同的方式工作)。 我们称之为生成长的producerCallId。 以类似的方式,当消费者ENDS从缓冲区读取一个插槽时,产生一个consumerCallId。 最近的consumerCallId被访问。
(如果有很多消费者,则select最低ID的电话)。
然后比较这些ID,如果两者之间的差异小于缓冲器一侧,则允许生产者写入。
(如果producerCallId大于最近的consumerCallId + bufferSize,则意味着缓冲区已满,生产者被迫等待总线等待,直到有一个点可用。
生产者然后基于他的callId(它是prducerCallId模缓冲区大小,但是由于缓冲区大小总是2的幂(在缓冲区创build时强制执行))在缓冲区中分配槽,所使用的执行操作是producerCallId&(bufferSize_1 ))。 然后可以自由修改该插槽中的事件。
(实际的algorithm有点复杂,为了优化目的,涉及将最近的consumerIdcaching在单独的primefaces参考中。)
当事件被修改时,更改是“已发布”。 在发布标志arrays中的相应插槽时填充更新的标志。 标志值是循环的编号(producerCallId除以bufferSize(因为bufferSize是2的幂,实际操作是右移)。
以类似的方式,可以有任何数量的消费者。 每次消费者想要访问缓冲区时,都会生成一个consumerCallId(取决于消费者如何添加到干扰源,id生成中使用的primefaces可能是共享的,或者每个都是独立的)。 然后将这个ConsumerCallId与最近的produCallId进行比较,如果两者中的较小者,则允许读者进行。
(类似的,如果producerCallId与consumerCallId相同,则意味着缓冲区是有效的,并且消费者被迫等待。在创build干扰程序时,等待的方式由WaitStrategy定义。
对于个人消费者(拥有自己的ID发生器的消费者),接下来检查的是批量消费的能力。 缓冲器中的槽按照从consumerCallId(相对于生产者相同的方式确定的索引)到相应于最近的producerCallId的顺序被检查。
通过比较在标志数组中写入的标志值与为consumerCallId生成的标志值来循环检查它们。 如果标志匹配,则意味着填充插槽的制作者已经进行了更改。 如果没有,则循环被破坏,并返回最高提交的changeId。 从ConsumerCallId到changeId中接收的插槽可以批量使用。
如果一组消费者一起阅读(共享ID生成器),则每个消费者只需要一个callId,并且只检查并返回该单个callId的插槽。
从这篇文章 :
干扰模式是一个批处理队列,由循环arrays(即环形缓冲区)填充,预先分配的传输对象使用内存屏障通过序列同步生产者和消费者。
记忆障碍是一种很难解释,Trisha的博客在我看来这个post做了最好的尝试: http : //mechanitis.blogspot.com/2011/08/dissecting-disruptor-why-its-so-fast。 HTML
但是如果你不想深入底层的细节,你可以知道Java中的内存障碍是通过volatile
关键字或通过java.util.concurrent.AtomicLong
。 破坏者模式序列是AtomicLong
,并通过记忆障碍而不是locking在生产者和消费者之间来回传达。
我发现通过代码更容易理解一个概念,所以下面的代码是一个简单的来自CoralQueue的 helloworld ,它是由我所属的CoralBlocks完成的干扰模式实现。 在下面的代码中,您可以看到干扰程序模式如何实现批处理以及环缓冲区(即循环数组)如何允许两个线程之间进行无垃圾通信:
package com.coralblocks.coralqueue.sample.queue; import com.coralblocks.coralqueue.AtomicQueue; import com.coralblocks.coralqueue.Queue; import com.coralblocks.coralqueue.util.MutableLong; public class Sample { public static void main(String[] args) throws InterruptedException { final Queue<MutableLong> queue = new AtomicQueue<MutableLong>(1024, MutableLong.class); Thread consumer = new Thread() { @Override public void run() { boolean running = true; while(running) { long avail; while((avail = queue.availableToPoll()) == 0); // busy spin for(int i = 0; i < avail; i++) { MutableLong ml = queue.poll(); if (ml.get() == -1) { running = false; } else { System.out.println(ml.get()); } } queue.donePolling(); } } }; consumer.start(); MutableLong ml; for(int i = 0; i < 10; i++) { while((ml = queue.nextToDispatch()) == null); // busy spin ml.set(System.nanoTime()); queue.flush(); } // send a message to stop consumer... while((ml = queue.nextToDispatch()) == null); // busy spin ml.set(-1); queue.flush(); consumer.join(); // wait for the consumer thread to die... } }