1. 确保消息不丢失
- 生产者确认机制
确保生产者的消息能到达队列,如果报错可以先记录到日志中,再去修复数据 - 持久化功能
确保消息未消费前在队列中不会丢失,其中的交换机、队列、和消息都要做持久化 - 消费者确认机制
由spring确认消息处理成功后完成ack,当然也需要设置一定的重试次数,我们当时设置了3次,如果重试3次还没有收到消息,就将失败后的消息投递到异常交换机,交由人工处理
2. 消息重复消费问题
出现背景:消费者设置了自动确认机制,在消费者消费完消息后的 ACK 确认消息因为网络波动或者宕机的原因没有发送到 MQ,MQ中的消息并没有删除,这就将会导致消息的重复消费
解决方案:
- 消息唯一ID
给每一条消息的设置一个唯一标识ID,我们在处理消息时,先到数据库查询一下,这个消息是否已经处理过,如果没有处理过,这个时候就可以正常处理这个消息了,处理前就可以把当前的 消息ID 标记为 处理中,处理完成后 再把此 消息ID 标记为 处理完。如果已经处理过这个消息了,就说明消息重复消费了,我们就不需要再消费了。 - 幂等问题:数据库的唯一约束和锁、分布式锁?
3. 延迟队列
💖 死信队列+消息过期时间
如果消息超时未消费就会变成死信,在 RabbitMQ 中如果消息成为死信,队列可以绑定一个死信交换机,在死信交换机上可以绑定其他队列,在我们发消息的时候可以按照需求指定TTL的时间,这样就实现了延迟队列的功能了。
💖 延时队列插件
- 使用 RabbitMQ 的延迟队列插件,如
rabbitmq-delayed-message-exchange
插件。 - 安装插件后,可以创建延迟交换(
Delayed Message Exchange
),它允许你指定消息的延迟时间。 - 发送消息时,指定消息的
x-delay
属性来设置延迟时间。
4. 消息堆积
- 提高消费者的消费能力,比如使用多线程消费消息
- 增加更多的消费者,提高消费速度
使用工作队列模式,设置多个消费者消费同一个队列中的消息 - 扩大队列的容积,提高堆积上限
- 可以使用 RabbitMQ 的惰性队列
- 接收消息后直接存入磁盘而非内存
- 消费者需要消费消息时才会从磁盘中读取并加载到内存
- 支持百万条消息的存储
- 可以使用 RabbitMQ 的惰性队列
5. 高可用
-
普通集群
-
镜像集群
-
仲裁队列(强一致性)