优质博文:IT-BLOG-CN
一个商业化消息队列的性能好坏,其文件存储机制设计是衡量一个消息队列服务技术水平和最关键指标之一。下面将从
Kafka
文件存储机制和物理结构角度,分析Kafka
是如何实现高效文件存储,及实际应用效果。Kafka
的基本存储单位是分区。在配置Kafka
的时候,管理员指定了一个用于存储分区的目录清单log.dirs
参数的值。
一、分区分配
创建主题时,Kafka
首先决定如何在broker
之间分配分区。假设有6
个broker
,打算创建一个包含10
个分区的主题。并且复制系数是3
,相当于30
个分区副本。在被分配到6
个broker
上时,要达到如下的目标:
【1】在broker
间平均分配分区副本。对于上述例子来说,就是要保证每个broker
可以分到5
个副本。
【2】确保每个分区的每个副本分布在不同的broker
上。
【3】如果为broker
指定了机架信息,那么尽可能把每个分区的副本分配到不同机架的broker
上。这样做是为了保证一个机架不可用不会导致整个分区不可用。
为了实现这个目标,我们先随机选择一个broker
(假设是2
),然后通过轮询给每个broker
分配分区来确定首领的位置。如果分区0
的首领在broker2
上,那么分区1
的首领就在broker3
上,以此类推。然后,从分区首领开始,以此分配跟随者副本。如分区0
首领在broker2
上,那么它的第一个副本会出现在broker3
上,第二个出现在 broker4
上。如果配置了机架信息,那么就不是按照数字顺序来选择broker
了,而是按照交替机架的方式来选择broker
。假设broker0
、broker1
、broker2
放在同一个机架,broker3
、broker4
、broker5
放在其他不同的机架。此时就不是按照0
到5
的顺序来选择broker
,而是按照0
,3
,1
,4
,2
,5
的顺序进行选择的。
二、文件管理
保留数据时Kafka
的一个基本特性,Kafka
不会一直保留数据,也不会等到所有消费者都读取了消息之后才删除消息。相反,Kafka
管理员为每个主题配置了数据保留期限,规定数据被删除之前可以保留多长时间,或者清理数据之前可以保留的数据量大小。 因为在一个大文件里查找和删除消息是很费时的,也很容易出错,所以我们把分区分成若干个片段。默认情况下,index
大小为10M
,每个片段log
包含1GB
或一周数据,以较小的那个为准。当前正在写入数据的片段叫做活跃片段,活跃片段永远不会被删除。
三、文件格式
我们把Kafka
的消息和偏移量保存在文件里。保存在磁盘上的数据格式与从生产者发送过来或者发送给消费者的消息格式是一样的。因为使用相同的消息格式进行磁盘存储和网络传输,Kafka
可以使用零复制技术给消费者发送消息,同时避免了对生产者已经压缩过的消息进行解压缩。除了键、值和偏移量外,消息里还包含了消息大小、校验和、消息格式版本号、压缩算法和时间戳。时间戳可以是生产者发送消息的时间,也可以是消息到达broker
的时间,这个是可配置的。如果生产者发送的是压缩过的消息,那么同一个批次的消息会被再压缩一次,被当做包装消息进行发送。下面是普通消息和包装消息图:
四、文件存储机制
【1】Broker
: 消息中间件处理结点,一个Kafka
节点就是一个Broker
,多个Broker
可以组成一个Kafka
集群。
【2】Topic
: 主题,如page view
日志、click
日志等都可以以Topic
的形式存在,Kafka
集群能够同时负责多个Topic
的分发。
【3】Partition
: Topic
物理上的分组,一个Topic
可以分为多个Partition
,每个Partition
是一个有序的队列。
【4】Segment
: Partition
物理上由多个Segment
组成。
【5】offset
: 每个 Partition
都由一系列有序的、不可变的消息组成,这些消息被连续的追加到Partition
中。Partition
中的每个消息都有一个连续的序列号叫做offset
,用于Partition
唯一标识一条消息。
分析过程分为以下4
个步骤:
【1】Topic
中Partition
存储分布: 假设Kafka
集群只有一个Broker
,xxx/message-folder
为数据文件存储根目录,在Kafka Broker
中server.properties
文件配置(参数log.dirs=xxx/message-folder
),例如创建2
个Topic
名称分别为report_push
、launch_info
,Partitions
数量都为partitions=4
(将一个Topic
分为4
个部分存储)存储路径和目录规则为:
xxx/message-folder
|--report_push-0
|--report_push-1
|--report_push-2
|--report_push-3
|--launch_info-0
|--launch_info-1
|--launch_info-2
|--launch_info-3
【2】Partiton
中文件存储方式: 每个Partion
(目录)相当于一个巨型文件被平均分配到多个大小相等Segment
(段)数据文件中。但每个段Segment file
消息数量不一定相等,这种特性方便old segment file
快速被删除。每个Partiton
只需要支持顺序读写就行了,Segment
文件生命周期由服务端配置参数决定。这样做的好处就是能快速删除无用文件,有效提高磁盘利用率。
【3】partiton
中segment
文件存储结构:segment file
由2
大部分组成,分别为index file
和data file
,此2
个文件成对出现,后缀".index"
和“.log”
分别表示为segment
索引文件、数据文件。segment
文件命名规则:partion
全局的第一个segment
从0
开始,后续每个segment
文件名为上一个segment
文件最后一条消息的offset
值。数值最大为64
位long
大小,19
位数字字符长度,没有数字用0填充。下面文件列表是笔者在Kafka broker
上做的一个实验,创建一个topicXXX
包含1 partition
,设置每个segment
大小为500MB
,并启动producer
向Kafka broker
写入大量数据,如下图所示segment
文件列表形象说明了上述2
个规则以及segment
中index<—->data file
对应关系物理结构如下:
索引文件存储大量元数据,数据文件存储大量消息,索引文件中元数据指向对应数据文件中message
的物理偏移地址。 其中以索引文件中元数据3,497
为例,依次在数据文件中表示第3
个message
(在全局partiton
表示第368772
个message
)、以及该消息的物理偏移地址为497
。从上述图3
了解到segment data file
由许多 message
组成,下面详细说明message
物理结构如下:
【参数说明】:
8 byte offset
:在Parition
(分区)内的每条消息都有一个有序的id
号,这个id
号被称为偏移offset
,它可以唯一确定每条消息在Parition
内的位置。即offset
表示Partiion
的第多少message
。
4 byte message size
:message
大小;
4 byte CRC32
:用crc32
校验message
;
1 byte “magic"
:表示本次发布Kafka
服务程序协议版本号;
1 byte “attributes"
:表示为独立版本、或标识压缩类型、或编码类型;
4 byte key length
:表示key
的长度,当key
为-1
时,K byte key
字段不填;
value bytes payload
:表示实际消息数据;
`index文件结构:
offset: 783932 position: 69483992
offset: 784323 position: 69543233
offset: 784565 position: 69589443
offset: 784932 position: 69623433
offset: 785355 position: 69658994
offset: 785894 position: 69704355
offset: 786389 position: 69738993
offset: 786584 position: 69784345
log
文件结构: 有个眼缘即可
offset: 784932 CreateTime:1598161852389 keysize: -1 valuesize: 15 sequence: 9884 baseOffset: 7043213 lastOffset: 784932 count: 1 baseSequence: 907
【4】在partition
中如何通过offset
查找message
: 例如读取offset=368776
的Message
,需要通过下面2
个步骤查找。
【第一步】查找segment file
: 上图为例,其中00000000000000000000.index
表示最开始的文件,起始偏移量offset
为0。第二个文件00000000000000368769.index
的消息量起始偏移量为368770 = 368769 + 1
。同样,第三个文件00000000000000737337.index
的起始偏移量为737338=737337 + 1
,其他后续文件依次类推,以起始偏移量命名并排序这些文件,只要根据offset
二分查找文件列表,就可以快速定位到具体文件。当offset=368776
时定位到00000000000000368769.index|log
;
【第二步】通过segment file
查找message
: 通过第一步定位到segment file
,当offset=368776
时,依次定位到00000000000000368769.index
的元数据物理位置和00000000000000368769.log
的物理偏移地址,然后再通过00000000000000368769.log
顺序查找直到offset=368776
为止。从上述图可知这样做的优点,segment index file
采取稀疏索引存储方式,它减少索引文件大小,通过mmap
可以直接内存操作,稀疏索引为数据文件的每个对应message
设置一个元数据指针,它比稠密索引节省了更多的存储空间,但查找起来需要消耗更多的时间。通过上述过程详细分析,我们就可以清楚认识到kafka
文件存储机制的奥秘。
五、Kafka文件实际运行效果
【实验环境】:Kafka
集群 = 由2
台虚拟机组成;CPU = 4
核;物理内存= 8GB
;网卡 = 千兆网卡;JVM HEAP = 4GB
;详细Kafka
服务端配置及其优化请参考:Kafka server.properties
配置详解:
从上述图可以看出,Kafka
运行时很少有大量读磁盘的操作,主要是定期批量写磁盘操作,因此操作磁盘很高效。这跟Kafka
文件存储中读写message
的设计是息息相关的。Kafka
中读写message
有如下特点:写message
,消息从java
堆转入page cache
即物理内存。由异步线程刷盘,消息从page cache
刷入磁盘。读message
消息直接从page cache
转入socket
发送出去。当从page cache
没有找到相应数据时,此时会产生磁盘IO
,从磁盘Load
消息到page cache
,然后直接从socket
发出去。
六、Kafka 中的 Partition 和 Offset
【1】Log
机制: 说到分区,就要说Kafka
对消息的存储,首先,kafka
是通过log
(日志)来记录消息发布的。每当产生一个消息,Kafka
会记录到本地的log
文件中,这个log
和我们平时的log
有一定的区别。这里可以参考一下The Log
,不多解释。这个log
文件默认的位置在config/server.properties
中指定的,默认的位置是log.dirs=/tmp/kafka-logs
,Linux
不用说,windows
的话就在你对应磁盘的根目录下。
分区Partition
: Kafka
是为分布式环境设计的,因此如果日志文件,其实也可以理解成消息数据库,放在同一个地方,那么必然会带来可用性的下降,一挂全挂,如果全量拷贝到所有的机器上,那么数据又存在过多的冗余,而且由于每台机器的磁盘大小是有限的,所以即使有再多的机器,可处理的消息还是被磁盘所限制,无法超越当前磁盘大小。因此有了Partition
的概念。Kafka
对消息进行一定的计算,通过hash
来进行分区。这样,就把一份log
文件分成了多份。如上面的分区读写日志图,分成多份以后,在单台Broker
上,比如快速上手中,如果新建Topic
的时候,我们选择replication-factor 1 partitions 2
,那么在log
目录里,我们会看到test-0
目录和test-1
目录就是两个分区了。你可能会想,这没啥区别呀。注意,当有了多个broker
之后,这个意义就存在了。这里上一张图:
【2】Kafka
分布式分区存储: 这是一个Topic
包含4
个Partition
,2 Replication
(拷贝),也就是说全部的消息被放在了4
个分区存储,为了高可用,将4
个分区做了2份冗余,然后根据分配算法。将总共8
份数据,分配到Broker
集群上。结果就是每个Broker
上存储的数据比全量数据要少,但每份数据都有冗余,这样,一旦一台机器宕机,并不影响使用。比如图中的Broker1
,宕机了那么剩下的三台Broker
依然保留了全量的分区数据。所以还能使用,如果再宕机一台,那么数据不完整了。当然你可以设置更多的冗余,比如设置了冗余是4
,那么每台机器就有了0123
完整的数据,宕机几台都行。需要在存储占用和高可用之间做衡量。至于宕机后,zookeeper
会选出新的 partition leader
。
偏移offset
: 上一段说了分区,分区就是一个有序的,不可变的消息队列。新来的commit log
持续往后面加数据。这些消息被分配了一个下标(或者偏移),就是offset
,用来定位这一条消息。消费者消费到了哪条消息,是保持在消费者这一端的。消息者也可以控制,消费者可以在本地保存最后消息的offset
,并间歇性的向zookeeper
注册offset
。也可以重置offset
。
如何通过offset
算出分区:其实Partition
存储的时候,又分成了多个segment
(段),然后通过一个index
索引,来标识第几段。这里先可以去看一下本地log
目录的分区文件夹。在我这里,test-0
这个分区里面,会有一个index
文件和一个log
文件,对于某个指定的分区,假设每5
个消息作为一个段大小,当产生了10
条消息的情况下,目前有会分段。
0.index
(表示这里index
是对0-4
做的索引)、5.index
(表示这里index
是对5-9
做的索引)、10.index
(表示这里index
是对10-15
做的索引,目前还没满) 和0.log
、5.log
、10.log
。当消费者需要读取offset=8
的时候,首先kafka
对index
文件列表进行二分查找,可以算出应该是在5.index
对应的log
文件中,然后对对应的5.log
文件,进行顺序查找,5->6->7->8
,直到顺序找到8
就好了。
七、索引
消费者可以从Kafka
的任意可用偏移量位置开始读取消息,假设消费者要读取从偏移量100
开始的1MB
消息,那么Broker
必须立即定位到偏移量100
,为了帮组broker
更快地定位到指定的偏移量,Kafka
为每个分区维护一个索引。索引把偏移量映射到片段文件和偏移量在文件里的位置。索引也被分成片段,所以再删除消息时,也可以删除相应的索引。Kafka
不维护索引的校验和。如果索引出现损坏,Kafka
会通过重新读取消息并录制偏移量和位置来重新生成索引。如果有必要,管理员是可以删除索引的,这样做是绝对安全的,Kafka
会自动重新生成这些索引。
八、Kafka高效文件存储设计特点
【1】Kafka
把topic
中一个parition
大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。
【2】通过索引信息可以快速定位message
和确定response
的最大大小。
【3】通过index
元数据全部映射到memory
,可以避免segment file
的IO
磁盘操作。
【4】通过索引文件稀疏存储,可以大幅降低index
文件元数据占用空间大小。