RocketMQ 通过一系列的机制来保证消息的可靠性传递,确保在面对各种异常和故障情况时,消息系统能够稳定地处理和传递消息。以下是 RocketMQ 保证可靠性传递的关键机制:
1. 同步双写机制 (Synchronous Write Mechanism):
RocketMQ的同步双写机制通过操作系统的页缓存、同步刷盘和CommitLog的持久化操作,为消息的可靠性传递提供了坚实的基础。在生产环境中,根据实际需求可以适度调整相关配置,以平衡性能和可靠性。
1.1 同步双写机制工作流程:
- 消息生产:当生产者发送消息到RocketMQ时,消息首先被写入操作系统的页缓存。
- 同步刷盘(Sync Flush):RocketMQ会在消息写入页缓存后进行同步刷盘操作。这意味着将消息从页缓存刷写到磁盘的存储介质,确保消息已经被持久化。
- CommitLog持久化:一旦消息成功同步刷盘,RocketMQ将消息写入 CommitLog。CommitLog是消息的物理存储,包含了所有已发送的消息。
- 返回成功响应:一旦消息被成功写入CommitLog,生产者收到成功响应,可以认为消息已经被成功持久化。
1.2 同步双写机制流程:
- 消息发送:
- 生产者将消息发送到 RocketMQ 主节点。
- 主节点写入消息存储文件(Master CommitLog):
- 主节点负责将消息写入自身的 CommitLog 文件,保证消息在主节点的存储文件中持久化。
- 主节点同步消息到从节点:
- 主节点将刚刚写入的消息同步到所有配置的从节点。
- 同步可以采用不同的方式,如同步、异步或半同步,这取决于 RocketMQ 的配置。
- 从节点写入消息存储文件(Slave CommitLog):
- 从节点接收到主节点的同步消息后,将消息写入自身的 CommitLog 文件。
- 从节点确认消息同步完成:
- 从节点向主节点发送确认消息已成功同步的信号。
- 主节点收到所有从节点的确认后,认为消息在从节点的同步操作已经完成。
- 消息标记为可投递:
- 一旦消息在主节点和所有从节点的 CommitLog 文件中都成功写入,消息被标记为可投递状态。
- 这意味着消息已经在多个节点上得到了持久化。
- 消息可供消费者消费:
- 消费者可以消费已被确认写入的消息,保证消息在多节点之间的同步。
1.2.2 流程图:
1.2 关键概念:
- 页缓存(Page Cache):操作系统中用于存储文件系统缓存的内存区域。RocketMQ通过将消息首先写入页缓存,实现了消息在内存中的持久化。
- 同步刷盘(Sync Flush):是指将内存中的数据同步刷写到磁盘。RocketMQ确保消息在被发送后,首先在内存中得到持久化,然后再刷写到磁盘,从而防止数据的丢失。
- CommitLog:是RocketMQ中消息的物理存储结构,包含了所有已发送的消息。CommitLog的持久化保证了即使在异常情况下,如Broker宕机,消息也能够被恢复。
1.3 为什么使用同步双写机制?
- 提高持久化可靠性:同步双写机制确保了消息在写入磁盘之前已经在内存中得到持久化,从而提高了消息的可靠性。
- 防止数据丢失:在异常情况下,如机器宕机,通过同步刷盘机制可以避免数据在写入磁盘之前丢失。
- 加速消息的发送速度:同步双写机制实际上将消息发送和持久化的操作异步化,提高了消息发送的吞吐量。
1.4 配置同步双写机制:
在RocketMQ的配置文件中,可以通过配置 isolateDiskReadAndWrite 参数为 true 来启用同步双写机制。该配置项在broker.conf中配置:
# broker配置文件中启用同步双写机制的配置
isolateDiskReadAndWrite=true
这样的配置将确保消息的同步刷盘机制被启用,提高了消息持久化的可靠性。
2. 主从复制机制 (Master-Slave Replication):
RocketMQ采用主从复制机制来提高系统的可用性。每个Broker都有一个主Broker和若干个从Broker。主Broker负责消息的读写,而从Broker用于备份和容灾。如果主Broker发生故障,RocketMQ能够迅速切换到从Broker以提供服务,确保消息的持久化和传递。
2.1 主从复制的工作原理:
- 角色定义: 在 RocketMQ 的 Broker 集群中,每个 Broker 都有主 Broker 和若干个从 Broker。主 Broker 负责消息的读写,而从 Broker 用于备份和容灾。
- 主节点写入消息: 当生产者发送消息时,消息首先被写入主节点的 CommitLog 中。
- 同步复制到从节点: 主节点将写入的消息同步复制到所有从节点。这确保了从节点具有与主节点相同的消息副本。
- 从节点备份: 从节点保存了主节点的消息备份,以备主节点宕机或发生故障时使用。
- 切换到从节点: 当主节点发生故障或宕机时,RocketMQ 能够快速切换到某个从节点,并确保消息服务的持续性。切换后,从节点成为新的主节点。
2.2 主从复制的配置:
在 RocketMQ 的配置文件中,可以通过配置 brokerRole 参数来指定 Broker 的角色,即是主节点还是从节点。以下是一个简化的配置示例:
# broker配置文件中的master配置
brokerClusterName=MyBrokerCluster
brokerName=broker-a
brokerId=0
listenPort=10911
namesrvAddr=localhost:9876
brokerRole=ASYNC_MASTER
# broker配置文件中的slave配置
brokerClusterName=MyBrokerCluster
brokerName=broker-a
brokerId=1
listenPort=10912
namesrvAddr=localhost:9876
brokerRole=SLAVE
在这个配置中,brokerRole 参数指定了 Broker 的角色,主节点使用 ASYNC_MASTER,从节点使用 SLAVE。
2.3 关键概念:
- 主 Broker(Master Broker): 负责消息的读写和写入 CommitLog。
- 从 Broker(Slave Broker): 用于备份主 Broker 的消息,确保在主 Broker 故障时可以顺利切换。
- 同步复制: 主节点将消息同步复制到所有从节点,确保从节点具有相同的消息副本。
- 切换(Failover): 在主节点发生故障时,从节点可以快速切换为新的主节点,确保消息服务的持续性。
2.4 为什么使用主从复制机制?
- 提高可用性: 主从复制机制提高了系统的可用性,即使主节点发生故障,从节点可以迅速切换为主节点,保障消息服务的持续性。
- 容灾备份: 从节点充当主节点的备份,确保在主节点宕机或发生故障时,仍能够提供服务。
- 负载均衡: 主从复制机制支持在多个节点之间分摊消息读写的负载,提高系统的整体性能。
2.5 配置主从复制机制:
在配置文件中通过设置 brokerRole 参数,可以指定 Broker 的角色。主节点使用 ASYNC_MASTER,从节点使用 SLAVE。通过合理配置主从节点的数量和位置,可以优化系统的性能和可靠性。
3. 消息拉取和消费确认机制 (Message Pull and Consumer Acknowledgment):
消费者在拉取消息时,RocketMQ记录每条消息的消费状态。消费者在处理完消息后,会向Broker提交消息的消费确认。Broker将这些消费确认的信息持久化到CommitLog中。这确保了消息在被成功消费后会被正确地持久化,而且在消费者或者Broker出现问题时,能够根据这些记录进行恢复。
3.1 消息拉取机制:
在 RocketMQ 中,消费者通过拉取(Pull)的方式获取消息。这种机制相对于推送(Push)的方式更为灵活,允许消费者按照自己的速度主动拉取消息。消息拉取的流程如下:
- 消费者向 Broker 发起拉取消息的请求。
- Broker 返回一批消息给消费者。
- 消费者处理拉取到的消息。
3.2 消费确认机制:
在 RocketMQ 中,消费者在成功消费一条消息后,需要向 Broker 发送消费确认。这个机制确保了消息被成功消费,并避免消息在消费者宕机或发生其他故障时被重复消费。消费确认的流程如下:
- 消费者成功处理一条消息后,向 Broker 发送消费确认。
- Broker 接收到消费确认后,将消息的消费状态记录在 CommitLog 中。
- 消费者定期向 Broker 发送消费进度,以便 Broker 知晓消费的最新状态。
3.3 消费确认的两种模式:
RocketMQ 提供了两种消费确认模式:
- 自动确认(Auto Acknowledge): 消费者在成功消费消息后,会自动向 Broker 发送消费确认,无需手动处理。适用于那些不需要精确控制消费确认的场景。
- 手动确认(Manual Acknowledge): 消费者需要手动调用确认接口,显式地告知 Broker 消息已经被成功消费。这种模式更为灵活,适用于需要精确控制消费确认的场景,例如在事务消息中。
3.4 防止消息重复消费:
RocketMQ 通过消费确认机制防止消息重复消费。即使在消息推送和拉取的情况下,一旦消息被成功消费,消费者向 Broker 发送确认后,Broker 将记录消息的消费状态,防止消息被重复消费。
3.5 代码示例(Java):
// RocketMQ消费者示例代码
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("example_topic", "*");consumer.registerMessageListener((List<MessageExt> messages, ConsumeConcurrentlyContext context) -> {for (MessageExt message : messages) {// 处理消息的业务逻辑// 手动确认消息已经被消费consumer.acknowledge(message.getMsgId());}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});consumer.start();
System.out.println("Consumer Started.");
在这个示例中,消费者注册了消息监听器,对拉取到的消息进行处理,并使用 consumer.acknowledge() 手动确认消息的消费。这种手动确认的方式可以确保消息在被成功处理后才发送确认,防止消息重复消费。
3.6 消息拉取和消费确认机制流程:
3.6.1 流程说明:
1)消费者订阅主题和标签:
消费者通过调用 subscribe 方法订阅对应的主题和标签。
调用的组件:DefaultMQPushConsumer(消费者实例)
源码示例:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("topic", "tag");
2)拉取消息:
消费者通过调用 poll 或者注册消息监听器,并启动消息消费线程。
消费者主动向 RocketMQ 服务器发起拉取消息的请求。
调用的组件:MQClientAPIImpl(RocketMQ 客户端 API 实现)
源码示例:
PullResult pullResult = pullMessageSync(queue, subExpression, expressionType,nextOffset, pullBatchSize, sysFlag, commitOffset, brokerAddr);
3)服务器返回消息:
RocketMQ 服务器收到拉取消息请求后,从消息队列中获取消息,并将消息返回给消费者。
调用的组件:MessageQueue(消息队列)
源码示例:
MessageExt msg = queueMessageList.get(i);
4)消费者处理消息:
消费者收到消息后,执行消息的业务处理逻辑。
消费者可以在业务逻辑中判断消息是否处理成功。
5)确认消息消费:
如果消息处理成功,消费者调用 acknowledge 方法确认消息已被成功消费。
如果消息处理失败,消费者可以选择不确认消息,以触发消息的重新投递。
调用的组件:ConsumeMessageService(消息消费服务)
源码示例:
this.consumeMessageService.submitConsumeRequest(msgs,processQueue,messageQueue,dispathToConsume);
6)服务器更新消费进度:
消费者确认消息后,RocketMQ 服务器更新消费者的消费进度。
消费进度用于记录消费者在消息队列中的消费位置,以便下次拉取消息时从正确的位置开始。
调用的组件:ConsumerOffsetManager(消费者位移管理器)
源码示例:
this.consumerOffsetManager.commitOffset(consumerGroup,clientid,messageQueue,offset);
7)消费者定时发送心跳:
消费者定时向 RocketMQ 服务器发送心跳,以保持与服务器的连接。
调用的组件:HeartbeatProducer(心跳生产者)
源码示例:
this.heartbeatProducer.sendHeartbeatToAllBrokerWithLock();
8)消息拉取轮询:
消费者定期发起消息拉取请求,获取新的消息。
消费者拉取的频率可以根据业务需求和性能调优进行配置。
调用的组件:DefaultMQPushConsumerImpl(消费者实现)
源码示例:
this.mQClientAPIImpl.pullMessage(brokerAddr,requestHeader,timeoutMillis,communicationMode,pullCallback);
9)重复以上流程:
消费者将不断重复以上流程,持续从 RocketMQ 服务器拉取消息并进行消费。
整个流程是在 DefaultMQPushConsumer
内部进行的,包含了对 MQClientAPIImpl
、ConsumeMessageService
、ConsumerOffsetManager
等多个组件的调用
3.7 为什么使用消息拉取和消费确认机制?
- 灵活性: 消息拉取机制使得消费者可以按照自己的速度主动拉取消息,适用于不同业务场景的需求。
- 可靠性: 消费确认机制确保了消息被成功消费后才进行确认,防止消息在异常情况下被重复消费。
- 控制权: 消费者可以通过手动确认的方式更加灵活地控制消息的消费进度,适用于需要更细粒度控制的场景。
4. 消息的重新投递机制 (Message Redelivery Mechanism):
RocketMQ支持消息的重新投递机制。当消息发送失败或者消费失败时,RocketMQ会根据预设的重试策略将消息重新投递给消费者。这包括了设置最大重试次数和重试间隔时间,以确保消息在遇到短暂故障后能够尽快被重新处理。
4.1 工作原理:
- 消息发送失败: 当消息发送到 Broker 时,如果发生发送失败,RocketMQ 将对该消息进行重新投递。
- 消息消费失败: 当消费者处理消息时发生失败,消费者可以选择将消息标记为重新投递。这通常发生在消息处理的业务逻辑出现异常或者无法处理当前消息的情况下。
- 重试次数控制: RocketMQ 允许设置消息的最大重试次数,即消息最多被重新投递的次数。超过重试次数后,消息将被标记为死信消息,不再重新投递。
- 重试间隔控制: 为了避免短时间内频繁地重试,RocketMQ 支持设置消息的重试间隔时间。在每次重新投递时,消息将等待一定的时间再次被尝试。
4.2 代码示例(Java):
生产者端:
DefaultMQProducer producer = new DefaultMQProducer("example_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();Message message = new Message("example_topic", "TagA", "Hello, RocketMQ".getBytes());// 发送消息,并指定最大重试次数和重试间隔时间
SendResult sendResult = producer.send(message, (m, ex, retryTimes) -> {if (retryTimes < 2) { // 最多重试两次// 返回重新投递的策略return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;} else {// 不再重试,消息将被标记为死信消息return ConsumeOrderlyStatus.SUCCESS;}
}, 3, 3000);System.out.println("Message ID: " + sendResult.getMsgId());producer.shutdown();
消费者端:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("example_topic", "*");consumer.registerMessageListener((List<MessageExt> messages, ConsumeConcurrentlyContext context) -> {for (MessageExt message : messages) {try {// 处理消息的业务逻辑// 如果处理失败,可以选择重新投递消息if (someBusinessLogic(message)) {return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;} else {// 返回重新投递的策略,RocketMQ 将根据配置进行重试return ConsumeConcurrentlyStatus.RECONSUME_LATER;}} catch (Exception e) {// 异常情况也可以选择重新投递return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});consumer.start();
System.out.println("Consumer Started.");
在这个示例中,生产者通过 producer.send() 方法发送消息,并通过 SendCallback 设置了消息发送失败后的处理逻辑。消费者在 MessageListener 中处理消息的业务逻辑,并根据处理结果返回不同的消费状态,以决定是否重新投递消息。
4.3 为什么使用消息的重新投递机制?
- 处理瞬时故障: 在分布式系统中,瞬时故障是难以避免的。可能由于网络抖动、服务暂时不可用等原因导致消息发送失败或者消费失败。重新投递机制能够帮助系统在这些短暂的故障发生后,通过重新尝试来最终成功地将消息发送或者处理。
- 提高消息处理成功率: 在一些场景下,消息的处理可能涉及到复杂的业务逻辑、依赖外部服务,或者受到临时资源限制等。第一次处理时可能由于各种原因失败,但通过重新投递机制,系统有机会在稍后重新尝试,从而提高消息的最终成功率。
- 应对消费端问题: 消费端可能由于程序bug、处理逻辑错误或者依赖服务故障等原因导致消息处理失败。通过重新投递,系统有机会在修复问题后重新处理这些消息。
- 系统的自愈能力: 重新投递机制使得系统在遇到短期故障或处理失败的情况下,能够自动尝试修复问题,降低了对人工干预的依赖,提高了系统的自愈能力。
- 降低手动处理成本: 对于某些失败的消息,人工介入可能是昂贵的,尤其在大规模系统中。通过自动的重新投递机制,系统能够更加高效地处理这些失败的消息,降低了手动处理的成本。
- 避免消息丢失: 在一些异常情况下,如网络分区、服务器故障等,消息可能在发送或者消费的过程中丢失。通过重新投递机制,系统有机会在这些异常恢复后重新处理消息,避免消息的永久丢失。
5. 事务消息机制 (Transactional Message Mechanism):
RocketMQ提供了事务消息的支持,用于处理分布式事务。在事务消息中,生产者发送半消息(half message),然后等待事务的执行结果。最终,根据事务的结果,生产者提交或回滚这条消息。这个机制保证了在分布式事务场景中消息的一致性。
5.1 工作原理:
- 事务消息发送: 事务消息发送分为两个阶段。首先,生产者发送半消息(Half Message),该消息处于预提交状态,但尚未提交到 CommitLog 中。
- 本地事务执行: 生产者在发送半消息后,会执行本地事务。本地事务可能成功、失败或者状态不确定。
- 事务消息状态回查: RocketMQ 定期进行事务消息的状态回查。对于处于预提交状态的半消息,RocketMQ 会向生产者发送回查请求。
- 生产者处理回查请求: 生产者收到回查请求后,需要根据本地事务的执行结果来确定如何处理该消息。如果本地事务成功,则提交该消息;如果本地事务失败,则回滚该消息。
- 消息提交或回滚: 在回查后,生产者根据本地事务的执行结果提交或回滚事务消息。如果提交,消息变为可投递状态;如果回滚,消息将被删除。
- 消息可投递: 一旦事务消息提交,RocketMQ 将允许该消息被消费者消费。
5.2 事务消息机制流程:
5.2.1 流程描述:
- 半消息发送: 生产者发送半消息(Half Message)到 RocketMQ 服务器。
- 消息状态为 Prepared: 半消息的状态被设置为 "Prepared",表示消息处于预提交状态。
- 本地事务执行: 生产者执行本地事务逻辑,可能是数据库操作、文件写入等。
- 事务消息状态回查: RocketMQ 定期进行事务消息的状态回查,向生产者询问半消息的本地事务执行结果。
- 生产者处理回查请求: 生产者根据本地事务的执行结果,可能是提交、回滚或保持不变,返回相应的事务状态。
- 消息提交或回滚: 如果本地事务成功,生产者将消息状态设置为 "Commit",表示提交;如果本地事务失败,生产者将消息状态设置为 "Rollback",表示回滚。
- 消息可投递或删除: 如果消息状态为 "Commit",RocketMQ 允许该消息被消费者消费;如果消息状态为 "Rollback",该消息将被删除。
5.2.2 流程图:
5.3 代码示例(Java):
事务消息生产者:
TransactionMQProducer producer = new TransactionMQProducer("transaction_producer_group");
producer.setNamesrvAddr("localhost:9876");// 设置事务监听器
producer.setTransactionListener(new TransactionListener() {@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {// 执行本地事务逻辑,返回事务执行状态// 可能是 COMMIT_MESSAGE、ROLLBACK_MESSAGE、UNKNOW// 具体返回值要根据本地事务的执行结果来确定}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {// 回查本地事务状态,返回事务执行状态// 可能是 COMMIT_MESSAGE、ROLLBACK_MESSAGE、UNKNOW// 具体返回值要根据本地事务的执行结果来确定}
});producer.start();// 发送事务消息
Message message = new Message("transaction_topic", "TagA", "Hello, RocketMQ".getBytes());
SendResult sendResult = producer.sendMessageInTransaction(message, null);System.out.println("Transaction Send Result: " + sendResult.getSendStatus());producer.shutdown();
事务消息消费者:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transaction_consumer_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("transaction_topic", "*");consumer.registerMessageListener((List<MessageExt> messages, ConsumeConcurrentlyContext context) -> {for (MessageExt message : messages) {// 处理事务消息的业务逻辑// 注意:由于事务消息的状态可能是 UNKNOW,所以需要确保消息的幂等性}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});consumer.start();
System.out.println("Transaction Consumer Started.");
5.4 为什么使用事务消息机制?
- 确保消息的一致性: 事务消息机制允许生产者和消费者参与到分布式事务中,确保消息在发送和消费的过程中的一致性。
- 处理本地事务: 生产者可以在发送半消息后执行本地事务。如果本地事务成功,生产者提交该消息;如果本地事务失败,生产者回滚该消息。
- 状态回查: RocketMQ 定期回查半消息的状态,以确保在网络分区、节点故障等情况下,能够最终处理事务消息的状态。
- 消息的幂等性: 由于事务消息可能经历多次状态回查,消费者需要确保消息的处理是幂等的,以避免因为重复处理消息而引发的问题。
- 消息的可靠性: 事务消息机制提供了一种可靠的消息传递方式,特别适用于要求事务一致性的场景,如订单支付、库存扣减等。