【RocketMQ】消息的存储

当Broker收到生产者的消息发送请求时,会对请求进行处理,从请求中解析发送的消息数据,接下来以单个消息的接收为例,看一下消息的接收过程。

数据校验

封装消息

首先Broker会创建一个MessageExtBrokerInner对象封装从请求中解析到的消息数据,它会将Topic信息、队列ID、消息内容、消息属性、发送消息时间、发送消息的主机地址等信息设置到MessageExtBrokerInner中,后续都使用这个MessageExtBrokerInner对象来操纵消息。

接下来会判断是否开启事务,开启事务与未开启事务时调用的方法不一致,这里以未开启事务为例,看下消息的持久化过程。

消息校验

在存储消息之前,需要对消息进行一系列的校验,保证收到的消息有效合法。

Broker存储合法性检查

主要对Broker是否可以写入消息进行检查,包含以下几个方面:

  1. 判断是否处于关闭消息存储的状态,如果处于关闭状态则不再受理消息的存储;
  2. Broker是否是从节点,从节点只能读不能写;
  3. Broker是否有写权限,如果没有写入权限,不能进行写入操作;
  4. 操作系统是否处于PAGECACHE繁忙状态,处于繁忙状态同样不能进行写入操作;

消息长度检查

主要是对主题的长度校验和消息属性的长度校验。

LMQ(Light Message Queue)

主要判断在开启LMQ(Light Message Queue)时是否超过了最大消费数量。

消息写入

对消息进行校验完毕之后,就可以对消息进行写入了。

前面说到Broker将收到的消息封装为了MessageExtBrokerInner对象,这里会新增以下设置:

(1)设置消息存储的时间(当前时间);
(2)计算消息体的CRC值,并设置到对应的成员变量中;
(3)如果发送消息的主机地址或者当前存储消息的Broker地址使用了IPV6,设置相应的IPV6标识;

写入缓冲区

RocketMQ会将消息数据先写入内存buffer,写入之前还会做一些校验:
(1)对消息属性数据的长度进行校验判断是否超过限定值;
(2)对消息整体内容长度进行校验,判断是否超过最大的长度限制;

校验通过之后,会根据消息总体内容的长度对buffer进行初始化,也就是根据需要的大小申请一块内存区域,开始写入以下数据:

  • 消息总长度,占4个字节;
  • 魔数,占4个字节;
  • 消息体CRC校验和,占4个字节;
  • 队列ID,占4个字节;
  • 标识,占4个字节;
  • 队列的偏移量,占8个字节;
  • 消息在文件的物理偏移量,占8个字节;
  • 系统标识,占4个字节;
  • 发送消息的时间戳,占8个字节;
  • 发送消息的主机地址,占8个字节;
  • 存储时间戳,占8个字节;
  • 存储消息的主机地址,占8个字节;
  • 消息的重试次数,占4个字节;
  • 事务相关偏移量,占8个字节;
  • 消息内容的长度,占4个字节;
  • 消息内容,由于消息内容不固定,所以长度不固定;
  • 主题名称的长度,占1个字节;
  • 主题名称内容,长度不固定;
  • 消息属性长度,占2个字节;
  • 消息属性内容,长度不固定;

整体存储格式如下:

获取CommitLog

RocketMQ将每一条消息存储到CommitLog文件中,存储文件的根目录由配置参数storePathRootDir决定:

默认每一个CommitLog的文件大小为1G,如果文件写满会新建一个CommitLog文件,以该文件中第一条消息的偏移量为文件名,小于20位用0补齐。

比如第一个文件中第一条消息的偏移量为0,那么第一个文件的名称为00000000000000000000,当这个文件存满之后,需要重新建立一个CommitLog文件,一个文件大小为1G,
1GB = 1024*1024*1024 = 1073741824 Bytes,所以下一个文件就会被命名为00000000001073741824。

在持久化消息之前,需要知道消息要写入哪个CommitLog文件,RocketMQ通过一个队列(对应MappedFileQueue)存储了记录了所有的CommitLog文件(对应MappedFile),并提供了相关方法获取到当前正在使用的那个CommitLog。

mappedFileQueue是所有mappedFile的集合,可以理解为CommitLog文件所在的那个目录。
MappedFile可以看做是每一个Commitlog文件的映射对象,每一个CommitLog对于一个MappedFile对象。

如果获取到的CommitLog取为空或者已写满,可能是首次写入消息还未创建文件或者上一次写入的文件已达到规定的大小(1G),此时会新建一个CommitLog文件。

需要注意,在获取CommitLog之前会加锁,一是防止在多线程情况下创建多个CommitLog,二是接下来要往CommitLog中写入消息内容,防止多线程情况下数据错乱。

写入CommitLog

知道要写入哪个CommitLog之后,就可以将之前已经写入缓冲区buffer的消息数据写入到CommitLog了。

RocketMQ提供了两种方式进行写入:

(1)通过暂存池将数据写入缓冲区
在开启暂存池时,会先将数据都写入字节缓冲区ByteBuffer中,ByteBuffer在申请内存时,可以申请JVM堆内内存(HeapByteBuffer),也可以申请堆外内存(DirectByteBuffer),RocketMQ使用的是堆外内存DirectByteBuffer

暂存池
类似线程池,只不过池中存放的是提前申请好的内存(ByteBuffer),RocketMQ会预先申请一些内存,从源码中可以看到申请的是堆外内存,然后放入池中,需要用时从池中获取,使用完毕后会归还到池中。

暂存池的开启条件
需要同时满足以下三个条件时才会开启暂存池:

  1. 配置中允许开启暂存池;
  2. Broker的角色不能是SLAVE
  3. 刷盘策略为异步刷盘;

从条件3中可以看出异步刷盘时才可以开启暂存池的使用,因为异步刷盘,很有可能是积攒了一批消息,需要同时刷入磁盘,所以使用暂存池可以将之前写入的消息先暂存在内存缓冲区中,等待执行刷盘时,将积攒的消息一起刷入磁盘中,而同步刷盘由于每次写入完毕之后要立刻刷回磁盘,那么就没有必要使用暂存池缓存数据了。

(2)通过文件映射
未开启暂存池时使用文件映射,使用MappedByteBuffer映射对应的CommitLog文件,MappedByteBuffer是ByteBuffer的子类,它可以将磁盘的文件内容映射到虚拟地址空间,通过虚拟地址访问物理内存中映射的文件内容,对文件内容进行操作。
使用MappedByteBuffer可以减少数据的拷贝,详细内容可参考【Java】Java中的零拷贝。

消息写入流程

了解了写入方式之后,来看下消息的写入流程:

  1. CommitLog对应的MappedFile对象中记录了当前文件的写入位置,首先会判断准备写入的位置是否小于文件总大小,如果小于意味着当前文件可以进行内容写入,反之说明此文件已写满,不能继续下一步,需要返回错误信息;

  2. 判断是否开启暂存池,开启暂存池时使用MappedFile中的ByteBuffer来开辟共享内存,否则使用MappedFile中的;
    MappedByteBuffer来开辟。

开辟共享内存之后,往共享内存中写入的数据,会影响到开辟它那个ByteBuffer或者MappedByteBuffer中;

  1. 将之前已经写入缓冲区的消息数据写入到开辟的共享内存中;

  2. 返回消息写入结果,有以下几种状态:

    • PUT_OK:写入成功;
    • END_OF_FILE:超过文件大小;
    • MESSAGE_SIZE_EXCEEDED:消息长度超过最大允许长度:
    • PROPERTIES_SIZE_EXCEEDED:消息、属性超过最大允许长度;
    • UNKNOWN_ERROR:未知异常;

需要注意,此时消息驻留在操作系统的PAGECACHE中,接下来需要根据刷盘策略决定何时将内容刷入到硬盘中。

RocketMQ消息存储相关源码可参考:【RocketMQ】【源码】消息的存储

刷盘处理

在以上的消息写入步骤完成之后,会进行刷盘操作。

有两种刷盘策略:

同步刷盘:表示消息写入到内存之后需要立刻刷到磁盘文件中。

异步刷盘:表示消息写入内存成功之后就返回,由MQ定时将数据刷入到磁盘中,会有一定的数据丢失风险。

不管同步刷盘还是异步刷盘,都是唤醒对应的刷盘线程来进行,这里不对唤醒的具体过程进行讲解,如果想了解可参考【RocketMQ】【源码】消息的刷盘机制。

同步刷盘

前面讲到,暂存池只有在异步刷盘时才可以启用,所以设置为同步刷盘时,使用的是文件映射的方式写入数据,在同步刷盘时直接通过MappedByteBufferforce方法将数据flush到磁盘文件即可。

异步刷盘

异步刷盘有开启暂存池和未开启两种情况。

开启暂存池

开启暂存池时,可以分为Commit和Flush两个阶段。

(1)Commit阶段

在开启暂存池时,数据会先写入缓冲区ByteBuffer中,并未映射到CommitLog文件中,所以首先会唤醒Commit线程,将ByteBuffer中的数据写入到CommitLog对应的FileChannel中。

(2)Flush阶段

数据被写入FileChannel之后,就会唤醒Flush线程,再调用FileChannel的force方法将数据flush到磁盘。

未开启暂存池

未开启暂存池时使用文件映射的方式,直接唤醒Flush线程,调用MappedByteBufferforce方法将数据flush到磁盘文件即可。

总结

通过上面分析消息的持久化过程,来看下RocketMQ提升性能的一些地方。

(1)RocketMQ在写入数据到CommitLog时,采用的是顺序写的方式,顺序写比随机写文件效率要高很多。

(2)在异步刷盘时,可以使用暂存池,暂存池会提前申请好内存,申请内存是一个比较重的操作,所以避免在消息写入时申请内存,以此提高效率。

(3)RocketMQ使用了MappedByteBuffer文件映射的方式,向CommitLog写入数据,可以减少数据的拷贝过程。

参考

RocketMQ官方文档

郭慕荣-RocketMQ消息存储原理总结(一)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/131213.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

社区版IDEA导入Eclipse项目注意事项

经过正常导入教程之后。。。 有些jar包没有依赖导入,我这里直接把tomcat的lib全导入

【Spring Boot系列】- Spring Boot侦听器Listener

【Spring Boot系列】- Spring Boot侦听器Listener 文章目录 【Spring Boot系列】- Spring Boot侦听器Listener一、概述二、监听器Listener分类2.1 监听ServletContext的事件监听器2.2 监听HttpSeesion的事件监听器2.3 监听ServletRequest的事件监听器 三、SpringMVC中的监听器3…

骨传导耳机对人体有危险吗?会损害听力吗?

如果在使用骨传导耳机的时候控制好时间和音量,是不会对人体带来危险和造成伤害的。 下面跟大家解释一下为什么骨传导耳机对人体没有危害,最大的原因就是骨传导耳机不需要空气传导,而是通过颅骨传到听觉中枢,传输过程中几乎没有噪…

docker容器技术实战-2

03docker hub 首先注册上号: https://hub.docker.com/ 上传自己的镜像仓库 创建自己的仓库 webserver 拉取镜像 配置加速器 04搭建私有仓库 上传镜像 在主机1上 在主机2 上 激活内核选项 激活内核选项文件传输过去 配置使用非加密端口 05 docker私有仓库 仓库加…

langchain主要模块(二):数据连接

langchain2之数据连接 langchain1.概念2.主要模块模型输入/输出 (Model I/O)数据连接 (Data connection)链式组装 (Chains)代理 (Agents)内存 (Memory)回调 (Callbacks) 3.数据连接1.数据加载:2.文档分割:3.文档向量化:4.存储和检索向量数据:…

Webpack Sourcemap文件泄露漏洞

Webpack Sourcemap文件泄露漏洞 前言一、Webpack和Sourcemap1.1 什么是Webpack1.2 什么是Sourcemap 二、漏洞利用2.1 使用reverse-sourcemap工具2.1 直接看前端代码 三、漏洞挖掘漏洞修复 前言 Webpack主要是用于前端框架进行打包的工具,打包后形成.js.map文件&…

单例模式-饿汉模式、懒汉模式

单例模式,是设计模式的一种。 在计算机这个圈子中,大佬们针对一些典型的场景,给出了一些典型的解决方案。 目录 单例模式 饿汉模式 懒汉模式 线程安全 单例模式 单例模式又可以理解为是单个实例(对象) 在有些场…

LeetCode 35. 搜索插入位置

题目链接 力扣(LeetCode)官网 - 全球极客挚爱的技术成长平台 题目解析 该题我们可以采用二分查找的方式,我们可以把数组分为,小于target的一边儿和大于等于target的一边儿。 当midleft(right-left)下标所对应的数大于等于targ…

多版本CUDA安装切换

系统中默认的安装CUDA为12.0,现在需要在个人用户下安装CUDA11.7。 CUDA 下载 CUDA官网下载 安装 Log file not open.Segmentation fault (core dumped)错误 将/tmp/cuda-installer.log删除即可。重新安装,去掉驱动的安装,设置Toolkit的安装…

骨科常用评估量表汇总,详细评分标准分享,建议收藏!

根据骨科医生的量表使用情况,笔者整理了9个常用的骨科量表,可在线评测并直接出结果,还可转发或生成二维码使用,或创建项目进行数据管理,有需要的小伙伴赶紧收藏! Harris髋关节评分系统 Harris髋关节评分系…

3D点云处理:Opencv Pcl实现深度图转点云(附源码)

文章目录 0. 测试效果1. 代码实现文章目录:3D视觉个人学习目录0. 测试效果 处理结果1. 代码实现 文章中提供的深度图像,深度图像一般以.tiff和.png保存,可以通过Opencv中的 c v : : i m r

fabirc 将图像绘制原点定为图形内部

添加元素时,设置属性: originX: center, originY: center, 如我定义两个矩形: addrect () { // 矩形1var rect new fabric.Rect({top: 50,left: 100,width: 100,height: 70,fill: #F56C6C,strokeUniform: true // 限制边框宽度缩放})canva…

LINUX 网络管理

目录 一、NetworkManager的特点 二、配置网络 1、使用ip命令临时配置 1)查看网卡在网络层的配置信息 2)查看网卡在数据链路层的配置信息 3)添加或者删除临时的网卡 4)禁用和启动指定网卡 2、修改配置文件 3、nmcli命令行…

时序分解 | MATLAB实现基于SSA奇异谱分析的信号分解分量可视化

时序分解 | MATLAB实现基于LMD局部均值分解的信号分解分量可视化 目录 时序分解 | MATLAB实现基于LMD局部均值分解的信号分解分量可视化效果一览基本介绍程序设计参考资料 效果一览 基本介绍 奇异谱分解奇异谱分析SSA 可直接替换txt数据运行 Matlab 1.包含3D分解效果图 频谱图等…

深圳唯创知音电子将参加IOTE 2023第二十届国际物联网展•深圳站

​ 2023年9月20~22日,深圳唯创知音电子将在 深圳宝安国际会展中心(9号馆9B1)为您全面展示最新的芯片产品及应用方案,助力传感器行业的发展。 作为全球领先的芯片供应商之一,深圳唯创知音电子一直致力于为提供高质量、…

分类预测 | Matlab实现基于BP-Adaboost数据分类预测

分类预测 | Matlab实现基于BP-Adaboost数据分类预测 目录 分类预测 | Matlab实现基于BP-Adaboost数据分类预测效果一览基本介绍研究内容程序设计参考资料 效果一览 基本介绍 1.Matlab实现基于BP-Adaboost数据分类预测(Matlab完整程序和数据) 2.多特征输入…

K8S基础概念

1、Node Node作为集群中的工作节点,运行真正的应用程序,在Node上Kubernetes管理的最小运行单元是Pod。Node上运行着Kubernetes的Kubelet、kube-proxy服务进程,这些服务进程负责Pod的创建、启动、监控、重启、销毁、以及实现软件模式的负载均…

UMA 2 - Unity Multipurpose Avatar☀️二.概念介绍

文章目录 🟥 UMA核心🟧 UMA Data 数据类1️⃣ DNA2️⃣ Slots 插槽Overlays 纹理贴图🟨 Base Recipe 基础人形Recipes🟩 Wardrobe Recipes 服饰Recipes🟥 UMA核心 UMA核心组件是 DynamicCharacterAvatar ,后续我们跟插件交互的API,例如捏脸的参数,都是与之交互完成的…

基于Android的生鲜农产品商城交易设计与实现

摘 要 人们生活水平随着发展不断的提升,人们对生鲜产品消费比越来越依赖,都希望吃到新鲜的食品。消费的加大给生鲜了全新的供应链及销售模式,那种传统的生鲜配送模式也在发生着变化。生鲜系统电商平台在我国目前是属于盛行的电商行业&#x…

算法:经典贪心算法--跳一跳[2]

1、题目: 给定一个长度为 n 的 0 索引整数数组 nums。初始位置为 nums[0]。 每个元素 nums[i] 表示从索引 i 向前跳转的最大长度。换句话说,如果你在 nums[i] 处,你可以跳转到任意 nums[i j] 处: 返回到达 nums[n - 1] 的最小跳跃次数。生…