[RabbitMQ] 重试机制+TTL+死信队列

🌸个人主页:https://blog.csdn.net/2301_80050796?spm=1000.2115.3001.5343
🏵️热门专栏:
🧊 Java基本语法(97平均质量分)https://blog.csdn.net/2301_80050796/category_12615970.html?spm=1001.2014.3001.5482
🍕 Collection与数据结构 (92平均质量分)https://blog.csdn.net/2301_80050796/category_12621348.html?spm=1001.2014.3001.5482
🧀线程与网络(96平均质量分) https://blog.csdn.net/2301_80050796/category_12643370.html?spm=1001.2014.3001.5482
🍭MySql数据库(93平均质量分)https://blog.csdn.net/2301_80050796/category_12629890.html?spm=1001.2014.3001.5482
🍬算法(97平均质量分)https://blog.csdn.net/2301_80050796/category_12676091.html?spm=1001.2014.3001.5482
🍃 Spring(97平均质量分)https://blog.csdn.net/2301_80050796/category_12724152.html?spm=1001.2014.3001.5482
🎃Redis(97平均质量分)https://blog.csdn.net/2301_80050796/category_12777129.html?spm=1001.2014.3001.5482
🐰RabbitMQ(97平均质量分) https://blog.csdn.net/2301_80050796/category_12792900.html?spm=1001.2014.3001.5482
感谢点赞与关注~~~
在这里插入图片描述

目录

  • 1. 重试机制
    • 1.1 重试配置
    • 1.2 配置交换机队列
    • 1.3 发送消息
    • 1.4 消费消息
    • 1.5 运行测试
    • 1.6 手动确认
  • 2. TTL
    • 2.1 设置消息TTL
    • 2.2 设置消息队列的TTL
    • 2.3 两者的区别
  • 3. 死信队列
    • 3.1 死信队列的概念
    • 3.2 代码示例
      • 3.2.1 声明队列和交换机
      • 3.2.2 正常队列与死信交换机的绑定
      • 3.2.3 制造死信产生的条件
      • 3.2.4 发送消息
      • 3.2.5 测试死信
    • 3.3 常见面试题

1. 重试机制

在消息传递的过程中,可能会遇到各种问题,比如网络故障,服务不可用,资源不足等,这些问题都可能会导致消息处理失败.为了解决这些问题,RabbitMQ提供了重试机制,允许消息在处理失败之后重新发送.
我们也可以对重试机制设置重试次数.超过了重试的限制次数,如果没有对队列进行死信队列的绑定,那么该消息就会发生丢失.

1.1 重试配置

  rabbitmq:host: 182.92.204.253port: 5672username: jiangruijiapassword: *****virtual-host: /listener:simple:acknowledge-mode: autoretry:initial-interval: 5000msenabled: truemax-attempts: 5
  • initial-interval: 表示初始失败的等待时间为5s
  • enabled: 表示开启消费者失败重试
  • max-attempts: 最大重试次数,包括首次发送
  • 注意: 重试机制在自动确认模式之下才会生效,如果配置为手动确认,则重试机制不会生效.

1.2 配置交换机队列

public static final String RETRY_QUEUE = "retry_queue";
public static final String RETRY_EXCHANGE = "retry_exchange";
@Bean
public DirectExchange retryExchange(){return ExchangeBuilder.directExchange(Constant.RETRY_EXCHANGE).durable(true).build();
}
@Bean
public Queue retryQueue(){return QueueBuilder.durable(Constant.RETRY_QUEUE).build();
}
@Bean
public Binding retryBinding(@Qualifier("retryExchange") DirectExchange exchange,@Qualifier("retryQueue") Queue queue){return BindingBuilder.bind(queue).to(exchange).with("retry");
}

1.3 发送消息

@RequestMapping("/retry")
public String retry(){rabbitTemplate.convertAndSend(Constant.RETRY_EXCHANGE,"retry","retry消息");return "发送成功";
}

1.4 消费消息

@Component
public class RetryListener {@RabbitListener(queues = Constant.RETRY_QUEUE)public void retryListener(Message message) throws UnsupportedEncodingException {System.out.printf("接收到消息:%s,deliveryTag:%d\n",new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());int i = 3/0;System.out.println("处理完成");}
}

1.5 运行测试

使用Postman调用接口,观察控制台信息
在这里插入图片描述
我们看到由于消费者发生异常,消息在不停地进行重试,重试5次之后,抛出了异常.
如果我们对异常进行捕获,则不会进行重试.

@Component
public class RetryListener {@RabbitListener(queues = Constant.RETRY_QUEUE)public void retryListener(Message message) throws UnsupportedEncodingException {System.out.printf("接收到消息:%s,deliveryTag:%d\n",new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());try {int i = 3/0;}catch (Exception e){System.out.println("处理失败");}System.out.println("处理完成");}
}

在这里插入图片描述

1.6 手动确认

如果我们把确认接收机制改为手动确认机制

@Component
public class RetryListener {@RabbitListener(queues = Constant.RETRY_QUEUE)public void retryListener(Message message, Channel channel) throws IOException {System.out.printf("接收到消息:%s,deliveryTag:%d\n",new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());try {Thread.sleep(1000);int i = 3/0;channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}catch (Exception e){System.out.println("处理失败");channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,true);}System.out.println("处理完成");}
}

运行结果:
在这里插入图片描述
我们看到,在手动控制模式的时候,重试的次数不会像自动控制那样直接生效,因为是否重试以及何时重试更多地取决于应用程序和消费者的实现逻辑.

2. TTL

TTL,即过期时间,RabbitMQ可以对消息和队列设置TTL.
当消息到达存活时间之后,还没有被消费,就会被自动清除.

2.1 设置消息TTL

目前有两种方法可以设置消息的TTL.一是设置队列的TTL,队列中所有消息都有相同的过期时间,二是设置每条消息的过期时间,每条消息都可以有不同的TTL.如果两种方法一起使用,则消息的TTL以两者之间较小的那个数值为准.
首先我们针对每条消息设置TTL.方法就是我们在发送消息的时候对消息的expiretion参数进行设置,单位为毫秒.
我们先来配置交换机和队列.

public static final String TTL_QUEUE = "ttl_queue";
public static final String TTL_EXCHANGE = "ttl_exchange";
@Bean
public DirectExchange ttlExchange(){return ExchangeBuilder.directExchange(Constant.TTL_EXCHANGE).durable(true).build();
}
@Bean
public Queue ttlQueue(){return QueueBuilder.durable(Constant.TTL_QUEUE).build();
}
@Bean
public Binding ttlBinding(@Qualifier("ttlExchange") DirectExchange exchange,@Qualifier("ttlQueue") Queue queue){return BindingBuilder.bind(queue).to(exchange).with("ttl");
}

发送消息

@RequestMapping("/ttl")
public String ttl(){String msg = "ttl消息";Message message = new Message(msg.getBytes(StandardCharsets.UTF_8));message.getMessageProperties().setExpiration("10000");//设置消息过期时间为10srabbitTemplate.convertAndSend(Constant.TTL_EXCHANGE,"ttl",message);return "发送信息";
}

运行程序,观察结果:

  1. 发送消息之后,Ready消息为1
    在这里插入图片描述
  2. 过了10s之后,发现消息被删除
    在这里插入图片描述

2.2 设置消息队列的TTL

设置队列TTL的方法是在创建队列时,加入x-message-ttl参数实现,单位为毫秒.
设置队列和绑定关系

@Bean
public Queue ttlQueue2(){Map<String,Object> map = new HashMap<>();map.put("x-message-ttl",10000);//设置10s过期return QueueBuilder.durable(Constant.TTL_QUEUE2).withArguments(map).build();
}
@Bean
public Binding ttl2Binding(@Qualifier("ttlExchange") DirectExchange exchange,@Qualifier("ttlQueue2") Queue queue){return BindingBuilder.bind(queue).to(exchange).with("ttl2");
}

队列的声明也可以简写为:

@Bean
public Queue ttlQueue2(){return QueueBuilder.durable(Constant.TTL_QUEUE2).ttl(10000).build();
}

发送消息:

@RequestMapping("/ttl2")
public String ttl2(){rabbitTemplate.convertAndSend(Constant.TTL_EXCHANGE,"ttl2","ttl2消息");return "发送成功";
}

运行程序之后观察结果:
运行之后发现,新增了一个队列,队列有一个TTL标识.
在这里插入图片描述
使用Postman调用接口发送消息:
在这里插入图片描述
10s之后发现队列中的消息被删除:
在这里插入图片描述

2.3 两者的区别

设置队列的ttl属性方法,一旦消息过期,就会立即从队列中删除.如果消息设置ttl,即使消息过期,也不会立即从队列中删除,而是在即将投递到消费者之前进行判定,如果过期了,才进行删除.
这两种方法处理过期消息的原理如下:
设置队列的过期时间,队列中已经过期的消息肯定在队列头部,RabbitMQ只需要定期扫描队头是否有过期的消息即可.
而设置消息的TTL的方式,每条消息的过期时间不同,如果定期扫描整个队列,效率是非常低的,所以不如等到此消息即将被消费时再判定是否过期.

3. 死信队列

3.1 死信队列的概念

死信简单理解就是因为种种原因,无法被消费的信息,就是死信.
有死信,就会有死信队列,当一个消息在一个队列中变为死信之后,它能被重新被发送到另一个交换机中,这个交换机就是DLX(Dead Letter Exchange),绑定DLX的队列,就被称为死信队列(DLQ,Dead Letter Queue).
在这里插入图片描述
消息变为死信一般是由于一下的几种情况:

  1. 消息被拒绝.(Basic.reject/Basic.nack),并且设置requeue参数为false.
  2. 消息过期
  3. 队列达到最大长度

3.2 代码示例

3.2.1 声明队列和交换机

交换机和队列的声明包含两部分:

  • 正常声明的交换机和队列
  • 声明死信队列和死信的交换机
    死信的交换机和队列和普通的交换机队列在声明的时候没有什么区别.
public static final String NORMAL_QUEUE = "normal_queue";
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static final String DLX_EXCHANGE = "dlx_exchange";
public static final String DLX_QUEUE = "dlx_queue";
@Bean
public DirectExchange normalExchange(){return ExchangeBuilder.directExchange(Constant.NORMAL_EXCHANGE).durable(true).build();
}
@Bean
public Queue normalQueue(){return QueueBuilder.durable(Constant.NORMAL_QUEUE).build();
}
@Bean
public Binding normalBinding(@Qualifier("normalExchange") DirectExchange exchange,@Qualifier("normalQueue") Queue queue){return BindingBuilder.bind(queue).to(exchange).with("normal");
}
@Bean
public DirectExchange dlxExchange(){return ExchangeBuilder.directExchange(Constant.DLX_EXCHANGE).durable(true).build();
}
@Bean
public Queue dlxQueue(){return QueueBuilder.durable(Constant.DLX_QUEUE).build();
}
@Bean
public Binding dlxBinding(@Qualifier("dlxExchange") DirectExchange exchange,@Qualifier("dlxQueue") Queue queue){return BindingBuilder.bind(queue).to(exchange).with("dlx");
}

3.2.2 正常队列与死信交换机的绑定

当一个队列中存在死信的时候,RabbitMQ会自动把这个消息重新发布到设置的DLX交换机上,进而被路由到另一个队列,即死信队列.
绑定的时候需要为普通队列设置x-dead-letter-exchangex-dead-letter-routing-key两个参数,第一个参数是绑定的死信交换机,第二个参数是死信队列的路由键.

@Bean
public Queue normalQueue(){Map<String,Object> map = new HashMap<>();map.put("x-dead-letter-exchange",Constant.DLX_EXCHANGE);map.put("x-dead-letter-routing-key","dlx");return QueueBuilder.durable(Constant.NORMAL_QUEUE).withArguments(map).build();
}

上面的map可以简写为:

@Bean
public Queue normalQueue(){return QueueBuilder.durable(Constant.NORMAL_QUEUE).deadLetterExchange(Constant.DLX_EXCHANGE).deadLetterRoutingKey("dlx").build();
}

3.2.3 制造死信产生的条件

我们通过设置队列长度限制或者是设置消息的过期时间来制造死信.如果队列中的消息过期,或者是队列中的消息超过队列的长度,全部会被路由到死信队列.

@Bean
public Queue normalQueue(){return QueueBuilder.durable(Constant.NORMAL_QUEUE).deadLetterExchange(Constant.DLX_EXCHANGE).deadLetterRoutingKey("dlx").ttl(10000).maxLength(10L).build();
}

3.2.4 发送消息

@RequestMapping("/normal")
public String normal(){rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE,"normal","normal消息");return "发送成功";
}

3.2.5 测试死信

  1. 启动程序之后观察队列
    在这里插入图片描述
    我们发现队列normal_queue有5个标签,则会五个标签分别代表该队列的5个属性:
    • D: durable的缩写,设置持久化.
    • TTL: Time to Live,队列的TTL.
    • Lim: 队列设置了长度(x-max-length).
    • DLX: 死信队列的交换机
    • DLK: 死信队列的路由键
  2. 测试到达过期时间
    接下来我们给正常队列中发送一条消息.
    在这里插入图片描述
    10秒钟之后,队列过期,消息也会过期,所以消息就会进入死信队列.
    在这里插入图片描述
    在这里插入图片描述
  3. 测试超出队列长度
    我们给队列中一次性发送20条消息
    在这里插入图片描述
    我们发现其中的10条消息会直接进入死信队列.
  4. 测试消息被拒收
    编写消费者代码,并抛出异常,普通队列消费者确认接收机制使用手动应答.
@Component
public class NormalListener {@RabbitListener(queues = Constant.NORMAL_QUEUE)public void normalListener(Message message, Channel channel) throws IOException, InterruptedException {System.out.printf("接收到消息%s,tag:%d\n",new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());try {System.out.println("处理消息");int i = 3/0;channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);}catch (Exception e){System.out.println("消息处理失败");Thread.sleep(1000);//拒绝接收之后不放回原队列中,则会放入死信队列channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,false);}}
}
@Component
public class DLXListener {@RabbitListener(queues = Constant.DLX_QUEUE)public void dlxListener(Message message) throws UnsupportedEncodingException {System.out.printf("死信队列接收到消息:%s,tag:%d\n",new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());}
}

在这里插入图片描述

3.3 常见面试题

  1. 死信队列的概念
    死信简单理解就是因为种种原因,无法被消费的信息,就是死信.
  2. 来源
    1. 消息被拒绝.(Basic.reject/Basic.nack),并且设置requeue参数为false.
    2. 消息过期
    3. 队列达到最大长度
  3. 死信队列的应用场景
    它可以处理异常情况下,消息不能够被消费者正确消费而被置入死信队列中的情况,应用程序可以通过消费这个死信队列中的内容来分析当时所遇到的异常情况,进而可以改善和优化系统.
    一般的应用场景还有:
    1. 消息重试:将死信消息重新发送到原队列或另⼀个队列进行重试处理.
    2. 消息丢弃:直接丢弃这些无法处理的消息,以避免它们占用系统资源.
    3. 日志收集:将死信消息作为日志收集起来,用于后续分析和问题定位.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/477660.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Prometheus结合K8s(二)使用

上一篇介绍了如何搭建 Prometheus结合K8s&#xff08;一&#xff09;搭建-CSDN博客&#xff0c;这章介绍使用 页面访问 kubectl get svc -n prom 看promeheus和granfana的端口访问页面 Prometheus 点击status—target&#xff0c;可以看到metrics的数据来源&#xff0c;即各…

计算机前沿技术-人工智能算法-大语言模型-最新研究进展-2024-11-05

计算机前沿技术-人工智能算法-大语言模型-最新研究进展-2024-11-05 目录 文章目录 计算机前沿技术-人工智能算法-大语言模型-最新研究进展-2024-11-05目录1. LLM App Squatting and Cloning2. Improving Grapheme-to-Phoneme Conversion through In-Context Knowledge3. A Comp…

【SQL Server】华中农业大学空间数据库实验报告 实验三 数据操作

1.实验目的 熟悉了解掌握SQL Server软件的基本操作与使用方法&#xff0c;以及通过理论课学习与实验参考书的帮助&#xff0c;熟练掌握使用T-SQL语句和交互式方法对数据表进行插入数据、修改数据、删除数据等等的操作&#xff1b;作为后续实验的基础&#xff0c;根据实验要求重…

LDR6020驱动的Type-C接口显示器解决方案

一、引言 随着科技的飞速发展&#xff0c;Type-C接口凭借其高速数据传输、强大的电力传输能力以及便捷的正反可插设计&#xff0c;正逐渐成为现代电子设备的主流接口标准。在显示器领域&#xff0c;Type-C接口的引入不仅简化了线缆连接&#xff0c;还为用户带来了更丰富的功能…

Spring |(四)IoC/DI配置管理第三方bean

文章目录 &#x1f4da;数据源对象管理&#x1f407;环境准备&#x1f407;实现Druid管理&#x1f407;实现C3P0管理 &#x1f4da;加载properties文件&#x1f407;第三方bean属性优化&#x1f407;读取单个属性 学习来源&#xff1a;黑马程序员SSM框架教程_SpringSpringMVCMa…

三十一、构建完善微服务——API 网关

一、API 网关基础 系统拆分为微服务后&#xff0c;内部的微服务之间是互联互通的&#xff0c;相互之间的访问都是点对点的。如果外部系统想调用系统的某个功能&#xff0c;也采取点对点的方式&#xff0c;则外部系统会非常“头大”。因为在外部系统看来&#xff0c;它不需要也没…

(免费送源码)计算机毕业设计原创定制:Java+JSP+HTML+JQUERY+AJAX+MySQL springboot计算机类专业考研学习网站管理系统

摘 要 大数据时代下&#xff0c;数据呈爆炸式地增长。为了迎合信息化时代的潮流和信息化安全的要求&#xff0c;利用互联网服务于其他行业&#xff0c;促进生产&#xff0c;已经是成为一种势不可挡的趋势。在大学生在线计算机类专业考研学习网站管理的要求下&#xff0c;开发一…

社交电商专业赋能高校教育与产业协同发展:定制开发AI智能名片及2+1链动商城小程序的创新驱动

摘要&#xff1a;本文围绕社交电商有望成为高校常态专业这一趋势展开深入探讨&#xff0c;剖析国家政策认可下其学科发展前景&#xff0c;着重阐述在专业建设进程中面临的师资短缺及实践教学难题。通过引入定制开发AI智能名片与21链动商城小程序&#xff0c;探究如何借助这些新…

Linux各种并发服务器优缺点

本文旨在介绍针对“无并发C/S模型”改进的方法总结以及各种改进方法的优缺点&#xff0c;具体函数的实现并不介绍。 1. 无并发C/S模型 创建服务器流程分析&#xff1a; socket()创建服务器的监听套接字bind()将服务器给服务器的监听套接字绑定IP地址和Port端口号listen()设置…

基于AXI PCIE IP的FPGA PCIE卡示意图

创作不易&#xff0c;转载请注明出处&#xff1a;https://blog.csdn.net/csdn_gddf102384398/article/details/143926217 上图中&#xff0c;在FPGA PCIE卡示意图内&#xff0c;有2个AXI Master设备&#xff0c;即&#xff1a;PCIE到AXI4-Full-Master桥、AXI CDMA IP&#xff1…

【漏洞复现】|智互联SRM智联云采系统quickReceiptDetail SQL注入漏洞

漏洞描述 智互联(深圳)科技有限公司SRM智联云采系统针对企业供应链管理难题&#xff0c;及智能化转型升级需求&#xff0c;智联云采依托人工智能、物联网、大数据、云等技术&#xff0c;通过软硬件系统化方案&#xff0c;帮助企业实现供应商关系管理和采购线上化、移动化、智能…

el-table-column自动生成序号在序号前插入图标

实现效果&#xff1a; 代码如下&#xff1a; 在el-table里加入这个就可以了&#xff0c;需要拿到值可以用scope.$index ​​​​​​​<el-table-column type"index" label"序号" show-overflow-tooltip"true" min-width"40">…

如何利用 Puppeteer 的 Evaluate 函数操作网页数据

介绍 在现代的爬虫技术中&#xff0c;Puppeteer 因其强大的功能和灵活性而备受青睐。Puppeteer 是一个用于控制 Chromium 或 Chrome 浏览器的 Node.js 库&#xff0c;提供了丰富的 API 接口&#xff0c;能够帮助开发者高效地处理动态网页数据。本文将重点讲解 Puppeteer 的 ev…

“小浣熊家族AI办公助手”产品体验 — “人人都是数据分析师”

一、引言&#xff1a; 大家平时应该在工作中常常使用到Excel来做数据统计&#xff0c;比如临近过年时&#xff0c;公司一般会开各种复盘、年终、检讨、明年规划大会&#xff0c;势必需要准备一大堆的量化数据报表&#xff0c;用于会议上的数据汇报、分析工作&#xff0c;试想一…

Unity-添加世界坐标系辅助线

如果你想在场景中更直观地显示世界坐标系&#xff0c;可以通过编写一个简单的脚本来实现。下面是一个基本的示例脚本&#xff0c;它会在场景中绘制出世界坐标系的三个轴&#xff1a; using UnityEngine;public class WorldAxesIndicator : MonoBehaviour {public float length…

Makefile基础应用

1 使用场景 在Linux环境下&#xff0c;我们通常需要通过命令行来编译代码。例如&#xff0c;在使用gcc编译C语言代码时&#xff0c;需要使用以下命令。 gcc -o main main.c 使用这种方式编译代码非常吃力&#xff0c;每次调试代码都需要重新在命令行下重新编译&#xff0c;重复…

【tensorflow的安装步骤】

创建一个虚拟环境 conda create -n tensorflow python3.6激活虚拟环境 conda activate tensorflow使用镜像源下载 pip install tensorflow1.15.0 -i https://pypi.tuna.tsinghua.edu.cn/simple/特别特别重要的点&#xff01;&#xff01;&#xff01; 别用WiFi或者校园网下…

【Python · PyTorch】循环神经网络 RNN(基础概念)

【Python PyTorch】循环神经网络 RNN&#xff08;基础概念&#xff09; 0. 生物学相似性1. 概念2. 延时神经网络&#xff08;TDNN&#xff09;3. 简单循环神经网络&#xff08;Simple RNN&#xff09;3.1 BiRNN 双向循环神经网络3.2 特点记忆性参数共享图灵完备 3.3 网络结构3…

使用EFK收集k8s日志

首先我们使用EFK收集Kubernetes集群中的日志&#xff0c;本次实验讲解的是在Kubernetes集群中启动一个Elasticsearch集群&#xff0c;如果企业内已经有了Elasticsearch集群&#xff0c;可以直接将日志输出至已有的Elasticsearch集群。 文章目录 部署elasticsearch创建Kibana创建…

Qt入门1——认识Qt的几个常用头文件和常用函数

1.头文件 ① #include <QPushButton>——“按钮”头文件&#xff1b; ② #include <QLabel>——“标签”头文件&#xff1b; ③ #include <QFont>——“字体”头文件&#xff1b; ④#include <QDebug>——输出相关信息&#xff1b; 2. 常用函数/类的基…