前言
Disruptor的高性能,是多种技术结合以及本身架构的结果。本文主要讲源码,涉及到的相关知识点需要读者自行去了解,以下列出:
- 锁和CAS
- 伪共享和缓存行
- volatile和内存屏障
原理
添加了中文注释的源码:Disruptor
下图来自官方文档
官方原图有点乱,我翻译一下
在讲原理前,先了解 Disruptor 定义的术语
-
Event
存放数据的单位,对应 demo 中的
LongEvent
-
Ring Buffer
环形数据缓冲区:这是一个首尾相接的环,用于存放 Event ,用于生产者往其存入数据和消费者从其拉取数据
-
Sequence
序列:用于跟踪进度(生产进度、消费进度)
-
Sequencer
Disruptor的核心,用于在生产者和消费者之间传递数据,有单生产者和多生产者两种实现。
-
Sequence Barrier
序列屏障,消费者之间的依赖关系就靠序列屏障实现
-
Wait Strategy
-
等待策略,消费者等待生产者将发布的策略
-
Event Processor
事件处理器,循环从 RingBuffer 获取 Event 并执行 EventHandler。
-
Event Handler
事件处理程序,也就是消费者
-
Producer
生产者
Ring Buffer
环形数据缓冲区(RingBuffer),逻辑上是首尾相接的环,在代码中用数组来表示Object[]
。Disruptor生产者发布分两步
- 步骤一:申请写入 n 个元素,如果可以写入,这返回最大序列号
- 步骤二:根据序列号去 RingBuffer 中获取 Event,修改并发布
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); | |
// 获取下一个可用位置的下标(步骤1) | |
long sequence = ringBuffer.next(); | |
try { | |
// 返回可用位置的元素 | |
LongEvent event = ringBuffer.get(sequence); | |
// 设置该位置元素的值 | |
event.set(l); | |
} finally { | |
// 发布 | |
ringBuffer.publish(sequence); | |
} |
这两个步骤由 Sequencer 完成,分为单生产者和多生产者实现
Sequencer
单生产者
如果申请 2 个元素,则如下图所示(圆表示 RingBuffer)
// 一般不会有以下写法,这里为了讲解源码才使用next(2) | |
// 向RingBuffer申请两个元素 | |
long sequence = ringBuffer.next(2); | |
for (long i = sequence-1; i <= sequence; i++) { | |
try { | |
// 返回可用位置的元素 | |
LongEvent event = ringBuffer.get(i); | |
// 设置该位置元素的值 | |
event.set(1); | |
} finally { | |
ringBuffer.publish(i); | |
} | |
} |
next 申请成功的序列,cursor 消费者最大可用序列,gatingSequence 表示能申请的最大序列号。红色表示待发布,绿色表示已发布。申请相当于占位置,发布需要一个一个按顺序发布
如果 RingBuffer 满了呢,在上图步骤二的基础上,生产者发布了3个元素,消费者消费1个。此时生产者再申请 2个元素,就会变成下图所示
只剩下 1 个空间,但是要申请 2个元素,此时程序会自旋等待空间足够。
接下来结合代码看,单生产者的 Sequencer 实现为 SingleProducerSequencer,先看看构造方法
abstract class SingleProducerSequencerPad extends AbstractSequencer | |
{ | |
protected long p1, p2, p3, p4, p5, p6, p7; | |
SingleProducerSequencerPad(int bufferSize, WaitStrategy waitStrategy) | |
{ | |
super(bufferSize, waitStrategy); | |
} | |
} | |
abstract class SingleProducerSequencerFields extends SingleProducerSequencerPad | |
{ | |
SingleProducerSequencerFields(int bufferSize, WaitStrategy waitStrategy) | |
{ | |
super(bufferSize, waitStrategy); | |
} | |
long nextValue = Sequence.INITIAL_VALUE; | |
long cachedValue = Sequence.INITIAL_VALUE; | |
} | |
public final class SingleProducerSequencer extends SingleProducerSequencerFields | |
{ | |
protected long p1, p2, p3, p4, p5, p6, p7; | |
public SingleProducerSequencer(int bufferSize, WaitStrategy waitStrategy) | |
{ | |
super(bufferSize, waitStrategy); | |
} | |
} |
这是 Disruptor 高性能的技巧之一,SingleProducerSequencer 需要的类变量只有 nextValue 和cachedValue,p1 ~ p7 的作用是填充缓存行,这能保证 nextValue 和cachedValue 必定在独立的缓存行,我们可以用ClassLayout
打印内存布局看看
接下来看如何获取序列号(也就是步骤一)
// 调用路径 | |
// RingBuffer#next() | |
// SingleProducerSequencer#next() | |
public long next(int n) | |
{ | |
if (n < 1) | |
{ | |
throw new IllegalArgumentException("n must be > 0"); | |
} | |
long nextValue = this.nextValue; | |
//生产者当前序号值+期望获取的序号数量后达到的序号值 | |
long nextSequence = nextValue + n; | |
//减掉RingBuffer的总的buffer值,用于判断是否出现‘覆盖’ | |
long wrapPoint = nextSequence - bufferSize; | |
//从后面代码分析可得:cachedValue就是缓存的消费者中最小序号值,他不是当前最新的‘消费者中最小序号值’,而是上次程序进入到下面的if判定代码段时,被赋值的当时的‘消费者中最小序号值’ | |
//这样做的好处在于:在判定是否出现覆盖的时候,不用每次都调用getMininumSequence计算‘消费者中的最小序号值’,从而节约开销。只要确保当生产者的节奏大于了缓存的cachedGateingSequence一个bufferSize时,从新获取一下 getMinimumSequence()即可。 | |
long cachedGatingSequence = this.cachedValue; | |
//(wrapPoint > cachedGatingSequence) : 当生产者已经超过上一次缓存的‘消费者中最小序号值’(cachedGatingSequence)一个‘Ring’大小(bufferSize),需要重新获取cachedGatingSequence,避免当生产者一直在生产,但是消费者不再消费的情况下,出现‘覆盖’ | |
//(cachedGatingSequence > nextValue) : https://github.com/LMAX-Exchange/disruptor/issues/76 | |
// 这里判断就是生产者生产的填满BingBUffer,需要等待消费者消费 | |
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) | |
{ | |
cursor.setVolatile(nextValue); // StoreLoad fence | |
//gatingSequences就是消费者队列末尾的序列,也就是消费者消费到哪里了 | |
//实际上就是获得处理的队尾,如果队尾是current的话,说明所有的消费者都执行完成任务在等待新的事件了 | |
long minSequence; | |
while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) | |
{ | |
// 等待1纳秒 | |
LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin? | |
} | |
this.cachedValue = minSequence; | |
} | |
this.nextValue = nextSequence; | |
return nextSequence; | |
} | |
public void publish(long sequence) | |
{ | |
// 更新序列号 | |
cursor.set(sequence); | |
// 等待策略的唤醒 | |
waitStrategy.signalAllWhenBlocking(); | |
} |
要解释的都在注释里了,gatingSequences 是消费者队列末尾的序列,对应着就是下图中的 ApplicationConsumer 的 Sequence
多生产者
看完单生产者版,接下来看多生产者的实现。因为是多生产者,需要考虑并发的情况。
如果有A、B两个消费者都来申请 2 个元素
cursor 申请成功的序列,HPS 消费者最大可用序列,gatingSequence 表示能申请的最大序列号。红色表示待发布,绿色表示已发布。HPS 是我自己编的缩写,表示 getHighestPublishedSequence
方法的返回值
如图所示,只要申请成功,就移动 cursor 的位置。RingBuffer 并没有记录发布情况(图中的红绿颜色),这个发布情况由 MultiProducerSequencer
的availableBuffer
来维护。
下面看代码
public final class MultiProducerSequencer extends AbstractSequencer | |
{ | |
// 缓存的消费者中最小序号值,相当于SingleProducerSequencerFields的cachedValue | |
private final Sequence gatingSequenceCache = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); | |
// 标记元素是否可用 | |
private final int[] availableBuffer; | |
public long next(int n) | |
{ | |
if (n < 1) | |
{ | |
throw new IllegalArgumentException("n must be > 0"); | |
} | |
long current; | |
long next; | |
do | |
{ | |
current = cursor.get(); | |
next = current + n; | |
//减掉RingBuffer的总的buffer值,用于判断是否出现‘覆盖’ | |
long wrapPoint = next - bufferSize; | |
//从后面代码分析可得:cachedValue就是缓存的消费者中最小序号值,他不是当前最新的‘消费者中最小序号值’,而是上次程序进入到下面的if判定代码段时,被赋值的当时的‘消费者中最小序号值’ | |
//这样做的好处在于:在判定是否出现覆盖的时候,不用每次都调用getMininumSequence计算‘消费者中的最小序号值’,从而节约开销。只要确保当生产者的节奏大于了缓存的cachedGateingSequence一个bufferSize时,从新获取一下 getMinimumSequence()即可。 | |
long cachedGatingSequence = gatingSequenceCache.get(); | |
//(wrapPoint > cachedGatingSequence) : 当生产者已经超过上一次缓存的‘消费者中最小序号值’(cachedGatingSequence)一个‘Ring’大小(bufferSize),需要重新获取cachedGatingSequence,避免当生产者一直在生产,但是消费者不再消费的情况下,出现‘覆盖’ | |
//(cachedGatingSequence > nextValue) : https://github.com/LMAX-Exchange/disruptor/issues/76 | |
// 这里判断就是生产者生产的填满BingBUffer,需要等待消费者消费 | |
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) | |
{ | |
long gatingSequence = Util.getMinimumSequence(gatingSequences, current); | |
if (wrapPoint > gatingSequence) | |
{ | |
LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy? | |
continue; | |
} | |
gatingSequenceCache.set(gatingSequence); | |
} | |
// 使用cas保证只有一个生产者能拿到next | |
else if (cursor.compareAndSet(current, next)) | |
{ | |
break; | |
} | |
} | |
while (true); | |
return next; | |
} | |
...... | |
} |
MultiProducerSequencer
和SingleProducerSequencer
的 next()方法逻辑大致一样,只是多了CAS的步骤来保证并发的正确性。接着看发布方法
public void publish(final long sequence) | |
{ | |
// 记录发布情况 | |
setAvailable(sequence); | |
// 等待策略的唤醒 | |
waitStrategy.signalAllWhenBlocking(); | |
} | |
private void setAvailable(final long sequence) | |
{ | |
// calculateIndex(sequence):获取序号 | |
// calculateAvailabilityFlag(sequence):RingBuffer的圈数 | |
setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence)); | |
} | |
private void setAvailableBufferValue(int index, int flag) | |
{ | |
long bufferAddress = (index * SCALE) + BASE; | |
UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag); | |
// 上面相当于 availableBuffer[index] = flag 的高性能版 | |
} |
记录发布情况,其实相当于 availableBuffer[sequence] = 圈数
,前面说了,availableBuffer
是用来标记元素是否可用的,如果消费者的圈数 ≠ availableBuffer中的圈数,则表示元素不可用
public boolean isAvailable(long sequence) | |
{ | |
int index = calculateIndex(sequence); | |
// 计算圈数 | |
int flag = calculateAvailabilityFlag(sequence); | |
long bufferAddress = (index * SCALE) + BASE; | |
// UNSAFE.getIntVolatile(availableBuffer, bufferAddress):相当于availableBuffer[sequence] 的高性能版 | |
return UNSAFE.getIntVolatile(availableBuffer, bufferAddress) == flag; | |
} | |
private int calculateAvailabilityFlag(final long sequence) | |
{ | |
// 相当于 sequence % bufferSize ,但是位操作更快 | |
return (int) (sequence >>> indexShift); | |
} |
isAvailable() 方法判断元素是否可用,此方法的调用堆栈看完消费者就清楚了。
消费者
本小节介绍两个方面,一是 Disruptor 的消费者如何实现依赖关系的,二是消费者如何拉取消息并消费
消费者的依赖关系实现
我们看回这张图,每个消费者前都有一个 SequenceBarrier ,这就是消费者之间能实现依赖的关键。每个消费者都有一个 Sequence,表示自身消费的进度,如图中,ApplicationConsumer 的 SequenceBarrier 就持有 ReplicaionConsumer 和 JournalConsumer 的 Sequence,这样就能控制 ApplicationConsumer 的消费进度不超过其依赖的消费者。
下面看源码,这是 disruptor 配置消费者的代码。
EventHandler journalConsumer = xxx; | |
EventHandler replicaionConsumer = xxx; | |
EventHandler applicationConsumer = xxx; | |
disruptor.handleEventsWith(journalConsumer, replicaionConsumer) | |
.then(applicationConsumer); | |
// 下面两行等同于上面这行 | |
// disruptor.handleEventsWith(journalConsumer, replicaionConsumer); | |
// disruptor.after(journalConsumer, replicaionConsumer).then(applicationConsumer); |
先看ReplicaionConsumer 和 JournalConsumer 的配置 disruptor.handleEventsWith(journalConsumer, replicaionConsumer)
/** 代码都在Disruptor类 **/ | |
public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers) | |
{ | |
// 没有依赖的消费者就创建新的Sequence | |
return createEventProcessors(new Sequence[0], handlers); | |
} | |
/** | |
* 创建消费者 | |
* @param barrierSequences 当前消费者组的屏障序列数组,如果当前消费者组是第一组,则取一个空的序列数组;否则,barrierSequences就是上一组消费者组的序列数组 | |
* @param eventHandlers 事件消费逻辑的EventHandler数组 | |
*/ | |
EventHandlerGroup<T> createEventProcessors( | |
final Sequence[] barrierSequences, | |
final EventHandler<? super T>[] eventHandlers) | |
{ | |
checkNotStarted(); | |
// 对应此事件处理器组的序列组 | |
final Sequence[] processorSequences = new Sequence[eventHandlers.length]; | |
final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences); | |
for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) | |
{ | |
final EventHandler<? super T> eventHandler = eventHandlers[i]; | |
// 创建消费者,注意这里传入了SequenceBarrier | |
final BatchEventProcessor<T> batchEventProcessor = | |
new BatchEventProcessor<>(ringBuffer, barrier, eventHandler); | |
if (exceptionHandler != null) | |
{ | |
batchEventProcessor.setExceptionHandler(exceptionHandler); | |
} | |
consumerRepository.add(batchEventProcessor, eventHandler, barrier); | |
processorSequences[i] = batchEventProcessor.getSequence(); | |
} | |
// 每次添加完事件处理器后,更新门控序列,以便后续调用链的添加 | |
// 所谓门控,就是RingBuffer要知道在消费链末尾的那组消费者(也是最慢的)的进度,避免消息未消费就被写入覆盖 | |
updateGatingSequencesForNextInChain(barrierSequences, processorSequences); | |
return new EventHandlerGroup<>(this, consumerRepository, processorSequences); | |
} |
createEventProcessors() 方法主要做了3件事,创建消费者、保存eventHandler和消费者的映射关系、更新 gatingSequences
- EventProcessor 是消费者
- SequenceBarrier 是消费者屏障,保证了消费者的依赖关系
- consumerRepository 保存了eventHandler和消费者的映射关系
gatingSequences 我们在前面说过,生产者通过 gatingSequences 知道消费者的进度,防止生产过快导致消息被覆盖,更新操作在 updateGatingSequencesForNextInChain() 方法中
// 为消费链下一组消费者,更新门控序列 | |
// barrierSequences是上一组事件处理器组的序列(如果本次是第一次,则为空数组),本组不能超过上组序列值 | |
// processorSequences是本次要设置的事件处理器组的序列 | |
private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences) | |
{ | |
if (processorSequences.length > 0) | |
{ | |
// 将本组序列添加到Sequencer中的gatingSequences中 | |
ringBuffer.addGatingSequences(processorSequences); | |
// 将上组消费者的序列从gatingSequences中移除 | |
for (final Sequence barrierSequence : barrierSequences) | |
{ | |
ringBuffer.removeGatingSequence(barrierSequence); | |
} | |
// 取消标记上一组消费者为消费链末端 | |
consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences); | |
} | |
} |
让我们把视线再回到消费者的设置方法
disruptor.handleEventsWith(journalConsumer, replicaionConsumer) | |
.then(applicationConsumer); |
journalConsumer 和 replicaionConsumer 已经设置了,接下来是 applicationConsumer
/** 代码在EventHandlerGroup类 **/ | |
public final EventHandlerGroup<T> then(final EventHandler<? super T>... handlers) | |
{ | |
return handleEventsWith(handlers); | |
} | |
public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers) | |
{ | |
return disruptor.createEventProcessors(sequences, handlers); | |
} |
可以看到,设置 applicationConsumer 最终调用的也是 createEventProcessors() 方法,区别就在于 createEventProcessors() 方法的第一个参数,这里的 sequences 就是 journalConsumer 和 replicaionConsumer 这两个消费者的 Sequence
消费者的消费逻辑
消费者的主要消费逻辑在 EventProcessor#run()
方法中,下面以BatchEventProcessor
举例
// BatchEventProcessor#run() | |
// BatchEventProcessor#processEvents() | |
private void processEvents() | |
{ | |
T event = null; | |
long nextSequence = sequence.get() + 1L; | |
while (true) | |
{ | |
try | |
{ | |
// 获取最大可用序列 | |
final long availableSequence = sequenceBarrier.waitFor(nextSequence); | |
... | |
// 执行消费逻辑 | |
while (nextSequence <= availableSequence) | |
{ | |
// dataProvider就是RingBuffer | |
event = dataProvider.get(nextSequence); | |
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence); | |
nextSequence++; | |
} | |
sequence.set(availableSequence); | |
} | |
catch () | |
{ | |
// 异常处理 | |
} | |
} | |
} |
方法简洁明了,在死循环中通过 sequenceBarrier 获取最大可用序列,然后从 RingBuffer 中获取 Event 并调用 EventHandler 进行消费。重点在 sequenceBarrier.waitFor(nextSequence); 中
public long waitFor(final long sequence) | |
throws AlertException, InterruptedException, TimeoutException | |
{ | |
checkAlert(); | |
// 获取可用的序列,这里返回的是Sequencer#next方法设置成功的可用下标,不是Sequencer#publish | |
// cursorSequence:生产者的最大可用序列 | |
// dependentSequence:依赖的消费者的最大可用序列 | |
long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this); | |
if (availableSequence < sequence) | |
{ | |
return availableSequence; | |
} | |
// 获取最大的已发布成功的序号(对于发布是否成功的校验在此方法中) | |
return sequencer.getHighestPublishedSequence(sequence, availableSequence); | |
} |
熟悉的 getHighestPublishedSequence() 方法,忘了就回去看看生产者小节。waitStrategy.waitFor() 对应着图片中的 waitFor() 。
消费者的启动
前面讲了消费者的处理逻辑,但是 BatchEventProcessor#run() 是如何被调用的呢,关键在于disruptor.start();
// Disruptor#start() | |
public RingBuffer<T> start() | |
{ | |
checkOnlyStartedOnce(); | |
for (final ConsumerInfo consumerInfo : consumerRepository) | |
{ | |
consumerInfo.start(executor); | |
} | |
return ringBuffer; | |
} | |
class EventProcessorInfo<T> implements ConsumerInfo | |
{ | |
public void start(final Executor executor) | |
{ | |
// eventprocessor就是消费者 | |
executor.execute(eventprocessor); | |
} | |
} |
还记得 consumerRepository
吗,没有就往上翻翻设置消费者那里的 disruptor.handleEventsWith() 方法。
所以启动过程就是
disruptor#start() → ConsumerInfo#start() → Executor#execute() → EventProcessor#run()
课后作业:Disruptor 的消费者使用了多少线程?
总结
本文讲了 Disruptor 大体逻辑和源码,当然其高性能的秘诀不止文中描述的那些。还有不同的等待策略,Sequence 中使用Unsafe
而不是JDK中的 Atomic 原子类等等。