前言
mq的优点:异步提速、解耦、流量削峰
mq的缺点: mq宕机可能导致消息丢失、消费者未成功消费如果保证整个系统架构的事务安全、消息可能被重复消费出现幂等问题、消息未被消费到引发死信问题、消息出现消费延迟或消费异常引发的顺序消费错乱问题...
mq的使用建议:系统扛不住了,扩容太贵了,不得不使用了
以下将根据官网提供的主流工作模式描述mq使用流程
一、队列生产消费模式
1、简单模式
消费者创建
public class SimpleConsumer {public static final String QUEUE_NAME = "hello world";//队列的名称public static void consumeMessage() throws IOException, TimeoutException {//创建工厂ConnectionFactory factory = new ConnectionFactory();//工厂IP 连接rabbitmq队列factory.setHost("127.0.0.1");//设置用户名称factory.setUsername("guest");//设置密码factory.setUsername("guest");//创建连接Connection connection = factory.newConnection();//获取信道Channel channel = connection.createChannel();//声明 接受消息DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println(new String(message.getBody()));};//取消消息回调CancelCallback cancelCallback = consumerTag -> {System.out.println("消息消费被中断");};//消费者消费消息:消费哪个队列、消费成功之后是否要自动应答(true:自动应答,false:手动应答)、消费失败的回调、取消消息的回调System.out.println(channel.basicConsume(QUEUE_NAME, false, deliverCallback, cancelCallback));}
}
生产者创建
public class SimpleProducer {public static final String QUEUE_NAME = "hello world";//消息发送public static void buildMessage() throws IOException, TimeoutException {//创建工厂ConnectionFactory factory = new ConnectionFactory();//工厂IP 连接rabbitmq队列factory.setHost("127.0.0.1");//设置用户名称factory.setUsername("guest");//设置密码factory.setUsername("guest");//创建连接Connection connection = factory.newConnection();//获取信道Channel channel = connection.createChannel();//创建队列:// 设置队列名称、是否持久化、是否共享队列消息的消费(true:共享消费,false:只有一个消费者消费)、是否自动删除(true:消息用完后自动删除、false:)、其它参数(延迟、死信消息处理用)channel.queueDeclare(QUEUE_NAME, false, false, false, null);//发送消息String message = "hello world!!!";//消息发送channel.basicPublish("", QUEUE_NAME, null, message.getBytes());}
}
2、工作队列模式
工作队列(任务队列)主要是消费端注册多个消费者以加速消息消费,防止消息消费速度慢、延迟、死信等异常,适用于任务比较重的地方
模式特点:
1、一条消息只会被一个消费者接收;
2、rabbit采用轮询的方式将消息是平均发送给消费者的;
3、消费者在处理完某条消息后,才会收到下一条消息
代码和简单模式没区别,额外要做的就是对消费者线程多开,可以对线程做些标记观察是否真的有轮询消费
3、Pub/Sub订阅模式
与之前的模式相比,该模式可以提供消息共享,即一个消息被多个消费者消费,但是一个队列中的消息仍然只能消费一次,那如何做到的多消费呢?答案是利用交换订阅技术,即由交换机来路由消息给多个队列从而实现多次消费
Exchange(交换机)
交换机的工作非常简单,一方面接受来自生产者的消息,另一方面将消息推到队列。关键的点在于交配机必须精确处理消息,即把消息推到哪个队列或者是否应该丢弃
交换机类型描述:直接(direct/route)、主题(topic)、标题(headers)、扇出(fanout)
注:比如前面代码发布消息时channel.basicPublish方法指定的exchange是""或者null,实际走的是name为default的exchange,其类型为direct,routingkey默认是队列名称
临时队列
未设置持久配置都是临时队列,即durable参数配置的值不是durable,哦還要看生成者是否配置了autodelete,如果autodelete參數配置的值是true也是臨時隊列,因为用完就把队列删除了嘛谁还管队列是否配置了持久
创建临时队列的快速方法:String queueName = channel.queueDeclare().getQueue()
Bind
交换机绑定queue
Fanout
扇出这种类型就是广播,我不知道谁做的中文翻译,我*你**,就是消息广播到所绑定的所有队列,和routingkey没有关系
4、Direct订阅模式
这个和fanout模式差不多,区别在于exchange的type是direct,需要在bind队列时设置routing key,exchange根据routing key找到bind的queue,然后路由消息到这个queue
5、Topic
这个是对direct的扩展,direct的缺点在于需要硬编码到代码,如果以后要扩展别的队列还需要去生产者手动添加新routing key,当然我们可以通过服务配置来弥补这个问题,但是维护也需要人力,而topic可以很好的解决这个耦合问题
topic模式要求
交换机的routing key不能随意写,必须是一个单词列表,以 . 分隔,单词列表长度<=255
还有两个替换符,*代替一个单词,#代替零或多个单词,例如:*.orange.*;lazy.# ...
二、死信队列
死信:
死信是指无法被消费的消息,一般来说produer将消息投递到queue,consumer从queue取出消息进行消费,但是某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就是死信消息,为了保存这些死信消息就有了死信队列。
应用场景:
为了保证订单业务的消息数据不丢失,需要使用rabbitmq的死信队列机制,当消息消费发生异常,将消息投入死信队列中。
死信来源:
消息ttl过期(在当前队列中的存活时间,这段时间不被处理就被丢到dead queue)
队列达到最大长度(队列满了,无法再添加数据到mq)
消息被拒绝(basic.reject或者basic.nack)并且requeue=false
消费者都宕机了
三.延迟队列
延迟队列是用来存放需要在指定时间被处理的元素的队列
1.应用场景
外卖订单规定在15min之内下单,否则失效
新创建的店铺,如果十天内没有上传商品,自动发送消息提醒
用户注册账户成功,如果三天内没有登陆进行短信提醒
用户发起退款,如果三天内没有得到处理会通知线下服务人员
预定会议后,在预定时间的前15分钟消息提醒参会人员
2.代码操作
2.1.框架图
创建两个队列 QA 和 QB,两个队列 TTL 分别设置为 10s 和 40s,然后再创建一个交换机 X 和死信交换机 Y,它们的类型都是 direct,创建一个死信队列 QD,它们的绑定关系如下:
2.2.配置信息
@Configuration
public class TtlQueueConfig {/*** 普通交换机名称*/public static final String X_EXCHANGE = "X";/*** 死信交换机名称*/public static final String Y_DEAD_LETTER_EXCHANGE = "Y";/*** 普通队列名称*/public static final String QUEUE_A = "QA";public static final String QUEUE_B = "QB";/*** 死信队列名称*/public static final String DEAD_LETTER_QUEUE = "QD";/*** 声明 XExchange*/@Beanpublic DirectExchange xExchange(){return new DirectExchange(X_EXCHANGE);}/*** 声明 yExchange*/@Beanpublic DirectExchange yExchange(){return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);}/*** 声明队列QA*/@Beanpublic Queue queueA(){Map<String, Object> arguments = new HashMap<>(3);// 设置死信交换机arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);// 设置死信路由键arguments.put("x-dead-letter-routing-key", "YD");// 设置过期时间arguments.put("x-message-ttl", 10000);return new Queue(QUEUE_A, true, false, false, arguments);}/*** 声明队列QB*/@Beanpublic Queue queueB(){Map<String, Object> arguments = new HashMap<>(3);// 设置死信交换机arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);// 设置死信路由键arguments.put("x-dead-letter-routing-key", "YD");// 设置过期时间arguments.put("x-message-ttl", 40000);return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();}/*** 死信队列QD*/@Beanpublic Queue queueD(){return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();}/*** 绑定*/@Beanpublic Binding queueABindingX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueA).to(xExchange).with("XA");}@Beanpublic Binding queueBBindingX(){return new Binding(QUEUE_B, Binding.DestinationType.QUEUE, X_EXCHANGE, "XB", null);}@Beanpublic Binding queueDBindingY(@Qualifier("queueD") Queue queueD,@Qualifier("yExchange") DirectExchange yExchange){return BindingBuilder.bind(queueD).to(yExchange).with("YD");}
}
2.3.消息生产者代码
@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/sendMsg/{message}")public String sendMsg(@PathVariable String message){log.info("当前时间:{}发送一条消息{}给两个队列", new Date(), message);rabbitTemplate.convertAndSend("X", "XA", "消息来自TTL为10s队列QA:"+message);rabbitTemplate.convertAndSend("X", "XB", "消息来自TTL为40s队列QB:"+message);return "发送成功";}
}
2.4.消息消费者代码
@Slf4j
@Component
public class DeadLetterConsumer {@RabbitListener(queues = "QD")public void receiveD(Message message){String msg = new String(message.getBody());log.info("当前时间{},收到死信队列的消息:{}", new Date(), msg);}
}
2.5.测试
发送一个请求 http://localhost:8080/ttl/sendMsg/嘻嘻嘻
第一条消息在 10s 后变成了死信消息,然后被消费者消费掉了,第二条消息在 40s 之后变成了死信消息,然后被消费掉,这样一个延时队列就完成了。
2.6.缺点
queueA和queueB都是通过queue设置ttl,如此一来生产者无法灵活设置消息的ttl
2.7.优化
让生产者也参与设置ttl,即生产者在生产消息时设置ttl,如此一来queue即使没有设置ttl也可以实现延时操作,扩展性大幅提高。
【注:】如果生产者和queue同时设置ttl,则以最短的ttl为有效值
如图,新增一个队列 QC,绑定关系如下,该队列不设置 TTL 时间
2.8.配置信息
@Component
public class MsgTtlQueueConfig {private static final String Y_DEAD_LETTER_EXCHANGE = "Y";private static final String QUEUE_C = "QC";@Bean("queueC")public Queue queueC(){Map<String, Object> arguments = new HashMap<>(2);// 声明当前队列绑定的死信交换机arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);// 声明当前队列的私信路由keyarguments.put("x-dead-letter-routing-key", "YD");return new Queue(QUEUE_C, false, false, false, arguments);}@Beanpublic Binding queueCBindingX(@Qualifier("queueC") Queue queueC,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueC).to(xExchange).with("XC");}
}
2.9.消费生产者
@GetMapping("/sendExpirationMsg/{message}/{ttlTime}")
public String sendMsg(@PathVariable String message, @PathVariable String ttlTime){MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration(ttlTime);return message;}};rabbitTemplate.convertAndSend("X", "XC", message, messagePostProcessor);return "发送成功";
}
将程序执行,然后发送请求:
http://localhost:8080/ttl/sendExpirationMsg/你好 1/20000
http://localhost:8080/ttl/sendExpirationMsg/你好 2/2000
两条消息的过期时间一致,过期时间短的那条消息,在过期时间到了以后并没有立即被消费,而是和过期时间长的那条消息一起被消费了。所以,如果使用在消息属性上设置 TTL 的方式,消息可能并不会按时“死亡”,因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。
2.10.Rabbitmq 插件实现延迟队列
上面提到的问题,确实是一个问题,如果不能实现TTL的细粒度,并使其在设置的 TTL 时间及时死亡,就无法设计成一个通用的延时队列。
原理:
以往的ttl都是由队列处理,而插件延迟功能是由exchange来管理,当ttl达到截至时间再发送到对应的queue
安装:
这里不做演示,可百度去官网下载和安装对应版本的插件(好文推荐),安装完成后重启如下所示
新增一个队列 delayed.queue,一个自定义交换机 delayed.exchange,绑定关系如下:
我们自定义的交换机中,这是一种新的交换机类型,该类型消息支持延迟投递机制,消息传递后并不会立即投递到目标队列中,而是存储在 mnesia(一个分布式数据系统)表中,当达到投递时间时,才投递到目标队列中。
2.11.配置信息
@Configuration
public class DelayedQueueConfig {public static final String DELAYED_QUEUE_NAME = "delayed.queue";public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";public static final String DELAYED_ROUTING_KEY = "delayed.routingKey";@Bean("delayedQueue")public Queue delayedQueue(){return new Queue(DELAYED_QUEUE_NAME);}/*** 自定义交换机 定义一个延迟交换机* 不需要死信交换机和死信队列,支持消息延迟投递,消息投递之后没有到达投递时间,是不会投递给队列* 而是存储在一个分布式表,当投递时间到达,才会投递到目标队列* @return*/@Bean("delayedExchange")public CustomExchange delayedExchange(){Map<String, Object> args = new HashMap<>(1);// 自定义交换机的类型args.put("x-delayed-type", "direct");return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);}@Beanpublic Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue delayedQueue,@Qualifier("delayedExchange") CustomExchange delayedExchange){return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();}
}
2.12.生产消息
@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {@Autowiredprivate RabbitTemplate rabbitTemplate;public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";public static final String DELAYED_ROUTING_KEY = "delayed.routingKey";@GetMapping("/sendDelayMsg/{message}/{delayTime}")public String sendMsg(@PathVariable String message, @PathVariable Integer delayTime){rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message, messagePostProcessor ->{messagePostProcessor.getMessageProperties().setDelay(delayTime);return messagePostProcessor;});log.info("当前时间:{},发送一条延迟{}毫秒的信息给队列delay.queue:{}", new Date(), delayTime, message);return "发送成功";}
}
2.13.消费消息
@Slf4j
@Component
public class DeadLetterConsumer {public static final String DELAYED_QUEUE_NAME = "delayed.queue";@RabbitListener(queues = DELAYED_QUEUE_NAME)public void receiveDelayedQueue(Message message){String msg = new String(message.getBody());log.info("当前时间:{},收到延时队列的消息:{}", new Date(), msg);}
}
发起请求:
http://localhost:8080/ttl/sendDelayMsg/come on baby1/20000
http://localhost:8080/ttl/sendDelayMsg/come on baby2/2000
第二条消息被先消费掉了,符合预期
2.14.总结
延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延时队列可以很好地利用 RabbitMQ 的特性,如:消息可靠发送、消息可靠投递、死信队列,来保证消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过 RabbitMQ 集群的特性,可以很好要的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。
四、发布确认 & 消息回退
如果rabbitmq服务器异常或者宕机,虽然队列、队列中消息可以持久化,但是消息可能还没有经交换机成功进入队列,这段消息传递的空窗期如何捕获到消息状态?
发布确认是本次记录的一种方式(另外还有事务消息),发布确认是指rabbitmq服务在发布者和交换机这段起到状态反馈的作用,消息发送到交换机后,会反馈成功或失败状态给生产者
消息回退是指rabbitmq服务在交换机到队列这段起到状态反馈的作用,在发布确认成功之后,交换机将消息路由到队列,如果消息路由失败则反馈给发布者失败的消息信息
作用
如图,核心点就是缓存,即消息的备份,通过发布确认和消息回退可以返还消息或者消息标识给生产者,生产者可以据此重新获取 & 发送消息
发布确认使用方式
单消息发布确认:channel发布消息,然后channel执行waitForConfirms(),虽然简单但是低效,1000个简单消息处理耗时1s
批量消息发布确认:channel发布一批消息(例如for循环发布),然后channel执行waitForConfirms(),1000个简单消息处理耗时0.15s
异步发布确认: channel发布消息,然后channel执行 addReturnListener(ReturnCallback returnCallback),1000个简单消息处理耗时0.06s,不过需要你实现里面的两个回调接口并作为参数传入,一个是发布确认接口,另一个是不可达消息处理接口
springBoot下发布确认使用方式
spring-boot-starter-amqp提供了配置信息:spring.rabbitmq.publisher-confirm-type,主要用于设置发布确认类型
correlated:成功发布消息到交换机,可以异步触发回调
simple:支持rabbittemplate使用waitForConfirms方法进行同步确认
none:禁用发布模式
好文推荐
RabbitMQ确认消息Confirm详解 - 简书 (jianshu.com)https://www.jianshu.com/p/2dd4acabfb90
(63条消息) RabbitMQ学习笔记 - mandatory参数_setmandatory_mytt_10566的博客-CSDN博客https://blog.csdn.net/mytt_10566/article/details/90741398
消息回退使用方式
在仅开启生产者确认机制的情况下,交换机接收到消息后会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息被直接丢弃,生产者也感知不到消息被丢弃。这是一种不可靠的情况。解决方法就是设置mandatory参数,即生产者设置回退消息回调方法用来处理回退消息。
springBoot的mandatory设置方式:spring.rabbitmq.publisher-returns=true
发布确认&回退消息代码演示
演示架构
生产方演示发送消息到交换机失败、交换机路由失败等状态下回调函数的执行情况
配置信息
application.properties文件配置# 发布确认类型设置:correlated(关联性的,成功发布消息到交换机,可以触发回调)、none(禁用发布模式)、simple(支持rabbittemplate使用waitForConfirms方法进行同步确认)
spring.rabbitmq.publisher-confirm-type=correlated# 回退消息开启
spring.rabbitmq.publisher-returns=true
# mandatory代表强制发送: 即使交换机路由失败也会再次尝试,如果失败则回调ReturnCallback
spring.rabbitmq.template.mandatory=true
@Configuration
public class ConfirmConfig {//-----------------------------------------------发布确认配置----------------------------------------//交换机public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";//队列public static final String CONFIRM_QUEUE_NAME = "confirm_queue";//routing keypublic static final String CONFIRM_ROUTING_KEY = "confirm_routing_key";//声明交换机
// 创建发布确认交换机@Bean("confirmExchange")public DirectExchange confirmExchange(){return new DirectExchange(CONFIRM_EXCHANGE_NAME);}@Bean("confirmQueue")public Queue confirmQueue(){return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();}@Beanpublic Binding queueBindingExchange(@Qualifier("confirmQueue") Queue confirmQueue, @Qualifier("confirmExchange") DirectExchange exchange){return BindingBuilder.bind(confirmQueue).to(exchange).with(CONFIRM_ROUTING_KEY);}}
回调接口实现
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void init(){//注入发布确认回调接口rabbitTemplate.setConfirmCallback(this);
// 注入失败回退回调接口rabbitTemplate.setReturnsCallback(this);}/*** 交换机确认回调方法* 1.发送消息 交换机收到消息 回调* correlationData 保存回调消息的ID和相关信息* 交换机收到消息 ack = true* cause null* 2.发送消息 交换机接受失败 回调* correlationData 保存回调消息的ID和相关信息* 交换机收到消息 ack = false* cause 失败原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {String id = correlationData!=null ? correlationData.getId() : "";if(ack){log.info("确认回调:id:{}", id);}else {log.info("交换机未接受到消息,id:{},原因:{}", id, cause);}}//将不可达的消息传递给生产者@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.error("消息:{};\r\n退回消息交换机:{};\r\n退回原因:{};\r\nrouting key:{}",new String(returnedMessage.getMessage().getBody()), returnedMessage.getExchange(), returnedMessage.getReplyText(), returnedMessage.getRoutingKey());}
}
消息消费
@Component
@Slf4j
public class Consumer {@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)public void receiveConfirmMessage(Message message){log.info("接受到的消息:{}",new String(message.getBody()));}
}
消息发布
@Slf4j
@RestController
@RequestMapping("/pub_confirm")
public class PubConfirmSendMsgController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/sendMsg/{message}")public String sendMsg(@PathVariable("message") String message){log.info("当前时间:{}发送一条消息{}", new Date(), message);//1 关联实例用来设置消息的唯一标识,存储在消息的headers中CorrelationData correlationData = new CorrelationData();
// rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY, "消息来自发布确认测试:"+message, correlationData);//2 模拟下异常情况,设置要给不存在的routing key,正常情况下会由回退消息回调方法处理rabbitTemplate.convertAndSend("ConfirmConfig.CONFIRM_EXCHANGE_NAME", "none_exist", "消息来自发布确认测试:"+message, correlationData);return "发送成功";}
【注:】
发布确认在失败阶段要注意下,此时correlationData没有设置returnedMessge,只存在消息标识id,但是仍可以通过消息入库 + id查询的方式重新发送消息
五、备份交换机
有了发布确认和回退消息,我们有机会去处理不可达的消息,但是有时候我们并不知道该如何处理这些不可达的消息,最多进行日志打印,然后触发告警,最后补偿数据,如何更方便地解决这个问题?备份交换机就可以很好地简化这个流程
备份交换机是指为普通交换机的alternate参数配置的一个交换机,当交换机无法路由消息时就转给备份交换机来处理
【注:】
如果同时设置了备份交换机和回退设置,会先执行备份交换机处理,回退消息回调方法来最后兜底。
比如confirm.exchange路由失败转给backup.exchange失败,则回退消息回调方法会被触发;再比如backup.exchange将消息路由到warning.queue失败,则回退消息回调方法仍然会被触发
配置信息
@Configuration
public class BackupConfig {//--------------------------------------------备份交换机配置-------------------------------------------//交换机public static final String BACKUP_EXCHANGE_NAME = "backup_exchange";//队列public static final String BACKUP_QUEUE_NAME = "backup_queue";public static final String WARNING_QUEUE_NAME = "warning_queue";// 关联备份交换机然后创建发布确认交换机
// 如果broker已经有confirmExchange要提前清除掉@Bean("confirmExchange")public DirectExchange confirmExchange(){return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true).alternate(BackupConfig.BACKUP_EXCHANGE_NAME).build();}//声明交换机@Bean("backupExchange")public FanoutExchange backupExchange(){return new FanoutExchange(BACKUP_EXCHANGE_NAME);}//声明队列@Bean("backupQueue")public Queue backupQueue(){return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();}@Bean("warningQueue")public Queue warningQueue(){return QueueBuilder.durable(WARNING_QUEUE_NAME).build();}@Beanpublic Binding queueBackupBindingExchange(@Qualifier("backupQueue") Queue backupQueue, @Qualifier("backupExchange") FanoutExchange exchange){return BindingBuilder.bind(backupQueue).to(exchange);}@Beanpublic Binding queueWarningBindingExchange(@Qualifier("warningQueue") Queue warningQueue, @Qualifier("backupExchange") FanoutExchange exchange){return BindingBuilder.bind(warningQueue).to(exchange);}
}
六、其它
1.幂等
幂等就是对同一操作的重复执行,如删除操作和查询操作,天然兼容幂等,但是新增和修改必须解决幂等影响,比如查询某个用户的流水,不管怎么重复操作都不会影响流水数据,删除也一样,但是新增就麻烦了,比如付款,对当前订单的付款如果重复付款肯定是业务异常的,修改也是如此
消费端的幂等保障一般是:1 唯一ID+指纹码机制,利用数据库去重;2 利用redis原子性实现。唯一码是一些规则或者时间戳加别的服务信息拼接成的唯一码,要么就是redis执行setnx指令,天然支持幂等
2.优先级队列
在系统中有一个订单催付的业务,商城会在订单有效期内推送消息给客户作为一个下单提醒,很细节的功能,但是展开来看还是有点东西的,比如系统会区分大小客户,比如高价订单会被优先处理,这涉及到队列的优先级设置,简单系统可以利用定时任务轮询,但是发杂系统还是要用消息队列,rabbitmq就提供了一个优先级配置参数
2.1.优先级队列控制台设置方式
标记会有个pri,表示该队列有权限参数配置
2.2.优先级队列代码设置方式
配置信息
@Configuration
public class PriorityConfig {@Beanpublic Queue queuePri(){
// 优先级以小为优先,设置范围0-255,不用设置太大消耗内存return QueueBuilder.durable("priority_queue").withArgument("x-max-priority", 10).build();}}
消费端
@Slf4j
@Component
public class PriorityConsumer {@RabbitListener(queues = "priority_queue")public void receiveConfirmMessage(Message message){log.warn("排序处理!消息:{}", new String(message.getBody()));}}
生产消息测试
@RestController
@RequestMapping("/priority")
public class PrioritySendMsgController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/sendMsg/{message}")private String sendMsg(@PathVariable("message") String message){MessageProperties messageProperties = new MessageProperties();Message msg = null;for(int i=1; i<11; i++){if(i==5){//优先级以小为优先messageProperties.setPriority(1);}else {messageProperties.setPriority(10);}msg = new Message((message+":"+i).getBytes(StandardCharsets.UTF_8), messageProperties);rabbitTemplate.convertAndSend("","priority_queue",msg.getBody());}return "发布成功!!!";}
}
3.惰性队列
从3.6.0开始引入惰性队列,功能是将消息最大限度存储到磁盘,当消息被消费时才被加载进内存。但是默认情况,生产者发送消息到队列时会最大限度存储到内存,这是为了更高效处理消息
优点:有磁盘兜底可以大量地存储消息、使消息更加可靠(比如防止宕机丢失)
缺点:在被写入磁盘时依然会在内存备份,当mq释放内存时会将内存的消息换页到磁盘,这个操作比较消耗内存,也会阻塞队列导致接受不到消息,尤其消息量高时问题明显
总结:常规消息量高效处理可以用默认队列(default mode);解决消息堆积可以用惰性队列(lazy mode),好文推荐https://blog.csdn.net/TheWindOfSon/article/details/130808424?spm=1001.2101.3001.6650.2&utm_medium=distribute.pc_relevant.none-task-blog-2~default~YuanLiJiHua~Position-2-130808424-blog-104037059.235%5Ev38%5Epc_relevant_anti_vip_base&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2~default~YuanLiJiHua~Position-2-130808424-blog-104037059.235%5Ev38%5Epc_relevant_anti_vip_base&utm_relevant_index=5
控制台配置信息
代码配置信息
@Configuration
public class LazyModeConfig {@Beanpublic Queue queueLazyMode(){return QueueBuilder.durable("lazy_mode_queue").withArgument("x-queue-mode", "lazy").build();}
}
消息消费
@Slf4j
@Component
public class LazyModeConsumer {@RabbitListener(queues = "lazy_mode_queue")public void receiveConfirmMessage(Message message){log.warn("懒惰队列!消息:{}", new String(message.getBody()));}}
消息生产
@RestController
@RequestMapping("/lazyMode")
public class LazyModeController {@Autowiredprivate RabbitTemplate rabbitTemplate;// lazyMode队列的可执行测试@GetMapping("/sendMsg/{message}")private String sendMsg(@PathVariable("message") String message){//obj参数如果是非Message类型实例则默认执行消息持久化rabbitTemplate.convertAndSend("","lazy_mode_queue",message);return "发布成功!!!";}// lazyMode队列消息是否可持久化测试@GetMapping("/sendMsg2/{message}")private String sendMsg2(@PathVariable("message") String message){//obj参数设置Message类型实例,交付模式为非持久化。如果mq宕机恢复后消息依然存在则说明lazyMode队列确实是做了消息的持久化//测试流程:
// 1.关闭消费者LazyModeConsumer(测试结束后恢复开启)
// 2.执行接口
// 3.查看控制台,是否有待消费消息
// 4.如果有,stop mq服务
// 5.start mq服务,观察待消费消息是否依然存在MessageProperties messageProperties = new MessageProperties();messageProperties.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);Message msg = new Message((message).getBytes(StandardCharsets.UTF_8), messageProperties);rabbitTemplate.convertAndSend("","lazy_mode_queue",msg.getBody());return "发布成功!!!";}
}
创建Message类型实例发送消息,通过模拟宕机测试出的结论:消息不会丢失,持久化得到了实际验证
6.4.交付模式(delivery mode)
delivery mode是指生产者发送消息的属性是否支持持久化,要么是none_persistent、要么是persistent
七、集群
集群搭建(主从高可用集群,一主多从,主机负责消息接收和消费,主机宕机后选举从机补上,RabbitMQ—集群原理)
镜像队列(主机宕机将无法消费队列,因此要在从机备份队列,以便宕机后选举出的新主机运行此备份队列,镜像队列)
federationExchange(非集群之间如何消息同步)
federationQueue(非集群之间消息同步)
Shovel(非集群之间消息同步)
待续:83-集群原理_哔哩哔哩_bilibili
演示代码gitee:rabbitmq-demo: rabbitmq-demohttps://gitee.com/songyahuiX2/rabbitmq-demo.git
参考资料: rabbitmq_哔哩哔哩_bilibilihttps://www.bilibili.com/video/BV1cb4y1o7zz?p=1&vd_source=8be621c052fd9f705308579363b67881
好文推荐:
RabbitMQ镜像队列_胡晗-的博客-CSDN博客https://blog.csdn.net/phoenix9311/article/details/108998237?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522168982077116800215063044%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fall.%2522%257D&request_id=168982077116800215063044&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~first_rank_ecpm_v1~rank_v31_ecpm-4-108998237-null-null.142^v90^chatsearch,239^v2^insert_chatgpt&utm_term=%E9%95%9C%E5%83%8F%E9%98%9F%E5%88%97%E5%AF%BC%E8%87%B4%E6%B6%88%E8%B4%B9%E9%87%8D%E5%A4%8D&spm=1018.2226.3001.4187
(63条消息) 【RabbitMQ】RabbitMQ如何做到保证消息100%不丢失?_rabbitmq确保消息不丢失_码农BookSea的博客-CSDN博客https://blog.csdn.net/bookssea/article/details/123119980
(63条消息) RabbitMQ消息丢失的场景,如何保证消息不丢失?(详细讲解,一文看懂)_rabbitmq如何保证消息不丢_十八岁讨厌Java的博客-CSDN博客https://blog.csdn.net/w20001118/article/details/126595970?spm=1001.2101.3001.6650.1&utm_medium=distribute.pc_relevant.none-task-blog-2~default~BlogCommendFromBaidu~Rate-1-126595970-blog-108402423.235%5Ev38%5Epc_relevant_anti_vip_base&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2~default~BlogCommendFromBaidu~Rate-1-126595970-blog-108402423.235%5Ev38%5Epc_relevant_anti_vip_base&utm_relevant_index=2
(63条消息) Springboot 2.x ——RabbitTemplate为什么会默认消息持久化?_rabbittemplate 如何这是消息持久化_专注写bug的博客-CSDN博客https://blog.csdn.net/qq_38322527/article/details/122921878
RabbitMQ如何保证消息的可靠传输 - 知乎 (zhihu.com)https://zhuanlan.zhihu.com/p/156147569(63条消息) RabbitMQ(四):RabbitMQ高级特性_张凯锋zkf的博客-CSDN博客https://blog.csdn.net/m0_53142956/article/details/127792054
(63条消息) rabbitmq basicReject / basicNack / basicRecover区别_t0mCl0nes的博客-CSDN博客https://blog.csdn.net/fly_leopard/article/details/102821776
(63条消息) RabbitMQ学习笔记:addReturnListener监听回退消息_rabbitmq监听队列消息_OceanSky6的博客-CSDN博客https://blog.csdn.net/yaomingyang/article/details/102752906