在nodejs中使用RabbitMQ(五)死信队列,延迟队列

死信队列(Dead Letter Queue,DLQ)是 RabbitMQ 中的一种机制,用于处理无法成功消费或不能按预期处理的消息。简单来说,死信队列用于存储那些不能被正常消费或处理的消息,以便后续审查或重新处理。

死信队列用于处理以下几种情况的消息:

1、消息过期:如果消息的 TTL(存活时间)到期,消息会被认为是死信,自动转发到死信队列。

2、消息被拒绝(nack:消费者拒绝消息时,如果消息被标记为不重新排队(requeue: false),消息会被送往死信队列。

3、队列满:如果队列已经达到最大长度,且还有新消息进入,RabbitMQ 会丢弃老的消息并将其送到死信队列。

4、消息无法路由:如果消息没有匹配到任何队列的路由规则,它会被送往死信队列。

配置死信队列

RabbitMQ 提供了两种主要机制来设置死信队列:

1、死信交换机(Dead Letter Exchange,DLX): 

  • 设置一个专门的交换机(Dead Letter Exchange,DLX),并将死信消息转发到该交换机。
  • 可以为队列设置 x-dead-letter-exchange 属性来指定死信交换机。

    2、死信路由键(Dead Letter Routing Key): 

    • 可以设置死信消息的路由键,以便将消息路由到适当的死信队列。
    • 通过 x-dead-letter-routing-key 属性来配置。

    3、 消息过期时间(TTL):

    • 你可以设置队列的消息存活时间(TTL)。消息在过期后会变为死信并转发到死信队列。
    • 通过 x-message-ttl 属性来设置消息的生存时间。

    死信队列创建

    1、一个生产者1,一个exchange交换机1,一个消费者1(主队列)。

    2、一个exchange交换机2,一个消费者2(死信队列)。

    3、消费者1要配置队列参数'x-dead-letter-exchange','x-dead-letter-routing-key', 'x-message-ttl'。

    生产者1通过exchange1发送消息给消费者1,如果消息不能正常处理,会通过exchange2转发给消费者2. 

    示例

    一个生产者,将消息发送给两个消费者,消费者消息如果处理失败,会将消息转发给死信队列。

    producer.ts

    import RabbitMQ from 'amqplib';async function start() {try {const conn = await RabbitMQ.connect("amqp://admin:admin1234@localhost:5672?heartbeat=60");conn.on("error", function (err1) {if (err1.message !== "Connection closing") {console.error("[AMQP] conn error", err1.message);}});conn.on("close", function () {console.error("[AMQP] reconnecting");return setTimeout(start, 1000);});console.log("[AMQP] connected");let channel = null;try {channel = await conn.createChannel();} catch (err) {console.error("[AMQP]", err);return setTimeout(start, 1000);}const exchangeName = 'exchange8';await channel.assertExchange(exchangeName,'direct',{durable: true,},);const deadExchangeName = 'exchange8_dead_letter';await channel.assertExchange(deadExchangeName,'direct',{durable: true,},);let routeKey = 'route.key';for (let i = 0; i < 4; ++i) {// console.log('message send!', channel.sendToQueue(//   queueName,//   Buffer.from(`发送消息,${i}${Math.ceil(Math.random() * 100000)}`),//   { persistent: true, correlationId: 'ooooooooooooooo' },// 消息持久化,重启后存在//   // (err: any, ok: Replies.Empty)=>{}// ));let num = Math.ceil(Math.random() * 100000);console.log('消息发送是否成功', num, routeKey, channel.publish(exchangeName,routeKey,Buffer.from(`"发送消息, index:${i}, number:${num}, routeKey:${JSON.stringify(routeKey)}"`),{persistent: true,},));}setTimeout(() => {conn.close();process.exit(0);}, 1000);} catch (err) {console.error("[AMQP]", err);return setTimeout(start, 1000);}
    }start();
    

    dead_letter.ts

    import RabbitMQ, { type Replies } from 'amqplib/callback_api';RabbitMQ.connect('amqp://admin:admin1234@localhost:5672', (err0, conn) => {if (err0) {console.error(err0);return;}conn.createChannel(function (err1, channel) {console.log('[*] waiting...');const deadExchangeName = 'exchange8_dead_letter';const deadRouteKey = 'route.key.dead.letter';// 一次只有一个未确认消息,防止消费者过载channel.prefetch(1);// 死信队列const deadQueueName = 'queue8';channel.assertQueue(deadQueueName, {durable: true,  // 保证死信队列持久化});channel.bindQueue(deadQueueName, deadExchangeName, deadRouteKey);channel.consume(deadQueueName, (msg) => {console.log(`死信队列'${deadQueueName}'接收到的消息:`, msg?.content.toString());if (msg) {channel.ack(msg);}}, {// noAck: true, // 是否自动确认消息,为true不需要调用channel.ack(msg);noAck: false,arguments: {}});});conn.on("error", function (err1) {if (err1.message !== "Connection closing") {console.error("[AMQP] conn error", err1.message);}});conn.on("close", function () {console.error("[AMQP] reconnecting");});
    });
    

    consumer.ts

    import RabbitMQ, { type Replies } from 'amqplib/callback_api';RabbitMQ.connect('amqp://admin:admin1234@localhost:5672', (err0, conn) => {if (err0) {console.error(err0);return;}conn.createChannel(function (err1, channel) {console.log('[*] waiting...');const exchangeName = 'exchange8';const routeKey = 'route.key';const deadExchangeName = 'exchange8_dead_letter';const deadRouteKey = 'route.key.dead.letter';// 一次只有一个未确认消息,防止消费者过载channel.prefetch(1);// 主队列const queueName = 'queue7';channel.assertQueue(queueName, {durable: true,  // 队列是否持久化,确保队列在 RabbitMQ 重启后仍然存在arguments: {// 对应的死信交换机(空字符串表示默认交换机)'x-dead-letter-exchange': deadExchangeName,// 死信队列的路由键'x-dead-letter-routing-key': deadRouteKey,// 队列消息过期时间1分钟'x-message-ttl': 60000,},});channel.bindQueue(queueName,exchangeName,routeKey,{},(err, ok) => {console.log(queueName, '队列绑定结果', err, ok);},);channel.consume(queueName, function (msg) {if (Math.ceil(Math.random() * 100) > 50 && msg) {console.log(`队列'${queueName}'接收到的消息`, msg?.content.toString());channel.ack(msg);} else if (msg) {console.log(`队列'${queueName}'拒绝接收消息`, msg?.content.toString());// 第二个参数,false拒绝当前消息// 第二个参数,true拒绝小于等于当前消息// 第三个参数,3false从队列中清除// 第三个参数,4true从新在队列中排队channel.nack(msg, false, false);} else {}}, {// noAck: true, // 是否自动确认消息,为true不需要调用channel.ack(msg);noAck: false,arguments: {}}, (err: any, ok: Replies.Empty) => {console.log(err, ok);});});conn.on("error", function (err1) {if (err1.message !== "Connection closing") {console.error("[AMQP] conn error", err1.message);}});conn.on("close", function () {console.error("[AMQP] reconnecting");});
    });
    

    consumer2.ts

    import RabbitMQ, { type Replies } from 'amqplib/callback_api';RabbitMQ.connect('amqp://admin:admin1234@localhost:5672', (err0, conn) => {if (err0) {console.error(err0);return;}conn.createChannel(function (err1, channel) {console.log('[*] waiting...');const exchangeName = 'exchange8';const routeKey = 'route.key';const deadExchangeName = 'exchange8_dead_letter';const deadRouteKey = 'route.key.dead.letter';// 一次只有一个未确认消息,防止消费者过载channel.prefetch(1);// 主队列const queueName = 'queue9';channel.assertQueue(queueName, {durable: true,  // 队列是否持久化,确保队列在 RabbitMQ 重启后仍然存在arguments: {// 对应的死信交换机(空字符串表示默认交换机)'x-dead-letter-exchange': deadExchangeName,// 死信队列的路由键'x-dead-letter-routing-key': deadRouteKey,// 队列消息过期时间1分钟'x-message-ttl': 60000,},});channel.bindQueue(queueName,exchangeName,routeKey,{},(err, ok) => {console.log(queueName, '队列绑定结果', err, ok);},);channel.consume(queueName, function (msg) {if (Math.ceil(Math.random() * 100) > 50 && msg) {console.log(`队列'${queueName}'接收到的消息`, msg?.content.toString());channel.ack(msg);} else if (msg) {console.log(`队列'${queueName}'拒绝接收消息`, msg?.content.toString());// 第二个参数,false拒绝当前消息// 第二个参数,true拒绝小于等于当前消息// 第三个参数,3false从队列中清除// 第三个参数,4true从新在队列中排队channel.nack(msg, false, false);} else {}}, {// noAck: true, // 是否自动确认消息,为true不需要调用channel.ack(msg);noAck: false,arguments: {}}, (err: any, ok: Replies.Empty) => {console.log(err, ok);});});conn.on("error", function (err1) {if (err1.message !== "Connection closing") {console.error("[AMQP] conn error", err1.message);}});conn.on("close", function () {console.error("[AMQP] reconnecting");});
    });
    

     延迟队列

    方法一

    rabbitmq本身没有直接提供延迟队列,可以通过安装插件实现(注:目前支持到4.0.2)。

    rabbitmq_delayed_message_exchange

     文档:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

    下载:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

     方法二

     通过死信队列实现消息的延迟处理。主队列并不会被直接消费,而是通过设置 x-message-ttl(即消息的过期时间)来控制消息的存活时间。消息在主队列中的 TTL 到期后,它会被转发到死信队列(Dead Letter Queue)中,在死信队列中处理消息。

    相比于上述死信队列的实现,在consumer.ts中删除了channel.consume消息接收相关代码。

     producer.ts

    import RabbitMQ from 'amqplib';async function start() {try {const conn = await RabbitMQ.connect("amqp://admin:admin1234@localhost:5672?heartbeat=60");conn.on("error", function (err1) {if (err1.message !== "Connection closing") {console.error("[AMQP] conn error", err1.message);}});conn.on("close", function () {console.error("[AMQP] reconnecting");return setTimeout(start, 1000);});console.log("[AMQP] connected");let channel = null;try {channel = await conn.createChannel();} catch (err) {console.error("[AMQP]", err);return setTimeout(start, 1000);}const exchangeName = 'exchange9';await channel.assertExchange(exchangeName,'direct',{durable: true,},);const deadExchangeName = 'exchange9_dead_letter';await channel.assertExchange(deadExchangeName,'direct',{durable: true,},);let routeKey = 'route.key';for (let i = 0; i < 4; ++i) {// console.log('message send!', channel.sendToQueue(//   queueName,//   Buffer.from(`发送消息,${i}${Math.ceil(Math.random() * 100000)}`),//   { persistent: true, correlationId: 'ooooooooooooooo' },// 消息持久化,重启后存在//   // (err: any, ok: Replies.Empty)=>{}// ));let num = Math.ceil(Math.random() * 100000);console.log('消息发送是否成功', num, routeKey, channel.publish(exchangeName,routeKey,Buffer.from(`"发送消息, index:${i}, number:${num}, routeKey:${JSON.stringify(routeKey)}"`),{persistent: true,},));}setTimeout(() => {conn.close();process.exit(0);}, 1000);} catch (err) {console.error("[AMQP]", err);return setTimeout(start, 1000);}
    }start();
    

    consumer.ts

    import RabbitMQ, { type Replies } from 'amqplib/callback_api';RabbitMQ.connect('amqp://admin:admin1234@localhost:5672', (err0, conn) => {if (err0) {console.error(err0);return;}conn.createChannel(function (err1, channel) {console.log('[*] waiting...');const exchangeName = 'exchange9';const routeKey = 'route.key';const deadExchangeName = 'exchange9_dead_letter';const deadRouteKey = 'route.key.dead.letter';// 一次只有一个未确认消息,防止消费者过载channel.prefetch(1);// 主队列const queueName = 'queue10';channel.assertQueue(queueName, {durable: true,  // 队列是否持久化,确保队列在 RabbitMQ 重启后仍然存在arguments: {// 对应的死信交换机(空字符串表示默认交换机)'x-dead-letter-exchange': deadExchangeName,// 死信队列的路由键'x-dead-letter-routing-key': deadRouteKey,// 队列消息过期时间10s'x-message-ttl': 10000,},});channel.bindQueue(queueName,exchangeName,routeKey,{},(err, ok) => {console.log(queueName, '队列绑定结果', err, ok);},);// channel.consume(queueName, function (msg) {//   if (Math.ceil(Math.random() * 100) > 50 && msg) {//     console.log(`队列'${queueName}'接收到的消息`, msg?.content.toString());//     channel.ack(msg);//   } else if (msg) {//     console.log(`队列'${queueName}'拒绝接收消息`, msg?.content.toString());//     // 第二个参数,false拒绝当前消息//     // 第二个参数,true拒绝小于等于当前消息//     // 第三个参数,3false从队列中清除//     // 第三个参数,4true从新在队列中排队//     channel.nack(msg, false, false);//   } else {//   }// }, {//   // noAck: true, // 是否自动确认消息,为true不需要调用channel.ack(msg);//   noAck: false,//   arguments: {}// }, (err: any, ok: Replies.Empty) => {//   console.log(err, ok);// });});conn.on("error", function (err1) {if (err1.message !== "Connection closing") {console.error("[AMQP] conn error", err1.message);}});conn.on("close", function () {console.error("[AMQP] reconnecting");});
    });
    

    dead_letter.ts

    import RabbitMQ, { type Replies } from 'amqplib/callback_api';RabbitMQ.connect('amqp://admin:admin1234@localhost:5672', (err0, conn) => {if (err0) {console.error(err0);return;}conn.createChannel(function (err1, channel) {console.log('[*] waiting...');const deadExchangeName = 'exchange9_dead_letter';const deadRouteKey = 'route.key.dead.letter';// 一次只有一个未确认消息,防止消费者过载channel.prefetch(1);// 死信队列const deadQueueName = 'queue11';channel.assertQueue(deadQueueName, {durable: true,  // 保证死信队列持久化});channel.bindQueue(deadQueueName, deadExchangeName, deadRouteKey);channel.consume(deadQueueName, (msg) => {console.log(`死信队列'${deadQueueName}'接收到的消息:`, msg?.content.toString());if (msg) {channel.ack(msg);}}, {// noAck: true, // 是否自动确认消息,为true不需要调用channel.ack(msg);noAck: false,arguments: {}});});conn.on("error", function (err1) {if (err1.message !== "Connection closing") {console.error("[AMQP] conn error", err1.message);}});conn.on("close", function () {console.error("[AMQP] reconnecting");});
    });
    

     

     

    本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/18065.html

    如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

    相关文章

    ubuntu服务器部署

    关闭欢迎消息 服务器安装好 ubuntu 系统后&#xff0c;进行终端登录&#xff0c;会显示出很多的欢迎消息 通过在用户的根目录下执行 touch .hushlogin 命令&#xff0c;再次登录终端就不会出现欢迎消息 修改hostname显示 修改 /etc/hostname 文件内容为主机名&#xff0c;保…

    排序算法——人无完人

    没有哪一个排序方法是完美的&#xff0c;对于不同的需求&#xff0c;排序算法各有自己的优势。金无足赤&#xff0c;人无完人。 &#xff08;这里不再重复所讲排序算法的实现&#xff0c;网上已有很多好的教学。&#xff09; 排序方法除了依靠时间复杂度和空间复杂度来区分&am…

    3D模型可视化引擎HOOPS Visualize在桌面端的支持有哪些特点?

    在数字化转型日益加速的今天&#xff0c;工业和工程领域的专业人员面临着越来越复杂的设计和数据分析需求。3D模型可视化技术应运而生&#xff0c;并成为了加速设计和制造流程的关键工具。作为业界领先的3D可视化引擎之一&#xff0c;HOOPS Visualize提供了一系列强大的桌面端支…

    傅里叶变换推导

    基本模型 假设在二维直角坐标系中&#xff0c;可以用相互垂直的基向量和表示&#xff1a; 假设&#xff1a; 假设在上的投影为&#xff0c;那么&#xff1a; 所以&#xff1a; 用公式表达&#xff1a; 但是在实际中&#xff0c;基向量和不一定长度都是1&#xff0c;重新推导一…

    数据科学之数据管理|python for office

    现如今&#xff0c;随着计算机的逐渐普及。现代化办公成为每个职场人必备的技能&#xff0c;本文档就来介绍&#xff0c;如何使用pytohn实现自动化办公。然而&#xff0c;自动化办公有时并不能减少工作量。自动化办公更适合批量处理文档。单一的文件&#xff0c;小金不建议使用…

    【前端框架】Vue3 中 `setup` 函数的作用和使用方式

    在 Vue 3 里&#xff0c;setup 函数是组合式 API 的核心入口&#xff0c;为开发者提供了更灵活、高效的组件逻辑组织方式。以下为你详细介绍其作用和使用方式&#xff1a; 作用 1. 初始化响应式数据 在 setup 函数中&#xff0c;我们能够使用 ref 和 reactive 等函数来创建响…

    MySQL无法连接到本地localhost的解决办法2024.11.8

    问题描述&#xff1a;我的MySQL可以远程连接服务器&#xff0c;但无法连接自己的localhost。 错误提示&#xff1a; 2003 - Cant connet to MySQL server on localhost(10061 "Unknown error")查找问题原因&#xff1a; 1. 检查环境变量是否正确&#xff1a;发现没…

    STM32HAL库快速入门教程——常用外设学习(2)

    目录 一、STM32HAL库开发&#xff08;8&#xff09;——CubeMX配置DMA 1.1、什么是DMA&#xff1f; 1.2、内存内存之间的传输&#xff08;单次&#xff09; ​编辑 1.3、内存外设之间的传输&#xff08;ADC&#xff09; 二、STM32HAL库开发&#xff08;9&#xff09;——…

    LabVIEW与小众设备集成

    在LabVIEW开发中&#xff0c;当面临控制如布鲁克OPUS红外光谱仪这类小众专业设备的需求&#xff0c;而厂家虽然提供了配套软件&#xff0c;但由于系统中还需要控制其他设备且不能使用厂商的软件时&#xff0c;必须依赖特定方法通过LabVIEW实现设备的控制。开发过程中&#xff0…

    PyQt组态软件 拖拽设计界面测试

    PyQt组态软件测试 最近在研究PyQt,尝试写个拖拽设计界面的组态软件&#xff0c;目前实现的功能如下&#xff1a; 支持拖入控件&#xff0c;鼠标拖动控件位置 拖动控件边缘修改控件大小支持属性编辑器&#xff0c;修改当前选中控件的属性 拖动框选控件&#xff0c;点选控件 控…

    AI如何与DevOps集成,提升软件质量效能

    随着技术的不断演进&#xff0c;DevOps和AI的融合成为推动软件开发质量提升的重要力量。传统的DevOps已经为软件交付速度和可靠性打下了坚实的基础&#xff0c;而随着AI技术的加入&#xff0c;DevOps流程不仅能提升效率&#xff0c;还能在质量保障、缺陷预测、自动化测试等方面…

    Mac配置Flutter开发环境

    1、访问 Flutter 官网&#xff0c;下载安装Flutter SDK 2、将 Flutter 添加到 PATH 环境变量 找到用户文件夹中的.zshrc隐藏文件&#xff08;隐藏文件显示方式&#xff1a;shiftcommand.&#xff09;&#xff0c;打开.zshrc文件&#xff0c;添加Flutter SDK路径&#xff0c;注…

    Linux系统使用ollama本地安装部署DeepSeekR1 + open-webui

    Linux系统使用ollama本地安装部署DeepSeekR1 open-webui 1. 首先&#xff0c;下载安装ollama #下载安装脚本并执行 curl -fsSL https://ollama.com/install.sh | sh #安装完成后查看ollama版本 ollama --version2. 使用ollama下载deepseek #不同的参数规格对硬件有不同的要…

    【Kubernetes】常用命令全解析:从入门到实战(中)

    &#x1f407;明明跟你说过&#xff1a;个人主页 &#x1f3c5;个人专栏&#xff1a;《Kubernetes航线图&#xff1a;从船长到K8s掌舵者》 &#x1f3c5; &#x1f516;行路有良友&#xff0c;便是天堂&#x1f516; 目录 一、引言 1、什么是k8s 2、K8s的核心功能 二、资…

    [ComfyUI]腾讯开源黑科技Sonic,插件更新,更加可控啦

    一、Sonic更新介绍 大家还记得我前分享过腾讯开源的Sonic这个项目吧&#xff0c;通过照片声音就可以生成非常不错的数字人开口说话的视频。 当时我就挺满意的&#xff0c;不过那时候输出还只能输出正方形的视频&#xff0c;这点就让我留有遗憾。 今天我再去翻作者的项目官网…

    设计模式Python版 命令模式(上)

    文章目录 前言一、命令模式二、命令模式示例 前言 GOF设计模式分三大类&#xff1a; 创建型模式&#xff1a;关注对象的创建过程&#xff0c;包括单例模式、简单工厂模式、工厂方法模式、抽象工厂模式、原型模式和建造者模式。结构型模式&#xff1a;关注类和对象之间的组合&…

    微服技术栈之Spring could gateway

    0 前言 之前使用到的gateway技术栈 &#xff0c;光靠记忆可能没有记住那么多的&#xff0c;gateway当今比较主流的网关技术栈了。说到gateway&#xff0c;不得不提及Zuul&#xff0c;而Zuul已经被淘汰了。 1 概述 Could全家桶有个很重要的组件就是网关&#xff0c;在1.X版本…

    上课啦 | 2月17日软考高项【5月备考班】

    相关文章推荐 福利&#xff1a;【软考-电子书】赠送 | 信息系统项目管理师教程 软考证书以考代评评定的职称是什么&#xff1f;聘任步骤&#xff1f; 添加图片注释&#xff0c;不超过 140 字&#xff08;可选&#xff09; 软考 高 项 课程&#xff1a;2月17日开课 | 软考-高…

    小米 R3G 路由器刷机教程(Pandavan)

    小米 R3G 路由器刷机教程&#xff08;Pandavan&#xff09; 一、前言 小米 R3G 路由器以其高性价比和稳定的性能备受用户青睐。然而&#xff0c;原厂固件的功能相对有限&#xff0c;难以满足高级用户的个性化需求。刷机不仅可以解锁路由器的潜能&#xff0c;还能通过第三方固…

    【电脑】u盘重装win7

    u盘必须8GB以上 1. CPU型号 首先查看CPU的型号看看到底能不能装win7 2. 下载光盘映像文件 网址 看电脑是多少位的机器(32位下载x86 64位下载x64) 一共是这么多个版本按需下载对应的版本 电脑小白推荐无脑下载旗舰版 将链接复制到迅雷进行下载 3. 下载软碟通 网址 下…