RabbitMQ-死信队列(golang)

1、概念

      死信(Dead Letter),字面上可以理解为未被消费者成功消费的信息,正常来说,生产者将消息放入到队列中,消费者从队列获取消息,并进行处理,但是由于某种原因,队列中的消息未被消费者拿到,这样的消息就会成为死信,存放死信消息的队列,也就被称为死信队列(Dead Letter Queue,简称DLQ)。

2、死信产生的原因

文心一言的回答如下:

  1. 消息被拒绝:当消费者使用basic.reject或basic.nack方法拒绝消息,并且requeue参数被设置为false时,消息会被视为死信。这意味着消费者明确表示无法或不愿意处理该消息,并且不希望该消息重新进入队列等待其他消费者处理。
  2. 消息处理失败:消费者由于代码错误、消息格式不正确、业务规则冲突等原因无法成功处理消息时,该消息也可以被标记为死信。这种情况下,尽管消费者尝试处理消息,但由于某些无法克服的错误,消息无法被成功消费。
  3. 消息过期:如果消息设置了生存时间(TTL,Time To Live),并且在这个时间内没有被消费,那么消息会过期并被视为死信。TTL是RabbitMQ中用于指定消息在队列中存活时间的参数,超过该时间的消息将被视为过期并丢弃或转发到死信队列。
  4. 队列长度限制:当队列中的消息数量超过了设置的最大长度时,新到达的消息无法进入队列,这些消息也会被视为死信。队列长度限制是RabbitMQ中用于控制队列大小的一种机制,当队列达到最大容量时,新到达的消息将无法被接收并可能被丢弃或转发到死信队列。

总结来说,主要原因就三个,消息被拒绝、消息过期、队列满

在一些重要的场景,比如支付场景,提交的订单超时未支付的,可以设计为进入死信队列。

3、死信队列使用实践

3.1 消息过期

设置正常队列ttl过期时间为5s,如果5s内消息没有被消费,则会自动放入死信队列中。

关键点:设置正常队列属性,ttl5s过期:

// 设置TTL并让消息过期进入死信队列,5s不消费就为进入死信队列args := amqp.Table{"x-message-ttl":             int64(5000), // 5秒TTL"x-dead-letter-exchange":    "","x-dead-letter-routing-key": dlx.Name,}// 声明正常队列q, err := ch.QueueDeclare("normal_queue", // nametrue,           // durablefalse,          // delete when unusedfalse,          // exclusivefalse,          // no-waitargs,           // arguments)

全部代码如下: 

package mainimport ("fmt"amqp "github.com/rabbitmq/amqp091-go"
)func main() {conn, err := amqp.Dial("amqp://guest:guest@xxxx.xx.xx.xxx:5672/")if err != nil {fmt.Println("Failed to connect to RabbitMQ")return}defer conn.Close()ch, err := conn.Channel()if err != nil {fmt.Println("Failed to open a channel")return}// 声明死信队列dlx, err := ch.QueueDeclare("dead_letter_queue", // nametrue,                // durablefalse,               // delete when unusedfalse,               // exclusivefalse,               // no-waitnil,                 // arguments)if err != nil {fmt.Println("Failed to declare a queue: dead_letter_queue,err:" + err.Error())return}err = ch.ExchangeDeclare("my_exchange", // name"direct",      // typetrue,          // durablefalse,         // auto-deletedfalse,         // internalfalse,         // no-waitnil,           // arguments)if err != nil {fmt.Println("Failed to declare exchange: my_exchange,err:" + err.Error())return}// 设置TTL并让消息过期进入死信队列,5s不消费就为进入死信队列args := amqp.Table{"x-message-ttl":             int64(5000), // 5秒TTL"x-dead-letter-exchange":    "","x-dead-letter-routing-key": dlx.Name,}// 声明正常队列,注意,必须在声明队列时就要设置死信队列信息q, err := ch.QueueDeclare("normal_queue", // nametrue,           // durablefalse,          // delete when unusedfalse,          // exclusivefalse,          // no-waitargs,           // arguments)if err != nil {fmt.Println("Failed to declare a queue: normal_queue,err:" + err.Error())return}// 将正常队列绑定到交换机,并设置死信交换机和路由键err = ch.QueueBind(q.Name,        // queue nameq.Name,        // routing key"my_exchange", // exchangefalse,nil,)if err != nil {fmt.Println("Failed to bind queue to exchange: my_exchange,err:" + err.Error())return}err = ch.Publish("my_exchange", "normal_queue", false, false, amqp.Publishing{Body: []byte("hello world")})if err != nil {fmt.Println("Failed to publis msg to exchange: my_exchange,err:" + err.Error())return}
}

队列信息包括绑定的死信队列信息、ttl等信息如下:

运行上方代码,会向队列发送一条信息,我们先不创建消费者,5s后,消息会被自动放入死信队列。

3.2 队列满

当mq队列由于消息量过多导致队列打满时,这个时候过来的消息,将会被自动放入到死信队列中。

设置队列长度属性代码如下:

args := amqp.Table{// "x-message-ttl":             int64(5000), // 5秒TTL"x-max-length":              2,"x-dead-letter-exchange":    "","x-dead-letter-routing-key": dlx.Name,}// 声明正常队列q, err := ch.QueueDeclare("normal_queue", // nametrue,           // durablefalse,          // delete when unusedfalse,          // exclusivefalse,          // no-waitargs,           // arguments)

队列属性如下:

发送两条信息:

 继续发送第三个:

 测试代码:

package mainimport ("fmt"amqp "github.com/rabbitmq/amqp091-go"
)func main() {conn, err := amqp.Dial("amqp://guest:guest@xx.xx.xx.xx:5672/")if err != nil {fmt.Println("Failed to connect to RabbitMQ")return}defer conn.Close()ch, err := conn.Channel()if err != nil {fmt.Println("Failed to open a channel")return}// 声明死信队列dlx, err := ch.QueueDeclare("dead_letter_queue", // nametrue,                // durablefalse,               // delete when unusedfalse,               // exclusivefalse,               // no-waitnil,                 // arguments)if err != nil {fmt.Println("Failed to declare a queue: dead_letter_queue,err:" + err.Error())return}err = ch.ExchangeDeclare("my_exchange", // name"direct",      // typetrue,          // durablefalse,         // auto-deletedfalse,         // internalfalse,         // no-waitnil,           // arguments)if err != nil {fmt.Println("Failed to declare exchange: my_exchange,err:" + err.Error())return}// 设置TTL并让消息过期进入死信队列,5s不消费就为进入死信队列args := amqp.Table{// "x-message-ttl":             int64(5000), // 5秒TTL"x-max-length":              2,"x-dead-letter-exchange":    "","x-dead-letter-routing-key": dlx.Name,}// 声明正常队列q, err := ch.QueueDeclare("normal_queue", // nametrue,           // durablefalse,          // delete when unusedfalse,          // exclusivefalse,          // no-waitargs,           // arguments)if err != nil {fmt.Println("Failed to declare a queue: normal_queue,err:" + err.Error())return}// 将正常队列绑定到交换机,并设置死信交换机和路由键err = ch.QueueBind(q.Name,        // queue nameq.Name,        // routing key"my_exchange", // exchangefalse,nil,)if err != nil {fmt.Println("Failed to bind queue to exchange: my_exchange,err:" + err.Error())return}err = ch.Publish("my_exchange", "normal_queue", false, false, amqp.Publishing{Body: []byte("hello world")})if err != nil {fmt.Println("Failed to publis msg to exchange: my_exchange,err:" + err.Error())return}
}

3.3 消息被拒绝

       消息被拒绝的情况,当消费者无法处理某条信息时,客户端想rabbitmq服务器发送一个【负确认】应答,表示消费者未能成功处理此条消息,并且希望RabbitMQ根据配置重新发送这条消息(例如,将其重新排队)或者将其丢弃。

客户端函数:ch.Nack,函数原型:

func (ch *Channel) Nack(tag uint64, multiple bool, requeue bool) error {ch.m.Lock()defer ch.m.Unlock()return ch.send(&basicNack{DeliveryTag: tag,Multiple:    multiple,Requeue:     requeue,})
}

入参含义如下: 

tag

这是一个唯一标识符,用于标识消费者之前接收到的特定消息。当消费者调用 ch.Ackch.Nack 或 ch.Reject 时,必须提供这个标识符,以便RabbitMQ知道是对哪条消息进行确认或拒绝。

multiple

这是一个布尔值(bool),用于指示是否应该同时确认(或拒绝)多条消息。如果设置为 true,则RabbitMQ将认为从上一个被确认的消息开始(包括该消息),直到当前消息为止的所有未确认消息都被拒绝。这通常用于批量处理消息确认,但在使用 ch.Nack 时,它的作用更多是关于是否应该重新排队当前消息之后的消息(取决于RabbitMQ的配置和消息的属性)。

requeue

这也是一个布尔值(bool),用于指示被拒绝的消息是否应该被重新放入队列的末尾以便稍后重试。如果设置为 true,则消息将被重新排队;如果设置为 false,则消息将被丢弃(或者根据RabbitMQ的配置可能被发送到死信队列,如果配置了的话)。

测试过程,首先使用3.1或者3.2的代码向mq中写入几条信息:

之后使用如下代码进行消费:

package mainimport ("fmt""time"amqp "github.com/rabbitmq/amqp091-go"
)func main() {conn, err := amqp.Dial("amqp://guest:guest@xx.xx.xx.xx:5672/")if err != nil {fmt.Println("Failed to connect to RabbitMQ")return}defer conn.Close()ch, err := conn.Channel()if err != nil {fmt.Println("Failed to open a channel")return}err = ch.ExchangeDeclare("my_exchange", // name"direct",      // typetrue,          // durablefalse,         // auto-deletedfalse,         // internalfalse,         // no-waitnil,           // arguments)if err != nil {fmt.Println("Failed to declare exchange: my_exchange,err:" + err.Error())return}// 声明正常队列// q, err := ch.QueueDeclare(// 	"normal_queue", // name// 	true,           // durable// 	false,          // delete when unused// 	false,          // exclusive// 	false,          // no-wait// 	nil,            // arguments// )// if err != nil {// 	fmt.Println("Failed to declare a queue: normal_queue,err:" + err.Error())// 	return// }// 将正常队列绑定到交换机,并设置死信交换机和路由键err = ch.QueueBind("normal_queue", // queue name"normal_queue", // routing key"my_exchange",  // exchangefalse,nil,)if err != nil {fmt.Println("Failed to bind queue to exchange: my_exchange,err:" + err.Error())return}msgs, _ := ch.Consume("normal_queue", // queue"",             // consumerfalse,          // auto-acktrue,           // exclusivefalse,          // no-localfalse,          // no-waitnil,            // args)go func() {for d := range msgs {// 模拟处理失败,全部放入死信队列ch.Nack(d.DeliveryTag, false, false)}}()time.Sleep(10 * time.Second)
}

运行代码后,3条消息全部进入到死信队列中:

 4、总结

      RabbitMQ的死信队列(Dead Letter Queue,简称DLQ)是一种用于处理消息失败或无法路由的消息的机制,死信队列中的所有消息都是无法被正常消费的死信,这使得开发者可以集中对这些消息进行管理和分析。通过分析死信队列中的消息,开发者可以了解系统的运行状态、发现潜在的问题,并进行相应的优化和改进,以提升系统的稳定性和可靠性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/473776.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Uni-APP+Vue3+鸿蒙 开发菜鸟流程

参考文档 文档中心 运行和发行 | uni-app官网 AppGallery Connect DCloud开发者中心 环境要求 Vue3jdk 17 Java Downloads | Oracle 中国 【鸿蒙开发工具内置jdk17,本地不使用17会报jdk版本不一致问题】 开发工具 HBuilderDevEco Studio【目前只下载这一个就…

Python中的with语句

with语句和上下文管理器 Python提供了 with 语句的写法,既简单又安全 文件操作的时候使用with语句可以自动调用关闭文件操作,即使出现异常也会自动关闭文件操作。 # 1、以写的方式打开文件 with open(1.txt, w) as f:# 2、读取文件内容f.write(hello wor…

SQL面试题——抖音SQL面试题 主播播出时长

主播播出时长 现有如下数据,主播id、房间号、播出的批次号,每个批次号进出房间的时间戳、分区时间: 每一次直播都有一个上播和下播,每个房间里,同一个批次号会有两条数据,分别记录了上播和下播时间,求每个主播的播出时长? 通过上面的数据,可以清晰的看出,同一个批次…

【汇编】c++游戏开发

由一起学编程创作的‘C/C项目实战:2D射击游戏开发(简易版), 440 行源码分享来啦~’: C/C项目实战:2D射击游戏开发(简易版), 440 行源码分享来啦~_射击c-CSDN博客文章浏览…

Uniapp 引入 Android aar 包 和 Android 离线打包

需求: 原生安卓 apk 要求嵌入到 uniapp 中,并通过 uniapp 前端调起 app 的相关组件。 下面手把手教你,从 apk 到 aar,以及打包冲突到如何运行,期间我所遇到的问题都会 一 一 进行说明,相关版本以我文章内为…

自动化运维(k8s):一键获取指定命名空间镜像包脚本

前言:脚本写成并非一蹴而就,需要不断的调式和修改,这里也是改到了7版本才在 生产环境 中验证成功。 该命令 和 脚本适用于以下场景:在某些项目中,由于特定的安全或政策要求,不允许连接到你的镜像仓库。然而…

Vue2+ElementUI:用计算属性实现搜索框功能

前言: 本文代码使用vue2element UI。 输入框搜索的功能,可以在前端通过计算属性过滤实现,也可以调用后端写好的接口。本文介绍的是通过计算属性对表格数据实时过滤,后附完整代码,代码中提供的是死数据,可…

机器学习(1)

一、机器学习 机器学习(Machine Learning, ML)是人工智能(Artificial Intelligence, AI)的一个分支,它致力于开发能够从数据中学习并改进性能的算法和模型。机器学习的核心思想是通过数据和经验自动优化算法&#xff…

【Linux学习】【Ubuntu入门】1-4 ubuntu终端操作与shell命令1

1.使用快捷键CtrlAltT打开命令终端,或者单击右键点击… 2.常用shell命令 目录信息查看命令:ls ls -a:显示目录所有文件及文件夹,包括隐藏文件,比如以.开头的 ls -l:显示文件的详细信息 ls -al&#xff1…

Oracle OCP认证考试考点详解082系列19

题记: 本系列主要讲解Oracle OCP认证考试考点(题目),适用于19C/21C,跟着学OCP考试必过。 91. 第91题: 题目 解析及答案: 关于 Oracle 数据库中的索引及其管理,以下哪三个陈述是正确的&#x…

智能网页内容截图工具:AI助力内容提取与可视化

我们每天都会接触到大量的网页内容。然而,如何从这些内容中快速提取关键信息,并有效地进行整理和分享,一直是困扰我们的问题。本文将介绍一款我近期完成的基于AI技术的智能网页内容截图工具,它能够自动分析网页内容,截…

基于单片机智能温室大棚监测系统

本设计以单片机为核心的智能温室大棚监测系统,用于监测大棚内的温湿度、土壤湿度、CO2浓度和光照强度。该系统以STM32F103C8T6芯片为核心控制单元,涵盖电源、按键、NB-IoT模块、显示屏模块、空气温湿度检测、土壤湿度检测、二氧化碳检测和光敏电阻等模块…

深挖C++赋值

详解赋值 const int a 10; int b a;&a 0x000000b7c6afef34 {56496} &a 0x000000b7c6afef34 {10} 3. &b 0x000000b7c6afef54 {10} 总结: int a 10 是指在内存中(栈)中创建一个int (4 byte)大小的空间…

【Golang】——Gin 框架中的模板渲染详解

Gin 框架支持动态网页开发,能够通过模板渲染结合数据生成动态页面。在这篇文章中,我们将一步步学习如何在 Gin 框架中配置模板、渲染动态数据,并结合静态资源文件创建一个功能完整的动态网站。 文章目录 1. 什么是模板渲染?1.1 概…

创建vue3项目步骤

脚手架创建项目: pnpm create vue Cd 项目名称安装依赖:Pnpm iPnpm Lint:修复所有文件风格 ,不然eslint语法警告报错要双引号Pnpm dev启动项目 拦截错误代码提交到git仓库:提交前做代码检查 pnpm dlx husky-in…

【爬虫实战】抓取某站评论

【爬虫实战】抓取某站评论 声明:本文中所有内容仅供学习交流使用,不用于其他任何目的,不提供完整代码,严禁用于商业用途和非法用途,否则由此产生的一切后果均与作者无关! 方式一:JS逆向request发…

OpenSSL 自签名

参考文档:unigui开发人员工作手册2021 参考文章:保姆级OpenSSL下载及安装教程-CSDN博客 下载 Win32/Win64 OpenSSL Installer for Windows - Shining Light Productions 进入后向下拉找到下载位置,建议下载二进制版本的精简版&#xff0c…

基于YOLOv8深度学习的公共卫生防护口罩佩戴检测系统(PyQt5界面+数据集+训练代码)

在全球公共卫生事件频发的背景下,防护口罩佩戴检测成为保障公众健康和控制病毒传播的重要手段之一。特别是在人员密集的公共场所,例如医院、学校、公共交通工具等地,口罩的正确佩戴对降低病毒传播风险、保护易感人群、遏制疫情扩散有着至关重…

stm32下的ADC转换(江科协 HAL版)

十二. ADC采样 文章目录 十二. ADC采样12.1 ADC的采样原理12.2 STM32的采样基本过程1.引脚与GPIO端口的对应关系2.ADC规则组的四种转换模式(**)2.2 关于转换模式与配置之间的关系 12.3 ADC的时钟12.4 代码实现(ADC单通道 & ADC多通道)1. 单通道采样2. 多通道采样 19.ADC模数…

“fc-async”提供了基本的异步处理能力

在开发中,异步处理已经成为提升系统性能和用户体验的常用方式。然而,传统的@Async注解和基础的异步处理工具在面对复杂的任务场景时,存在局限性。这些局限性包括但不限于高并发环境下的稳定性、任务失败后的恢复机制、以及任务的监控和管理。 开源项目“fc-async”提供了基…