【RocketMQ 存储】- 同步刷盘和异步刷盘

文章目录

  • 1. 前言
  • 2. 概述
  • 3. submitFlushRequest 提交刷盘请求
  • 4. FlushDiskWatcher 同步刷盘监视器
  • 5. 同步刷盘但是不需要等待刷盘结果
  • 6. 小结


本文章基于 RocketMQ 4.9.3

1. 前言

RocketMQ 存储部分系列文章:

  • 【RocketMQ 存储】- RocketMQ存储类 MappedFile
  • 【RocketMQ 存储】- 一文总结 RocketMQ 的存储结构-基础
  • 【RocketMQ 存储】- broker 端存储单条消息的逻辑
  • 【RocketMQ 存储】- broker 端存储批量消息的逻辑

上一篇文章中,我们解析了 RocketMQ 是如何存储单条消息和批量消息的,但是消息也只是存储到了 CommitLog#MappedFile#ByteBuffer,也就是内容还是没有刷盘,那么这篇文章就来介绍下刷盘的逻辑。


2. 概述

RocketMQ 的刷盘有两种策略,同步刷盘异步刷盘

  • 同步刷盘:只有在消息真正持久化至磁盘后 RocketMQ 的 Broker 端才会真正返回给 Producer 端一个成功的 ACK 响应。同步刷盘对 MQ 消息可靠性来说是一种不错的保障,但是性能上会有较大影响,一般适用于金融业务应用该模式较多。
  • 异步刷盘:能够充分利用 OS 的PageCache的优势,只要消息写入PageCache即可将成功的ACK返回给Producer端。消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了MQ的性能和吞吐量。

消息写入 CommitLog 的 ByteBuffer 后,通过 submitFlushRequest 方法添加了一个刷盘请求,下面我们就来看下这个方法。


3. submitFlushRequest 提交刷盘请求

/*** 提交刷盘请求* @param result* @param messageExt* @return*/
public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {// 同步刷盘,默认是异步刷盘,也就是 FlushDiskType.ASYNC_FLUSHif (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {// 同步刷盘服务 GroupCommitServicefinal GroupCommitService service = (GroupCommitService) this.flushCommitLogService;// 消息是否需要等待存储完成后才返回if (messageExt.isWaitStoreMsgOK()) {// 创建刷盘请求,这里偏移量 nextOffset = 消息写入位置 + 写入的消息长度,意思是刷完盘之后偏移量应该设置为 nextOffsetGroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),// 同步刷盘的等待时间是 5sthis.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());// 添加请求到 flushDiskWatcher 中flushDiskWatcher.add(request);// 把请求添加到刷盘请求集合 requestsWrite 中,等待刷盘服务 GroupCommitService 处理service.putRequest(request);// 返回刷盘请求的 flushOKFuture,但是并没有填充结果return request.future();} else {// 如果不需要等待刷盘结果,那么唤醒刷盘服务就可以直接返回添加刷盘请求成功的状态码(PUT_OK)了service.wakeup();return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);}}// 异步刷盘else {// 看看是否开启了堆外缓存if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {// 如果没有启动堆外缓存,那么唤醒异步刷盘服务 FlushRealTimeServiceflushCommitLogService.wakeup();} else  {// 这里就是开启了堆外缓存,那么唤醒异步提交服务 CommitRealTimeServicecommitLogService.wakeup();}// 返回结果return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);}
}

下面就以这个方法引出同步刷盘和异步刷盘,在解析之前先说下消息刷盘的标记,分别是同步刷盘(FlushDiskType.SYNC_FLUSH ),异步刷盘(FlushDiskType.ASYNC_FLUSH) ,这个方法逻辑如下:

  • 如果刷盘服务设定是同步刷盘,也就是 FlushDiskType.SYNC_FLUSH

    • 获取同步刷盘服务 GroupCommitService。
    • 接着判断消息的配置是否需要等待存储完成后才返回,如果需要存储完成才返回,那么会等待刷盘请求被处理之后才返回结果,当然不是在这里等待,后面会解释。
    • 如果不需要等待刷盘结果,那么唤醒同步刷盘线程,随后直接返回PUT_OK。
  • 如果刷盘服务设定是异步刷盘,也就是 FlushDiskType.ASYNC_FLUSH

    • 判断是否开启了堆外缓存,如果开启了,那么说明消息是写入的 writeBuffer,我们上一篇文章就已经解释过了,writeBuffer 不是使用 mmap 映射到 page cache 的,所以写入这里面的消息得先 commit 提交到 page cache,所以开启了堆外缓存那么会唤醒 CommitRealTimeService
    • 如果没有开启了堆外缓存,证明消息是写入了 mappedByteBuffer,这时候直接唤醒异步刷盘服务 FlushRealTimeService

好了,上面就是这个方法的全部逻辑,那么顺着上面的代码,我提出两个问题:

  1. FlushDiskWatcher 这个 Watcher 是干什么用的?
  2. 同步刷盘服务不需要等待刷盘结果的情况下,为什么不需要添加一个 request 到同步刷盘请求集合中?

其实上面第二个问题可以从同步刷盘服务 GroupCommitService 得到解答,当然我这里只是做一个总结,具体逻辑会在后续慢慢看,下面还是先看下第一个问题,也就是 FlushDiskWatcher 是干什么的?


4. FlushDiskWatcher 同步刷盘监视器

FlushDiskWatcher 是同步刷盘请求的监视器,还记得前面创建的刷盘请求呢?

// 创建刷盘请求,这里偏移量 nextOffset = 消息写入位置 + 写入的消息长度,意思是刷完盘之后偏移量应该设置为 nextOffset
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),// 同步刷盘的等待时间是 5sthis.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());/*** 同步刷盘请求*/
public static class GroupCommitRequest {// 最新刷盘位置,意思就是这个刷盘请求处理完成后最新的刷盘位置 flushedWhere >= nextOffsetprivate final long nextOffset;// 同步刷盘的阻塞等待结果private CompletableFuture<PutMessageStatus> flushOKFuture = new CompletableFuture<>();// 超时时间private final long deadLine;public GroupCommitRequest(long nextOffset, long timeoutMillis) {this.nextOffset = nextOffset;this.deadLine = System.nanoTime() + (timeoutMillis * 1_000_000);}public long getDeadLine() {return deadLine;}public long getNextOffset() {return nextOffset;}public void wakeupCustomer(final PutMessageStatus putMessageStatus) {this.flushOKFuture.complete(putMessageStatus);}public CompletableFuture<PutMessageStatus> future() {return flushOKFuture;}}

这里设置的刷盘等待时间是 5 s,所以 deadLine = System.nanoTime() + (5 * 1_000_000)。那么下面来看下 FlushDiskWatcher 这个类里面的方法和属性。

// 同步刷盘的请求集合
private final LinkedBlockingQueue<GroupCommitRequest> commitRequests = new LinkedBlockingQueue<>();

这个就是同步刷盘的请求集合,FlushDiskWatcher 里面只有这一个属性参数。

public class FlushDiskWatcher extends ServiceThread {private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);// 同步刷盘的请求集合private final LinkedBlockingQueue<GroupCommitRequest> commitRequests = new LinkedBlockingQueue<>();@Overridepublic String getServiceName() {return FlushDiskWatcher.class.getSimpleName();}@Overridepublic void run() {...}public void add(GroupCommitRequest request) {commitRequests.add(request);}public int queueSize() {return commitRequests.size();}
}

上面就是 FlushDiskWatcher 里面的方法,比较简单,核心逻辑还是在 run 方法中,下面就来看下 run 方法的逻辑。

首先就是一个 while 循环,FlushDiskWatcher 只要不 shutdown 都会一直循环,就算 while 循环里面出异常也不会退出。

// 如果服务没有停止
while (!isStopped()) {...
}

在 while 循环中,首先从 commitRequests 集合中阻塞获取刷盘请求,当消息同步刷盘的时候,就会把请求往 commitRequests 里面添加,这里就能获取到了。

try {// 从 commitRequests 集合中阻塞获取刷盘请求,当消息同步刷盘的时候,就会把请求往 commitRequests 里面添加,这里就能获取到了request = commitRequests.take();} catch (InterruptedException e) {// 中断异常,继续执行log.warn("take flush disk commit request, but interrupted, this may caused by shutdown");continue;}

到这里就是能获取到请求,那么就会阻塞等待这个请求到时间,下面又是一个 while 循环,这个 while 循环就是专门处理这个 request 的。

// 下面就是 while 循环判断这个 request 有没有返回结果了
while (!request.future().isDone()) {// 当前时间long now = System.nanoTime();// 看看是不是已经超时了if (now - request.getDeadLine() >= 0) {// 刷盘请求超时了,就是往请求里面写入 FLUSH_DISK_TIMEOUT 的结果request.wakeupCustomer(PutMessageStatus.FLUSH_DISK_TIMEOUT);// 跳出煦暖break;}// 这里的截至时间是 nacos,所以求睡眠时间是 / 1 000 000long sleepTime = (request.getDeadLine() - now) / 1_000_000;// 最小不能低于 10 mssleepTime = Math.min(10, sleepTime);if (sleepTime == 0) {// 如果是 0,就是到点了,返回结果超时,因为下面就是 sleep,所以这里不超时下一次 while 循环也会超时request.wakeupCustomer(PutMessageStatus.FLUSH_DISK_TIMEOUT);break;}try {// 睡眠Thread.sleep(sleepTime);} catch (InterruptedException e) {// 睡眠被中断就跳出循环log.warn("An exception occurred while waiting for flushing disk to complete. this may caused by shutdown");break;}
}

上面就是阻塞等待的核心逻辑了,里面的逻辑和 Timer 的有点像,但是吧看上面的逻辑,如果说里面同时添加了 10000 个 request 请求,那么第 10000 个请求就得遍历 10000 次才能判断到,这时候就有可能已经超时了。所以这里感觉使用 ScheduledExecutorService 来处理会好一点。

我倒是想不明白这里为什么不直接在同步处理请求的时候用当前时间和 deadline 作比较,如果超时直接返回,为什么还要单独写一个监视器呢?如果有知道的朋友可以评论下。

所以 FlushDiskWatcher 这个类的作用就是不断检测同步刷盘请求看看有没有超时,如果超时了就直接返回结果 FLUSH_DISK_TIMEOUT


5. 同步刷盘但是不需要等待刷盘结果

好了,上面第四小结就解释了第三小节给出的问题:FlushDiskWatcher 这个 Watcher 是干什么用的?,下面这一小节就来回答下最后一个问题:同步刷盘服务不需要等待刷盘结果的情况下,为什么不需要添加一个 request 到同步刷盘请求集合中?

我们重新看下为什么会提出这个问题,还是回到 submitFlushRequest 方法,那么我们来截下图:
在这里插入图片描述
当刷盘请求是同步刷盘但是不需要返回结果的时候,这里只是唤醒同步刷盘服务 GroupCommitService,然后直接返回 PUT_OK。上两篇文章就解释过了,这里消息提交只是提交到 writeBuffer 或者 mappedByteBuffer,还没有刷盘的。

要解答这个问题,就得先去 GroupCommitService 里面看下刷盘逻辑,里面刷盘的核心逻辑在 doCommit,不过这篇文章我先不详细解释如何刷盘的,主要是整体看下里面的逻辑。

/*** 同步刷盘*/
private void doCommit() {// 从读队列中获取刷盘请求if (!this.requestsRead.isEmpty()) {...} else {// 有些消息是同步刷盘不等待,就不需要走上面的流程去读取 requestsRead 处理刷盘请求,这类的也不会往 requestsWrite 里面设置刷盘请求CommitLog.this.mappedFileQueue.flush(0);}
}

上面同步刷盘服务不需要等待刷盘结果的情况下,请求不会添加到 requestsRead 里面,所以就会走 else 的逻辑,在里面直接进行刷盘,这个方法我们后面也会解析。

但是又有问题了,我们知道如果开启了堆外缓存,消息先写入 writeBuffer,要知道这部分数据是不能直接通过 fileChannel 刷盘的,看刷盘逻辑,就是 MappedFile#flush 方法,就看这里:
在这里插入图片描述
finalChannel 初始化的时候是通过 mmap 映射出 mappedByteBuffer 的。在这里插入图片描述
要知道这个 finalChannel 是当前用户空间到 Page Cache 中的区域的通道,跟 writeBuffer 这个堆外缓存可没什么关系,所以上面的 MappedFile#flush 方法在通过 fileChannel.force 的时候其实是把 Page Cache 里面的数据给刷盘,但是此时 GroupCommitService 通过 flush 刷盘的时候,writeBuffer 里面的消息可还没有提交到 Page Cache,所以这部分数据是怎么刷盘的呢?

这里后面说到 GroupCommitService 源码的时候会解析,这里就先不说了。


6. 小结

好了,本文介绍了刷盘里面同步刷盘和异步刷盘,同时也解析了添加同步刷盘请求的逻辑,下一篇文章就开始介绍这几种刷盘服务。





如有错误,欢迎指出!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/14450.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

了解传输层TCP协议

目录 一、TCP协议段格式 二、TCP原理 1.确认应答 2.超时重传 3.连接管理 建立连接 断开连接 4.滑动窗口 5.流量控制 6.拥塞控制 7.延时应答 8.捎带应答 9.面向字节流 10.TCP异常情况 TCP&#xff0c;即Transmission Control Protocol&#xff0c;传输控制协议。人如…

第 26 场 蓝桥入门赛

3.电子舞龙【算法赛】 - 蓝桥云课 问题描述 话说这年头&#xff0c;连舞龙都得电子化&#xff01;这不&#xff0c;蓝桥村的老程序员王大爷突发奇想&#xff0c;用LED灯带和一堆传感器鼓捣出了一条“电子舞龙”&#xff0c;它能根据程序指令在村里的广场上“翩翩起舞”。 广…

老游戏回顾:TL2

TL2是一部ARPG游戏&#xff0c;是TL的续作游戏&#xff0c;由位于美国西雅图的Runic Games开发&#xff0c;游戏于2012年9月20日上市&#xff0c;简体中文版于2013年4月10日在国内上市。 2有非常独特的艺术风格&#xff0c;这些在1中就已经形成&#xff0c;经过升级将使这款游…

前端实现 GIF 图片循环播放

前言 使用 img 加载 GIF 图片&#xff0c;内容只会播放一次&#xff0c;之后就会自动暂停&#xff1b; 通过定时器在一段时间后重新加载图片的方式&#xff0c;会导致浏览器内存不断增大&#xff0c;并且可能会有闪烁、卡顿的问题&#xff1b; ImageDecoder WebCodecs API 的…

1-2 面向对象编程方法

1.0 面向对象编程思维 在面向对象风格中&#xff0c;结构体被看做数据&#xff08;data&#xff09;&#xff0c;而操作数据的函数称作方法&#xff08;method&#xff09;。目前函数 和数据是分离的&#xff0c;函数并不直接操作数据&#xff0c;我们需要拿到函数返回的结果&a…

LVGL4种输入设备详解(触摸、键盘、实体按键、编码器)

lvgl有触摸、键盘、实体按键、编码器四种输入设备 先来分析一下这四种输入设备有什么区别 &#xff08;1&#xff09;LV_INDEV_TYPE_POINTER 主要用于触摸屏 用到哪个输入设备保留哪个其他的也是&#xff0c;保留触摸屏输入的任务注册&#xff0c;其它几种种输入任务的注册&…

让文物“活”起来,以3D数字化技术传承文物历史文化!

文物&#xff0c;作为不可再生的宝贵资源&#xff0c;其任何毁损都是无法逆转的损失。然而&#xff0c;当前文物保护与修复领域仍大量依赖传统技术&#xff0c;同时&#xff0c;文物管理机构和专业团队的力量相对薄弱&#xff0c;亟需引入数字化管理手段以应对挑战。 积木易搭…

如何通过 ESPN API 获取 NBA 球队的赛程表

对于 NBA 爱好者和开发者来说&#xff0c;通过 API 获取球队赛程表是一项非常实用的功能&#xff0c;尤其是如果你正在构建一个应用或网站&#xff0c;需要自动化获取比赛安排的情况下。今天&#xff0c;我将为大家介绍如何通过 ESPN 提供的 API 获取 NBA 球队的赛程表。 1. ES…

LM Studio 部署本地大语言模型

一、下载安装 1.搜索&#xff1a;lm studio LM Studio - Discover, download, and run local LLMs 2.下载 3.安装 4.更改成中文 二、下载模型(软件内下载) 1.选择使用代理&#xff0c;否则无法下载 2.更改模型下载目录 默认下载位置 C:\Users\用户名\.lmstudio\models 3.搜…

【开源免费】基于SpringBoot+Vue.JS智能学习平台系统(JAVA毕业设计)

本文项目编号 T 181 &#xff0c;文末自助获取源码 \color{red}{T181&#xff0c;文末自助获取源码} T181&#xff0c;文末自助获取源码 目录 一、系统介绍二、数据库设计三、配套教程3.1 启动教程3.2 讲解视频3.3 二次开发教程 四、功能截图五、文案资料5.1 选题背景5.2 国内…

【R语言】环境空间

一、环境空间的特点 环境空间是一种特殊类型的变量&#xff0c;它可以像其它变量一样被分配和操作&#xff0c;还可以以参数的形式传递给函数。 R语言中环境空间具有如下3个特点&#xff1a; 1、对象名称唯一性 此特点指的是在不同的环境空间中可以有同名的变量出现&#x…

黑马 Linux零基础快速入门到精通 笔记

初识Linux Linux简介 提及操作系统&#xff0c;我们可能最先想到的是windows和mac&#xff0c;这两者都属于个人桌面操作系统领域&#xff0c;而Linux则属于服务器操作系统领域。无论是后端软件、大数据系统、网页服务等等都需要运行在Linux操作系统上。 Linux是一个开源的操作…

Golang:精通sync/atomic 包的Atomic 操作

在本指南中&#xff0c;我们将探索sync/atomic包的细节&#xff0c;展示如何编写更安全、更高效的并发代码。无论你是经验丰富的Gopher还是刚刚起步&#xff0c;你都会发现有价值的见解来提升Go编程技能。让我们一起开启原子运算的力量吧&#xff01; 理解Go中的原子操作 在快…

网络安全ITP是什么 网络安全产品ips

DS/IPS都是专门针对计算机病毒和黑客入侵而设计的网络安全设备 1、含义不同 IDS &#xff1a;入侵检测系统&#xff08;发现非法入侵只能报警不能自己过滤&#xff09; 做一个形象的比喻&#xff1a;假如防火墙是一幢大楼的门锁&#xff0c;那么IDS就是这幢大楼里的监视系统…

高速网络的未来:零拷贝Zero-Copy架构

在当今高速发展的信息技术领域&#xff0c;追求极致的性能和效率是永恒的主题。而当我们深入探索计算机系统的内部奥秘时&#xff0c;一个令人瞩目的概念 —— 零拷贝&#xff08;Zero-Copy&#xff09;架构&#xff0c;逐渐走入我们的视野。想象一下&#xff0c;在数据如洪流般…

备忘录模式

引言 当我们和朋友下棋的时候&#xff0c;我们很多情况下会发现下了一步臭棋&#xff0c;这时候就会和朋友开玩笑要悔棋&#xff0c;即撤回刚刚下的一步棋。在程序中&#xff0c;很多时候也会出错&#xff0c;我们也希望程序可以恢复出错前的状态&#xff0c;这就需要备忘录模式…

Element UI 表单源码原理

&#x1f90d; 前端开发工程师、技术日更博主、已过CET6 &#x1f368; 阿珊和她的猫_CSDN博客专家、23年度博客之星前端领域TOP1 &#x1f560; 牛客高级专题作者、打造专栏《前端面试必备》 、《2024面试高频手撕题》 &#x1f35a; 蓝桥云课签约作者、上架课程《Vue.js 和 E…

使用deepseek快速创作ppt

目录 1.在DeekSeek生成PPT脚本2.打开Kimi3.最终效果 DeepSeek作为目前最强大模型&#xff0c;其推理能力炸裂&#xff0c;但是DeepSeek官方没有提供生成PPT功能&#xff0c;如果让DeepSeek做PPT呢&#xff1f; 有个途径&#xff1a;在DeepSeek让其深度思考做出PPT脚本&#xf…

深入理解小波变换:信号处理的强大工具

引言 在科学与工程领域&#xff0c;信号处理一直是关键环节&#xff0c;傅里叶变换与小波变换作为重要的分析工具&#xff0c;在其中发挥着重要作用。本文将深入探讨小波变换&#xff0c;阐述其原理、优势以及与傅里叶变换的对比&#xff0c;并通过具体案例展示其应用价值。 一…

Kafka 入门与实战

一、Kafka 基础 1.1 创建topic kafka-topics.bat --bootstrap-server localhost:9092 --topic test --create 1.2 查看消费者偏移量位置 kafka-consumer-groups.bat --bootstrap-server localhost:9092 --describe --group test 1.3 消息的生产与发送 #生产者 kafka-cons…