RabbitMQ简单模式和工作模式

RabbitMQ 是一个消息队列中间件,用于在分布式系统中进行消息传递。在 RabbitMQ 中,有几种工作模式,其中简单模式和工作模式是其中两种基本的模式之一。

  1. 简单模式(Simple Mode):

    • 在简单模式中,有一个生产者(Producer)将消息发送到一个队列(Queue)中,然后有一个消费者(Consumer)从队列中接收并处理消息。
    • 这是最基本的消息队列模式,适用于单个生产者和单个消费者的场景。
    • 生产者将消息发送到队列,而消费者从队列中接收并处理消息,消息的传递是单向的。
  2. 工作模式(Work Queue Mode):

    • 工作模式也被称为竞争消费者模式。在这种模式下,有多个消费者监听同一个队列,但每条消息只能被其中一个消费者接收和处理。
    • 当消息被发送到队列时,它将被发送给下一个空闲的消费者,从而实现消息的分发和并发处理。
    • 这种模式对于处理大量工作的情况很有用,可以通过增加消费者的数量来提高消息处理的速度。

在 RabbitMQ 中,简单模式和工作模式的实现通常使用一些基本的概念,包括生产者、消费者、队列和消息。生产者负责发送消息到队列,而消费者则负责从队列中接收和处理消息。

下面是一个使用 RabbitMQ 和 Node.js(使用 amqplib 库)以及 TypeScript 实现工作模式的简单示例。在这个例子中,我们将使用 amqplib 库来连接 RabbitMQ 服务器,并使用 TypeScript 来编写代码。

首先,确保你已经安装了 amqplib 库。可以使用以下命令进行安装:

npm install amqplib

接下来,创建一个生产者和一个消费者的 TypeScript 文件。以下是示例代码:

producer.ts:

import * as amqp from 'amqplib';async function produce() {const connection = await amqp.connect('amqp://localhost');const channel = await connection.createChannel();const queue = 'work_queue';await channel.assertQueue(queue, { durable: true });for (let i = 0; i < 10; i++) {const message = `Message ${i}`;channel.sendToQueue(queue, Buffer.from(message), { persistent: true });console.log(` [x] Sent '${message}'`);}setTimeout(() => {connection.close();process.exit(0);}, 500);
}produce();

consumer.ts:

import * as amqp from 'amqplib';async function consume() {const connection = await amqp.connect('amqp://localhost');const channel = await connection.createChannel();const queue = 'work_queue';await channel.assertQueue(queue, { durable: true });// 设置每次只处理一个消息[平均分配的概念,不会让一个work太忙和太闲]//这一行代码的作用是告诉 RabbitMQ 不要在消费者未确认(ack)之前向其发送新的消息await channel.prefetch(1);console.log(' [*] Waiting for messages. To exit press CTRL+C');await channel.consume(queue, async (msg) => {if (msg !== null) {const message = msg.content.toString();console.log(` [x] Received ${message}`);// Simulate some workawait new Promise(resolve => setTimeout(resolve, 1000));console.log(' [x] Done');channel.ack(msg);}});
}consume();

这个示例中,生产者将消息发送到名为 work_queue 的队列中,而消费者则监听该队列并处理消息。消费者使用 channel.prefetch(1) 来确保一次只接收一个消息,从而实现竞争消费者模式。

记得在运行前启动 RabbitMQ 服务器,并确保 TypeScript 文件已编译成 JavaScript。你可以使用以下命令进行编译:

tsc producer.ts
tsc consumer.ts

然后,分别运行 producer.jsconsumer.js。这样你就可以在 RabbitMQ 中看到消息的生产和消费过程。

RabbitMQ消息持久化和手动应答

在 RabbitMQ 中,消息持久化和手动应答是两个关键的概念,它们可以帮助确保消息的可靠传递和处理。下面简要介绍这两个概念:

  1. 消息持久化(Message Durability):

    • 默认情况下,RabbitMQ 中的消息是瞬时的,也就是说,如果 RabbitMQ 服务器停止或崩溃,所有未处理的消息都会丢失。
    • 通过将消息标记为持久化,你可以确保消息在 RabbitMQ 服务器重启后仍然可用。要实现消息持久化,需要在发送消息时设置消息的 deliveryMode 属性为 2persistent)。
    • 例如,在生产者端设置消息为持久化:
    channel.sendToQueue(queue, Buffer.from(message), { persistent: true });
    
    • 在消费者端,你需要确保队列和消息都被声明为持久化:
    channel.assertQueue(queue, { durable: true });
    

    这样,即使 RabbitMQ 服务器重启,持久化的消息也不会丢失。

  2. 手动应答(Manual Acknowledgment):

    • 默认情况下,RabbitMQ 使用自动应答(auto-acknowledgment)模式,即一旦消息被传递给消费者,RabbitMQ 就将其标记为已处理。
    • 在某些情况下,你可能需要更细粒度的控制,以确保消息在被消费者完全处理之后才被标记为已处理。这就是手动应答的用途。
    • 在消费者端,需要将 noAck 设置为 false,表示手动应答模式:
channel.consume(queueName, async (msg: Message | null) => {if (msg) {const data: EmailTask = JSON.parse(msg.content.toString());console.log('Processing mail task:', msg.content.toString());try {//模拟邮件发送await new Promise(resolve => setTimeout(resolve, 1000));console.log(' [x] Done');channel.ack(msg);} catch (error) {console.log('error:', data);// 处理消息失败,判断是否需要进行重试if (canRetry(msg)) {// 重新入队,进行下一次尝试channel.reject(msg, true);} else {// 不进行重试,将消息从队列中移除channel.ack(msg);}}}
}, { noAck: false });//默认false 
  • 在这种情况下,消费者需要在处理完消息后显式调用 channel.ack(msg) 来确认消息已被处理。如果消费者崩溃或在处理消息时发生错误,消息将保持在队列中,直到被明确确认。
  • 在 RabbitMQ 中,channel.reject 方法用于拒绝一条消息。它的参数如下channel.reject(msg, requeue);
    msg: 要拒绝的消息对象。
    requeue: 如果设置为 true,则被拒绝的消息将被重新排队,即重新放回队列。如果设置为 false,则消息将被删除。默认为 true。

综合使用消息持久化和手动应答,可以确保在面对不同情况时,消息的可靠传递和处理。

重试间隔和次数

在这里插入图片描述
在这里插入图片描述

  1. 重新投递消息并设置头部信息:

    • 在处理消息失败时,将消息重新投递到队列,并设置一个头部信息,例如 x-redelivered-count,用来记录消息的重试次数。
    • 在消费者端,根据这个头部信息来判断是否达到重试次数的上限,如果是,则不再重新投递,可能将消息放入死信交换机。
  2. 使用外部存储记录重试次数:

    • 每次消息处理失败时,将消息的唯一标识(例如 UUID)和重试次数记录到外部存储中(例如 Redis、Memcache、MySQL)。
    • 在消费者端,在每次重新处理时,从外部存储中获取当前重试次数,并判断是否达到重试次数的上限。
  3. 自定义插件:

    • 编写一个 RabbitMQ 插件,实现自定义的消息重试逻辑,包括记录重试次数、判断是否重新投递等。
    • 这样可以更灵活地控制消息的处理流程。

需要注意的是,这些方法都是基于 RabbitMQ 不直接提供重试次数限制的情况下的一些自定义实践。在回答中也提到了关于 quorum queues 的更新,以及支持通过策略(policy)来限制重投递次数的可能性。因此,具体的实现方式可能会随着 RabbitMQ 版本的更新而有所变化。

await channel.consume(queueName, async (msg: Message | null) => {if (msg) {const data: EmailTask = JSON.parse(msg.content.toString());let retryCount = msg.properties.headers['x-retry-count'] || 0;console.log('Processing message:', data);console.log('Retry count:', retryCount);try {if (data.to.includes('recipient1@example.com')) {throw new Error('邮件发送失败...');}//发送邮件await new Promise(resolve => setTimeout(resolve, 1000));channel.ack(msg);} catch (error) {console.log('error:', data);// 增加重试次数retryCount++;// 判断是否达到最大重试次数if (retryCount < maxRetryAttempts) {// 重新发送消息到队列channel.sendToQueue(queueName, msg.content, {persistent: true,headers: {'x-retry-count': retryCount,},});} else {// 不进行重试,将消息从队列中移除channel.ack(msg);}}}
});

请添加图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/248024.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

docker-学习-1

docker-学习-第一天 docker-学习-第一天1.docker是什么&#xff1f;容器的好处docker现况理解docker 2.在centos7中安装docker2.1安装步骤 3.docker里边三个非常重要的概念3.1安装一个阿里云的国内的镜像加速器&#xff0c;可以到阿里云上下载镜像了 4.docker的命令4.1 docker …

最优化基础 - (最优化问题分类、凸集)

系统学习最优化理论 什么是最优化问题&#xff1f; 决策问题&#xff1a; &#xff08;1&#xff09;决策变量 &#xff08;2&#xff09;目标函数&#xff08;一个或多个&#xff09; &#xff08;3&#xff09;一个可由可行策略组成的集合&#xff08;等式约束或者不等式约束…

Python根据Excel表进行文件重命名

一、问题背景 在日常办公过程中&#xff0c;批量重命名是经常使用的操作。之前我们已经进行了初步探索&#xff0c;主要是通过批处理文件、renamer软件或者Python中的pathlib等模块对当前目录下的文件进行批量重命名。 而今天我们要使用的是PythonExcel的方法对指定目录下的文…

[远程桌面]技术支持小技巧

需求场景&#xff1a;可以连接到现场的Windows工作站&#xff0c;想要Linux桌面 win R &#xff0c;运行mstsc命令 XRDP远程桌面默认端口3389 输入用户名密码即可远程

旅游业web系统产品设计对比

一、背景 博主主要做的行业属于旅游业&#xff0c;所有今天想对比一下行业内各web系统。看看产品是怎么设计的。其实不对比也知道都差不多&#xff0c;一个行业建设起来&#xff0c;同质化程度都会很高。如果没什么创新的话&#xff0c;大家都大同小异。 二、途牛 首页跟团游自…

【ArcGIS微课1000例】0098:查询河流流经过的格网

本实验讲述,ArcGIS中查询河流流经过的格网,如黄河流经过的格网、县城、乡镇、省份等。 文章目录 一、加载数据二、空间查询三、结果导出四、注意事项一、加载数据 加载实验配套数据0098.rar中的河流(黄河)和格网数据,如下图所示: 接下来,将查询河流流经过的格网有哪些并…

iText操作pdf

最近有个任务是动态的创建pdf根据获取到的内容&#xff0c;百度到的知识点都比较零散&#xff0c;官方文档想必大家也不容易看懂。下文是我做出的汇总 public class CreatePdfUtils {public static void create(){//准备File file new File("C:\\code\\base-project-back…

Linux学习20 使用FRP进行内网穿透实现远程访问

Linux学习20 使用FRP进行内网穿透实现远程访问 一、FRP简介1. 简介2. 准备环境3. toml文件4. toml文件语法&#xff08;1&#xff09;表&#xff08;Table&#xff09;&#xff08;2&#xff09;键值对&#xff08;3&#xff09;数组&#xff08;4&#xff09;布尔值&#xff0…

高等数学:无穷小/大、极限运算/存在法则、连续性/间断点

参考课程&#xff1a;【建议收藏】同济七版《高等数学》精讲视频 | 期末考试 | 考研零基础 | 高数小白_哔哩哔哩_bilibili 仅供本人学习之用 无穷小 无穷小不是指趋近于负无穷&#xff0c;而是趋近于0 比如 这里x-2就叫做x趋近于2时的无穷小&#xff1b;1/x就叫做x趋近于无穷时…

代码随想录Day35 | 860.柠檬水找零 406.根据身高重建队列 452. 用最少数量的箭引爆气球

代码随想录Day35 | 860.柠檬水找零 406.根据身高重建队列 452. 用最少数量的箭引爆气球 860.柠檬水找零406.根据身高重建队列vector与list 452.用最少数量的箭引爆气球 860.柠檬水找零 文档讲解&#xff1a;代码随想录 视频讲解&#xff1a; 贪心算法&#xff0c;看上去复杂&am…

20240130在ubuntu20.04.6下给GTX1080安装最新的驱动和CUDA

20240130在ubuntu20.04.6下给GTX1080安装最新的驱动和CUDA 2024/1/30 12:27 缘起&#xff0c;为了在ubuntu20.4.6下使用whisper&#xff0c;以前用的是GTX1080M&#xff0c;装了535的驱动。 现在在PDD拼多多上了入手了一张二手的GTX1080&#xff0c;需要将安装最新的545的驱动程…

怎么控制Element的数据树形表格展开所有行;递归操作,打造万能数据表格折叠。

HTML <el-button type"success" size"small" click"expandStatusFun"> <span v-show"expandStatusfalse"><i class"el-icon-folder-opened"></i>展开全部</span><span v-show"expan…

为什么需要 SSL 证书?

网站需要 SSL 证书来确保用户数据的安全&#xff0c;验证网站的所有权&#xff0c;防止攻击者创建虚假网站版本&#xff0c;以及将信任传达给用户。 如果网站要求用户登录、输入个人详细信息&#xff08;例如其信用卡号&#xff09;或查看机密信息&#xff08;例如&#xff0c…

C++入门(一)— 使用VScode开发简介

文章目录 C 介绍C 擅长领域C 程序是如何开发编译器、链接器和库编译预处理编译阶段汇编阶段链接阶段 安装集成开发环境 &#xff08;IDE&#xff09;配置编译器&#xff1a;构建配置配置编译器&#xff1a;编译器扩展配置编译器&#xff1a;警告和错误级别配置编译器&#xff1…

Ubuntu 20.04 Server 使用命令行设置 IP 地址

1、编辑 /etc/netplan/ 目录下的配置文件00-installer-config.yaml (修改之前&#xff0c;把原来的文件备份) 按照对应的配置进行修改IP地址和网关 2、运行命令使其生效 sudo netplan apply 修改完成后&#xff0c;永久有效。重启后配置不会丢失

关于bypassuac的探究——基础知识

用户帐户控制(User Account Control)是Windows Vista&#xff08;及更高版本操作系统&#xff09;中一组新的基础结构技术&#xff0c;可以帮助阻止恶意程序&#xff08;有时也称为“恶意软件”&#xff09;损坏系统&#xff0c;同时也可以帮助组织部署更易于管理的平台。 使用…

【网站项目】基于SSM的204面向工程教育专业认证的毕业生跟踪调查反馈系统

&#x1f64a;作者简介&#xff1a;拥有多年开发工作经验&#xff0c;分享技术代码帮助学生学习&#xff0c;独立完成自己的项目或者毕业设计。 代码可以私聊博主获取。&#x1f339;赠送计算机毕业设计600个选题excel文件&#xff0c;帮助大学选题。赠送开题报告模板&#xff…

PyTorch的nn.Module类的详细介绍

在PyTorch中&#xff0c;nn.Module 类是构建神经网络模型的基础类&#xff0c;所有自定义的层、模块或整个神经网络架构都需要继承自这个类。nn.Module 类提供了一系列属性和方法用于管理网络的结构和训练过程中的计算。 1. PyTorch中nn.Module基类的定义 在PyTorch中&#xff…

谷达冠楠:抖音开店卖什么退货率低

在抖音开设电商店铺&#xff0c;选择合适的商品对于降低退货率至关重要。商品的质量和满足消费者需求是保证低退货率的关键因素。例如&#xff0c;日常必需品如个人护理用品、家居清洁工具等因其使用频率高和需求稳定&#xff0c;通常拥有较低的退货率。另外&#xff0c;独特性…

HiveSQL题——窗口函数(lag/lead)

目录 一、窗口函数的知识点 1.1 窗户函数的定义 1.2 窗户函数的语法 1.3 窗口函数分类 1.4 前后函数:lag/lead 二、实际案例 2.1 股票的波峰波谷 0 问题描述 1 数据准备 2 数据分析 3 小结 2.2 前后列转换&#xff08;面试题&#xff09; 0 问题描述 1 数据准备 …