一、持久化
1.1 持久化对象
rabbitmq的持久化分为三个部分:
- 交换器的持久化。
- 队列的持久化。
- 消息的持久化。
1.1.1 交换器持久化
- 交换器的持久化是通过在声明交换器时, 指定Durability参数为durable实现的。
- 若交换器不设置持久化,在rabbitmq服务重启之后,相关的交换器元数据会丢失,但消息不会丢失,只是不能将消息发送到这个交换器中。
所以在声明交换器时,都要设置持久化。 - 在web监控创建时,默认也是持久化模式,指定持久化模式带有标识“D”。
springboot监听器,实现交换器持久化示例
1.1.2 队列持久化
- 队列的持久化是通过在声明队列时, 指定Durability参数为durable实现的。
- 若队列不设置持久化,在rabbitmq服务重启之后,相关队列的元数据和消息数据同时丢失。
- 若队列设置持久化,只能保证队列本身的元数据不会因异常情况而丢失,但是并不能保证内部所存储的消息不会丢失。要确保消息不会丢失,需要将消息设置为持久化。
- 在web监控创建时,默认也是持久化模式,指定持久化模式带有标识“D”。
springboot监听器,实现队列持久化示例
1.1.3 消息持久化
消息的持久化可以通过消息的投递模式来实现,属于代码层面上的。可以控制每一条消息是否久化。
但是将所有消息都设置为持久化,会严重影响rabbitmq服务器性能,写入磁盘的速度比写入内存的速度慢得不只一点点。所以对于可靠性不是那么高的消息可以不采用持久化处理以提高整体的吞吐量。在选择是否要将消息持久化时,需要在可靠性和吐吞量之间做一个权衡。
springboot代码设置消息的持久化示例
1.2 总结要点
- 交换器、队列、消息都可以设置是否持久化。交换器和队列持久化的含义是元数据持久化。消息持久化的含义是消息本身持久化。
将交换器、队列、消息都设置了持久化之后能百分之百保证数据不丢失吗?答案是不能
- 从消费者来说,如果在订阅消费队列时将 autoAck 参数设置为 true,那么当消费者接收到相关消息之后,还没来得及处理就宕机了,这样也算数据丢失。这种情况很好解决,将autoAck 参数设置为 false,并进行手动确认。
- 在持久化的消息正确存入rabbitmq之后,还需要有一段时间(虽然很短,但是不可忽视) 才能存入磁盘之中。如果在这段时间内rabbitmq服务节点发生了宕机、重启等异常情况,消息保存还没来得及落盘,那么这些消息将会丢失。这种情况可以使用镜像队列来解决。
二、存储机制
前面提到的消息持久化,其实是在rabbitmq的“持久层”中完成的。不管是持久化的消息,还是非持久化的消息都可以被写入到磁盘。
- 持久化的消息在到达队列时就入盘,而且还可以设置持久化的消息在内存中也保存一份备份,这么做可以提高业务效率,当内存吃紧时会从内存中清除。
- 非持久化的消息一般只保存在内存中,在内存吃紧的时候会被换入到磁盘中,以节省内存空间。
2.1 存储方式
持久层是一个逻辑上的概念,实际包含两个部分:
- 队列索引 (rabbit_queue_index):负责维护队列中落盘消息的信息,包括消息的存储地点、消息在队列中的位置、是否已被交付给消费者、是否已被消费者 ack 等。每个队列都有与之对应的一个队列索引。
- 消息存储(rabbit_msg_store):而消息存储是以键值对的形式存储消息,它被所有队列共享,所以在每个节点中有且只有一个。从技术层面上来说,rabbit_msg_store 具体还可以分两类:
- msg_store_persistent :负责持久化消息的持久化,重启后消息不会丢失。
- msg_store_transient:负责非持久化消息的持久化,重启后消息会丢失。
我们一般说消息存储,是习惯性地将 msg_store_persistent 和 msg_store_transient 看成 rabbit_msg_store 一个整体。
I have no name!@ed73deb9f1c5:/bitnami/rabbitmq/mnesia/rabbit@stats/msg_stores/vhosts/9PIHRMVSJH6VBOR100H7141ZT$ ls -al
drwxr-xr-x. 2 1001 root 19 Oct 7 02:57 msg_store_persistent
drwxr-xr-x. 2 1001 root 19 Oct 7 02:57 msg_store_transient
- 存在队列索引里的好处?
性能上的优化。相比存在消息存储里,直接存在队列索引仅需进行一次写操作。而存储在消息存储中的消息则需要两次写操,先写一次索引,再写一次消息存储,因此会有一定的性能提升。
注意事项: - 若消息直接存在队列索引中,则当消息通过exchange同时路由到多个队列时,此消息会被写到每个队列的索引文件中。
- 若消息是存在消息存储中,就仅仅只有一个副本。
2.2 存储文件
- 上面提到的消息,是包括消息体、属性和 headers,可以直接存储在队列索引中,也可以保存在消息存储中。
- rabbitmq启动后,会针对每个vhost会启动两个进程:msg_store_persistent和msg_store_transient,这两个进程作为服务端负责将消息写入文件,从文件读取消息。
- msg_store_persistent负责将持久化消息写入文件与从文件中读取消息。
- msg_store_transient负责非持久化消息写入文件与从文件中读取消息。
- 默认存储文件位置:通过日志可以看到存储文件地址,包含queues、msg_store_persistent、msg_store_transient 这3个文件夹。如下图,我这里是指定了存储文件地址。
I have no name!@ed73deb9f1c5:/bitnami/rabbitmq/mnesia/rabbit@stats/msg_stores/vhosts/9PIHRMVSJH6VBOR100H7141ZT$ ls -al
total 16
drwxr-xr-x. 5 1001 root 125 Oct 7 02:57 .
drwxr-xr-x. 4 1001 root 72 Oct 7 01:15 ..
-rw-r--r--. 1 1001 root 83 Oct 7 01:15 .config
drwxr-xr-x. 2 1001 root 19 Oct 7 02:57 msg_store_persistent
drwxr-xr-x. 2 1001 root 19 Oct 7 02:57 msg_store_transient
drwxr-xr-x. 3 1001 root 38 Oct 7 01:18 queues
-rw-r--r--. 1 1001 root 5464 Oct 7 02:57 recovery.dets
-rw-r--r--. 1 1001 root 9 Oct 7 02:57 .vhost
上面的地址/bitnami/rabbitmq/mnesia/rabbit@stats,是队列的数据存放目录,这个在在哪里找呢,可以通过日志来查看,如下图所示:
日志中还显示了,9PIHRMVSJH6VBOR100H7141ZT这个目录,对应着virtual01这个vhost的目录。对于rabbitmq来说,每一个租户vhost的消息存储,都是放在不同的目录的
2.2.1 队列索引.idx文件
rabbit_queue_index 中以顺序(文件名从 0 开始累加) 的段文件来进行存储,后缀为“ .idx "。
每个段文件中包含定的 SEGMENT_ENTRY_COUNT 条记录,SEGMENT_ENTRY_COUNT 默认值为16384字节。
每个rabbit_queue_index 从磁盘中读取消息的时候至少要在内存中维护一个段文件,所以设置queue_index_embed_msgs_below参数指定阈值大小时要格外谨慎,一点点增大也可能会引起内存爆炸式的增长。
2.2.2 消息存储.rdq文件
经过 rabbit_msg_store 处理的所有消息都会以追加的方式写入到文件中,当一个文件的大小超过指定的限制 (file_size_lmit)后,关闭这个文件再创建一个新的文件以供新的消息写入,文件后缀是“ .rdq ”。
文件名从0开始进行累加,所以文件名最小的文件也是最老的文件。
如下所示0.rdq文件
I have no name!@ed73deb9f1c5:/bitnami/rabbitmq/mnesia/rabbit@stats/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L/msg_store_persistent$ ls -al
total 0
drwxr-xr-x. 2 1001 root 19 Oct 7 02:57 .
drwxr-xr-x. 4 1001 root 111 Oct 7 02:57 ..
-rw-r--r--. 1 1001 root 0 Oct 7 02:57 0.rdq
在进行消息的存储时,rabbitmq会在ETS (Erlang Term Storage) 表中记录消息在文件中的位置映射 (Index) 和文件的相关信息 (FileSummary)。
- 读取文件信息:
- 在读取消息的时候,先根据消息的 ID (msg_id)找到对应存储的文件。
- 若文件存在并且未被锁住,则直接打开文件,从指定位置读取消息的内容。
- 若文件不存在,或被锁住,则发送请求由 rabbit_msg_store 进行处理。
- 删除文件信息:
- 消息的删除只是从 ETS 表删除指定消息的相关信息,同时更新消息对应的存储文件的相关信息。
- 执行消息删除操作时,不会立即对在文件中的消息进行删除,先是标记为垃圾数据。
- 若一个文件中都是垃圾数据时,则删除文件。
- 若一个文件中存在有效数据,则触发垃圾回收机制,进行文件合并选择性删除。
- 垃圾回收文件合并机制:
- 当检测到前后两个文件中的有效数据可以合并在一个文件中,并且所有的垃圾数据的大小和所有文件(至少有3 个文件存在的情况下)的数据大小的比值超过设置的值 GARBAGE_ERACTION (默认值为 0.5) 时,才会触发垃圾回收将两个文件合并。
2.2.3 垃圾回收机制(文件合并)
文件合并前提:
执行合并的两个文件一定是逻辑上相邻的两个文件。
文件合并流程:
- 第一步,执行合并时首先锁定这两个文件。
- 第二步,先对前面文件中的有效数据进行整理。
- 第三步,再将后面文件的有效数据写入到前面的文件。
- 第四步,更新消息在 ETS 表中的记录。
- 第五步,最后删除后面的文件。
2.3 存储原理
- 从3.5.0版本开始,较小的消息是直接存储在队列索引.rdx中。
- 较大的消息存在.rdq队列文件中
如下图所示,我发布的消息,消息比较小时,在0.idx中,即存在索引中
下面是通过查看0.idx,发现里面有消息的正文内容
当消息体比较大时,存放的是rdq文件时面
- 在进行消息的存储时,rabbitmq会在ETS表中记录消息在文件中的映射,以及文件的相关信息。
- 消息读取时,根据消息ID找到该消息所存储的文件,在文件中的偏移量,然后打开文件进行读取。
- 消息的删除只是从ETC表删除指定消息的相关信息,同时更新消息对应存储的文件的相关信息(更新文件有效数据大小)。
2.3.1 生产者消息写入原理
每个队列则看成是一个客户端,当生产者发送的消息达到队列时,向服务端请求写,写入过程如下:
- 第一步,rabbitmq启动后,针对每个vhost开启两个进程,msg_store_persistent进程和msg_store_transient进程。两进程作为服务端,每个队列作为客户端。
- 第二步,当生产者发送消息到队列时,每个队列都会向两进程发起写入请求。
- 第三步,两进程开始往磁盘里写入消息。
- msg_store_persistent进程将持久化消息写入到服务器的msg_store_persistent目录下,文件名称依次为0.rdq、1.rdq、2.rdq等等。
- msg_store_transient进程将非持久化消息写入到服务器的msg_store_transient目录下,文件名称依次为0.rdq、1.rdq、2.rdq等等。
2.3.2 消费者消息读取原理
- 第一步,消费者向队列获取消息体。
- 第二步,队列汇聚消息ID去找落盘文件。
- 若文件存在,且未被锁住,则直接读取文件内容,返回消息给消费者。
- 若文件不存在,或已被锁住,则让rabbit_msg_store进程处理。
- 第三步,队列向两进程发起请求,进程先是通过GC进程去查看文件是否被锁住,同时也会清理垃圾,进行有效数据合并。
- 若被锁住则解锁,获取消息,返回给消费者。
- 若清理垃圾后,发现还是没有此消息,则向rabbitmq其他节点发送询问请求。
- 第四步,其他节点会根据消息ID挨个寻找,直至将rabbitmq集群每个节点找遍,之后返回结果给消费者。