typescript 实现RabbitMQ死信队列和延迟队列 订单10分钟未付归还库存

Manjaro安装RabbitMQ

安装

sudo pacman -S rabbitmq rabbitmqadmin

启动管理模块

sudo rabbitmq-plugins enable rabbitmq_managementsudo rabbitmq-server

管理界面
http://127.0.0.1:15672/
默认用户名和密码都是guest。
要使用 rabbitmqctl 命令添加用户并分配权限,您可以按照以下步骤进行操作:

  1. 添加用户
rabbitmqctl add_user mingcai password

请将 password 替换为您想要设置的实际密码。

  1. 分配权限
rabbitmqctl set_permissions -p / mingcai ".*" ".*" ".*"

这个命令将用户 mingcai 授予对所有虚拟主机的所有资源的读、写和管理权限。如果您只想给予特定权限,请适当调整正则表达式 ".*",以授予适当的权限。例如,如果您只想给予读取权限,可以使用 "^amq\."

  1. 可选步骤:设置用户角色

您可以将用户分配给不同的角色,以便更好地管理权限。例如,您可以将用户添加到 administrator 角色以获取管理员权限:

rabbitmqctl set_user_tags mingcai administrator

这样,用户 mingcai 就被赋予了管理员权限。

请确保您具有适当的权限来执行这些操作,并确保替换示例中的用户名和密码为您自己的实际值。

死信队列

在这里插入图片描述

在这里插入图片描述
标题:利用RabbitMQ死信队列处理消息的三种情况

在消息队列的应用中,处理异常情况和消息的延迟成为了一项重要的任务。RabbitMQ作为一款流行的消息队列服务,提供了死信队列(Dead Letter Exchange)功能,能够有效地处理消息被拒绝、消息过期以及队列达到最大长度等情况。本文将介绍如何利用RabbitMQ的死信队列来处理这三种情况,并提供了TypeScript示例代码。

1. 消息被拒绝

当消费者无法处理某条消息时,可以选择将其标记为“被拒绝”。这种情况下,我们可以配置RabbitMQ,将被拒绝的消息发送到一个死信队列,以后再处理。

// 引入amqplib库
import * as amqp from 'amqplib';// 连接到RabbitMQ服务器
const connection = await amqp.connect('amqp://localhost');// 创建Channel
const channel = await connection.createChannel();// 定义队列
const queueName = 'my_queue';
await channel.assertQueue(queueName, {// 设置死信交换机deadLetterExchange: 'my_dead_letter_exchange'
});// 消费消息
channel.consume(queueName, (msg) => {// 处理消息if (msg) {// 处理失败,拒绝消息并将其重新放回队列// channel.reject(msg, true); // 第二个参数设为 true 表示将消息重新放回队列// 处理失败,拒绝消息channel.reject(msg, false); // 第二个参数设为 false 表示将消息投递到死信队列// or 处理失败,拒绝消息并将其重新放回死信队列channel.nack(msg, false, false); // 第二个参数设为 false 表示不将消息重新放回原队列,第三个参数设为 false 表示不拒绝当前和之前所有未确认的消息}
});

2. 消息过期

有时候我们希望消息在一定时间内被处理,如果超过了这个时间,就认为消息已经过期。RabbitMQ允许我们设置消息的过期时间,并在消息过期后将其发送到死信队列。

// 发布消息
await channel.sendToQueue(queueName, Buffer.from('Hello'), {expiration: '60000' // 设置过期时间为60秒
});

3. 队列达到最大长度

为了避免队列过载,我们可以限制队列的最大长度。当队列达到最大长度时,新的消息将被拒绝,并发送到死信队列。

// 定义队列
await channel.assertQueue(queueName, {maxLength: 100, // 设置最大队列长度为100deadLetterExchange: 'my_dead_letter_exchange'
});

通过以上配置,我们可以利用RabbitMQ的死信队列来处理消息被拒绝、消息过期以及队列达到最大长度等情况,保证消息系统的稳定性和可靠性。

以上是利用TypeScript示例代码演示了如何在RabbitMQ中使用死信队列。希望这篇文章对你有所帮助!

延时队列

什么是延时队列?顾名思义:首先它要具有队列的特性,再给它附加一个延迟消费队列消息的功能,也就是说可以指定队列中的消息在哪个时间点被消费。
延时队列在项目中的应用还是比较多的,尤其像电商类平台:
1、订单成功后,在30分钟内没有支付,自动取消订单
2、外卖平台发送订餐通知,下单成功后60s给用户推送短信。
3、如果订单一直处于某一个未完结状态时,及时处理关单,并退还库存
4、淘宝新建商户一个月内还没上传商品信息,将冻结商铺等

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

npm install amqplib --save
npm install @types/amqplib --save-dev

在这里插入图片描述

总结

在这里插入图片描述

rabbitmqadmin 使用入门

rabbitmqadmin 是 RabbitMQ 的命令行管理工具,可以用于执行各种管理任务,如创建队列、交换机,查看队列状态等。以下是一些基本的用法示例:

export RABBITMQ_SERVER=127.0.0.1
export RABBITMQ_PORT=5672   
export RABBITMQ_USER=mingcai     
export RABBITMQ_PASSWORD=passwordrabbitmqadmin list exchanges
  1. 查看 RabbitMQ 服务器信息
rabbitmqadmin status
  1. 列出所有交换机
rabbitmqadmin list exchanges
  1. 列出所有队列
rabbitmqadmin list queues
  1. 创建一个交换机
rabbitmqadmin declare exchange name=my_exchange type=direct
  1. 创建一个队列
rabbitmqadmin declare queue name=my_queue
  1. 绑定队列到交换机
rabbitmqadmin declare binding source=my_exchange destination=my_queue routing_key=my_routing_key
  1. 发送消息到指定交换机
rabbitmqadmin publish exchange=my_exchange routing_key=my_routing_key payload="Hello, RabbitMQ!"
  1. 获取队列消息
rabbitmqadmin get queue=my_queue

这些命令只是一些基本用法示例,rabbitmqadmin 工具支持更多功能和选项。你可以通过运行 rabbitmqadmin help 命令来获取更详细的帮助信息,或者查看官方文档以了解更多选项和使用方法。

延时3秒和8秒全部代码

// delayProducer.ts
import * as amqp from 'amqplib';async function setupRouting() {const connection = await amqp.connect('amqp://mingcai:password@127.0.0.1');const channel = await connection.createChannel();const exchange = 'routing_exchange';// 定义 dlx-exchangeconst dlxExchangeName = 'dlx-exchange';// 声明交换机await channel.assertExchange(exchange, 'direct', {durable: true});await channel.assertExchange(dlxExchangeName, 'direct', { durable: true });//消息防止丢失const dlxqueueBindings= [{dlxQueueName: 'dlx-3_second_queue', routingKey: 'fast',},{dlxQueueName: 'dlx-8_second_queue', routingKey: 'slow'}];for (const binding of dlxqueueBindings) {// 绑定延迟死信队列await channel.assertQueue(binding.dlxQueueName );//死信交换机和死信队列绑定 Routing key fast 的消息await channel.bindQueue(binding.dlxQueueName, dlxExchangeName, binding.routingKey); // 将 dlx-queue 绑定到死信交换机}// 定义队列和路由键的映射const queueBindings = [{queue: '3_second_queue', routingKey: 'fast', arguments: {'x-message-ttl': 3000, // TTL 设置为 3 秒 消息被拒绝或过期时将重新发布到的交换器的可选名称。'x-dead-letter-exchange': 'dlx-exchange'//消息被拒绝或过期时将重新发布到的交换器的可选名称}},{queue: '8_second_queue', routingKey: 'slow', arguments: {'x-message-ttl': 8000, // TTL 设置为 8 秒 消息被拒绝或过期时将重新发布到的交换器的可选名称。'x-dead-letter-exchange': 'dlx-exchange'//消息被拒绝或过期时将重新发布到的交换器的可选名称}}];// 声明队列,并将队列绑定到交换机上for (const binding of queueBindings) {await channel.assertQueue(binding.queue, {durable: true, arguments: binding.arguments});await channel.bindQueue(binding.queue, exchange, binding.routingKey);}for (let i = 0; i < 10; i++) {await new Promise((resolve) => {setTimeout(() => {resolve(1)}, 1000)})const chinaTime = new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Shanghai' });console.log('当前中国时间:', chinaTime);// 发送消息到交换机,并设置不同的路由键await sendMessage(channel, exchange, 'fast', `[${i}] ${chinaTime} Message for the fast queue`);await sendMessage(channel, exchange, 'slow', `[${i}] ${chinaTime} Message for the slow queue`);}// 关闭连接setTimeout(async () => {await channel.close();await connection.close();}, 10000); //10 秒后关闭连接
}async function sendMessage(channel: amqp.Channel, exchange: string, routingKey: string, message: string) {channel.publish(exchange, routingKey, Buffer.from(message));console.log(`Sent message '${message}' with routing key '${routingKey}'`);
}setupRouting().catch(console.error);
//消费者 dlx-3_second_queue.ts
import * as amqp from 'amqplib';async function setupRouting() {const connection = await amqp.connect('amqp://mingcai:password@127.0.0.1');const channel = await connection.createChannel();let queue = 'dlx-3_second_queue'// 定义队列和路由键的映射await channel.consume(queue, (msg) => {if (msg !== null) {const chinaTime = new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Shanghai' });console.log(`Received message ${chinaTime}'${msg.content.toString()}' from queue '${queue}'`);channel.ack(msg); // 确认消息已被处理}});}setupRouting().catch(console.error);
//dlx-8_second_queue.ts
import * as amqp from 'amqplib';async function setupRouting() {const connection = await amqp.connect('amqp://mingcai:password@127.0.0.1');const channel = await connection.createChannel();let queue = 'dlx-8_second_queue'// 定义队列和路由键的映射await channel.consume(queue, (msg) => {if (msg !== null) {const chinaTime = new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Shanghai' });console.log(`Received message ${chinaTime}'${msg.content.toString()}' from queue '${queue}'`);channel.ack(msg); // 确认消息已被处理}});}setupRouting().catch(console.error);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/294194.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java类和对象练习题

练习一 下面代码的运行结果是&#xff08;&#xff09; public static void main(String[] args){String s;System.out.println("s"s);} 解析&#xff1a;本题中的代码不能编译通过&#xff0c;因为在Java当中局部变量必须先初始化&#xff0c;后使用。所以此处编译不…

【Python刷题】将有序数组转换为二叉搜索树

问题描述 给你一个整数数组 nums &#xff0c;其中元素已经按 升序 排列&#xff0c;请你将其转换为一棵 平衡 二叉搜索树。 高度平衡的意思是&#xff1a;二叉树是一颗满足“每个结点的左右两个子树的高度差的绝对值不超过1”的二叉树。 示例 1&#xff1a; 输入&#xf…

农村集中式生活污水分质处理及循环利用技术指南

立项单位&#xff1a;生态环境部土壤与农业农村生态环境监管技术中心、山东文远环保科技股份有限公司、北京易境创联环保有限公司、中国环境科学研究院、广东省环境科学研究院、中铁第五勘察设计院集团有限公司、中华环保联合会水环境治理专业委员会 本文件规定了集中式村镇生活…

Stable Diffusion 模型下载:epiCPhotoGasm(真实、照片)

本文收录于《AI绘画从入门到精通》专栏,专栏总目录:点这里,订阅后可阅读专栏内所有文章。 文章目录 模型介绍生成案例案例一案例二案例三案例四案例五案例六案例七案例八下载地址模型介绍

语音克隆技术浪潮:探索OpenAI Voice Engine的奇妙之旅

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

基于SpringBoot的“校园志愿者管理系统”的设计与实现(源码+数据库+文档+PPT)

基于SpringBoot的“校园志愿者管理系统”的设计与实现&#xff08;源码数据库文档PPT) 开发语言&#xff1a;Java 数据库&#xff1a;MySQL 技术&#xff1a;SpringBoot 工具&#xff1a;IDEA/Ecilpse、Navicat、Maven 系统展示 系统总体结构图 系统首页界面图 志愿者注册…

游戏引擎中的声音系统

一、声音基础 1.1 音量 声音振幅的大小 压强p&#xff1a;由声音引起的与环境大气压的局部偏差 1.2 音调 1.3 音色 1.4 降噪 1.5 人的听觉范围 1.6 电子音乐 将自然界中连续的音乐转换成离散的信号记录到内存中 采样 - 量化 - 编码 香农定理&#xff1a;采样频率是信…

探究云手机的海外原生IP优势

随着全球数字化进程的加速&#xff0c;企业越来越依赖于网络来扩展其业务。在这个数字时代&#xff0c;云手机作为一种创新的通信技术&#xff0c;已经成为了企业网络优化的重要组成部分。云手机支持海外原生IP的特性&#xff0c;为企业在国际市场上的拓展提供了全新的可能性。…

idea中 错误:找不到或无法加载主类

很神奇的就是maven打包是正常的&#xff0c;本来也是好好的&#xff0c;突然启动就报错了&#xff0c;我百度了很急&#xff0c;没什么结果&#xff0c;找了公司6年工作经验的老员工&#xff0c;还是搞了好久&#xff0c;我站了好久也是没解决。后来我也是在想maven的jar包都能…

【每日一题】2810. 故障键盘-2024.4.1

题目&#xff1a; 2810. 故障键盘 你的笔记本键盘存在故障&#xff0c;每当你在上面输入字符 i 时&#xff0c;它会反转你所写的字符串。而输入其他字符则可以正常工作。 给你一个下标从 0 开始的字符串 s &#xff0c;请你用故障键盘依次输入每个字符。 返回最终笔记本屏幕…

ROS2从入门到精通1-2:详解ROS2服务通信机制与自定义服务

目录 0 专栏介绍1 服务通信模型2 服务模型实现(C)3 服务模型实现(Python)4 自定义服务5 话题、服务通信的异同 0 专栏介绍 本专栏旨在通过对ROS2的系统学习&#xff0c;掌握ROS2底层基本分布式原理&#xff0c;并具有机器人建模和应用ROS2进行实际项目的开发和调试的工程能力。…

vscode通过ssh连接服务器(吐血总结)

一、通过ssh连接服务器 1、打开vscode&#xff0c;进入拓展&#xff08;CtrlShiftX&#xff09;&#xff0c;下载拓展Remote - SSH。 2、点击远程资源管理器选项卡&#xff0c;选择远程&#xff08;隧道/SSH&#xff09;类别。 3、点击SSH配置。 4、在中间上部分弹出的配置文件…

目标检测:数据集划分 XML数据集转YOLO标签

文章目录 1、前言&#xff1a;2、生成对应的类名3、xml转为yolo的label形式4、优化代码5、划分数据集6、画目录树7、目标检测系列文章 1、前言&#xff1a; 本文演示如何划分数据集&#xff0c;以及将VOC标注的xml数据转为YOLO标注的txt格式&#xff0c;且生成classes的txt文件…

Navicat工具使用

Navicat的本质&#xff1a; 在创立连接时提前拥有了数据库用户名和密码 双击数据库时&#xff0c;相当于建立了一个链接关系 点击运行时&#xff0c;远程执行命令&#xff0c;就像在xshell上操作Linux服务器一样&#xff0c;将图像化操作转换成SQL语句去后台执行 一、打开Navi…

文生图大模型三部曲:DDPM、LDM、SD 详细讲解!

1、引言 跨模态大模型是指能够在不同感官模态(如视觉、语言、音频等)之间进行信息转换的大规模语言模型。当前图文跨模态大模型主要有&#xff1a; 文生图大模型&#xff1a;如 Stable Diffusion系列、DALL-E系列、Imagen等 图文匹配大模型&#xff1a;如CLIP、Chinese CLIP、…

【TensorRT】TensorRT C# API 项目介绍:基于C#与TensorRT部署深度学习模型(下篇)

文章目录 4. 接口调用4.1 创建并配置C#项目4.2 添加推理代码4.3 项目演示 5. 总结 4. 接口调用 4.1 创建并配置C#项目 首先创建一个简单的C#项目&#xff0c;然后添加项目配置。 首先是添加TensorRT C# API 项目引用&#xff0c;如下图所示&#xff0c;添加上文中C#项目生成的…

Intel Arc显卡安装Stable Diffusion

StableDiffusion是一种基于深度学习的文本到图像生成模型&#xff0c;于2022年发布。它主要用于根据文本描述生成详细图像&#xff0c;也可应用于其他任务&#xff0c;如内补绘制、外补绘制和在提示词指导下生成图像翻译。通过给定文本提示词&#xff0c;该模型会输出一张匹配提…

Linux如何连接github仓库

一.创建一个github账号 如何创建一个github账号 二.在github上创建一个仓库 登录上github后出现这个界面 然后点击左上角头像&#xff0c;在按照图片位置点击&#xff1a; 继续按照图片上的位置进行点击&#xff1a; 创建成功&#xff1a; 三.云主机连接Github仓库 1.在linux中…

今日学到的小知识点:

1.快读&#xff1a;当用cin和scanf都不能满足要求的读入速度时&#xff0c;可以用getchar手写一个快读函数 C代码&#xff1a; inline int read() {int flag 1;//判断符号位int res 0;char ch getchar();while (ch < 0 || ch>9) {//若不为数字&#xff0c;则判断符号…

notepad++里安装32位和64位的16进制编辑器Hex-Editor

这个16进制编辑器确实是个好东西&#xff0c;平时工作种会经常用到&#xff0c; 这是hex-editor的官网。这个里边只能下载32位的(64位的看最下边)&#xff0c;选一个合适的版本&#xff0c;我当时选的是最新的版本 https://sourceforge.net/projects/npp-plugins/files/Hex%20E…