深度解析RocketMq源码-IndexFile

1.绪论

在工作中,我们经常需要根据msgKey查询到某条日志。但是,通过前面对commitLog分析,producer将消息推送到broker过后,其实broker是直接消息到达broker的先后顺序写入到commitLog中的。我们如果想根据msgKey检索一条消息无疑大海捞针,所以们需要像数集一样建立一个目录,我们其实可以想到的是构建一个Map,key存储msgKey,value存储msg在commitLog中的物理偏移量。而这个目录其实就是indexFile。

2.indexFile的组成和原理

indexFile主要由两部分组成,分别是indexFile文件头和index的文件内容。

2.1 indexFile文件头 - IndexHeader 

indexHeader占据40个字节,其中最重要的是他记录了整个索引文件的最开始插入的索引的时间和最后一条数据插入的时间,主要是为了支持根据时间进行范围搜索。以及第一条和最后一条日志的索引位置。还有一个就是已经插入了多少条索引IndexCount。

public class IndexHeader {
//index文件头占4个字节public static final int INDEX_HEADER_SIZE = 40;private static int beginTimestampIndex = 0;private static int endTimestampIndex = 8;private static int beginPhyoffsetIndex = 16;private static int endPhyoffsetIndex = 24;private static int hashSlotcountIndex = 32;private static int indexCountIndex = 36;private final ByteBuffer byteBuffer;//开始的时间戳private final AtomicLong beginTimestamp = new AtomicLong(0);//结束时间戳private final AtomicLong endTimestamp = new AtomicLong(0);//开始的物理偏移量private final AtomicLong beginPhyOffset = new AtomicLong(0);//结束的物理偏移量private final AtomicLong endPhyOffset = new AtomicLong(0);//hash槽的数量private final AtomicInteger hashSlotCount = new AtomicInteger(0);//index的数量private final AtomicInteger indexCount = new AtomicInteger(1);
}

2.2 indexFile的组成

idnexFile的内容包括:

1. 40个字节的indexFile头

2. 4* 500w个字节hash槽,每个槽记录的其实是:根据key取hash值%槽数在当前hash槽的索引的序号(也即当前有多少条索引)

3. 20*2000w个自己的索引数,每条索引20个字节,包含4个字节索引key的hash值+8个字节的物理偏移量+4个字节的当前索引的插入时间距离该索引文件第一条索引的插入时间的差值+4个字节的上一个在当前hash槽的索引的序号。

我们可以画图来描述一下:

e5b98bd4baf04c1c9393127f0a93ebca.png

可以看出idnexFile是采用链地址法解决hash冲突的,每个索引存储有上一条拥有相同hash值索引的index值,相当于通过链表将这些hash冲突的索引串起来。

public class IndexFile {private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);//一个hash槽的大小为4个字节private static int hashSlotSize = 4;//一条索引的大小为20字节private static int indexSize = 20;private static int invalidIndex = 0;//hash槽的数量private final int hashSlotNum;//index的总数量private final int indexNum;//index也是存储在mappedFile中的private final MappedFile mappedFile;private final MappedByteBuffer mappedByteBuffer;//index文件的头private final IndexHeader indexHeader;public IndexFile(final String fileName, final int hashSlotNum, final int indexNum,final long endPhyOffset, final long endTimestamp) throws IOException {//文件总大小 = 头部所占40个字节 + hash槽数量(默认为500w) * 4个字节 + index数量 * 20个字节int fileTotalSize =IndexHeader.INDEX_HEADER_SIZE + (hashSlotNum * hashSlotSize) + (indexNum * indexSize);//新建mappedFilethis.mappedFile = new MappedFile(fileName, fileTotalSize);//获取到与文件建立映射关系的bufferthis.mappedByteBuffer = this.mappedFile.getMappedByteBuffer();//hash槽数量this.hashSlotNum = hashSlotNum;//索引文件的数量this.indexNum = indexNum;ByteBuffer byteBuffer = this.mappedByteBuffer.slice();//index文件的头部this.indexHeader = new IndexHeader(byteBuffer);if (endPhyOffset > 0) {//够级整个索引文件的开始的物理偏移量和结束的偏移量this.indexHeader.setBeginPhyOffset(endPhyOffset);this.indexHeader.setEndPhyOffset(endPhyOffset);}if (endTimestamp > 0) {//够级整个索引文件的开始时间戳和结束时间戳this.indexHeader.setBeginTimestamp(endTimestamp);this.indexHeader.setEndTimestamp(endTimestamp);}}
}

3.向indexFile插入一条索引数据

主要的步骤如下:

1.获取msgKey的hash值;

2.通过hash值对总的hash槽数取模得到对应第几个槽;

3.40个字节的index头大小+第几个槽*4个字节寻得hash槽的物理地址;

4.40个字节的索引头大小+hash槽总数*4个字节+现在存储了多少条索引*20个字节得到最新一条数据写入的物理偏移量;

5.分别写入索引内容:hash值,commitLog的物理偏移量,距离第一条索引的时间戳+上一条指向同一个hash槽的索引的序号(也即当前hash槽中存储的值);

6.将最新一条的索引序号写入到hash槽中。

    public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {if (this.indexHeader.getIndexCount() < this.indexNum) {//1.获取msgKey的hash值int keyHash = indexKeyHashMethod(key);//2.通过hash值对总的hash槽数取模得到对应第几个槽int slotPos = keyHash % this.hashSlotNum;//3.40个字节的index头大小+第几个槽*4个字节寻得hash槽的物理地址int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;try {//获取到上一个hash槽的所指向的索引序号int slotValue = this.mappedByteBuffer.getInt(absSlotPos);if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {slotValue = invalidIndex;}//获取当前索引与第一条索引的差值long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();timeDiff = timeDiff / 1000;if (this.indexHeader.getBeginTimestamp() <= 0) {timeDiff = 0;} else if (timeDiff > Integer.MAX_VALUE) {timeDiff = Integer.MAX_VALUE;} else if (timeDiff < 0) {timeDiff = 0;}//40个字节的索引头大小+hash槽总数*4个字节+现在存储了多少条索引*20个字节得到最新一条数据写入的物理偏移量int absIndexPos =IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize+ this.indexHeader.getIndexCount() * indexSize;//分别写入索引内容:hash值,commitLog的物理偏移量,距离第一条索引的时间戳+上一条指向同一个hash槽的索引的序号this.mappedByteBuffer.putInt(absIndexPos, keyHash);this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);//将最新一条的索引序号写入到hash槽中this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());//更新idnex中的最后一条索引的时间戳和物理偏移量if (this.indexHeader.getIndexCount() <= 1) {this.indexHeader.setBeginPhyOffset(phyOffset);this.indexHeader.setBeginTimestamp(storeTimestamp);}if (invalidIndex == slotValue) {this.indexHeader.incHashSlotCount();}   //增加indexheader索引序号this.indexHeader.incIndexCount();this.indexHeader.setEndPhyOffset(phyOffset);this.indexHeader.setEndTimestamp(storeTimestamp);return true;} catch (Exception e) {log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);}} else {log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()+ "; index max num = " + this.indexNum);}return false;}

4.从indexFile中读取一条索引数据

1.获取索引key的hash值;

2.hash值对槽总数取模获得第几个槽;

3.40个字节的index头大小+第几个槽*4个字节寻得hash槽的物理地址;

4.从槽中读取到该槽所指向的最新的一条索引序号;

5.40个字节的索引头大小+hash槽总数*4个字节+hash槽中存储的索引序号*20个字节得到最新一条数据写入的物理偏移量;

6.如果hash值相等,并且时间匹配,证明匹配到数据,跳出循环;

7.如果不匹配,便根据链表寻找到拥有相同hash值并且时间匹配的日志;

    public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,final long begin, final long end) {if (this.mappedFile.hold()) {//获取索引key的hash值int keyHash = indexKeyHashMethod(key);//hash值对槽总数取模获得第几个槽int slotPos = keyHash % this.hashSlotNum;//40个字节的index头大小+第几个槽*4个字节寻得hash槽的物理地址int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;try {//从槽中读取到该槽所指向的最新的一条索引序号int slotValue = this.mappedByteBuffer.getInt(absSlotPos);if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()|| this.indexHeader.getIndexCount() <= 1) {} else {for (int nextIndexToRead = slotValue; ; ) {if (phyOffsets.size() >= maxNum) {break;}// 40个字节的索引头大小+hash槽总数*4个字节+hash槽中存储的索引序号*20个字节得到最新一条数据写入的物理偏移量int absIndexPos =IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize+ nextIndexToRead * indexSize;//获取索引的hash值int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);//获取到该索引的物理偏移量long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);//获取到时间戳差值long timeDiff = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);//获取到拥有相同槽数的上一条索引序号int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);if (timeDiff < 0) {break;}timeDiff *= 1000L;long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;boolean timeMatched = (timeRead >= begin) && (timeRead <= end);//如果hash值相等,并且时间匹配,证明匹配到数据,跳出循环if (keyHash == keyHashRead && timeMatched) {phyOffsets.add(phyOffsetRead);}//如果上一条索引非法,证明已经到达链表头部,跳出循环,证明该条索引就是需要寻找的索引if (prevIndexRead <= invalidIndex|| prevIndexRead > this.indexHeader.getIndexCount()|| prevIndexRead == nextIndexToRead || timeRead < begin) {break;}nextIndexToRead = prevIndexRead;}}} catch (Exception e) {log.error("selectPhyOffset exception ", e);} finally {this.mappedFile.release();}}}

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/362715.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++精解【8】

文章目录 运算,- 加减法* / 乘除法逐元 乘法逐元 除法逐元综合运算矩阵乘法与加减法 转置、共轭、伴随矩阵点乘法,叉积 运算 ,- 加减法 逐元加减法 #include <iostream> #include "e:/eigen/Eigen/Dense" using namespace std;int main() {Eigen::Matrix2d …

IDEA版本推荐

推荐版本&#xff1a; IDEA 2024.1.4 下载链接&#xff1a;IDEA下载 &#xff08;下载时可以往下拖&#xff0c;选到自己想要的版本哦&#xff09; 本人由于项目开发需要&#xff0c;陆续用过几个版本的IDEA&#xff0c;包括&#xff1a; IDEA 2020.2.4 。这是在看韩顺平老师…

基于STM32的智能水质监测系统

目录 引言环境准备智能水质监测系统基础代码实现&#xff1a;实现智能水质监测系统 4.1 数据采集模块4.2 数据处理与分析4.3 控制系统实现4.4 用户界面与数据可视化应用场景&#xff1a;水质管理与优化问题解决方案与优化收尾与总结 1. 引言 智能水质监测系统通过使用STM32嵌…

2毛钱的SOT23-5封装28V、1.5A、1.2MHz DCDC转换器用于LCD偏置电源和白光LED驱动等MT3540升压芯片

前言 之前发了一个TI的BOOST升压芯片&#xff0c;用于LCD偏置电压或LED驱动&#xff0c;请访问以下链接。 6毛钱SOT-23封装28V、400mA 开关升压转换器&#xff0c;LCD偏置电源和白光LED应用芯片TPS61040 国产半导体厂家发展迅猛&#xff0c;今天推荐一个公司带“航天”的升压…

UE学习笔记--UE项目,IDE提示项目被卸载的解决方案

前言 我用的 IDE 是 Rider。 我不小心把 Intertmediate 文件夹给删掉了。 然后进入 Rider&#xff0c;报了一些错&#xff0c;然后编译也有问题。启动不了 UE。 解决办法 右键你的 uproject&#xff0c;点击 Generate visual studio project files。 让它重新生成对应的文件…

uniapp开发H5、手机APP、微信小程序 可拖动菜单按钮

ml-fab 插件地址&#xff1a;https://ext.dcloud.net.cn/plugin?id18909 1、可拖拽悬浮按钮 ml-fab&#xff0c;支持自定义插槽&#xff0c;点击可展开一个图标按钮菜单&#xff0c;可随意拖拽。 2、支持自定义插槽&#xff0c;可实现自定义配置。 3、操作简单易上手。 ml-f…

电脑开机之后,键盘鼠标需要重新插拔才能正常使用?

前言 小白平时修电脑修得多&#xff0c;总是会遇到各种各样的奇葩问题。这不&#xff0c;又有一位小伙伴来咨询&#xff1a;电脑开机之后&#xff0c;键盘鼠标都不能用&#xff0c;需要重新插拔一下才能正常使用。 啧啧啧&#xff0c;真的是很奇怪的问题&#xff0c;基本上没见…

华为HCIP Datacom H12-821 卷16

1.判断题 在 VRRP 中,当设备状态变为 Master 后,,会立刻发送免费 ARP 来刷新下游设备的 MAC 表项,从而把用户的流量引到此台设备上来 A、对 B、错 正确答案: A 解析: 2.判断题 路由选择工具 route- policy 能够基于预先定义的条件来进行过滤并设置 BGP

防火墙双双机热备

设备直路部署&#xff0c;上下行连接交换机 如 图所示&#xff0c;DeviceA和DeviceB的业务接口都工作在三层&#xff0c;上下行分别连接二层交换机。上行交换机连接运营商的接入点&#xff0c;运营商为企业分配的IP地址为1.1.1.3和1.1.1.4。现在希望DeviceA和DeviceB以负载分担…

prometheus+grafana搭建监控系统

1.prometheus服务端安装 1.1下载包 使用wget下载 &#xff08;也可以直接去官网下载包Download | Prometheus&#xff09; wget https://github.com/prometheus/prometheus/releases/download/v2.44.0/prometheus-2.44.0.linux-amd64.tar.gz1.2解压 tar xf prometheus-2.44…

面试突击:ArrayList源码详解

本文已收录于&#xff1a;https://github.com/danmuking/all-in-one&#xff08;持续更新&#xff09; 前言 哈喽&#xff0c;大家好&#xff0c;我是 DanMu。ArrayList 是我们日常开发中不可避免要使用到的一个类&#xff0c;并且在面试过程中也是一个非常高频的知识点&#…

02逻辑代数与硬件描述语言基础

2.1 逻辑代数&#xff08;简单逻辑的运算&#xff09; 2.2 逻辑函数的卡诺图&#xff08;从图论的角度&#xff09;化简法 2.3 硬件描述语言Verilog HDL基础&#xff08;研究生阶段才用得到&#xff09; 要求&#xff1a; 1、熟悉逻辑代数常用基本定律、恒等式和规则。 2、掌握…

华为200人园区网有线和无线

实验描述&#xff1a; 1 内网有有线业务、内部无线、外部无线三种业误。 2 内网服务器配置静态IP&#xff0c;网关192.168.108.1。 3 sW1和R1之间使用v1an200 192.168.200.9/30 互联。 4 R2向运营商申请企业宽带并获得了1个固定公网IP&#xff1a; 200.1.1.1 子网掩码 255.255.…

Bad owner or permissions on C:\\Users\\username/.ssh/config > 过程试图写入的管道不存在。

使用windows连接远程服务器出现Bad owner or permissions 错误 问题&#xff1a; 需要修复文件权限 SSH 配置文件应具有受限权限以防止未经授权的访问 确保只有用户对该.ssh/config文件具有读取权限 解决方案&#xff1a; 在windows下打开命令行&#xff0c;通过以下命令打开文…

一个去掉PDF背景水印的思路

起因 昨天测试 使用“https://github.com/VikParuchuri/marker” 将 pdf 转 Markdown的过程中&#xff0c;发现转换后的文件中会保护一些背景图片&#xff0c;是转换过程中&#xff0c;程序把背景图识别为了内容。于是想着怎么把背景图片去掉。 背景水印图片的特征 我这里拿…

仓库管理系统14--仓库设置

1、添加窗体 <UserControl x:Class"West.StoreMgr.View.StoreView"xmlns"http://schemas.microsoft.com/winfx/2006/xaml/presentation"xmlns:x"http://schemas.microsoft.com/winfx/2006/xaml"xmlns:mc"http://schemas.openxmlformats.…

[C#]基于opencvsharp实现15关键点人体姿态估计

数据集 正确选择数据集以对结果产生适当影响也是非常必要的。在此姿势检测中&#xff0c;模型在两个不同的数据集即COCO关键点数据集和MPII人类姿势数据集上进行了预训练。 1. COCO&#xff1a;COCO关键点数据集是一个多人2D姿势估计数据集&#xff0c;其中包含从Flickr收集的…

Ubuntu20.04使用Samba

目录 一、Samba介绍 Samba 的主要功能 二、启动samba 三、主机操作 四、Ubuntu与windows系统中文件互联 五、修改samba路径 一、Samba介绍 Samba 是一个开源软件套件&#xff0c;用于在 Linux 和 Unix 系统上实现 SMB&#xff08;Server Message Block&#xff09;协议…

leetcode-19-回溯

引自代码随想录 [77]组合 给定两个整数 n 和 k&#xff0c;返回 1 ... n 中所有可能的 k 个数的组合。 示例: 输入: n 4, k 2 输出: [ [2,4], [3,4], [2,3], [1,2], [1,3], [1,4]] 1、大致逻辑 k为树的深度&#xff0c;到叶子节点的路径即为一个结果 开始索引保证不重复…

MySQL高级-索引-使用规则-前缀索引

文章目录 1、前缀索引2、前缀长度3、查询表数据4、查询表的记录总数5、计算并返回具有电子邮件地址&#xff08;email&#xff09;的用户的数量6、从tb_user表中计算并返回具有不同电子邮件地址的用户的数量7、计算唯一电子邮件地址&#xff08;email&#xff09;的比例相对于表…