引入了 Disruptor 后,系统性能大幅提升!

Disruptor 是一个很受欢迎的内存消息队列,它源于 LMAX 对并发、性能和非阻塞算法的研究。今天一起来学习一下这个消息队列。

简介

对于主流的分布式消息队列来说,一般会包含 Producer、Broker、Consumer、注册中心等模块。比如 RocketMQ 架构如下:

图片

Disruptor 并不是分布式消息队列,它是一款内存消息队列,因此架构上跟分布式消息队列有很大差别。下面是一张 LMAX 使用 Disruptor 的案例图:

图片

我们介绍一下 Disruptor 架构中的核心概念。

1.1 Ring Buffer

Ring Buffer 通常被认为是 Disruptor 的最主要的设计,但是从 3.0 版本开始,Ring Buffer 只负责存储和更新经过 Disruptor 的数据。在一些高级的使用场景,它甚至完全可以被用户替换。

1.2 Sequence

Disruptor 使用 Sequence 来识别特定组件的位置。每个 Consumer(也就是事件处理器)都像 Disruptor 一样持有一个 Sequence。并发相关的核心代码依赖 Sequence 的自增值,因此 Sequence 具有跟 AtomicLong 相似的特性,事实上唯一的不同就是不同的 Sequence 之间不存在伪共享问题。

伪共享:CPU 缓存是以缓存行为单位进行加载和存储,CPU 每次从主存中拉取数据时,会把相邻的数据也存入同一个缓存行。即使多个线程操作的是同一缓存行中不同的变量,只要有一个线程修改了缓存行中的某一个变量值,该缓存行就会被标记为无效,需要重新从主从中加载。在多线程环境下,频繁地重新加载缓存行,会严重影响了程序执行效率。

1.3 Sequencer

Sequencer 是 Disrupter 的真正核心,有单个生产者和多个生产者两种实现(SingleProducerSequencer 和 MultiProducerSequencer)。为了让数据在生产者和消费者之间快速、准确地传输,它们都实现了所有并发算法。

1.4 Sequence Barrier

Sequencer 生成一个 Sequence Barrier,它包含由 Sequencer 生成的 Sequence 和消费者拥有的 Sequence 的引用。Sequence Barrier 决定是否有事件给消费者处理。

1.5 Wait Strategy

消费者怎样等待事件的到来。

1.6 Event Processor

主要负责循环处理来自 Disruptor 事件,它拥有消费者 Sequence 的所有权。有一个单独的实现类 BatchEventProcessor,这个类拥有高效的事件循环处理能力并且处理完成后可以回调实现 EventHandler 接口的用户。

1.7 Event Handler

由用户来实现并且代表 Disruptor 消费者的接口。

2 Disruptor 特性

2.1 多播事件

多播事件是 Disruptor 区别于其他队列的最大差异。其他队列都是一个事件消息只能被单个消费者消费,而 Disruptor 如果有多个消费者监听,则可以将所有事件消息发送给所有消费者。

在前面 LMAX 使用 Disruptor 的案例图中,有 JournalConsumer、ReplicationConsumer 和 ApplicationConsumer 三个消费者监听了 Disruptor,这三个消费者将收到来了 Disruptor 的所有消息。

2.2 消费者依赖关系图

为了支持并发处理在实际业务场景中的需要,有时消费者直接需要做协调。再回到前面 LMAX 使用 Disruptor 的案例,在 journalling 和 replication 这两个消费者处理完成之前,有必要阻止业务逻辑消费者开始处理。我们称这个特征为“gating”(或者更准确地说,该特征是“gating”的一种形式)。

首先,确保生产者数量不会超过消费者。这通过调用 RingBuffer.addGatingConsumers()来将相关消费者添加到 Disruptor。其次,消费者依赖关系的实现是通过构建一个 SequenceBarrier,SequenceBarrier 拥有需要在它前面完成处理逻辑的消费者的 Sequence。

就拿前面 LMAX 使用 Disruptor 的案例来说,ApplicationConsumer 的 SequenceBarrier 拥有 JournalConsumer 和 ReplicationConsumer 这 2 个消费者的 Sequence,所以 ApplicationConsumer 对 JournalConsumer 和 ReplicationConsumer 的依赖关系可以从 SequenceBarrier 到 Sequence 的连接中看到。

Sequencer 和下游消费者的关系也需要注意。Sequencer 的一个角色就是发布的事件消息不能超出 Ring Buffer。这就要求下游消费者的 Sequence 不能小于 Ring Buffer 的 Sequence,也不能小于 Ring Buffer 的大小。

上面图中,因为 ApplicationConsumer 的 Sequence 必须要保证小于等于 JournalConsumer 和 ReplicationConsumer 的 Sequence,因此 Sequencer 只需要关心 ApplicationConsumer 的 Sequence。

2.3 内存预分配

Disruptor 的目标是低延迟,因此减少或者去除内存分配是必要的。在基于 Java 的系统中,目标是减少 STW 次数。

为了支持这一点,用户可以在 Disruptor 中预分配事件所需的内存。在预分配内存时,用户提供的 EventFactory 将对 Ring Buffer 的所有元素进行调用。当生产者向 Disruptor 发送新的事件消息时,Disruptor 的 API 允许用户使用构造好的对象,他们可以调用对象的方法或者更新对象的字段。Disruptor 需要确保并发安全。

2.4 无锁并发

Disruptor 实现低延迟的另一个关键方法时使用无锁算法,通过使用内存屏障和 CAS 来实现内存可见性和正确性。Disruptor 唯一使用锁的地方就是在 BlockingWaitStrategy。

3 调优选项

虽然大多数场景下 Disruptor 可以表现出优秀的性能,但是仍然有一些调优参数可以改进 Disruptor 的性能。

3.1 单个/多个生产者

Disruptor<LongEvent> disruptor = new Disruptor(factory,bufferSize,DaemonThreadFactory.INSTANCE,ProducerType.SINGLE, new BlockingWaitStrategy() 
);

上面是 disruptor 的构造函数,ProducerType.SINGLE 表示创建单生产者的 Sequencer,ProducerType.MULTI  表示创建多生产者的 Sequencer。

在并发系统中提高系统性能的最好方式是遵循单写原则。下面是官方的一个 disruptor 吞吐量测试结果,测试环境是 i7 Sandy Bridge MacBook Air。

单生产者:

图片

多生产者:

图片

3.2 等待策略

  1. BlockingWaitStrategy

disruptor 的默认等待策略是 BlockingWaitStrategy,这种策略使用锁和唤醒锁的 Condition 变量。

  1. SleepingWaitStrategy

跟 BlockingWaitStrategy 策略类似,他是通过 LockSupport.parkNanos(1) 方法来实现等待,不需要给 Condition 变量发送信号来唤醒等待。

主要适用于对延时要求不高的场景,比如异步打印日志。

  1. YieldingWaitStrategy

‌YieldingWaitStrategy 策略使用 Busy spin‌(不释放 CPU 资源,通过循环检查条件直到条件满足为止)技术来等待 sequence 增长到一个合适的值。在循环内部会调用 Thread#yield() 方法允许其他排队线程去执行。

这种策略主要用于通过消耗 CPU 来实现低延迟的场景。当 EventHandler 数量消息逻辑 CPU 核数并且对延迟要求较高时,可以考虑这种等待策略。

  1. BusySpinWaitStrategy

BusySpinWaitStrategy 是性能最高的等待策略,它适用于低延迟系统,但是对部署环境要求很高。

这种等待策略的唯一适用场景是当 EventHandler 数量消息逻辑 CPU 核数并且超线程被禁用。

4 官方示例

下面是一个官方示例。这个例子比较简单,就是生产者向消费者发送一个 long 类型的值。

  1. 首先定义一个 Event。

public class LongEvent
{private long value;public void set(long value){this.value = value;}@Overridepublic String toString(){return "LongEvent{" + "value=" + value + '}';}
}
  1. 为了能让 Disruptor 预分配内存,这里定义一个 LongEventFactory。

public class LongEventFactory implements EventFactory<LongEvent>
{@Overridepublic LongEvent newInstance(){return new LongEvent();}
}
  1. 创建一个消费者来处理事件

public class LongEventHandler implements EventHandler<LongEvent>
{@Overridepublic void onEvent(LongEvent event, long sequence, boolean endOfBatch){System.out.println("Event: " + event);}
}
  1. 编写发送事件消息的逻辑

import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.examples.longevent.LongEvent;
import com.lmax.disruptor.util.DaemonThreadFactory;
import java.nio.ByteBuffer;public class LongEventMain
{public static void main(String[] args) throws Exception{int bufferSize = 1024; Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);disruptor.handleEventsWith((event, sequence, endOfBatch) ->System.out.println("Event: " + event)); disruptor.start(); RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); ByteBuffer bb = ByteBuffer.allocate(8);for (long l = 0; true; l++){bb.putLong(0, l);ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb);Thread.sleep(1000);}}
}

5 总结

作为一款高性能的内存队列,Disruptor 有不少优秀的设计思想值得我们学习,比如内存预分配、无锁并发。同时它的使用非常简单,推荐大家使用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/18989.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【WPSOffice】汇总

写在前面 PPT篇 幻灯片母版 通过母版功能统一幻灯片的样式、字体、颜色等&#xff0c;提高整体一致性。 统一设置模板样式 字体安装 查找到字体并安装。 在WPS PPT&#xff08;WPS演示&#xff09;中&#xff0c;以下是最常用的十个功能&#xff0c;能够帮助用户高效制作…

鸿蒙开发:熟知@BuilderParam装饰器

前言 本文代码案例基于Api13。 在实际的开发中&#xff0c;我们经常会遇到自定义组件的情况&#xff0c;比如通用的列表组件&#xff0c;选项卡组件等等&#xff0c;由于使用方的样式不一&#xff0c;子组件是动态变化的&#xff0c;针对这一情况&#xff0c;就不得不让使用方把…

在Nodejs中使用kafka(一)安装使用

安装 方法一、使用docker-compose安装 1、创建docker-compose.yml文件。 services:zookeeper:image: docker.io/bitnami/zookeeper:3.9ports:- "2181:2181"volumes:- "./data/zookeeper:/bitnami"environment:- ALLOW_ANONYMOUS_LOGINyeskafka:image: …

CRISPR spacers数据库;CRT和PILER-CR用于MAGs的spacers搜索

iPHoP&#xff1a;病毒宿主预测-CSDN博客 之前介绍了这个方法来预测病毒宿主&#xff0c;今天来介绍另一种比较用的多的方法CRISPR比对 CRISPR spacers数据库 Dash 在这可以下载作者搜集的spacers用于后期比对 CRT和PILER-CR 使用 CRT 和 PILERCR 识别 CRISPR 间隔区&#x…

深入理解Java的 JIT(即时编译器)

&#x1f9d1; 博主简介&#xff1a;CSDN博客专家&#xff0c;历代文学网&#xff08;PC端可以访问&#xff1a;https://literature.sinhy.com/#/literature?__c1000&#xff0c;移动端可微信小程序搜索“历代文学”&#xff09;总架构师&#xff0c;15年工作经验&#xff0c;…

LabVIEW开发CANopen紧急对象读取

本示例展示了如何通过CANopen协议读取设备的紧急对象&#xff08;Emergency object&#xff09;。紧急对象用于报告设备发生故障或异常情况时的紧急信息。通过该示例&#xff0c;用户可以配置并读取设备发送的紧急消息&#xff0c;确保在设备发生紧急状况时能够及时响应。 主要…

DeepSeek官方推荐的AI集成系统

DeepSeek模型虽然强大先进&#xff0c;但是模型相当于大脑&#xff0c;再聪明的大脑如果没有输入输出以及执行工具也白搭&#xff0c;所以需要有配套工具才能让模型发挥最大的作用。下面是一个典型AI Agent架构图&#xff0c;包含核心组件与数据流转关系&#xff1a; #mermaid-…

【第13章:自监督学习与少样本学习—13.4 自监督学习与少样本学习的未来研究方向与挑战】

凌晨三点的实验室里,博士生小张盯着屏幕上的训练曲线——他设计的跨模态少样本学习模型在医疗影像诊断任务上突然出现了诡异的性能断崖。前一秒还在92%的准确率高位运行,下一秒就暴跌到47%。这个看似灾难性的现象,却意外揭开了自监督学习与少样本学习技术深藏的核心挑战… 一…

论文解读之DeepSeek R1

今天带来DeepSeek R1的解读 一、介绍 deepseek主打复杂推理任务&#xff0c;如数学、代码任务。 R1以预训练过的V1-base初始化&#xff0c;主要发挥了RL在长思维链上的优势&#xff0c;R1-Zero直接RL而在前置步骤中不进行SFT&#xff0c;即缺少了有监督的指令微调阶段&#…

【Java学习】类和对象

目录 一、选择取块解 二、类变量 三、似复刻变量 四、类变量的指向对象 五、变量的解引用访问 1.new 类变量(参) 2.this(参) 3.类变量/似复刻变量. 六、代码块 七、复制变量的赋值顺序 八、访问限定符 1.private 2.default 九、导类 一、选择取块解 解引用都有可以…

使用css实现镂空效果

前言&#xff1a; 最近在公司完成小程序的新手引导中遇到了要将蒙层挖空&#xff0c;漏出后面内容的功能&#xff0c;找了各种资料之后&#xff0c;发现了一种就使用几行css代码就实现这个效果的方式&#xff0c;在这里分享给各位小伙伴们。 功能描述&#xff1a;实现下图的镂…

15.1 Process(进程)类

版权声明&#xff1a;本文为博主原创文章&#xff0c;转载请在显著位置标明本文出处以及作者网名&#xff0c;未经作者允许不得用于商业目的。 通常开发时想要获得进程是比较困难的事&#xff0c;必须要调用CreateToolhelpSnapshot、ProcessFirst、ProcessNext等API或者诸如 Zw…

【全栈开发】----Mysql基本配置与使用

本篇是在已下载Mysql的情况下进行的&#xff0c;若还未下载或未创建Mysql服务&#xff0c;请转到这篇: 2024 年 MySQL 8.0.40 安装配置、Workbench汉化教程最简易&#xff08;保姆级&#xff09;_mysql8.0.40下载安装教程-CSDN博客 本文对于mysql的操作均使用控制台sql原生代码…

数据恢复-01-机械硬盘的物理与逻辑结构

磁盘存储原理 磁盘存储数据的原理&#xff1a; 磁盘存储数据的原理是利用磁性材料在磁场作用下的磁化性质&#xff0c;通过在磁盘表面上划分成许多小区域&#xff0c;根据不同的磁化方向来表示0和1的二进制数据&#xff0c;通过读写磁头在磁盘上的移动&#xff0c;可以实现数据…

神经网络新手入门(3)光明顶复出(2006-2012)

让我们继续这场科技江湖的传奇&#xff0c;见证神经网络如何从寒冬中涅槃重生&#xff1a; 第五章&#xff1a;光明顶复出&#xff08;2006-2012&#xff09; 2006年&#xff0c;江湖人称"深度学习教主"的辛顿&#xff08;Geoffrey Hinton&#xff09;闭关修炼二十…

【C++】基础入门(详解)

&#x1f31f; Hello&#xff0c;我是egoist2023&#xff01; &#x1f30d; 种一棵树最好是十年前&#xff0c;其次是现在&#xff01; 目录 输入&输出 缺省参数(默认参数) 函数重载 引用 概念及定义 特性及使用 const引用 与指针的关系 内联inline和nullptr in…

【2025最新版】软件测试面试题总结(150道题含答案解析)

接口测试面试题 1&#xff1a;你平常做接口测试的过程中发现过哪些 bug? 2&#xff1a;平常你是怎么测试接口的&#xff1f; 3&#xff1a;平常用什么工具测接口? 4: webService 接口是如何测试的? 5&#xff1a;没有接口文档&#xff0c;如何做接口测试&#xff1f; 6&…

使用EVE-NE-锐捷实现NAT+ACL服务限制

一、项目拓扑 二、项目实现 1.NET配置 点击左侧的NetWorks,设置与图相同的配置&#xff0c;实现实验环境桥接到物理网络 2.GW配置 进入特权模式 enable进入全局模式 configure terminal 更改名称为GW hostname GW进入g0/0接口 interface g0/0将g0/0接口IP地址配置为192.168.…

nginx 实战配置

一、配置一个默认80端口的&#xff0c;静态页面&#xff0c;路径是path1。 http://192.168.0.111/path1 &#xff0c; /path1路径指向linux的/data/index1.html vi /data/nginx-1.24.0/conf/nginx.conf 文件添加以下配置 location /path1 { alias /data/…

kubekey一键部署k8s高可用与kubesphere

kubekey一键安装k8s与kubesphere还是蛮方便的&#xff0c;kubesphere官网上面也提到了高可用安装的一些事宜&#xff0c;但是没有涉及到kubesphere资深的redis的系统的部署问题&#xff0c;本文简单给出对应配置&#xff0c;其实这个配置在kubephere的cluster-configuration.ya…