【RabbitMQ】死信队列、延迟队列

死信队列

死信,简单理解就是因为种种原因,无法被消费的消息。

有死信,自然就有死信队列。当一个消息在一个队列中变成死信消息之后,就会被重新发送到另一个交换器中,这个交换器就是DLX(Dead Letter Exchange),绑定该交换器的队列,就被称为死信队列DLQ(Dead Letter Queue)。

消息变成死信消息一般是由于以下几条:

  • 队列达到最大长度
  • 消息过期
  • 消息被拒绝,即消息确认机中手动确认的两种拒绝情况,并且不允许重新入队

队列达到最大长度

spring:rabbitmq:host: 43.138.108.125port: 5672username: adminpassword: adminvirtual-host: mq-springboot-test
@Configuration
public class DeadConfig {// 正常队列,当正常队列中的消息出现一些不确定情况时,消息就会进入死信交换机中@Bean("normalQueue")public Queue normalQueue() {return QueueBuilder.durable(Constants.NORMAL_QUEUE).deadLetterExchange(Constants.DEAD_EXCHANGE) // 设置死信交换机.deadLetterRoutingKey("dead") // 设置死信队列的路由键为dead.maxLength(3) // 设置队列的最大长度为3.build();}@Bean("normalExchange")public Exchange normalExchange() {return ExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE).durable(true).build();}@Bean("normalQueueBind")public Binding normalQueueBind(@Qualifier("normalQueue") Queue queue,@Qualifier("normalExchange") Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("normal").noargs();}// 死信队列@Bean("deadQueue")public Queue deadQueue() {return QueueBuilder.durable(Constants.DEAD_QUEUE).build();}@Bean("deadExchange")public Exchange deadExchange() {return ExchangeBuilder.directExchange(Constants.DEAD_EXCHANGE).durable(true).build();}@Bean("deadQueueBind")public Binding deadQueueBind(@Qualifier("deadQueue") Queue queue,@Qualifier("deadExchange") Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("dead").noargs();}}
@RestController
@RequestMapping("/dead")
public class DeadController {@Resourceprivate RabbitTemplate rabbitTemplate;@RequestMappingpublic void deadQueue() {this.rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "hello 死信");System.out.println("正常队列发送消息成功");}}
@Configuration
public class DeadListener {@RabbitListener(queues = Constants.DEAD_QUEUE)public void deadListener(String msg) {System.out.println("死信队列接收到消息:" + msg);}}

在上述代码中,主要内容是声明了正常队列、交换机和绑定关系以及声明死信队列、死信交换机以及其绑定关系、正常队列的生产者代码、死亡队列的消费者代码。

队列达到最大长度和死信消息要转发到的DLX和路由键都是由正常队列在声明时进行绑定的。

启动上述程序之后,当正常队列存在三条消息之时,假设再来消息,那么消息就要进入死信交换机,从而路由到死信队列了。如下图可以看出,当发送第四条消息之后,死信队列的消费者就消费了一条消息:

在上述图片中,D表示队列是持久化的,Lim表示队列有最大长度,DLX表示队列存在死信交换机、DLK表示队列存在路由键。把鼠标放在这些字母上方,详细的消息都会表示。

在下述代码中,主要是对上述代码改进之后地方的指出,并没有把所有的代码全部给出。

消息过期

消息过期分为两种,一种是设置队列过期时间让消息过期,另一种是设置消息过期时间让消息过期,都可以进行测试。

设置队列过期时间

    @Bean("normalQueue")public Queue normalQueue() {return QueueBuilder.durable(Constants.NORMAL_QUEUE).deadLetterExchange(Constants.DEAD_EXCHANGE) // 设置死信交换机.deadLetterRoutingKey("dead") // 设置死信队列的路由键为dead
//                .maxLength(3) // 设置队列的最大长度为3.ttl(5 * 1000) // 设置队列的过期时间为5秒.build();}

 由上图以及结合代码可以看出,将消息由正常生产者发送给Broker之后,大概5秒钟之后,消息过期。此时消息就会发送给死信交换机,从而交给其对应的消费者消费。

设置消息的过期时间

@Slf4j
@RestController
@RequestMapping("/dead")
public class DeadController {@Resourceprivate RabbitTemplate rabbitTemplate;@RequestMappingpublic void deadQueue() {// 设置消息的过期时间MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration("5000");return message;}};this.rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "hello 死信", messagePostProcessor);log.info("死信队列发送成功");}}

同样,结合上图和代码来说,19秒的时候消息发送功,24秒的时候死信消费者消费消息成功。

消息被拒绝

spring:rabbitmq:host: 43.138.108.125port: 5672username: adminpassword: adminvirtual-host: mq-springboot-testlistener:simple:acknowledge-mode: manual # 消息确认机制,手动确认
@Slf4j
@Configuration
public class DeadListener {// 正常队列接收消息@RabbitListener(queues = Constants.NORMAL_QUEUE)public void normalListener(Channel channel, String msg, Message message) throws IOException {try {log.info("正常队列监听器接收消息:{}", msg);int num = 3 / 0;channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);} catch (Exception e) {log.error("正常队列监听器接收消息异常:{}", e.getMessage());channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);}}// 死信队列接收消息@RabbitListener(queues = Constants.DEAD_QUEUE)public void deadListener(String msg, Channel channel, Message message) throws IOException {try {log.info("死信队列监听器接收消息:{}", msg);channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);} catch (Exception e) {channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);}}}

由上图以及代码可以看到,当消息的确认机制是手动确认时,当出现异常并且拒绝消息重新入队以后,消息就会来到死信队列中。

使用场景

用户支付订单之后,支付系统会给订单系统返回当前订单的支付状态。为了保证支付信息不丢失,需要使用到死信队列机制。当消息消费异常时,将消息投入到死信队列,由订单系统的其他消费者来监听这个队列,并对数据进行处理(比如发送工单等,进行人工确认)。

消息重试:将死信消息发送到原队列或另一个队列进行重试处理。

消息丢弃:直接丢弃这些无法处理的消息,避免占用系统资源。

日志收集:将死信消息做为日志收集起来,用户后续分析和问题定位。

延迟队列

概念

延迟队列就是消息发送之后,并不想让消费者立即拿到消息,而是在等待特定时间之后,消费者才能拿到消息进行消费

应用场景

  1. 用户发起退款后,24小时内商家未处理,默认退款
  2. 用户注册成功后,三天后发送短信,提高用户活跃度
  3. 预定会议后,在会议开始前15分钟提醒众人参加会议
  4. 用户通过手机远程遥控家里的智能设备在指定时间进行工作,这就可以使用延迟队列。用户发送消息到延迟队列,当指定时间到了再将指令推送到智能设备。

实现方法

  1. RabbitMQ本身并没有实现延迟队列,因此可以使用TTL + 死信队列的方式来实现延迟队列。
  2. 安装延迟队列插件来实现延迟队列。

TTL + 死信队列

@Configuration
public class MockDelayConfig {@Bean("mockDelayNormalQueue")public Queue mockDelayNormalQueue() {return QueueBuilder.durable(Constants.MOCk_DELAY_NORMAL_QUEUE).ttl(5000 * 10) // 设置消息过期时间为50秒.deadLetterExchange(Constants.MOCK_DELAY_DEAD_EXCHANGE) // 设置死信交换机.deadLetterRoutingKey("mock.delay.dead") // 设置死信路由键.build();}@Bean("mockDelayNormalExchange")public Exchange mockDelayNormalExchange() {return ExchangeBuilder.directExchange(Constants.MOCk_DELAY_NORMAL_EXCHANGE).durable(true).build();}@Bean("mockDelayNormalQueueBind")public Binding mockDelayNormalQueueBind(@Qualifier("mockDelayNormalQueue") Queue queue,@Qualifier("mockDelayNormalExchange") Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("mock.delay.normal").noargs();}@Bean("mockDelayDeadQueue")public Queue mockDelayDeadQueue() {return QueueBuilder.durable(Constants.MOCK_DELAY_DEAD_QUEUE).build();}@Bean("mockDelayDeadExchange")public Exchange mockDelayDeadExchange() {return ExchangeBuilder.directExchange(Constants.MOCK_DELAY_DEAD_EXCHANGE).durable(true).build();}@Bean("mockDelayDeadQueueBind")public Binding mockDelayDeadQueueBind(@Qualifier("mockDelayDeadQueue") Queue queue,@Qualifier("mockDelayDeadExchange") Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("mock.delay.dead").noargs();}}
@Slf4j
@RestController
@RequestMapping("/mockDelay")
public class MockDelayController {@Resourceprivate RabbitTemplate rabbitTemplate;@RequestMappingpublic void mockDelayQueue() {this.rabbitTemplate.convertAndSend(Constants.MOCk_DELAY_NORMAL_EXCHANGE,"mock.delay.normal", "hello 延迟队列");log.info("延迟队列生产者发送成功");}}
@Slf4j
@Configuration
public class MockDelayListener {@RabbitListener(queues = Constants.MOCK_DELAY_DEAD_QUEUE)public void mockDelayListener(String msg) {log.info("模拟延迟队列消费者接收到消息:" + msg);}}

在上述代码中,实现的功能是生产者发送消息后,消费者在50秒之后获得消息,对消息进行消费:

在TTL一文中,已经说明了RabbitMQ只会检查队首消息是否过期,不会扫描整个队列。因此如果想要放在模拟延迟队列中的消息过期时间不一致,那就会出现死信消息无法被及时处理的情况。因此,我们想要模拟实现延迟队列,就要确保队列中所有消息的过期时间是一致的。如果存在时间不一致的情况,我们就可以使用不同的模拟延迟队列来实现。

延迟队列插件

下载插件:官方网站进行下载(注意版本对应关系)

启动插件

rabbitma-plusins list // 查看插件列表rabbitmq-plugins enable rabbitmq_delayed_message_exchange // 启动插件service rabbitmq-server restart # 重启服务

如下图,当交换机中有了x-delayed-message就表示延迟插件安装成功 

代码测试

@Configuration
public class DelayConfig {@Bean("delayQueue")public Queue delayQueue() {return QueueBuilder.durable(Constants.DELAY_QUEUE).build();}@Bean("delayExchange")public Exchange delayExchange() {return ExchangeBuilder.directExchange(Constants.DELAY_EXCHANGE).delayed() // 延迟交换机.durable(true) // 持久化.build();}@Bean("delayQueueBind")public Binding delayQueueBind(@Qualifier("delayQueue") Queue delayQueue,@Qualifier("delayExchange") Exchange delayExchange) {return BindingBuilder.bind(delayQueue).to(delayExchange).with("delay").noargs();}}
@Slf4j
@RestController
@RequestMapping("/delay")
public class DelayController {@Resourceprivate RabbitTemplate rabbitTemplate;@RequestMappingpublic void delayQueue() {for(int i = 0; i < 5; i++) {// 随机生成延迟时间Random random = new Random();int time = random.nextInt(20);// 消息处理器,设置延迟时间MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDelayLong((long) (time * 1000)); // 设置延迟时间return message;}};// 发送消息到延迟队列this.rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE, "delay", "hello 延迟队列 " + i, messagePostProcessor);log.info("发送延迟队列第" + i + "消息成功,延迟时间为:" + time);}}}
@Slf4j
@Configuration
public class DelayListener {@RabbitListener(queues = Constants.DELAY_QUEUE)public void delayListener(String msg) {log.info("延迟队列监听器,接收到的消息:{}", msg);}}

本质上,延迟插件就是让消息停留在交换机中,等到延迟时间结束之后,再发送到对应的队列中去。 

两者对比

使用TTL + 死信队列的好处是不需要额外安装插件。缺点是受消息的延迟时间影响,同一个队列中的消息必须延迟时间相同。

使用延迟队列插件的好处是不受延迟时间影响,同一队列中的所有消息延迟时间可以不同,额外的插件使得延迟队列的实现比较容易。缺点是需要依赖特定的插件,并且插件的版本必须和对应的RabbitMQ相对应。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/427977.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++入门 之 类和对象(下)

目录 一、初始化列表 二、隐式类型转换与explict 三、静态成员——static 四、友元 五、内部类 六、匿名对象 七.对象拷贝时的编译器优化 一、初始化列表 之前我们实现构造函数时&#xff0c;初始化成员变量主要使用函数体内赋值&#xff0c;构造函数初始化还有一种方式&…

信奥初赛解析:1-3-计算机软件系统

知识要点 软件系统是计算机的灵魂。没有安装软件的计算机称为“裸机”&#xff0c;无法完成任何工作硬件为软件提供运行平台。软件和硬件相互关联,两者之间可以相互转化&#xff0c;互为补充 计算机软件系统按其功能可分为系统软件和应用软件两大类 一、系统软件 系统软件是指…

SpringCloud微服务实现服务降级的最佳实践

Spring Cloud是一种用于快速构建分布式系统的框架&#xff0c;它提供了许多有用的功能&#xff0c;其中包括服务降级。 服务降级是一种保护机制&#xff0c;它可以在面临高并发或故障时保持服务的稳定性。当系统资源不足或服务出现故障时&#xff0c;服务降级可以通过关闭一些功…

react 组件化开发_生命周期_表单处理

组件基本介绍 我们从上面可以清楚地看到&#xff0c;组件本质上就是类和函数&#xff0c;但是与常规的类和函数不同的是&#xff0c;组件承载了渲染视图的 UI 和更新视图的 setState 、 useState 等方法。React 在底层逻辑上会像正常实例化类和正常执行函数那样处理的组件。 因…

Unsupervised Deep Representation Learning for Real-Time Tracking

摘要 我们的无监督学习的动机是稳健的跟踪器应该在双向跟踪中有效。具体来说&#xff0c;跟踪器能够在连续帧中前向定位目标对象&#xff0c;并回溯到其在第一帧中的初始位置。基于这样的动机&#xff0c;在训练过程中&#xff0c;我们测量前向和后向轨迹之间的一致性&#xf…

95、k8s之rancher可视化

一、ranker 图形化界面 图形化界面进行k8s集群的管理 rancher自带监控----普罗米修斯 [rootmaster01 opt]# docker load -i rancher.tar ##所有节点 [rootmaster01 opt]# docker pull rancher/rancher:v2.5.7 ##主节点[rootmaster01 opt]# vim /etc/docker/daemon.jso…

C++初阶学习——探索STL奥秘——反向迭代器

适配器模式是 STL 中的重要组成部分&#xff0c;除了容器适配器外&#xff0c;还有 选代器适配器&#xff0c;借助 选代器适配器 &#xff0c;可以轻松将各种容器中的普通迭代器转变为反向迭代器&#xff0c;这正是适配器的核心思想 注:库中的反向迭代器在设计时&#xff0c;为…

HashMap线程不安全|Hashtable|ConcurrentHashMap

文章目录 常见集合线程安全性HashMap为什么线程不安全&#xff1f;怎么保证HashMap线程安全 HashtableConcurrentHashMap 引入细粒度锁代码中分析总结 小结 常见集合线程安全性 ArrayList、LinkedList、TreeSet、HashSet、HashMap、TreeMap等都是线程不安全的。 HashTable是线…

【Python报错已解决】To update, run: python.exe -m pip install --upgrade pip

&#x1f3ac; 鸽芷咕&#xff1a;个人主页 &#x1f525; 个人专栏: 《C干货基地》《粉丝福利》 ⛺️生活的理想&#xff0c;就是为了理想的生活! 专栏介绍 在软件开发和日常使用中&#xff0c;BUG是不可避免的。本专栏致力于为广大开发者和技术爱好者提供一个关于BUG解决的经…

【C++篇】~类和对象(中)

类和对象&#xff08;中&#xff09; 1.类的默认成员函数​ 默认成员函数就是用户没有显式实现&#xff0c;编译器会自动生成的成员函数称为默认成员函数。一个类&#xff0c;我们不写的情况下编译器会默认生成以下6个默认成员函数&#xff0c;需要注意的是这6个中最重要的是前…

【LeetCode每日一题】——401.二进制手表

文章目录 一【题目类别】二【题目难度】三【题目编号】四【题目描述】五【题目示例】六【题目提示】七【解题思路】八【时间频度】九【代码实现】十【提交结果】 一【题目类别】 回溯 二【题目难度】 简单 三【题目编号】 401.二进制手表 四【题目描述】 二进制手表顶部…

arcgisPro地理配准

1、添加图像 2、在【影像】选项卡中&#xff0c;点击【地理配准】 3、 点击添加控制点 4、选择影像左上角格点&#xff0c;然后右击填入目标点的投影坐标 5、依次输入四个格角点的坐标 6、点击【变换】按钮&#xff0c;选择【一阶多项式&#xff08;仿射&#xff09;】变换 7…

1.Seata 1.5.2 seata-server搭建

一&#xff1a;Seata基本介绍 Seata是一款开源的分布式事务解决方案&#xff0c;致力于在微服务架构下提供高性能和简单易用的分布式事务服务。 详见官网链接&#xff1a;https://seata.apache.org/zh-cn/ 1.历史项目里的使用经验&#xff1a; 之前公司里的oem用户对应的App…

多重指针变量(n重指针变量)实例分析

0 前言 指针之于C语言&#xff0c;就像子弹于枪械。没了子弹的枪械虽然可以用来肉搏&#xff0c;却失去了迅速解决、优雅解决战斗的能力。但上了膛的枪械也非常危险&#xff0c;时刻要注意是否上了保险&#xff0c;使用C语言的指针也是如此&#xff0c;要万分小心&#xff0c;…

【VUE3.0】动手做一套像素风的前端UI组件库---先导篇

系列文章目录 【VUE3.0】动手做一套像素风的前端UI组件库—Button 目录 系列文章目录引言准备素材字体鼠标手势图 创建vue3项目构建项目1. 根据命令行提示选择如下&#xff1a;2. 进入项目根目录下载依赖并启动。3. 设置项目src路径别名&#xff0c;方便后期应用路径。4. 将素…

solana项目counter,测试过程中执行报错记录分享

跟随HackQuest部署counter项目&#xff0c;使用 Solana 官方提供的 playgroud 。这个平台让我们的部署和测试过程变得更加简便高效。 合约代码 lib.rs中复制以下代码 use anchor_lang::prelude::*; use std::ops::DerefMut;declare_id!("CVQCRMyzWNr8MbNhzjbfPu9YVvr97…

Amoco:一款针对二进制源码的安全分析工具

关于Amoco Amoco是一款功能强大的二进制源码静态分析工具&#xff0c;该工具基于Python 3.8开发&#xff0c;可以帮助广大研究人员轻松对二进制程序执行静态符号分析。 工具特性 1、一个通用的指令解码框架&#xff0c;旨在减少实现对新架构的支持所需的时间。例如&#xff0c…

通过springcloud gateway优雅的进行springcloud oauth2认证和权限控制

代码地址 如果对你有帮助请给个start&#xff0c;本项目会持续更新&#xff0c;目标是做一个可用的快速微服务开发平台&#xff0c;成为接私活&#xff0c;毕设的开发神器&#xff0c; 欢迎大神们多提意见和建议 使用的都是spring官方最新的版本&#xff0c;版本如下&#xff1…

F12抓包11:UI自动化 - Recoder(记录器)

课程大纲 使用场景&#xff08;导入和导出&#xff09;: ① 测试的重复性工作&#xff0c;本浏览器录制并进行replay&#xff1b; ② 导入/导出录制脚本&#xff0c;移植后replay&#xff1b; ③ 导出给开发进行replay复现bug&#xff1b; ④ 进行前端性能分析。 1、录制脚…

Virtuoso服务在centos中自动停止的原因分析及解决方案

目录 前言1. 问题背景2. 原因分析2.1 终端关闭导致信号12.2 nohup命令的局限性 3. 解决方案3.1 使用 screen 命令保持会话3.2 使用 tmux 作为替代方案3.3 使用系统服务&#xff08;systemd&#xff09; 4. 其他注意事项4.1 网络配置4.2 日志监控 结语 前言 在使用Virtuoso作为…