前置条件
docker 安装 mq
docker run \-e RABBITMQ_DEFAULT_USER=dudu \-e RABBITMQ_DEFAULT_PASS=123456 \-v mq-plugins:/plugins \--name mq \--hostname mq \-p 15672:15672 \-p 5672:5672 \--network hmall \-d \rabbitmq:3.8-management
可能会出现:docker: Error response from daemon: network hmall not found.
原因是在容器启动时,所需的网络环境没有正确配置。
检查网络列表
docker network ls
创建所需网络
docker network create hmall
运行容器时指定网络
docker run -d --net=hmall rabbitmq:3.8-management
重新启动容器
docker restart mq
新建初始工程
父工程引入依赖
<!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
publisher 和 consumer 引入 yml 配置
spring:rabbitmq:host: 192.168.64.100 # 你的虚拟机IPport: 5672 # 端口virtual-host: /dudu # 虚拟主机username: dudu # 用户名password: 123456 # 密码
###基本消息模型
新建虚拟主机
新建 base.queue 队列
publisher 测试类发送消息
@SpringBootTest
public class BaseTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendMessage(){//队列名称String queueName = "base.queue";//消息String message = "基本消息模型测试";//发送消息rabbitTemplate.convertAndSend(queueName,message);}
}
consumer 配置监听消息
@Component
public class RabbitMQListener {// 监听基本消息模型 base.queue队列@RabbitListener(queues = "base.queue")public void baseListener(String msg) {System.out.println("base.queue接收到消息:" + msg);}
}
work 消息模型
- 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
- 通过设置 prefetch 来控制消费者预取的消息数量
同基本消息模型一样新建队列:work.queue
publisher 测试类发送消息
@SpringBootTest
public class BaseTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendWorkMessage() {for (int i = 1; i <= 10; i++) {String message = "测试message" + i;rabbitTemplate.convertAndSend("work.queue",message);}}
}
consumer 配置监听消息
@Component
public class RabbitMQListener {// 监听 work 消息模型 work.queue队列@RabbitListener(queues = "work.queue")public void workListener1(String msg) {System.out.println("消费者一接收到work.queue的消息:"+ msg);}@RabbitListener(queues = "work.queue")public void workListener2(String msg) {System.err.println("消费者二接收到work.queue的消息:"+ msg);}
}
测试
默认是消费者平分消息,并没有考虑到消费者的处理能力。可能会存在一个消费者空闲,一个消费者忙,没有充分的利用消费者。
在 spring 中有一个简单的配置,可以解决这个问题。我们修改 consumer 服务的 application.yml 文件,修改配置:
spring:rabbitmq:host: 192.168.64.100 # 你的虚拟机IPport: 5672 # 端口virtual-host: /dudu # 虚拟主机username: dudu # 用户名password: 123456 # 密码listener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
再次测试
Fanout 交换机消息模型(广播)
- 接收 publisher 发送的消息
- 将消息按照规则路由到与之绑定的队列
- 不能缓存消息,路由失败,消息丢失
- FanoutExchange 的会将消息路由到每个绑定的队列
新建交换机和队列
将队列绑定到交换机
publisher 测试类发送消息
@SpringBootTest
public class BaseTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendFanoutMessage() {String exchangeName = "dudu.fanout";String message = "测试Fanout消息模型";rabbitTemplate.convertAndSend(exchangeName,"",message);}
}
consumer 配置监听消息
@Component
public class RabbitMQListener {// 监听 fanout 交换机消息模型(广播) work.queue队列@RabbitListener(queues = "fanout.queue1")public void fanoutListener1(String msg) {System.out.println("消费者一接收到 fanout.queue1 的消息:"+ msg);}@RabbitListener(queues = "fanout.queue2")public void fanoutListener2(String msg) {System.err.println("消费者二接收到 fanout.queue2 的消息:"+ msg);}
测试
Direct 交换机消息模型(发布-订阅)
- Fanout 交换机将消息路由给每一个与之绑定的队列
- Direct 交换机根据 RoutingKey 判断路由给哪个队列
- 如果多个队列具有相同的 RoutingKey,则与 Fanout 功能类似
新建交换机,队列,绑定路由 key
publisher 测试类发送消息
@SpringBootTest
public class BaseTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendDirectMessage() {String exchangeName = "dudu.direct";String message = "测试Direct消息模型";rabbitTemplate.convertAndSend(exchangeName,"red",message+"红色消息");rabbitTemplate.convertAndSend(exchangeName,"blue",message+"蓝色消息");rabbitTemplate.convertAndSend(exchangeName,"yellow",message+"黄色消息");}
}
consumer 配置监听消息
@Component
public class RabbitMQListener {// 监听 direct 交换机消息模型@RabbitListener(queues = "direct.queue1")public void directListener1(String msg) {System.out.println("消费者一接收到 direct.queue1 的消息:"+ msg);}@RabbitListener(queues = "direct.queue2")public void directListener2(String msg) {System.err.println("消费者二接收到 direct.queue2 的消息:"+ msg);
}
测试
Topic 交换机消息模型
Topic
类型的Exchange
与Direct
相比,都是可以根据RoutingKey
把消息路由到不同的队列。
只不过Topic
类型Exchange
可以让队列在绑定BindingKey
的时候使用通配符!- #
:匹配一个或多个词 -*
:匹配不多不少恰好 1 个词
举例:
item.#
:能够匹配item.spu.insert
或者item.spu
item.*
:只能匹配item.spu
新建交换机,队列,绑定路由 key
publisher 测试类发送消息
@SpringBootTest
public class BaseTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendTopicMessage() {String exchangeName = "dudu.topic";String message = "测试Topic消息模型";rabbitTemplate.convertAndSend(exchangeName,"china.message",message+"中国消息");rabbitTemplate.convertAndSend(exchangeName,"blue.news",message+"蓝色新闻");rabbitTemplate.convertAndSend(exchangeName,"yellow.news",message+"新闻");}
}
consumer 配置监听消息
@Component
public class RabbitMQListener {// 监听 topic 交换机消息模型 work.queue队列@RabbitListener(queues = "topic.queue1")public void topictListener1(String msg) {System.out.println("消费者一接收到 topic.queue1 的消息:"+ msg);}@RabbitListener(queues = "topic.queue2")public void topicListener2(String msg) {System.err.println("消费者二接收到 topic.queue2 的消息:"+ msg);}
}
测试
声明队列和交换机
若 mq 没有以方法名的交换机或队列, 则根据方法中 return 的新建交换机和队列
DirectConfig
@Configuration
public class DirectConfig {/*** 声明交换机 若mq没有名为 fanoutExchange 的交换机, 则创建名为 hmall.direct 的交换机* @return Direct类型交换机*/@Beanpublic DirectExchange directExchange(){return ExchangeBuilder.directExchange("hmall.direct").build();}/*** 第1个队列*/@Beanpublic Queue directQueue1(){return new Queue("direct.queue1");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("red");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");}/*** 第2个队列*/@Beanpublic Queue directQueue2(){return new Queue("direct.queue2");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("red");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");}
FanoutConfig
@Configuration
public class FanoutConfig {/*** 声明交换机 若mq没有名为 fanoutExchange 的交换机,则创建名为 dddddddddddddddddddddddddddddddddddddddddddddddddddddd.fanout 的交换机* @return Fanout类型交换机*/@Beanpublic FanoutExchange fanoutExchange(){//ExchangeBuilder.fanoutExchange("").build();return new FanoutExchange("dddddddddddddddddddddddddddddddddddddddddddddddddddddd.fanout");}/*** 第1个队列*/@Beanpublic Queue fanoutQueue1(){//QueueBuilder.durable("").build();return new Queue("fanoutdddddddddddddddddddddddddddddddddddddddddddddddddddddd.queue1");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}/*** 第2个队列*/@Beanpublic Queue fanoutQueue2(){return new Queue("fanoutdddddddddddddddddddddddddddddddddddddddddddddddddddddd.queue2");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
声明队列和交换机(注解)
AnnotationDirect
@Configuration
public class AnnotationDirect {@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "dudu.direct", type = ExchangeTypes.DIRECT),key = {"red", "blue"}))public void listenDirectQueue1(String msg){System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "dudu.direct", type = ExchangeTypes.DIRECT),key = {"red", "yellow"}))public void listenDirectQueue2(String msg){System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");}
}
AnnotationTopic
@Configuration
public class AnnotationTopic {@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "dudu.topic", type = ExchangeTypes.TOPIC),key = "china.#"))public void listenTopicQueue1(String msg){System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "dudu.topic", type = ExchangeTypes.TOPIC),key = "#.news"))public void listenTopicQueue2(String msg){System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");}
}
消息转换器
默认情况下 Spring 采用的序列化方式是 JDK 序列化,JDK 序列化存在下列问题:数据体积过大、有安全漏洞、可读性差
publisher 测试类 发一个 map 消息
@SpringBootTest
public class BaseTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendMessageMap() {Map<String, Object> map = new HashMap<>();map.put("name", "张三");map.put("age", 18);rabbitTemplate.convertAndSend("object.queue",map);}
}
队列就手动在 mq 创建一个 object.queue
这时候消息监听服务开着的话就会报错
在 mq 上查看发送的消息
配置 JSON 转换器
在publisher
和consumer
两个服务中都引入依赖:
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId>
</dependency>
在publisher
和consumer
两个服务的启动类中添加一个 Bean 即可或者写一个配置类把 bean 注入
@Configuration
public class MessageConverterConfig {@Beanpublic MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;}
}
consumer 监听 object.queue
@Component
public class RabbitMQListener {// 监听 object.queue 队列@RabbitListener(queues = "object.queue")public void listenSimpleQueueMessage(Map<String, Object> msg) throws InterruptedException {System.out.println("消费者接收到object.queue消息:【" + msg + "】");}
}
测试
发送者的可靠性
修改 publisher 配置问价
spring:rabbitmq:host: 192.168.64.100 # 你的虚拟机IPport: 5672 # 端口virtual-host: /dudu # 虚拟主机username: dudu # 用户名password: 123456 # 密码# 生产者重试机制connection-timeout: 1s #设置MQ的连接超时时间template:retry:enabled: true # 开启超时重试机制 SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的。# 如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。initial-interval: 1000ms # 失败后的初始等待时间multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multipliermax-attempts: 3 # 最大重试次数# 生产者确认机制# publisher-confirm-type`有三种模式可选:-# `none`:关闭confirm机制,simple`:同步阻塞等待MQ的回执,correlated`:MQ异步回调返回回执#一般我们推荐使用`correlated`,回调机制。publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型publisher-returns: true # 开启publisher return机制
定义 ReturnCallback
每个RabbitTemplate
只能配置一个ReturnCallback
,因此我们可以在配置类中统一设置。我们在 publisher 模块定义一个配置类:MqConfig
@AllArgsConstructor
@Configuration
public class MqConfig {private final RabbitTemplate rabbitTemplate;@PostConstructpublic void init(){rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(@Nonnull ReturnedMessage returnedMessage) {System.out.println("收到ReturnsCallback===========================");System.out.println("消息未进入队列"+returnedMessage.getMessage());System.out.println("交换机:"+returnedMessage.getExchange());System.out.println("路由键:"+returnedMessage.getRoutingKey());System.out.println("replyCode:"+returnedMessage.getReplyCode());System.out.println("replyText:"+returnedMessage.getReplyText());}});rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {System.out.println("收到ConfirmCallback===========================");System.out.println("是否到交换机:"+correlationData);System.out.println("ack:"+ack);System.out.println("原因:"+cause);if (!ack){System.out.println("消息发送失败"+cause);}});}
}
也可以这样写
@Configuration
@AllArgsConstructor
public class MqConfig {@Beanpublic RabbitTemplate rabbitTemplate(CachingConnectionFactory factory) {// 开启消息进入Broker确认factory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);// 开启消息未进入队列确认factory.setPublisherReturns(true);RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);// 进入Broker时触发回调rabbitTemplate.setConfirmCallback((correlationData, b, s) -> {System.out.println("是否到交换机:"+correlationData);System.out.println("ack:"+b);System.out.println("原因:"+s);if (b) {System.out.println("消息进入Broker成功");} else {System.out.println("消息进入Broker失败");}});// Mandatory:为true时,消息通过交换器无法匹配到队列会返回给生产者 并触发MessageReturn,为false时,匹配不到会直接被丢弃rabbitTemplate.setMandatory(true);// 消息未进入队列时触发回调rabbitTemplate.setReturnsCallback(returnedMessage -> {System.out.println("消息未进入队列"+returnedMessage.getMessage());System.out.println("交换机:"+returnedMessage.getExchange());System.out.println("路由键:"+returnedMessage.getRoutingKey());System.out.println("replyCode:"+returnedMessage.getReplyCode());System.out.println("replyText:"+returnedMessage.getReplyText());});return rabbitTemplate;}
}
新建测试、并且添加 ConfirmCallback
:
@Test
void testPublisherConfirm() {// 1.创建CorrelationDataCorrelationData cd = new CorrelationData();// 2.给Future添加ConfirmCallbackcd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onFailure(Throwable ex) {// 2.1.Future发生异常时的处理逻辑,基本不会触发log.error("send message fail", ex);}@Overridepublic void onSuccess(CorrelationData.Confirm result) {// 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容if(result.isAck()){ // result.isAck(),boolean类型,true代表ack回执,false 代表 nack回执log.debug("发送消息成功,收到 ack!");}else{ // result.getReason(),String类型,返回nack时的异常描述log.error("发送消息失败,收到 nack, reason : {}", result.getReason());}}});// 3.发送消息rabbitTemplate.convertAndSend("hmall.direct", "q", "hello", cd);
}
新的版本好像没有这个 addCallback()方法了下面这个倒是可能也可以吧
@Testvoid testPublisherConfirm() throws InterruptedException {// 1.创建CorrelationDataCorrelationData cd = new CorrelationData();cd.getFuture().whenComplete((confirm, throwable) -> {System.out.println("confirm: " + confirm + " throwable: " + throwable);if (confirm.isAck()) {System.out.println("消息发送成功,收到ack"+confirm.getReason());}else {System.out.println("消息发送失败,收到nack"+confirm.getReason());}});rabbitTemplate.convertAndSend("hmall.11direct", "blu1e", "hello",cd);Thread.sleep(2000);}
测试
总结
开启生产者确认比较消耗 MQ 性能,一般不建议开启。而且大家思考一下触发确认的几种情况:
- 路由失败:一般是因为 RoutingKey 错误导致,往往是编程导致
- 交换机名称错误:同样是编程错误导致
- MQ 内部故障:这种需要处理,但概率往往较低。因此只有对消息可靠性要求非常高的业务才需要开启,而且仅仅需要开启 ConfirmCallback 处理 nack 就可以了。
MQ 的可靠性
说明:在开启持久化机制以后,如果同时还开启了生产者确认,那么 MQ 会在消息持久化以后才发送 ACK 回执,进一步确保消息的可靠性。
不过出于性能考虑,为了减少 IO 次数,发送到 MQ 的消息并不是逐条持久化到数据库的,而是每隔一段时间批量持久化。一般间隔在 100 毫秒左右,这就会导致 ACK 有一定的延迟,因此建议生产者确认全部采用异步方式。
交换机持久化
队列持久化
消息持久化
在控制台发消息时可以指定参数
代码实现
@Testpublic void testSendMessage(){//队列名称String queueName = "base.queue";//消息String message = "基本消息模型测试";//发送消息//设置消息持久化rabbitTemplate.setMandatory(true);rabbitTemplate.convertAndSend(queueName,message);}
LazyQueue
在 3.12 版本之后,LazyQueue 已经成为所有队列的默认格式。因此官方推荐升级 MQ 为 3.12 版本或者所有队列都设置为 LazyQueue 模式。
在添加队列的时候,添加x-queue-mod=lazy
参数即可设置队列为 Lazy 模式:
代码
@Bean
public Queue lazyQueue(){return QueueBuilder.durable("lazy.queue").lazy() // 开启Lazy模式.build();
}
注解方式
@RabbitListener(queuesToDeclare = @Queue(name = "lazy.queue",durable = "true",arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg){log.info("接收到 lazy.queue的消息:{}", msg);
}
更新已有队列为 lazy 模式
命令
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues
命令解读:
rabbitmqctl
:RabbitMQ 的命令行工具set_policy
:添加一个策略Lazy
:策略名称,可以自定义"^lazy-queue$"
:用正则表达式匹配队列的名字'{"queue-mode":"lazy"}'
:设置队列模式为 lazy 模式--apply-to queues
:策略的作用对象,是所有的队列
消费者的可靠性
消费者确认机制
当消费者处理消息结束后,应该向 RabbitMQ 发送一个回执,告知 RabbitMQ 自己消息处理状态。回执有三种可选值:
- ack:成功处理消息,RabbitMQ 从队列中删除该消息
- nack:消息处理失败,RabbitMQ 需要再次投递消息
- reject:消息处理失败并拒绝该消息,RabbitMQ 从队列中删除该消息
一般 reject 方式用的较少,除非是消息格式有问题,那就是开发问题了。因此大多数情况下我们需要将消息处理的代码通过try catch
机制捕获,消息处理成功时返回 ack,处理失败时返回 nack.
由于消息回执的处理代码比较统一,因此 SpringAMQP 帮我们实现了消息确认。并允许我们通过配置文件设置 ACK 处理方式,有三种模式:
-
none
:不处理。即消息投递给消费者后立刻 ack,消息会立刻从 MQ 删除。非常不安全,不建议使用 -
manual
:手动模式。需要自己在业务代码中调用 api,发送ack
或reject
,存在业务入侵,但更灵活 -
auto
:自动模式。SpringAMQP 利用 AOP 对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack
. 当业务出现异常时,根据异常判断返回不同结果:- 如果是业务异常,会自动返回
nack
; - 如果是消息处理或校验异常,自动返回
reject
;
- 如果是业务异常,会自动返回
修改 consumer 的 yml 文件
spring:rabbitmq:host: 192.168.64.100 # 你的虚拟机IPport: 5672 # 端口virtual-host: /dudu # 虚拟主机username: dudu # 用户名password: 123456 # 密码listener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息# 确认模式# none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用# manual:手动确认。即消费者处理完消息后,需要手动ack。# auto:自动确认。即消费者处理完消息后,自动ack,消息会从MQ删除。如果是业务异常,会自动返回`nack` 消息处理或校验异常,自动返回`reject`消息不会从MQ删除acknowledge-mode: auto
失败重试机制
修改 consumer 的配置
spring:rabbitmq:host: 192.168.64.100 # 你的虚拟机IPport: 5672 # 端口virtual-host: /dudu # 虚拟主机username: dudu # 用户名password: 123456 # 密码listener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息# 确认模式# none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用# manual:手动确认。即消费者处理完消息后,需要手动ack,消息不会从MQ删除。# auto:自动确认。即消费者处理完消息后,自动ack,消息会从MQ删除。如果是业务异常,会自动返回`nack` 消息处理或校验异常,自动返回`reject`acknowledge-mode: auto# 失败重试机制retry:enabled: true # 开启消费者失败重试initial-interval: 1000ms # 初识的失败等待时长为1秒multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
- 消费者在失败后消息没有重新回到 MQ 无限重新投递,而是在本地重试了 3 次
- 本地重试 3 次以后,抛出了
AmqpRejectAndDontRequeueException
异常。查看 RabbitMQ 控制台,发现消息被删除了,说明最后 SpringAMQP 返回的是reject
结论:
- 开启本地重试时,消息处理过程中抛出异常,不会 requeue 到队列,而是在消费者本地重试
- 重试达到最大次数后,Spring 会返回 reject,消息会被丢弃
失败处理策略
本地测试达到最大重试次数后,消息会被丢弃。这在某些对于消息可靠性要求较高的业务场景下,显然不太合适了。
因此 Spring 允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery
接口来定义的,它有 3 个不同实现:
RejectAndDontRequeueRecoverer
:重试耗尽后,直接reject
,丢弃消息。默认就是这种方式ImmediateRequeueMessageRecoverer
:重试耗尽后,返回nack
,消息重新入队RepublishMessageRecoverer
:重试耗尽后,将失败消息投递到指定的交换机
比较优雅的一种处理方案是RepublishMessageRecoverer
,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。
ErrorMessageConfig 配置类
@Configuration
// 开启重试机制 这个配置类才会生效
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfig {@Beanpublic DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");}@Beanpublic Queue errorQueue(){return new Queue("error.queue", true);}@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");}@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}
延迟消息
死信交换机
当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):
- 消费者使用
basic.reject
或basic.nack
声明消费失败,并且消息的requeue
参数设置为 false - 消息是一个过期消息,超时无人消费
- 要投递的队列消息满了,无法投递
如果一个队列中的消息已经成为死信,并且这个队列通过**dead-letter-exchange**
属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机(Dead Letter Exchange)。而此时加入有队列与死信交换机绑定,则最终死信就会被投递到这个队列中。
死信交换机有什么作用呢?
- 收集那些因处理失败而被拒绝的消息
- 收集那些因队列满了而被拒绝的消息
- 收集因 TTL(有效期)到期的消息
注意:
RabbitMQ 的消息过期是基于追溯方式来实现的,也就是说当一个消息的 TTL 到期以后不一定会被移除或投递到死信交换机,而是在消息恰好处于队首时才会被处理。
当队列中消息堆积很多的时候,过期消息可能不会被按时处理,因此你设置的 TTL 时间不一定准确。
DelayExchange 插件
安装
基于 Docker 安装,所以需要先查看 RabbitMQ 的插件目录对应的数据卷
docker volume inspect mq-plugins
结果如下
[{"CreatedAt": "2024-06-19T09:22:59+08:00","Driver": "local","Labels": null,"Mountpoint": "/var/lib/docker/volumes/mq-plugins/_data","Name": "mq-plugins","Options": null,"Scope": "local"}
]
插件目录被挂载到了/var/lib/docker/volumes/mq-plugins/_data
这个目录,我们上传插件到该目录下。
接下来执行命令,安装插件:
docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange
声明延迟交换机
基于注解方式:
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "delay.queue", durable = "true"),exchange = @Exchange(name = "delay.direct", delayed = "true"),key = "delay"
))
public void listenDelayMessage(String msg){log.info("接收到delay.queue的延迟消息:{}", msg);
}
基于@Bean
的方式:
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class DelayExchangeConfig {@Beanpublic DirectExchange delayExchange(){return ExchangeBuilder.directExchange("delay.direct") // 指定交换机类型和名称.delayed() // 设置delay的属性为true.durable(true) // 持久化.build();}@Beanpublic Queue delayedQueue(){return new Queue("delay.queue");}@Beanpublic Binding delayQueueBinding(){return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");}
}
发送延迟消息
@SpringBootTest
public class BaseTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testvoid testPublisherDelayMessage() {// 1.创建消息String message = "hello, delayed message";// 2.发送消息,利用消息后置处理器添加消息头rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 添加延迟消息属性 这里的.setHeader("x-delay", 10000)替代了setDelay(10000)message.getMessageProperties().setHeader("x-delay", 10000);return message;}});System.out.println("消息发送成功"+ LocalDateTime.now());}
}
消息发送十秒后,消费者接收到消息
注意: 延迟消息插件内部会维护一个本地数据库表,同时使用 Elang Timers 功能实现计时。如果消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的 CPU 开销,同时延迟消息的时间会存在误差。
因此,不建议设置延迟时间过长的延迟消息。
假如订单超时支付时间为 30 分钟,理论上说我们应该在下单时发送一条延迟消息,延迟时间为 30 分钟。这样就可以在接收到消息时检验订单支付状态,关闭未支付订单。
但是大多数情况下用户支付都会在 1 分钟内完成,我们发送的消息却要在 MQ 中停留 30 分钟,额外消耗了 MQ 的资源。因此,我们最好多检测几次订单支付状态,而不是在最后第 30 分钟才检测。
例如:我们在用户下单后的第 10 秒、20 秒、30 秒、45 秒、60 秒、1 分 30 秒、2 分、…30 分分别设置延迟消息,如果提前发现订单已经支付,则后续的检测取消即可。
这样就可以有效避免对 MQ 资源的浪费了。