目录
日志存储设计
1. 日志存储的目录结构
2. 日志内容格式设计
3. 日志索引设计
4. 设计优势
消息写入流程
示例
流程图
消息读取流程
示例
关键设计细节
流程图
日志存储设计
Kafka的日志存储是其高吞吐、持久化能力的核心设计,其结构包含目录组织、消息格式、索引设计三部分。
1. 日志存储的目录结构
- 分区目录:每个Topic分区对应一个目录,命名格式为
<topic>-<partition>
,例如orders-0
。 - Segment文件:每个分区目录下包含多个日志段(Segment),每个段由两个文件组成:
-
- .log文件:存储实际消息(如
00000000000000000000.log
),文件名基于基准位移(Base Offset),即该段第一条消息的Offset。 - .index文件:稀疏索引文件(如
00000000000000000000.index
),记录Offset到物理位置的映射。
- .log文件:存储实际消息(如
- 活跃段(Active Segment):当前正在写入的Segment,文件名格式为
<nextOffset>.log
,例如新段的第一条消息Offset为100,则文件名为00000000000000000100.log
。
源码关键类:
Log
:管理分区的所有Segment(ConcurrentNavigableMap<Long, LogSegment>
)。LogSegment
:封装单个Segment的.log和.index文件操作。
2. 日志内容格式设计
Kafka消息以**批次(RecordBatch)**为单位存储,每个批次包含多条消息,减少I/O开销。单条消息格式如下:
RecordBatch Header:Base Offset (int64)Length (int32)Partition Leader Epoch (int32)Magic (int8)CRC (int32)Attributes (int16)Last Offset Delta (int32)First Timestamp (int64)Max Timestamp (int64)Producer ID (int64)Producer Epoch (int16)Base Sequence (int32)Records Count (int32)Record (多条):Length (varint)Attributes (int8)Timestamp Delta (varint)Offset Delta (varint)Key (varint bytes)Value (varint bytes)Headers (varint array)
特点:
- 紧凑二进制格式:通过变长类型(varint)和位移差值(Delta)压缩空间。
- 批量写入:多个Record打包成RecordBatch,减少网络和磁盘I/O。
- 幂等与事务支持:通过
Producer ID
、Epoch
、Sequence
字段实现。
3. 日志索引设计
- 稀疏索引(Sparse Index):.index文件不记录每条消息的Offset,而是每隔一定消息量(如4KB)记录一个索引项。
- 索引项结构:每个索引项占8字节,包含两个字段:
-
- Relative Offset:相对于基准位移的差值(4字节)。
- Physical Position:对应.log文件中的物理位置(4字节)。
索引查询流程(源码见OffsetIndex
类):
- 根据目标Offset,通过二分查找找到最近的索引项。
- 从.log文件的对应位置开始顺序扫描,直到找到目标消息。
4. 设计优势
- 高效查询:稀疏索引+顺序扫描平衡了索引大小与查询速度。
- 快速扩容:Segment文件按基准Offset分割,易于清理旧数据和扩展新文件。
- 高吞吐:批量写入、页缓存、零拷贝等技术最大化磁盘和网络效率。
通过这种设计,Kafka在保证消息持久化的同时,实现了百万级TPS的吞吐能力。
消息写入流程
示例
生产者发送一条消息{"order_id": 1001}
到Topic orders
的Partition 0。
写入流程:
- 选择分区:根据
Partitioner
确定消息写入orders-0
。 - 追加到活跃段:
-
- Broker将消息封装为RecordBatch,追加到当前活跃段(如
00000000000000001000.log
)。 - 更新对应的索引文件
00000000000000001000.index
(每隔4KB或一定时间写入索引项)。
- Broker将消息封装为RecordBatch,追加到当前活跃段(如
- 刷盘策略:根据
log.flush.interval.messages
或log.flush.interval.ms
决定何时将数据从页缓存刷到磁盘。
源码关键方法:
Log.append()
:处理消息追加。LogSegment.append()
:写入.log文件并更新索引。
流程图
消息读取流程
示例
消费者请求读取Offset为1005的消息。
1. 消费者发送FetchRequest:请求包含目标Topic、Partition和Offset(例如Offset=1005)。
2. Broker验证Offset有效性:
- 检查Offset是否在
LogStartOffset
(日志起始位移)和HighWatermark
(已提交消息的最大位移)之间。 - 若Offset无效(如小于LogStartOffset或大于HighWatermark),返回错误
OFFSET_OUT_OF_RANGE
。
- 定位Segment文件:
-
- Broker根据Offset值,在分区的
Log
对象中通过二分查找找到对应的LogSegment
。 - 具体逻辑:在
LogSegments
(一个有序的ConcurrentNavigableMap
)中调用floorEntry(Offset)
,找到基准Offset ≤ 目标Offset的Segment。 - 示例:Offset=1005 → 找到基准Offset=1000的Segment(文件
00000000000000001000.log
)。
- Broker根据Offset值,在分区的
- 加载索引文件:
-
- 打开对应Segment的
.index
文件(稀疏索引),通过内存映射(MappedByteBuffer
)加载到内存。
- 打开对应Segment的
- 解析Offset并查询索引:
-
- 计算相对Offset:
目标Offset - 基准Offset
(如1005 - 1000 = 5)。 - 在
.index
文件中二分查找最接近且≤相对Offset的索引项。 - 示例:索引项可能为
[4 → 4096]
(相对Offset=4,对应.log文件的物理位置4096字节)。
- 计算相对Offset:
- 定位消息物理位置:
-
- 根据索引项中的物理位置(4096),从
.log
文件的该位置开始顺序扫描。 - 逐条解析消息头中的Offset,直到找到目标Offset=1005的消息。
- 根据索引项中的物理位置(4096),从
- 操作系统缓存与文件缓存:
-
- 页缓存(Page Cache):Kafka依赖操作系统的页缓存机制,
.log
和.index
文件会被缓存到内存,后续读取直接从内存访问,避免磁盘I/O。 - 内存映射(Memory-Mapped Files):索引文件通过
MappedByteBuffer
映射到内存,加速索引查询。
- 页缓存(Page Cache):Kafka依赖操作系统的页缓存机制,
- 返回消息数据:
-
- 将找到的消息数据封装为
FetchResponse
返回给消费者,消息内容可能直接从页缓存中读取(零拷贝优化)
- 将找到的消息数据封装为
关键设计细节
1. Offset解析与索引查询
- 相对Offset计算:
索引文件中存储的是相对于Segment基准Offset的差值(如基准Offset=1000,索引项中的相对Offset=5 → 实际Offset=1005)。 - 稀疏索引优化:
索引文件仅记录部分Offset(如每隔4KB),通过二分查找快速定位到近似位置,再顺序扫描少量数据。
2. 文件定位与缓存机制
- Segment文件定位:
LogSegments
使用ConcurrentNavigableMap
维护所有Segment,floorEntry(Offset)
方法通过跳表(Skip List)快速查找。 - 操作系统缓存:
-
- 页缓存:Kafka的.log文件读写完全依赖操作系统的页缓存,消息读取时直接从内存访问,避免磁盘寻址。
- 内存映射文件:.index文件通过
FileChannel.map()
映射到内存,索引查询几乎无磁盘I/O。
3. 顺序扫描优化
- 批量读取:从.log文件的物理位置开始,按块(如8KB)读取数据,减少小文件I/O次数。
- 零拷贝(Zero-Copy):
消息数据通过FileChannel.transferTo()
直接从页缓存发送到网络Socket,无需经过用户态(源码见FileRecords.writeTo()
)。
流程图