LMAX的干扰模式如何工作?

我正试图了解干扰模式 。 我观看了InfoQ的video并试图阅读他们的论文。 我知道有一个环形缓冲区涉及,它被初始化为一个非常大的数组,以利用caching局部性,消除新内存的分配。

这听起来像是有一个或多个primefaces整数跟踪位置。 每个“事件”似乎都得到了一个唯一的ID,并且它的环中的位置是通过相对于环的大小等来查找其模数而find的。

不幸的是,我并没有直观的感觉。 我做了很多交易应用,研究了演员模型 ,看了SEDA等。

他们在介绍中提到,这种模式基本上是路由器如何工作的; 但是我还没有find路由器如何工作的任何好的描述。

有没有更好的解释指针?

谷歌代码项目确实参考了一个关于环形缓冲区实现的技术文章 ,但是对于想要了解它是如何工作的人来说,这有点枯燥,学术和艰难。 不过也有一些博客文章已经开始以更可读的方式解释内部。 有关环形缓冲区的解释是破坏者模式的核心, 描述了消费者障碍 (涉及从破坏者处读取的部分)以及处理多个生产者的一些信息 。

Disruptor最简单的描述是:这是一种以最有效的方式在线程之间发送消息的方式。 它可以作为一个队列的替代品,但它也与SEDA和Actors共享一些function。

与队列相比:

Disruptor提供了将消息传递到另一个线程的能力,如果需要的话将其唤醒(类似于BlockingQueue)。 但是,有三个明显的差异。

  1. Disruptor的用户通过扩展Entry类并提供工厂来执行预分配来定义消息的存储方式。 这允许内存重用(复制)或者Entry可以包含对另一个对象的引用。
  2. 将消息放入Disruptor是一个2阶段的过程,首先在环形缓冲区中声明一个插槽,为用户提供可以填充相应数据的Entry。 然后必须提交条目,这个两阶段的方法是必要的,以便灵活使用上面提到的内存。 这是使消息线程可见消息的提交。
  3. 跟踪从环形缓冲区消耗的消息是消费者的责任。 将这个责任从环形缓冲区本身移开,有助于减less写入争用的数量,因为每个线程维护自己的计数器。

相比于演员

Actor模型比大多数其他编程模型更接近Disruptor,特别是如果使用提供的BatchConsumer / BatchHandler类。 这些类隐藏了维护消耗序列号的所有复杂性,并在发生重要事件时提供一组简单的callback。 但是,有一些细微的差别。

  1. Disruptor使用1个线程1的消费者模型,其中Actor使用N:M模型,也就是说,你可以拥有尽可能多的angular色,他们将分布在固定数量的线程中(通常每个核心1个)。
  2. BatchHandler接口为onEndOfBatch()提供了额外的(也是非常重要的)callback。 这允许缓慢的消费者,例如那些正在进行I / O的用户将事件一起分批以提高吞吐量。 可以在其他Actor框架中执行批处理,但是由于几乎所有其他框架在批处理结束时都不提供callback,因此需要使用超时来确定批处理的结束,从而导致延迟较差。

相比SEDA

LMAX构build了Disruptor模式来取代基于SEDA的方法。

  1. 它提供的SEDA的主要改进是能够并行工作。 为此,Disruptor支持将相同的消息(以相同的顺序)多次转换为多个消费者。 这避免了在stream水线中需要分叉阶段。
  2. 我们也允许消费者等待其他消费者的结果,而不必在他们之间进行另一个排队阶段。 消费者可以简单地观察它所依赖的消费者的序列号。 这避免了pipe道中的连接阶段的需要。

与内存障碍相比

另一种思考方式是作为一个结构化的,有序的记忆障碍。 生产者壁垒形成写屏障,消费壁垒成为阅读障碍。

首先,我们想了解它提供的编程模型。

有一个或多个作家。 有一个或多个读者。 有一行条目,完全从旧到新(从左到右)。 作家可以在右端添加新条目。 每个读者从左到右依次读取条目。 读者显然不能读过去的作家。

没有条目删除的概念。 我使用“读者”而不是“消费者”来避免使用条目的图像。 但是我们明白,最后一位读者左边的条目变得毫无用处。

一般读者可以同时独立阅读。 但是,我们可以宣布读者之间的依赖关系。 读取器依赖关系可以是任意的非循环图。 如果阅读者B依赖于阅读者A,则阅读者B不能阅读过去的阅读者A.

读者依赖性的出现是因为读者A可以注释一个条目,而读者B依赖于该注释。 例如,A对条目进行一些计算,并将结果存储在条目的字段a中。 然后A继续前进,现在B可以读取该条目,并存储A的值。 如果读者C不依赖于A,则C不应该尝试读取a

这确实是一个有趣的编程模型。 无论性能如何,只有这个模型可以使许多应用程序受益。

当然,LMAX的主要目标是performance。 它使用预先分配的条目环。 这个环足够大,但是这个环是有限的,所以系统不会超出devise容量。 如果戒指已满,作者将等待最慢的读者前进并腾出空间。

入口对象被预分配并永久生存,以减less垃圾收集成本。 我们不插入新的入口对象或删除旧的入口对象,而是写入者要求预先存在的入口,填充其字段并通知读者。 这种明显的两阶段行动实际上只是一个primefaces行动

 setNewEntry(EntryPopulator); interface EntryPopulator{ void populate(Entry existingEntry); } 

预分配条目还意味着相邻条目(非常可能)位于相邻的存储器单元中,并且由于读取器顺序地读取条目,所以利用CPU高速caching很重要。

还有很多努力来避免locking,CAS,甚至是内存屏障(例如,如果只有一个作者,使用非易失性序列variables)

对于读者的开发人员:不同的注释读者应写入不同的领域,以避免写争用。 (其实他们应该写入不同的caching行)。注释读者不应该触及其他非依赖读者可能读取的任何东西。 这就是为什么我说这些读者注释条目,而不是修改条目。

Martin Fowler写了一篇关于LMAX的文章和LMAX架构的干扰模式,这可能会进一步澄清。

我真的花时间研究真正的来源,出于纯粹的好奇心,其背后的想法很简单。 在写这篇文章的时候最新的版本是3.2.1。

有一个缓冲区存储预先分配的事件,将保存消费者阅读的数据。

缓冲区由一个长度为数组的标志(整型数组)来支持,描述了缓冲区时隙的可用性(详见进一步的细节)。 这个数组像java#AtomicIntegerArray一样被访问,所以为了这个完美的目的,你可能会认为它是一个。

可以有任何数量的生产者。 当生产者想写入缓冲区时,会产生一个很长的数字(如调用AtomicLong#getAndIncrement,Disruptor实际上使用它自己的实现,但是它以相同的方式工作)。 我们称之为生成长的producerCallId。 以类似的方式,当消费者ENDS从缓冲区读取一个插槽时,产生一个consumerCallId。 最近的consumerCallId被访问。

(如果有很多消费者,则select最低ID的电话)。

然后比较这些ID,如果两者之间的差异小于缓冲器一侧,则允许生产者写入。

(如果producerCallId大于最近的consumerCallId + bufferSize,则意味着缓冲区已满,生产者被迫等待总线等待,直到有一个点可用。

生产者然后基于他的callId(它是prducerCallId模缓冲区大小,但是由于缓冲区大小总是2的幂(在缓冲区创build时强制执行))在缓冲区中分配槽,所使用的执行操作是producerCallId&(bufferSize_1 ))。 然后可以自由修改该插槽中的事件。

(实际的algorithm有点复杂,为了优化目的,涉及将最近的consumerIdcaching在单独的primefaces参考中。)

当事件被修改时,更改是“已发布”。 在发布标志arrays中的相应插槽时填充更新的标志。 标志值是循环的编号(producerCallId除以bufferSize(因为bufferSize是2的幂,实际操作是右移)。

以类似的方式,可以有任何数量的消费者。 每次消费者想要访问缓冲区时,都会生成一个consumerCallId(取决于消费者如何添加到干扰源,id生成中使用的primefaces可能是共享的,或者每个都是独立的)。 然后将这个ConsumerCallId与最近的produCallId进行比较,如果两者中的较小者,则允许读者进行。

(类似的,如果producerCallId与consumerCallId相同,则意味着缓冲区是有效的,并且消费者被迫等待。在创build干扰程序时,等待的方式由WaitStrategy定义。

对于个人消费者(拥有自己的ID发生器的消费者),接下来检查的是批量消费的能力。 缓冲器中的槽按照从consumerCallId(相对于生产者相同的方式确定的索引)到相应于最近的producerCallId的顺序被检查。

通过比较在标志数组中写入的标志值与为consumerCallId生成的标志值来循环检查它们。 如果标志匹配,则意味着填充插槽的制作者已经进行了更改。 如果没有,则循环被破坏,并返回最高提交的changeId。 从ConsumerCallId到changeId中接收的插槽可以批量使用。

如果一组消费者一起阅读(共享ID生成器),则每个消费者只需要一个callId,并且只检查并返回该单个callId的插槽。

从这篇文章 :

干扰模式是一个批处理队列,由循环arrays(即环形缓冲区)填充,预先分配的传输对象使用内存屏障通过序列同步生产者和消费者。

记忆障碍是一种很难解释,Trisha的博客在我看来这个post做了最好的尝试: http : //mechanitis.blogspot.com/2011/08/dissecting-disruptor-why-its-so-fast。 HTML

但是如果你不想深入底层的细节,你可以知道Java中的内存障碍是通过volatile关键字或通过java.util.concurrent.AtomicLong 。 破坏者模式序列是AtomicLong ,并通过记忆障碍而不是locking在生产者和消费者之间来回传达。

我发现通过代码更容易理解一个概念,所以下面的代码是一个简单的来自CoralQueue的 helloworld ,它是由我所属的CoralBlocks完成的干扰模式实现。 在下面的代码中,您可以看到干扰程序模式如何实现批处理以及环缓冲区(即循环数组)如何允许两个线程之间进行无垃圾通信:

 package com.coralblocks.coralqueue.sample.queue; import com.coralblocks.coralqueue.AtomicQueue; import com.coralblocks.coralqueue.Queue; import com.coralblocks.coralqueue.util.MutableLong; public class Sample { public static void main(String[] args) throws InterruptedException { final Queue<MutableLong> queue = new AtomicQueue<MutableLong>(1024, MutableLong.class); Thread consumer = new Thread() { @Override public void run() { boolean running = true; while(running) { long avail; while((avail = queue.availableToPoll()) == 0); // busy spin for(int i = 0; i < avail; i++) { MutableLong ml = queue.poll(); if (ml.get() == -1) { running = false; } else { System.out.println(ml.get()); } } queue.donePolling(); } } }; consumer.start(); MutableLong ml; for(int i = 0; i < 10; i++) { while((ml = queue.nextToDispatch()) == null); // busy spin ml.set(System.nanoTime()); queue.flush(); } // send a message to stop consumer... while((ml = queue.nextToDispatch()) == null); // busy spin ml.set(-1); queue.flush(); consumer.join(); // wait for the consumer thread to die... } } 
Interesting Posts