4、Rocketmq之存储原理

CommitLog ~ MappedFileQueue ~ MappedFile集合
在这里插入图片描述

正常情况下,RocketMQ支持消息体字节数最多为1个G。注意该消息体并不单单是消息体body。如果生产的消息其字节数超过1个G则该消息是无法被落盘处理的。因为没有一个MapperFile文件可以承载该消息所有的字节数。

1.AllocateMappedFileService

参考文章

异步初始化CommitLog文件时优先初始化nextFilePath、nextNextFilePath两个文件。

同时创建nextFilePath【/data/rocketmq/commitlog/00000000000000000000】、nextNextFilePath【/data/rocketmq/commitlog/00000000000000000050】两个文件是如何使用的呢?

  1. 优先返回nextFilePath,并添加到MappedFileQueue集合属性mappedFiles中。此时队列requestQueue为空。requestTable集合元素为nextNextFilePath【/data/rocketmq/commitlog/00000000000000000050】。
  2. 如果消息体的长度没有达到当前MapperFile中字节缓冲区capacity的大小,则不会创建新的MapperFile文件。
  3. 如果步骤2不成立,则创建新的nextFilePath【/data/rocketmq/commitlog/00000000000000000050】、nextNextFilePath【/data/rocketmq/commitlog/00000000000000000100】对应的MapperFile文件。但是由于requestTable集合不为空即存在nextFilePath对应的MapperFile文件【/data/rocketmq/commitlog/00000000000000000050】则删除并返回当前集合元素。
  4. 此时requestTable集合元素为nextNextFilePath【/data/rocketmq/commitlog/00000000000000000100】。MappedFileQueue中集合属性mappedFiles中存在00000000000000000000、00000000000000000050两个MappedFile文件。

如果真实发送的消息字节数没有超过当前字节缓冲区剩余空间则优先当前MapperFile文件处理。否则创建新的MapperFile文件。

public class AllocateMappedFileService extends ServiceThread {private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);private static int waitTimeOut = 1000 * 5;private ConcurrentMap<String, AllocateRequest> requestTable = new ConcurrentHashMap<>();private PriorityBlockingQueue<AllocateRequest> requestQueue = new PriorityBlockingQueue<>();private volatile boolean hasException = false;private DefaultMessageStore messageStore;public AllocateMappedFileService(DefaultMessageStore messageStore) {this.messageStore = messageStore;}// nextFilePath:CommitLog文件路径 /data/rocketmq/commitlog/00000000000000000000public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {int canSubmitRequests = 2;...AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;...boolean offerOK = this.requestQueue.offer(nextReq);AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;...boolean offerOK = this.requestQueue.offer(nextNextReq);...// 每次只是返回nextFilePath对应的MappedFile。此时 requestQueue 队列为空,requestTable集合中只是存在 nextNextFilePath 对应的MappedFile文件// 如果AllocateRequest result = this.requestTable.get(nextFilePath);messageStore.getPerfCounter().startTick("WAIT_MAPFILE_TIME_MS");// 阻塞等待 线程AllocateMappedFileService 初始化MapperFile文件。默认时间为5秒boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);messageStore.getPerfCounter().endTick("WAIT_MAPFILE_TIME_MS");if (!waitOK) {log.warn("create mmap timeout " + result.getFilePath() + " " + result.getFileSize());return null;} else {this.requestTable.remove(nextFilePath);// 返回nextFilePath对应的MappedFile文件,并添加到MappedFileQueue中集合属性mappedFiles中return result.getMappedFile();}}...public void run() {// 初始化 MapperFile文件 任务while (!this.isStopped() && this.mmapOperation()) {}}/***  通过 putRequestAndReturnMappedFile 生成的文件名异步创建本地文件*/private boolean mmapOperation() {boolean isSuccess = false;AllocateRequest req = null;try {req = this.requestQueue.take();//移除并返回元素,否则阻塞等待AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());...if (req.getMappedFile() == null) {long beginTime = System.currentTimeMillis();//创建对应对应大小、对应磁盘地址的本地文件。并且建立磁盘 & 内核映射关系MappedFile mappedFile = new DefaultMappedFile(req.getFilePath(), req.getFileSize());;...// pre write mappedFile 预热处理if (mappedFile.getFileSize() >= mappedFileSizeCommitLog && warmMapedFileEnable) {FlushDiskType flushDiskType = this.messageStore.getMessageStoreConfig().getFlushDiskType();MessageStoreConfig messageStoreConfig = this.messageStore.getMessageStoreConfig();int flushLeastPagesWhenWarmMapedFile = messageStoreConfig.getFlushLeastPagesWhenWarmMapedFile();mappedFile.warmMappedFile(flushDiskType,flushLeastPagesWhenWarmMapedFile);}req.setMappedFile(mappedFile);this.hasException = false;isSuccess = true;}} finally {if (req != null && isSuccess)req.getCountDownLatch().countDown();//初始化完毕释放锁}return true;}static class AllocateRequest implements Comparable<AllocateRequest> {// Full file pathprivate String filePath;private int fileSize;private CountDownLatch countDownLatch = new CountDownLatch(1);private volatile MappedFile mappedFile = null;public AllocateRequest(String filePath, int fileSize) {this.filePath = filePath;this.fileSize = fileSize;}...public CountDownLatch getCountDownLatch() {return countDownLatch;}public void setCountDownLatch(CountDownLatch countDownLatch) {this.countDownLatch = countDownLatch;}public MappedFile getMappedFile() {return mappedFile;}public void setMappedFile(MappedFile mappedFile) {this.mappedFile = mappedFile;}}
}

2.DefaultMappedFile

public class DefaultMappedFile extends AbstractMappedFile {public DefaultMappedFile(final String fileName, final int mappedFileSizeCommitLog) throws IOException {init(fileName, mappedFileSizeCommitLog);}private void init(final String fileName, final int mappedFileSizeCommitLog) throws IOException {this.fileName = fileName;this.mappedFileSizeCommitLog = mappedFileSizeCommitLog;this.file = new File(fileName);this.fileFromOffset = Long.parseLong(this.file.getName());boolean ok = false;UtilAll.ensureDirOK(this.file.getParent());try {this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, mappedFileSizeCommitLog);TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(mappedFileSizeCommitLog);TOTAL_MAPPED_FILES.incrementAndGet();ok = true;}finally {if (!ok && this.fileChannel != null) {this.fileChannel.close();}}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/91356.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

linux学习(自写shell)[11]

打印出提示信息获取用户键盘输入 cmd_line[NUM];用来保存完整的命令行 #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <sys/wait.h>#define NUM 1024 char cmd_line[NUM]; //shell int main() {wh…

Kali Linux是什么?它的主要用途是什么?

1. Kali Linux是什么&#xff1f; Kali Linux是一款基于Debian Linux的发行版&#xff0c;专注于网络安全和渗透测试。它由全球顶尖的安全专家和黑客社区维护开发&#xff0c;提供了丰富的工具和资源&#xff0c;用于测试网络、系统和应用程序的安全性。Kali Linux以其强大的功…

C#字符串占位符替换

using System;namespace myprog {class test{static void Main(string[] args){string str1 string.Format("{0}今年{1}岁&#xff0c;身高{2}cm&#xff0c;月收入{3}元&#xff1b;", "小李", 23, 177, 5000);Console.WriteLine(str1);Console.ReadKey(…

聊一下互联网开源变现

(点击即可收听) 互联网开源变现其实是指通过开源软件或者开放源代码的方式&#xff0c;实现收益或盈利。这种方式越来越被广泛应用于互联网行业 在互联网开源变现的模式中&#xff0c;最常见的方式是通过捐款、广告、付费支持或者授权等方式获利。 例如&#xff0c;有些开源软件…

轻量级自动化测试框架WebZ

一、什么是WebZ WebZ是我用Python写的“关键字驱动”的自动化测试框架&#xff0c;基于WebDriver。 设计该框架的初衷是&#xff1a;用自动化测试让测试人员从一些简单却重复的测试中解放出来。之所以用“关键字驱动”模式是因为我觉得这样能让测试人员&#xff08;测试执行人员…

MySQL — 索引

文章目录 索引索引结构 — B树与B树B树B树 聚簇索引与非聚簇索引聚簇索引非聚簇索引优缺点 覆盖索引与回表联合索引索引覆盖最左前缀匹配 索引 索引是对数据库表中一列或多列的值进行排序的一种结构。MySQL索引的建立对于MySQL的高效运行是很重要的&#xff0c;索引可以大大提…

剑指 Offer 40. 最小的k个数

题目描述 输入整数数组 arr &#xff0c;找出其中最小的 k 个数。例如&#xff0c;输入4、5、1、6、2、7、3、8这8个数字&#xff0c;则最小的4个数字是1、2、3、4。 示例 思路 方法1 采用未改进的快速排序 class Solution {public int[] getLeastNumbers(int[] arr, int k…

k8s ------存储卷(PV、PVC)

目录 一&#xff1a;为什么需要存储卷&#xff1f; 二&#xff1a;emptyDir存储卷 ​三&#xff1a;hostPath存储卷 四&#xff1a;nfs共享存储卷 五&#xff1a;PVC 和 PV 1、PVC 和 PV介绍 2、PV和PVC之间的相互作用遵循的生命周期 3、PV 的4 种状态 4、一个PV从创…

【Spring MVC】Spring MVC基于注解的程序开发

目录 一、什么是Spring MVC 二、Spring MVC项目的创建和使用 1、实现客户端和服务器端之间的连接 1.1、RequsestMapping注解 1.2、RequestMapper的简单使用 1.3、使用GetMapping和POSTMapping注解来实现HTTP连接 三、获取参数 1、实现获取单个参数 2、实现获取对象 3…

FPGA:RS编码仿真过程

FPGA&#xff1a;RS编码仿真过程 RS码是一种纠错性能很强的线性纠错码&#xff0c;能够纠正随机错误和突发错误。RS码是一种多进制BCH码&#xff0c;能够同时纠正多个码元错误。 之前已经记录了在MATLAB中进行rs编解码的过程&#xff0c;现在利用FPGA的IP核实现RS编码的过程&…

怎么自己制作动图表情包?在线gif生成的操作步骤

gif表情包在我们平时的生活里斗图的时候经常会用到&#xff0c;那么如何用图片制作gif&#xff08;https://www.gif.cn&#xff09;表情包呢&#xff1f;今天就分享一个在线gif生成的简单方法&#xff0c;利用gif制作工具将图片转gif动图&#xff0c;下面是详细的操作步骤。 打…

测试相关Liunx基础知识

Linux的历史和安装 基本常识 Liunx目录结果 常见

(el-Table)操作(不使用 ts):Element-plus 中Table 表格组件:多选修改成支持单选及表格相关样式的调整

Ⅰ、Element-plus 提供的 Table 表格组件与想要目标情况的对比&#xff1a; 1、Element-plus 提供 Table 组件情况&#xff1a; 其一、Element-ui 自提供的 Table 代码情况为(示例的代码)&#xff1a; // Element-plus 自提供的代码&#xff1a; // 此时是使用了 ts 语言环境…

echart图案例

效果 代码&#xff1a; index.vue <template><div class"pageBox"><div class"oneLineBox"><div class"fourColorImgBox"><div class"titleBox">企业风险四色图</div><div class"conte…

LVS-DR集群(一台LVS,一台CIP,两台web,一台NFS)的构建

一.构建环境 1.五台关闭防火墙&#xff0c;关闭selinux&#xff0c;拥有固定IP&#xff0c;部署有http服务的虚拟机&#xff0c;LVS设备下载ipvsadm工具&#xff0c;NFS 设备需要下载rpcbind和nfs-utils 2.实现功能 3.ipvsadm命令部分参数介绍 二.配置和测试 1.LVS设备 &…

互联网发展历程:从布线到无线,AC/AP的崭新时代

互联网的发展&#xff0c;一直在追求更便捷、更灵活的连接方式。在网络的早期&#xff0c;布线问题常常让人头疼。一项革命性的技术应运而生&#xff0c;那就是“无线AC/AP”。 布线问题的烦恼&#xff1a;繁琐的布线 早期网络的布线工作常常耗费时间和精力&#xff0c;尤其在大…

虫情测报灯——监测预警分析

KH-CQPest虫情测报灯是专为田间虫害统计、农林虫情测报而研制的设备&#xff0c;利用光、电、数控等技术实现自动诱虫、杀虫、虫体分散、拍照、运输、收集、排水等系统作业等功能&#xff0c;当有害虫出现时&#xff0c;会受到诱集光源的影响&#xff0c;自动飞扑撞向撞击屏&am…

互联网+AI+智慧工地管理平台源码(Spring Cloud +Vue)

基于微服务JavaSpring Cloud VueUniApp MySql开发的智慧工地管理源码&#xff0c;SaaS模式。 一、智慧工地概念 智慧工地就是互联网建筑工地&#xff0c;是将互联网的理念和技术引入建筑工地&#xff0c;然后以物联网、移动互联网技术为基础&#xff0c;充分应用BIM、大数据、…

基于Python科研论文绘制学习 - task1

绘制原则 必要性&#xff08;避免图多字少&#xff09; 易读性&#xff08;完整准确的标题、标签&#xff09; 一致性&#xff08;配图需要和上下文一致&#xff09; 尝试运行代码的时候出现了很多bug&#xff0c;基本都是围绕Scienceplots库的&#xff0c;在更新pip、pandas…