在分布式消息系统中,处理消费失败的消息是非常关键的一环。
RocketMQ 提供了一套完整的消息消费失败处理机制,下面我将简要介绍一下其处理逻辑。
截图代码版本:4.9.8
步骤1
当消息消费失败时,RocketMQ会发送一个code为36的请求到消息所在的 broker。broker接收到这个请求后,会将这个失败的消息放入到一个特殊的retry topic中,准备后续的重试消费。然而,在网络异常的情况下,有可能这里拿到的broker地址是空的,这将导致请求被发送到ns。由于ns并不处理这种类型的请求,因此,在网络异常的情况下,会出现请求超时等待的情况**(这个我们在多az做断网演练时遇到过)**。
步骤2
如果步骤1失败,即消息没有成功地被放入retry topic,客户端会尝试选择其他的broker进行发送。这样,即使某个broker出现问题,我们仍然可以确保消息得以在retry topic中得到存储,以便后续进行重试消费。
步骤3
如果步骤2也失败,即消息在所有的broker中都没有成功地放入retry topic,那么客户端会将消息塞回到消费的队列中,在5秒后再次尝试消费。
总结
总的来说,RocketMQ在处理消费失败的消息时,提供了一套从多个角度进行保障的策略。无论是通过发送到不同broker的retry topic,还是通过延迟再次消费,都能在一定程度上确保消息的最终一致性,降低消息丢失的风险。