RabbitMQ 死信队列应用

1. 概念

死信队列(Dead Letter Queue)是在消息队列系统中的一种特殊队列,用于存储无法被消费的消息。消息可能会因为多种原因变成“死信”,例如消息过期、消息被拒绝、消息队列长度超过限制等。当消息变成“死信”时,它们会被路由到死信队列中,以便进行进一步处理或分析。 死信队列能够帮助系统进行消息跟踪、监控和处理异常情况,是消息队列系统中的重要组成部分。

2. 应用场景

死信队列在消息队列系统中有多种应用场景,包括但不限于以下几个方面:

  • 延迟消息处理:实现延迟消息投递,例如实现消息的定时投递、消息重试机制等。

  • 任务调度:用于实现任务调度系统,例如延迟执行任务、失败重试任务等。

  • 异常处理:处理消息消费失败或超时的情况,对异常消息进行统一处理。

  • 业务流程控制:实现业务流程中的状态控制和超时处理,例如订单超时取消、支付超时处理等。

  • 监控和统计:对异常消息进行统计和分析,用于系统性能监控和问题排查。

这些应用场景展示了死信队列的灵活性和实用性,在实际系统开发中具有广泛的应用价值。

3. 造成消息进入死信队列的原因

消息成为死信的原因有以下几种:

  • 消息被拒绝(basic.reject或basic.nack),并且requeue标志被设置为false。若参数requeue为true,则表示还可以将此跳消息重新塞回普通队列,若为false则消息被拒绝后直接进入死信队列。

  • 消息过期。在生产者设置生产时设置,若消费者未在过期时间内消费消息,则消息被转发到死信队列中。("x-message-ttl")

  • 队列达到最大长度。当普通队列中消息堆积数量长度达到了maxLength,则会将新接收的消息转发到死信队列中去,从而避免消息丢失。

4. 死信队列工作流程图

5. 代码示例

5.1 引入依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.7.15</version>
</dependency>

5.2 RabbitMQ配置

@Configuration
public class RabbitConfig {/*** 死信队列消息模型构建----------------------------------------------------------------------------------**/// 创建普通队列@Beanpublic Queue basicQueue() {Map<String, Object> params = new HashMap<>(8);// x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称,params.put("x-dead-letter-exchange", Exchange.DEMO_DEAD_LETTER_EXCHANGE);// x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。params.put("x-dead-letter-routing-key", RoutingKey.DEMO_DEAD_ROUTING_KEY);// 注意这里是毫秒单位,这里我们给10秒params.put("x-message-ttl", 10*1000);return new Queue(MyQueue.DEMO_CONSUMER_QUEUE, true, false, false, params);}//创建“基本消息模型”的基本交换机,面向生产者@Beanpublic TopicExchange basicExchange() {//创建并返回基本交换机实例return new TopicExchange(Exchange.DEMO_BASIC_NORMAL_EXCHANGE, true, false);}//创建“基本消息模型”的基本绑定(基本交换机+基本路由),面向生产者@Beanpublic Binding basicBinding() {//创建并返回基本消息模型中的基本绑定(注意这里是正常交换机跟死信队列绑定在一定,不叫死信路由)return BindingBuilder.bind(basicQueue()).to(basicExchange()).with(RoutingKey.DEMO_ROUTING_KEY);}// 创建死信交换机@Beanpublic TopicExchange deadLetterExchange() {//创建并返回死信交换机实例return new TopicExchange(Exchange.DEMO_DEAD_LETTER_EXCHANGE, true, false);}// 创建第二个中转站// 创建死信队列@Beanpublic Queue deadLetterQueue() {return new Queue(MyQueue.DEMO_DEAD_LETTER_QUEUE, true);}// 创建死信路由及其绑定@Beanpublic Binding deadLetterBinding() {return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(RoutingKey.DEMO_DEAD_ROUTING_KEY);}public static class Exchange {public static final String DEMO_BASIC_NORMAL_EXCHANGE = "demo.basic.exchange";public static final String DEMO_DEAD_LETTER_EXCHANGE = "demo.dead.letter.exchange";}public static class RoutingKey {//交换机与报表队列绑定的RoutingKeypublic static final String DEMO_ROUTING_KEY = "demo.basic.routing.key";public static final String DEMO_DEAD_ROUTING_KEY = "demo.dead.routing.key";}/*** 队列名称* @author peng.zhang* @date 2024/01/30*/public static class MyQueue {//报表队列名称public static final String DEMO_CONSUMER_QUEUE = "demo.basic.queue";//死信队列名称public static final String DEMO_DEAD_LETTER_QUEUE = "demo.dead.letter.queue";}
}

5.3 消息生产者

@RestController
@RequestMapping("/test")
@Slf4j
public class TestController {@Resourceprivate RabbitTemplate rabbitTemplate;/*** 发送消息到死信队列*/@PostMapping("/testDeadQueue")public String testDeadQueue() {// 设置生产者到交换机的确认回调rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {log.info("correlationData:{}, ack:{}, cause:{}", JSON.toJSONString(correlationData), ack, cause);});// 设置消息未被队列接收时的返回回调rabbitTemplate.setReturnCallback((message, replyCode, replyText, ex, routing) -> {log.info("message:{}, replyCode:{}, replyText:{}, exchange:{}, routingKey:{}", JSON.toJSONString(message),replyCode, replyText, ex, routing);});// 生成关联数据并发送消息到交换机CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());// 消息内容String messageBody = StrUtil.format("this message send at {}", DateUtil.format(LocalDateTime.now(), "yyyy-MM-dd HH:mm:ss"));rabbitTemplate.convertAndSend(RabbitConfig.Exchange.DEMO_BASIC_NORMAL_EXCHANGE, RabbitConfig.RoutingKey.DEMO_ROUTING_KEY, messageBody, correlationData);log.info(">>>>>{}, 发送消息:{}", DateUtil.format(LocalDateTime.now(), "yyyy-MM-dd HH:mm:ss"), messageBody);return "OK";}}

5.4 消息消费者

@Component
@Slf4j
public class DeadLetterConsumer {/*** 监听 DEMO_CONSUMER_QUEUE 并处理传入的消息。* 为测试目的抛出 IOException 以模拟异常。** @param messageBody 消息负载* @param headers     消息头* @param channel     用于消息确认的通道* @throws IOException 如果抛出异常*/@RabbitListener(queues = RabbitConfig.MyQueue.DEMO_CONSUMER_QUEUE)@RabbitHandlerpublic void testBasicQueueAndThrowsException(@Payload String messageBody, @Headers Map<String, Object> headers, Channel channel) throws IOException {/*** Delivery Tag 用来标识信道中投递的消息。RabbitMQ 推送消息给 Consumer 时,会附带一个 Delivery Tag,* 以便 Consumer 可以在消息确认时告诉 RabbitMQ 到底是哪条消息被确认了。* RabbitMQ 保证在每个信道中,每条消息的 Delivery Tag 从 1 开始递增。*/Long tag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);log.info(">>>>>{} 普通队列消费, tag = {}, 消息内容:{}", DateUtil.format(LocalDateTime.now(), "yyyy-MM-dd HH:mm:ss"), tag, messageBody);/***  multiple 取值为 false 时,表示通知 RabbitMQ 当前消息被确认*  如果为 true,则额外将比第一个参数指定的 delivery tag 小的消息一并确认*/// ACK,确认一条消息已经被消费
//        channel.basicAck(deliveryTag, false);// 对应的业务操作。。。。。// doBusiness();// 模拟消息拒绝channel.basicNack(tag, false, false);}/*** 处理业务逻辑*/private void doBusiness() {System.out.println("here do some business code");}/*** 监听死信队列并处理消息。** @param data    消息内容* @param tag     消息标签* @param channel 通道*/@RabbitListener(queues = RabbitConfig.MyQueue.DEMO_DEAD_LETTER_QUEUE)@RabbitHandlerpublic void fromDeadLetter(@Payload String data, @Header(AmqpHeaders.DELIVERY_TAG) long tag, Channel channel) {log.info(">>>>>{} 死信队列消费, tag = {}, 消息内容:{}", DateUtil.format(LocalDateTime.now(), "yyyy-MM-dd HH:mm:ss"), tag, data);// 对应的业务操作。。。。。}
}

5.5 YML配置

spring:rabbitmq:username: rabbitmqpassword: rabbitmqport: 5672host: 127.0.0.1#publisher-confirm-type参数有三个可选值:#SIMPLE:会触发回调方法,相当于单个确认(发一条确认一条)。#CORRELATED:消息从生产者发送到交换机后触发回调方法。#NONE(默认):关闭发布确认模式。publisher-confirm-type: correlatedtemplate:receive-timeout: 1800000reply-timeout: 1800000retry:enabled: falselistener:direct:retry:enabled: truedefault-requeue-rejected: falsesimple:retry:# 是否开启消费者重试(为false时关闭消费者重试,这时消费端代码异常会一直重复收到消息)enabled: true# 最大重试次数max-attempts: 1# 重试间隔时间(单位毫秒)initial-interval: 10000# 重试最大时间间隔(单位毫秒)max-interval: 300000# 应用于前一重试间隔的乘法器multiplier: 5default-requeue-rejected: false

5.6 控制台输出

从控制台可以看出,消息被拒绝后,大概10秒后死信队列消息被消费。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/249596.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

喝汽水问题

答案&#xff1a; #include <stdio.h> int main() {int num 0; //可以喝汽水的次数int mon 20; //钱int cup 0; //瓶子数for (mon 20; mon > 0; mon--) //每次花1元钱买汽水喝{num; //可以喝汽水的次数加1cup; //瓶子数加1if (cup 2) //如果瓶子…

cuda基础教程(一)

文章目录 0. CURA Runtime API1. CUDA人工智能编程1.1. CUDA介绍1.2. 课程内容 2. 异构计算和并行计算2.1. 什么是并行计算2.2. 什么是异构计算 3. CUDA介绍3.1. GPU的性能指标3.2. 什么是CUDA3.3. 如何学习CUDA 4. 系统GPU查询5. Linux系统6. CUDA安装7. 查询GPU信息8. CUDA编…

故障诊断 | 一文解决,GRU门控循环单元故障诊断(Matlab)

文章目录 效果一览文章概述专栏介绍模型描述源码设计参考资料效果一览 文章概述 故障诊断 | 一文解决,GRU门控循环单元故障诊断(Matlab) 专栏介绍 订阅【故障诊断】专栏,不定期更新机器学习和深度学习在故障诊断中的应用;订阅

Java:搭建eladmin复习mvn、springboot、vue等

目录 1.源码平台后端&#xff1a; 2.源码平台前端&#xff1a; 3.操作系统&#xff1a;centos7.9 4.mysql:5.7.x 安装 5.redis:5.0.X 6.maven&#xff1a;3.8 7.java:1.8&#xff1a; 8.nodejs:16.x 9.通过mvn打包eladmin后端 10.npm打包前端项目进行部署 11.访问测试…

永磁同步电机速度环闭环控制

文章目录 1、速度环分析2、电机参数3、PI计算4、模型仿真4.1 模型总览4.2 实际转速与参考转速对比4.3 转矩波形4.4 相电流采样波形 模型下载地址&#xff1a; 链接: 速度闭环模型&#xff08;速度电流双闭环&#xff09; 1、速度环分析 2、电机参数 Udc24 V Rs0.6 LdLq1.4e-3…

二、人工智能之提示工程(Prompt Engineering)

黑8说 岁月如流水匆匆过&#xff0c;哭一哭笑一笑不用说。 黑8自那次和主任谈话后&#xff0c;对这个“妖怪”继续研究&#xff0c;开始学习OpenAI API&#xff01;关注到了提示工程(Prompt Engineering)的重要性&#xff0c;它包括明确的角色定义、自然语言理解&#xff08;…

Redis 布隆过滤器

布隆过滤器 这一篇文章主要是记录布隆过滤器的使用和认识 主要参考了如下的blog https://blog.csdn.net/weixin_42972832/article/details/131211665 他讲的还不错 简单的来说,布隆过滤器,实际上就像是一个集合,拿redis的key来举例来说,布隆过滤器的设置就是去过滤不属于redi…

mybatisplus-多数据源配置

1. 流程 pom文件yml配置多数据源具体服务添加注解DS(“***”) 1.pom文件 <!--mybatis plus 起步依赖--><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.4.0</vers…

VS之调用程序对DLL中全局变量的使用

接上篇《VS生成C动态链接库DLL》&#xff0c;能够生成DLL&#xff0c;且能调用后&#xff0c;遇到一个问题&#xff0c;即在DLL程序中定义了一些全局变量&#xff0c;应用程序需要使用&#xff0c;本以为可以直接使用&#xff0c;没想到&#xff0c;还是需要设置才可以&#xf…

【Node.js基础】Node.js的介绍与安装

文章目录 前言一、什么是Node.js&#xff1f;二、安装Node.js2.1 Windows系统2.2 macOS系统2.3 Linux系统 三、运行js代码总结 前言 随着互联网技术的不断发展&#xff0c;构建高性能、实时应用的需求日益增长。Node.js作为一种服务器端运行时环境&#xff0c;以其事件驱动、非…

leetcode-704.二分查找

题目 给定一个 n 个元素有序的&#xff08;升序&#xff09;整型数组 nums 和一个目标值 target &#xff0c;写一个函数搜索 nums 中的 target&#xff0c;如果目标值存在返回下标&#xff0c;否则返回 -1。 示例 1: 输入: nums [-1,0,3,5,9,12], target 9输出: 4 解释: 9 …

05MARL经典算法 基于联合动作价值函数

文章目录 前言一、动态规划值迭代算法二、TD差分联合动作学习1.Nash Q-learning2.Correlated Q-Learning 三、JAL的限制总结 前言 用于记录MARL当中的经典算法 基础的MARL算法有三种类型&#xff1a;学习联合动作价值函数、学习智能体的显示模型根据过去的动作预测未来的动作、…

uniapp瀑布流实现

1. 图片瀑布流&#xff1a; 不依赖任何插件&#xff0c;复制即可见效&#xff1a; <template><view class"page"><view class"left" ref"left"><image class"image" v-for"(item,i) in leftList" :k…

运动编辑学习笔记

目录 跳舞重建&#xff1a; 深度运动重定向 Motion Preprocessing Tool anim_utils MotionBuilder 跳舞重建&#xff1a; https://github.com/Shimingyi/MotioNet 深度运动重定向 https://github.com/DeepMotionEditing/deep-motion-editin 游锋生/deep-motion-editin…

红队打靶练习:INFOSEC PREP: OSCP

目录 信息收集 1、arp 2、nmap WEB 信息收集 wpscan dirsearch ssh登录 提权 信息收集 1、arp ┌──(root㉿ru)-[~/kali] └─# arp-scan -l Interface: eth0, type: EN10MB, MAC: 00:0c:29:69:c7:bf, IPv4: 192.168.110.128 Starting arp-scan 1.10.0 with 256 ho…

Ajax入门与使用

目录 ◆ AJAX 概念和 axios 使用 什么是 AJAX&#xff1f; 怎么发送 AJAX 请求&#xff1f; 如何使用axios axios 函数的基本结构 axios 函数的使用场景 1 没有参数的情况 2 使用params参数传参的情况 3 使用data参数来处理请求体的数据 4 上传图片等二进制的情况…

【Python基础 机器学习】Python环境搭建(适合新手阅读的超详细教程)

&#x1f680;个人主页&#xff1a;为梦而生~ 关注我一起学习吧&#xff01; &#x1f4a1;重要专栏&#xff1a; 机器学习 &#xff1a;相对完整的机器学习基础教学&#xff01; 机器学习python实战&#xff1a;用python带你感受真实的机器学习 深度学习&#xff1a;现代人工智…

【leetcode】20. 有效的括号

有效的括号 题目链接 // 栈结构 typedef char valuetype; typedef struct {valuetype* arr;int top;int capacity; } Stack;void Init(Stack* stack);void Push(Stack* stack, valuetype value); void Pop(Stack* stack);valuetype Top(Stack* stack); int Size(Stack* stack…

Elasticsearch:构建自定义分析器指南

在本博客中&#xff0c;我们将介绍不同的内置字符过滤器、分词器和分词过滤器&#xff0c;以及如何创建适合我们需求的自定义分析器。更多关于分析器的知识&#xff0c;请详细阅读文章&#xff1a; 开始使用 Elasticsearch &#xff08;3&#xff09; Elasticsearch: analyzer…

jenkins部署(docker)

docker部署&#xff0c;避免安装tomcat 1.拉镜像 docker pull jenkins/jenkins2.宿主机创建文件夹 mkdir -p /lzp/jenkins_home chmod 777 /lzp/jenkins_home/3.启动容器 docker run -d -p 49001:8080 -p 49000:50000 --privilegedtrue -v /lzp/jenkins_home:/var/jenkins_…