springboot 整合 rabbitMQ(1)-CSDN博客
上期说了rabbitMQ的基础用法(普通队列模式)
这期学习一下如何防止消息重复消费和进阶用法(订阅者模式)
目录
重复消费问题
导致 RabbitMQ 重复消费问题的原因:
解决思路
代码实现(这里在上一期的代码上进行修改)
生产者:
消费者:
订阅者模式:
是什么:
代码实现:
生产者:
消费者:
效果:
重复消费问题
导致 RabbitMQ 重复消费问题的原因:
-
网络问题: 在分布式系统中,网络通信是不稳定的因素之一。如果生产者发送一条消息到 RabbitMQ 但尚未收到确认(acknowledgment),可能会导致 RabbitMQ 认为消息未被正确处理并重新发送。
-
消费者故障: 消费者在处理消息时可能会发生故障,例如应用程序崩溃或因某种原因终止。如果 RabbitMQ 未收到消费者的确认消息,它可能会认为消息未被消费并重新发送。
-
网络分区: 当分布式系统中的网络发生分区(网络隔离)时,可能会导致消息在不同部分之间重复传递。这是因为每个分区可能都会独立处理消息。
-
消息重复传递策略: RabbitMQ 提供了不同的消息传递策略,例如“至少一次传递”和“最多一次传递”。这些策略可能会导致消息的重复传递,尤其在异常情况下。
-
消费者超时设置不当: 如果消费者设置了较长的超时时间,在消费者未确认消息的情况下,RabbitMQ 可能会认为消息未被处理并重新发送。
解决思路
- 生产者发送消息时携带一个唯一的id
- 消费者每次消费前先判断一下在redis中是否在id,不存在就消费,消费完之后就把id存储到redis中
代码实现(这里在上一期的代码上进行修改)
生产者:
消费者:
@Autowiredprivate RedisTemplate redisTemplate;private static final String QUEUE_NAME="login_queue";@RabbitListener(queuesToDeclare = @Queue(QUEUE_NAME))public void test01(User getUser, Message message, Channel channel) throws IOException {//获取消息唯一idString messageId = message.getMessageProperties().getMessageId();long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//判断消息是否存在if(redisTemplate.opsForHash().hasKey(QUEUE_NAME,messageId)){//存在表示该消息已被消费,returnreturn;}//将该消息id存入redis 第三个参数随便字符串,不要填对象,没法序列化redisTemplate.opsForHash().put(QUEUE_NAME,messageId,"a");//发送消息到邮箱sendMail(getUser);System.out.println("消费成功");//手动确认消息 multiple 是一个布尔值,它决定了是否确认多个消息// false:只确认指定的单个消息 true:确认所有未确认的消息,这些消息的deliveryTag 小于或等于指定的 deliveryTagchannel.basicAck(deliveryTag,false);} catch (Exception e) {//消息回滚 deliveryTag:指定要拒绝的消息 false:表示只拒绝指定的这条消息 true:表示将消息重新入队,让其他消费者有机会再次处理这条消息。channel.basicNack(deliveryTag,false,true);System.out.println("消息消费异常");}}
订阅者模式:
是什么:
简单解释就是,可以将消息发送给不同类型的消费者。做到发布一次,消费多个。下图取自于官方网站(RabbitMQ)的发布/订阅模式的图例
那么要使交换机接受到消息后转发给队列,就需要将队列绑定给交换机,可以写一个配置类,项目启动,自动将队列绑定给交换机
代码实现:
/*** @ClassName QueueConfig* @Description 声明队列和交换机 并将队列和交换机绑定* @Author* @Date 2024/10/9 16:28*/
@Configuration
public class QueueConfig {//订阅模式交换机名称public static String EXCHANGENAME="fanout_exchange";public static String TESTQUEUE01="test_01";public static String TESTQUEUE02="test_02";@Beanpublic Queue queue01(){return new Queue(TESTQUEUE01);}@Beanpublic Queue queue02(){return new Queue(TESTQUEUE02);}@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange(EXCHANGENAME);}@Beanpublic Binding bingingQueue01(Queue queue01,FanoutExchange fanoutExchange){return BindingBuilder.bind(queue01).to(fanoutExchange);}@Beanpublic Binding bingingQueue02(Queue queue02,FanoutExchange fanoutExchange){return BindingBuilder.bind(queue02).to(fanoutExchange);}
}
生产者:
只需要交换机的名字和传入的数据,队列名不需要填,因为交换机已经绑定队列
消费者:
效果:
发送一条数据,两个消费者消费