🔥博客主页: 【小扳_-CSDN博客】
❤感谢大家点赞👍收藏⭐评论✍
文章目录
1.0 RabbitMQ 的可靠性
2.0 发送者的可靠性
2.1 生产者重试机制
2.2 生产者确认机制
2.2.1 开启生产者确认机制
2.2.2 定义 ReturnCallback 机制
2.2.3 定义 ConfirmCallback 机制
3.0 MQ 可靠性
3.1 交换机持久化
3.2 队列持久化
3.3 消息持久化
3.4 LazyQueue 懒惰队列
3.4.1 声明懒惰队列的方式
4.0 消费者的可靠性
4.1 消费者确认机制
4.2 失败重试机制
4.3 失败处理策略
5.0 使用 DelayExchange 插件
5.1 安装 DelayExchange 插件
5.2 声明延迟交换机
5.3 发送延迟消息
1.0 RabbitMQ 的可靠性
消息从生产者到消费者的每一步都可能导致消息丢失:
1)发送消息时丢失:
- 生产者发送消息时连接 MQ 失败。
- 生产者发送消息到达 MQ 后未找到 Exchange 。
- 生产者发送消息到达 MQ 的 Exchange 后,未找到合适的 Queue 。
- 消息到达 MQ 后,处理消息的进程发生异常。
2)MQ 导致消息丢失:
- 消息到达 MQ,保存到队列后,尚未消费就突然宕机了。
3)消费者处理消息时:
- 消息接收后尚未处理突然宕机。
- 消息接收后处理过程中抛出异常。
综上,要解决消息丢失问题,保证 MQ 可靠性,就必须从三个方面入手:
- 确保生产者一定把消息发送到 MQ 。
- 确保 MQ 不会将消息弄丢。
- 确保消费者一定要处理消息。
2.0 发送者的可靠性
通过生产者重试机制、确认机制来确保发送者的可靠性。
2.1 生产者重试机制
首先第一种情况,就是生产者发送消息时,出现了网络故障,导致与MQ的连接中断。为了解决这个问题,SpringAMQP 提供的消息发送时的重试机制。即:当 RabbitTemplate 与 MQ 连接超时后,多次重试。
修改 application.yml 文件,添加下面的内容:
spring:rabbitmq:connection-timeout: 1s # 设置MQ的连接超时时间template:retry:enabled: true # 开启超时重试机制initial-interval: 1000ms # 失败后的初始等待时间multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multipliermax-attempts: 3 # 最大重试次数
利用命令停掉 RabbitMQ 服务:
docker stop mq
然后测试发送一条消息,会发现会每隔 1 秒重试 1 次,总共重试了 3 次。消息发送的超时重试机制配置成功了。
注意,当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过 SpringAMQP 提供的重试机制时阻塞式的重试,也就是说多次重试等待的过程中,当前线程式被阻塞的。
如果对业务性能有要求,建议禁用重试机制,如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。
2.2 生产者确认机制
一般情况下,只要生产者与 MQ 之间的网路连接顺畅,基本不会出现发送消息丢失的情况,因此大多数情况下我们无需考虑这种问题。
不过,在少数情况下,也会出现消息发送到 MQ 之后丢失的现象,比如:
- MQ 内部处理消息的进程发生了异常。
- 生产者发送消息到达 MQ 后未找到 Exchange 。
- 生产者发送消息到达 MQ 的 Exchange 后,未找到合适的 Queue,因此无法路由。
当消息投递到 MQ,但是路由失败时,通过 Return 返回异常信息,同时返回 ack 确认信息,代表投递成功。
1)临时消息投递到了 MQ,并且入队成功,返回 ACK,告知投递成功。
2)持久消息投递到了 MQ,并且入队完成持久化,返回 ACK,告知投递成功。
3)其他情况都会返回 NACK,告知投递失败。
其中 ACK 和 NACK 属于 Publisher Confirm 机制,ACK 是投递成功;NACK 是投递失败。而 return 则属于Publisher Return机制。
2.2.1 开启生产者确认机制
默认两种机制都是关闭状态,需要通过配置文件来开启。
在 application.yml 中添加配置:
spring:rabbitmq:publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型publisher-returns: true # 开启publisher return机制
这里 publisher-confirm-type 有三种模式可选:
1)none:关闭 confirm 机制
2)simple:同步阻塞等待 MQ 的回执
3)correlated:MQ 异步回调返回回执
一般我们推荐使用 correlated,回调机制。
2.2.2 定义 ReturnCallback 机制
每个 RabbitTemplate 只能配置一个 ReturnCallback,因此可以在配置类中统一设置。
代码如下:
package com.itheima.publisher.config;import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.ReturnedMessage; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.Configuration;import javax.annotation.PostConstruct;@Slf4j @AllArgsConstructor @Configuration public class MqConfig {private final RabbitTemplate rabbitTemplate;@PostConstructpublic void init(){rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {log.error("触发return callback,");log.debug("exchange: {}", returned.getExchange());log.debug("routingKey: {}", returned.getRoutingKey());log.debug("message: {}", returned.getMessage());log.debug("replyCode: {}", returned.getReplyCode());log.debug("replyText: {}", returned.getReplyText());}});} }
测试:
当发送消息的时候,设置一个不存在的路由:
在交换机中,不存在 "xbss" 路由关键字,则会执行 returnedMessage 方法。
// 首先引入依赖@Autowiredprivate RabbitTemplate rabbitTemplate;@Testvoid contextLoads() {//发送Object类型的消息textObject();}@Testpublic void textObject(){User user = new User(1,"xbs","123456",null,null);rabbitTemplate.convertAndSend("textExchange", "xbss", user, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setMessageId("123456");return message;}});}
执行结果:
2.2.3 定义 ConfirmCallback 机制
由于每个消息发送时的处理逻辑不一定相同,因此 ConfirmCallback 需要在每次发送消息时定义。具体来说,是在调用 RabbitTemplate 中的 convertAndSend 方法时,多传递一个参数:
代码如下:
@Testvoid testPublisherConfirm() {// 1.创建CorrelationDataCorrelationData cd = new CorrelationData();// 2.给Future添加ConfirmCallbackcd.getFuture().thenAccept(result -> {if (result.isAck()) { // result.isAck(),boolean类型,true代表ack回执,false 代表 nack回执log.info("发送消息成功,收到 ack!");} else { // result.getReason(),String类型,返回nack时的异常描述log.info("发送消息失败,收到 nack, reason : {}", result.getReason());}}).exceptionally(ex -> {// 2.1.Future发生异常时的处理逻辑,基本不会触发log.error("send message fail", ex);return null;});// 3.发送消息rabbitTemplate.convertAndSend("xbs.direct", "red", "hello", cd);}
执行结果:
执行成功之后,日志输出:
接收消息:
注意:
开启生产者确认比较消耗 MQ 性能,一般不建议开启。而且大家思考一下触发确认的几种情况:
-
路由失败:一般是因为 RoutingKey 错误导致,往往是编程导致
-
交换机名称错误:同样是编程错误导致
-
MQ 内部故障:这种需要处理,但概率往往较低。因此只有对消息可靠性要求非常高的业务才需要开启,而且仅仅需要开启 ConfirmCallback 处理 nack 就可以了。
3.0 MQ 可靠性
消息到达MQ以后,如果 MQ 不能及时保存,也会导致消息丢失,所以 MQ 的可靠性也非常重要。
为了提升性能,默认情况下 MQ 的数据都是在内存存储的临时数据,重启后就会消失。为了保证数据的可靠性,必须配置数据持久化,包括:
1)交换机持久化
2)队列持久化
3)消息持久化
3.1 交换机持久化
在控制台的 Exchange 页面,添加交换机时可以配置交换机的 Durability 参数:
设置为 Durable 就是持久化模式,Transient 就是临时模式。
3.2 队列持久化
在控制台的 Queues 页面,添加队列时,同样可以配置队列的 Durability 参数:
设置为 Durable 就是持久化模式,Transient 就是临时模式。
3.3 消息持久化
在控制台发送消息的时候,可以添加很多参数,而消息的持久化是要配置一个Delivery mode:设置为 Persistent 消息持久化。
说明:
在开启持久化机制以后,如果同时还开启了生产者确认,那么 MQ 会在消息持久化以后才发送 ACK 回执,进一步确保消息的可靠性。
不过出于性能考虑,为了减少 IO 次数,发送到 MQ 的消息并不是逐条持久化到数据库的,而是每隔一段时间批量持久化。一般间隔在 100 毫秒左右,这就会导致 ACK 有一定的延迟,因此建议生产者确认全部采用异步方式。
3.4 LazyQueue 懒惰队列
1)在默认情况下,RabbitMQ 会将接收到的信息保存在内存中以降低消息收发的延迟。但在某些特殊情况下,这会导致消息积压,比如:
-
消费者宕机或出现网络故障
-
消息发送量激增,超过了消费者处理速度
-
消费者处理业务发生阻塞
2)一旦出现消息堆积问题,RabbitMQ 的内存占用就会越来越高,直到触发内存预警上限。此时 RabbitMQ 会将内存消息刷到磁盘上,这个行为成为 PageOut 。PageOut 会耗费一段时间,并且会阻塞队列进程。因此在这个过程中 RabbitMQ 不会再处理新的消息,生产者的所有请求都会被阻塞。
为了解决这个问题,从 RabbitMQ 的 3.6.0 版本开始,就增加了 Lazy Queues 的模式,也就是惰性队列。惰性队列的特征如下:
-
接收到消息后直接存入磁盘而非内存
-
消费者要消费消息时才会从磁盘中读取并加载到内存(也就是懒加载)
-
支持数百万条的消息存储
而在 3.12 版本之后,LazyQueue 已经成为所有队列的默认格式。因此官方推荐升级 MQ 为 3.12 版本或者所有队列都设置为 LazyQueue 模式。
3.4.1 声明懒惰队列的方式
1)在添加队列的时候,添加 x-queue-mode 参数即可设置队列为 Lazy 模式:
2)在利用 SpringAMQP 声明队列的时候,添加 x-queue-mode 参数也可设置队列为 Lazy 模式:
@Beanpublic Queue queue3(){return QueueBuilder.durable("queue7") //队列名称.lazy() //开启懒惰模式.build();}
3)基于注解来声明队列并设置为 Lazy 模式:
@RabbitListener(queuesToDeclare = @Queue(name = "lazy.queue",durable = "true",arguments = @Argument(name = "x-queue-mode", value = "lazy") )) public void listenLazyQueue(String msg){log.info("接收到 lazy.queue的消息:{}", msg); }
4.0 消费者的可靠性
当 RabbitMQ 向消费者投递消息以后,需要知道消费者的处理状态如何。因为消息投递给消费者并不代表就一定被正确消费了,可能出现的故障有很多。
一旦发生上述情况,消息也会丢失。因此,RabbitMQ 必须知道消费者的处理状态,一旦消息处理失败才能重新投递消息。
4.1 消费者确认机制
1)为了确认消费者是否成功处理消息,RabbitMQ 提供了消费者确认机制(Consumer Acknowledgement)。即:当消费者处理消息结束后,应该向 RabbitMQ 发送一个回执,告知 RabbitMQ 自己消息处理状态。回执有三种可选值:
-
ack:成功处理消息,RabbitMQ 从队列中删除该消息。
-
nack:消息处理失败,RabbitMQ 需要再次投递消息。
-
reject:消息处理失败并拒绝该消息,RabbitMQ 从队列中删除该消息。
2)一般 reject 方式用的较少,除非是消息格式有问题,那就是开发问题了。因此大多数情况下我们需要将消息处理的代码通过 try catch 机制捕获,消息处理成功时返回 ack,处理失败时返回 nack 。
由于消息回执的处理代码比较统一,因此 SpringAMQP 帮我们实现了消息确认。并允许我们通过配置文件设置 ACK 处理方式,有三种模式:
-
none
:不处理。即消息投递给消费者后立刻 ack,消息会立刻从 MQ 删除。非常不安全,不建议使用 -
manual
:手动模式。需要自己在业务代码中调用 api,发送 ack 或 reject,存在业务入侵,但更灵活 -
auto
:自动模式。SpringAMQP 利用 AOP 对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回 ack. 当业务出现异常时,根据异常判断返回不同结果:-
如果是业务异常,会自动返回 nack;
-
如果是消息处理或校验异常,自动返回 reject;
-
因此,推荐使用 aotu 自动模式。
application 配置如下:
spring:rabbitmq:listener:simple:acknowledge-mode: auto # 自动ack
在开启消费者确认机制时,手动抛出异常模拟接收信息失败时:
1)发送消息:
// 首先引入依赖@Autowiredprivate RabbitTemplate rabbitTemplate;@Testvoid contextLoads() {//发送Object类型的消息testSendMessage();}@Testpublic void testSendMessage(){//发送消息rabbitTemplate.convertAndSend("yt","xbs","hello rabbitmq");}
2)接收消息:
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "wyt",durable = "true",arguments = @Argument(name = "x-queue-mode", value = "lazy")),exchange = @Exchange(name = "yt",type = ExchangeTypes.DIRECT),key = "xbs"))public void receiveMessage(String massage) throws InterruptedException {log.info("sendMessage发送的消息为: " + massage);//模拟接送消息失败throw new RuntimeException("模拟接收消息失败");}
执行结果:
由于抛出了异常,则返回给 MQ 为 nack,就会重复发送消息给客户端:
在队列中,由于消息没有被正确处理,则消息会一直在队列中且不断发送消息给客户端:
4.2 失败重试机制
当消费者出现异常后,消息会不断 requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次 requeue 到队列,再次投递,直到消息处理成功为止。
极端情况就是消费者一直无法执行成功,那么消息 requeue 就会无限循环,导致mq的消息处理飙升,带来不必要的压力:
应对上述情况 Spring 又提供了消费者失败重试机制:在消费者出现异常时利用本地重试,而不是无限制的 requeue 到 mq 队列。
修改 application.yml 文件,添加内容:
spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 1000ms # 初识的失败等待时长为1秒multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
再来尝试手动抛出异常模拟接收消息失败:
结果如下:
MQ 向接收者发送了三次信息,接着进程结束。
此时队列中的信息不存在了,丢失了:
结论:
-
开启本地重试时,消息处理过程中抛出异常,不会 requeue 到队列,而是在消费者本地重试
-
重试达到最大次数后,Spring 会返回 reject,消息会被丢弃。
4.3 失败处理策略
在之前的测试中,本地测试达到最大重试次数后,消息会被丢弃。这在某些对于消息可靠性要求较高的业务场景下,显然不太合适了。
因此 Spring 允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由 MessageRecovery 接口来定义的,它有3个不同实现:
-
RejectAndDontRequeueRecoverer
:重试耗尽后,直接reject
,丢弃消息。默认就是这种方式 -
ImmediateRequeueMessageRecoverer
:重试耗尽后,返回nack
,消息重新入队 -
RepublishMessageRecoverer
:重试耗尽后,将失败消息投递到指定的交换机
比较优雅的一种处理方案是 RepublishMessageRecoverer
,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。
代码实现:
1)定义处理失败消息的交换机和队列:
@Bean public DirectExchange errorMessageExchange(){return new DirectExchange("error.direct"); } @Bean public Queue errorQueue(){return new Queue("error.queue", true); } @Bean public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error"); }
2)定义一个 RepublishMessageRecoverer,关联队列和交换机
@Bean public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error"); }
完整代码:
package com.itheima.consumer.config;import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.retry.MessageRecoverer; import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer; import org.springframework.context.annotation.Bean;@Configuration @ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true") public class ErrorMessageConfig {@Beanpublic DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");}@Beanpublic Queue errorQueue(){return new Queue("error.queue", true);}@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");}@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");} }
当重试失败之后,不会直接将消息丢失而是交给专门接收失败信息的交换机,再由专门的消费者进行消费,比如说监听到队列中有接收失败的信息,将其写入日志中等等处理的方法。
测试:
1)发送消息:
// 首先引入依赖@Autowiredprivate RabbitTemplate rabbitTemplate;@Testvoid contextLoads() {//发送Object类型的消息testSendMessage();}@Testpublic void testSendMessage(){//发送消息rabbitTemplate.convertAndSend("yt","xbs","hello rabbitmq");}
2)接收消息:
接收消息之后,手动抛出异常模拟接收消息失败。
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "wyt",durable = "true",arguments = @Argument(name = "x-queue-mode", value = "lazy")),exchange = @Exchange(name = "yt",type = ExchangeTypes.DIRECT),key = "xbs"))public void receiveMessage(String massage) throws InterruptedException {log.info("sendMessage发送的消息为: " + massage);//模拟接送消息失败throw new RuntimeException("模拟接收消息失败");}
执行结果:
在本地重试三次发送失败之后,则将该消息交给 "errorMessageExchange" 交换机,再路由给 "errorMessageQueue" 队列,后续可以监听该队列进行接收消息再做处理。
5.0 使用 DelayExchange 插件
RabbitMQ 官方推出了一个插件,原生支持延迟消息功能。该插件的原理是设计了一种支持延迟消息功能的交换机,当消息投递到交换机后可以暂存一定时间,到期后再投递到队列中。
5.1 安装 DelayExchange 插件
基于 Docker 安装,所以需要先查看 RabbitMQ 的插件目录对应的数据卷。
docker volume inspect mq-plugins
结果如下:
[{"CreatedAt": "2024-06-19T09:22:59+08:00","Driver": "local","Labels": null,"Mountpoint": "/var/lib/docker/volumes/mq-plugins/_data","Name": "mq-plugins","Options": null,"Scope": "local"} ]
插件目录被挂载到了 docker volume inspect mq-plugins 这个目录,我们上传插件到该目录下。
DelayExchange 插件下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
接着文件放到 docker volume inspect mq-plugins 这个目录中。
接下来执行命令,安装插件:
docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange
安装结果:
5.2 声明延迟交换机
1)基于注解方式:
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "delay.queue", durable = "true"),exchange = @Exchange(name = "delay.direct", delayed = "true"),key = "delay" )) public void listenDelayMessage(String msg){log.info("接收到delay.queue的延迟消息:{}", msg); }
2)基于 @Bean 的方式:
package com.itheima.consumer.config;import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;@Slf4j @Configuration public class DelayExchangeConfig {@Beanpublic DirectExchange delayExchange(){return ExchangeBuilder.directExchange("delay.direct") // 指定交换机类型和名称.delayed() // 设置delay的属性为true.durable(true) // 持久化.build();}@Beanpublic Queue delayedQueue(){return new Queue("delay.queue");}@Beanpublic Binding delayQueueBinding(){return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");} }
5.3 发送延迟消息
发送消息时,必须通过 x-delay 属性设定延迟时间:
@Test void testPublisherDelayMessage() {// 1.创建消息String message = "hello, delayed message";// 2.发送消息,利用消息后置处理器添加消息头rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 添加延迟消息属性message.getMessageProperties().setDelay(5000);return message;}}); }
测试:
1)接收消息:
@AutowiredMessageConverter messageConverter;@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "delay.queue",durable = "true",arguments = @Argument(name = "x-queue-mode", value = "lazy")),exchange = @Exchange(name = "delay.exchange",type = ExchangeTypes.DIRECT,delayed = "true"),key = "xbs"))public void receiveMessage(Message massage) throws InterruptedException {log.info("sendMessage发送的消息ID为: " + massage.getMessageProperties().getMessageId());log.info("sendMessage发送的消息延迟时间为: " + massage.getMessageProperties().getDelayLong());String str = (String) messageConverter.fromMessage(massage);log.info("sendMessage发送的消息内容为: " + str);}
2)发送消息:
// 首先引入依赖@Autowiredprivate RabbitTemplate rabbitTemplate;@Testvoid contextLoads() {//发送Object类型的消息testSendDelayMessage();}@Testvoid testSendDelayMessage(){//发送消息rabbitTemplate.convertAndSend("delay.exchange", "xbs", "hello rabbitmq delay message", new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {//先设置唯一IDmessage.getMessageProperties().setMessageId("123456");//设置延迟发送时间message.getMessageProperties().setDelayLong(5000L);return message;}});}
执行结果:
注意:
延迟消息插件内部会维护一个本地数据库表,同时使用 Elang Timers 功能实现计时。如果消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的 CPU 开销,同时延迟消息的时间会存在误差。
因此,不建议设置延迟时间过长的延迟消息。