RabbitMQ 消息队列

1. 消息队列是什么?

当用户注册成功后,就发送邮件。当邮件发送成功了,接口才会提示注册成功信息。但由于发送邮件,依赖于其他厂商的服务,有可能他们的接口会非常耗时。那么用户就一直要等着邮件发送成功了,才提示注册成功。因为耗时很长,这样就会造成很糟糕的体验。

然后大家想一想,我们到底有没有必要等着邮件发送完成呢?就好比大家去邮局寄信,当你把信丢入邮筒里,你需要一直等着信送到别人手里,然后你再离开吗?

当然不需要,只要将信丢入邮筒,我们人就可以离开了。因为你知道,在将来的某个时候,你的信就会被寄到收件人手里。

消息队列,就好比是这里寄信的过程!消息队列由三部分构成:

在这里插入图片描述

  • 生产者(Producer):创建并发送消息,相当于去邮局寄信的人。
  • 队列(Queue):用来存放消息的,将消息从生产者传递到消费者,就相当于邮局了。邮局收到你的任务后,它去处理是需要排队的。按照先进先出的原则,需要等其他先来的任务处理完了,才会处理你的任务。
  • 消费者(Consumer):接收并处理消息的人。当邮局把信送到他手里了,收到消息了,最终处理任务的人。

还是以我们注册功能为例。当用户注册成功了,我们就通知RabbitMQ,你去发个邮件,然后就直接提示用户注册成功。我们完全没有必要等待邮件发送完成。

接着再写一个程序去接收消息,当它收到消息了,就去执行发邮件任务了。

所以,生产者、消费者,都是我们要写的程序。一个负责提交任务,另一个负责处理任务。互相之间通信,就靠中间的RabbitMQ。

2. 安装

2.1. 使用 Docker

当然,我最推荐的还是用Docker,因为太方便了,大家打开docker-compose.yml。增加RabbitMQ的配置,注意缩进非常重要,一定要和之前的MySQL和Redis的配置对齐

services:mysql:# ....redis:# ....rabbitmq:image: rabbitmq:4.0-managementports:- "5672:5672"- "15672:15672"volumes:- ./data/rabbitmq:/var/lib/rabbitmq # 持久化数据environment:RABBITMQ_DEFAULT_USER: adminRABBITMQ_DEFAULT_PASS: xw
  • 我们这里使用的RabbitMQ版本号是:4.0
  • 将RabbitMQ运行的默认端口567215672,映射到本机上。
  • 其中15672RabbitMQ自带的管理后台端口号
  • 接着将RabbitMQ的用来持久化数据的备份数据映射到项目根目录的data/rabbitmq目录中。
  • 最下面的是RabbitMQ用来连接的账号密码,大家可以根据需要自己设置。
    完成后,命令行,再次运行
docker-compose up -d

这样Docker,就会自动下载,并启动好RabbitMQ。
在这里插入图片描述

2.2. 直接安装

对于部分Windows上用不了Docker的同学,那就有点麻烦了。因为既要安装Erlang,又要安装RabbitMQ。这里附上下载地址:

  • Erlang下载地址:https://www.erlang.org/downloads
    -在这里插入图片描述

  • RabbitMQ下载地址:https://www.rabbitmq.com/docs/install-windows#downloads

  • 在这里插入图片描述
    对于这种安装方式,如果在安装过程出现了各种问题,请来讨论群中和我讨论。总而言之,我还是推荐大家尽可能,想办法将Docker跑通。

2.3.1 安装完ErlangRabbitMQ后,添加环境变量

打开环境变量设置窗口: 右键点击 “此电脑”,选择 “属性”。 在弹出的窗口中,点击 “高级系统设置”。 在 “系统属性” 窗口中,点击 “环境变量” 按钮。 添加 RabbitMQ 命令路径到系统的 Path 变量: 在 “系统变量” 列表中找到 “Path” 变量,选中它并点击 “编辑” 按钮。 在弹出的 “编辑环境变量” 窗口中,点击 “新建”,然后将 RabbitMQ 的 sbin目录路径添加进去,
例如 C:\Program Files\RabbitMQServer\rabbitmq_server-<version>\sbin。注意要将 替换为你实际安装的 RabbitMQ 版本号。 点击 “确定” 保存设置,依次关闭所有窗口 具体选择你自己安装的路径>。

2.3.2 通过命令行(管理员)安装rabbitmq_management 插件

RabbitMQ 的管理界面依赖于rabbitmq_management 插件

rabbitmq-plugins enable rabbitmq_management
2.3.3 通过命令行(管理员)运行
rabbitmq-service start
2.3. RabbitMQ 管理后台

启动好后,稍等个几秒钟,用浏览器访问:http://127.0.0.1:15672/,就能进入RabbitMQ的管理界面了

以游客模式登录 账号:guest 密码:guest

2.3.2.1 新增账号密码

在这里插入图片描述

在这里插入图片描述
输入刚才docker-compose.yml里的设置账号密码,就能进去了。里面的东西很多,先不用管它,一会儿再来看。

在这里插入图片描述

3. 官方案例解析

学习任何东西之前,最好的办法,无疑就是看官方文档了。
直接看JavaScript章节的Hello World部分。很明显,用之前先要装包

npm i amqplib

amqplib官方文档

3.1. 生产者:send.js

public目录里,新建一个send.js,代码大家从讲义文档直接复制过来。

amqplib.connect(‘amqp://admin:xw@localhost’) 参数详解

字段注解
amqp://表示使用 AMQP 协议进行连接
admin连接到 RabbitMQ 服务器时使用的用户名,用于身份验证
xw是与用户名对应的密码,用于验证用户身份。用户名和密码通过冒号 : 分隔
localhost指定了 RabbitMQ 服务器所在的主机地址,这里表示服务器运行在本地机器上。如果 RabbitMQ 服务器部署在其他机器上,需要将 localhost 替换为实际的 IP 地址或域名
const amqplib = require('amqplib');(async () => {try {const connection = await amqplib.connect('amqp://admin:xw@localhost');// 创建一个通道。通道是进行通信的基本单位,通过通道可以发送和接收消息const channel = await connection.createChannel();// 队列的名字是:helloconst queue = 'hello';// 要发送的消息内容是:你好,xw!const msg = '你好,xw!';// 创建一个队列。如果队列不存在,则创建一个队列。如果队列已经存在,则不会创建新的队列// durable: 表示队列是否持久化。如果设置为true,即重启后队列不会消失await channel.assertQueue(queue, { durable: true });// 发送消息到队列// queue: 要发送消息的队列的名字// content: 要发送的消息内容// persistent: true,消息持久化,确保消息在 RabbitMQ 重启后仍然存在。channel.sendToQueue(queue, Buffer.from(msg), { persistent: true });// 打印发送提示信息console.log('[x] 发送了:%s', msg);// 500ms 后关闭连接setTimeout(() => {connection.close();process.exit(0);}, 500);} catch (error) {console.log(error);}
})();

我们一点点分析下,它这里都写了些什么东西。
1、连接 const connection = await amqplib.connect(‘amqp://admin:clwy1234@localhost’)
2、创建一个通道。通道是进行通信的基本单位,只有通过通道,才可以发送和接收消息
3、我们定义了队列的名字是:hello。并定义了要发送的消息内容是:你好,xw!
4、创建了一个队列。但如果队列已经存在,那就直接使用,不会创建新的了。这里要把队列的名字放进去,也就是hello
这里还有个参数:durable: false,它表示队列是否持久化 false:不持久化,true:队列的信息会被写入磁盘,也就是docker-compose里设置的./data/rabbitmq,当RabbitMQ服务重启后,队列会被自动恢复
5、channel.sendToQueue,很明显,这就是将消息发送到队列了。
接着做了个console.log提示,可以显示在命令行里,便于我们观察。
最后面,等500ms后,将连接断开

看完后,我们就先将它跑起来,看看到底有个什么效果。用cd命令,进入public目录中。然后直接用node运行这个文件

cd public
node send.js

打开RabbitMQ的管理后台:http://127.0.0.1:15672/

里面的菜单,依次是:

  • 概览:Overview
  • 连接:Connections
  • 通道:Channels
  • 交换机:Exchanges
  • 队列和流:Queues and Streams
  • 管理用户:Admin

由于代码里,在500ms之后,就将连接关闭了。所以连接和通道里,都看不到东西。大家点击队列这里,里面可以看到hello队列了。这正是我们刚才创建的队列名字。
在这里插入图片描述
点进去看,里面可以看到队列的一些详细内容。

  • 上面有一些统计信息。一共收到了一条信息,准备好的有几条,在内存中的又几条,占有了多少空间、多少内存什么的。
    在这里插入图片描述

  • 再往下看,注意这里有个发送消息(Publish message)获取消息(Get message)

  • 我们先点获取消息,这里就能看到刚才发送的:你好,xw

在这里插入图片描述

4.2. receive.js

刚才是生产者的代码,我们接着继续看消费者的代码。在public目录中新建receive.js

const amqplib = require('amqplib');(async () => {try {// 连接到 RabbitMQconst connection = await amqplib.connect('amqp://admin:clwy1234@localhost');// 创建一个通道。通道是进行通信的基本单位,通过通道可以发送和接收消息const channel = await connection.createChannel();// 队列的名字是:helloconst queue = 'hello';// 创建一个队列。如果队列不存在,则创建一个队列。如果队列已经存在,则不会创建新的队列// durable: 表示队列是否持久化。如果设置为true,即重启后队列不会消失await channel.assertQueue(queue, { durable: true });// 打印等待接收消息的提示信息console.log(' [*] 等待接收消息在 %s 队列中. 按 CTRL+C 退出', queue);// 当接收到消息channel.consume(queue, (msg) => {// 打印接收到的消息内容console.log('[x] 接收到了:%s', msg.content.toString());// 如果不是自动确认,需要手动确认消息// channel.ack(msg);}, {// noAck: 表示是否自动确认消息,设置为true表示自动确认,设置为false表示手动确认// 如果设置为false,需要手动确认消息,否则消息会被重复消费。例如:channel.ack(msg)noAck: true});} catch (error) {console.log(error);}
})();

我们来运行一下,

node receive.js

在这里插入图片描述
再回去刷新页面,也多刷新个几次。因为刚才的两条消息都已经处理完成了,所以现在的准备好的(Ready)和在内存中的(In memory)都是0了。
在这里插入图片描述
点击连接(Connections),因为我们这次的消费者代码,并没有关闭连接,所以一直连在RabbitMQ上
在这里插入图片描述
点击通道(Channels),里面也能看到创建的通道。
在这里插入图片描述

4. noAck 自动、手动确认消息

关于自动确认消息,大家可以改为

noAck: false
ack(确认)当消费者成功处理完一条消息后,会向 RabbitMQ 服务器发送一个确认信号(ack),告知服务器该消息已经被成功消费,服务器可以将该消息从队列中移除
nack(否定确认)当消费者处理消息失败或者遇到某些异常情况时,会向 RabbitMQ 服务器发送一个否定确认信号(nack),表示消息处理未成功。根据不同的配置,服务器会对该消息进行不同的处理,比如重新入队、丢弃等。

channel.nack(message, multiple, requeue); 参数

参数注释
message该参数代表需要进行否定确认的消息对象,也就是消费者从队列中接收到的消息。这个消息对象包含了消息的内容、属性等信息
multiple这是一个布尔类型的参数,用于指定是否批量否定确认消息。当 multiple 为 true 时,表示会将当前消费者已经接收到但还未确认(包括 ack 和 nack)的所有消息进行否定确认。这在需要一次性处理多个未确认消息时很有用,可以提高处理效率。当 multiple 为 false 时,仅对当前传入的这一条消息进行否定确认。
requeue同样是布尔类型的参数,用于控制消息在被否定确认后的去向。当 requeue 为 true 时,消息会被重新放回原队列中,通常会被放置在队列的尾部,之后可能会再次被分发给其他消费者或者当前消费者(取决于 RabbitMQ 的分发策略)。这种方式适用于消息处理失败是由于临时原因导致的情况,例如网络抖动、数据库短暂不可用等,让消息有机会被重新处理。当 requeue 为 false 时,消息不会被重新放回原队列,而是会被丢弃或者根据死信队列(Dead Letter Queue,DLQ)的配置进行处理。死信队列用于存储那些无法被正常消费的消息,方便后续分析和处理

这样就需要手动确认消息了,我们先不确认。将接收消息的终端重启一下,重新发送一次,可以看到收到信息了。
CTRL+C停止后,再次接收消息,发现还能接收到这个消息。这就错了,按道理消息被处理完成后,就不应该重复收到了。

这就是因为,由于我们没有手动确认,所以RabbitMQ,认为消息一直没有被处理成功,就会不断的发过来,让你处理。
在这里插入图片描述
加上

// 当接收到消息
channel.consume(queue, function (msg) {console.log('[x] 接收到了:%s', msg.content.toString());// 如果不是自动确认,需要手动确认消息channel.ack(msg);}, {noAck: false
});

CTRL+C停止后,重新连接,再接收一次。现在的代码里,已经将消息确认了。再次按CTRL+C停止,再连接上去,就不会收到重复的消息了。
在这里插入图片描述
那么ack设计的目的,是让你有更高的灵活度。因为有些任务的执行,有可能失败,需要重试。我这里有个代码示例,大家可以参考一下。

// 模拟处理逻辑
channel.consume(queue, function (msg) {try {// 假设处理成功,手动确认消息channel.ack(msg);} catch (error) {// 处理失败,可以选择拒绝消息,将消息从队列中删除// nack:否定确认):当消费者处理消息失败或者遇到某些异常情况时,会向 RabbitMQ 服务器发送一个否定确认信号(nack),表示消息处理未成功。根据不同的配置,服务器会对该消息进行不同的处理,比如重新入队、丢弃等。channel.nack(msg, false, false);console.error(error);}
}, {noAck: false
});

总结

1、消息队列的组成

基础的消息队列,由三部分组成:生产者、队列和消费者。

名称说明
生产者将消息发送到队列
队列临时存储消息,按照先进先出的原则,在生产者和消费者之间传递消息
消费者监听队列,收到消息后进行处理。
2、RabbitMQ 的方法
方法说明
const connection = amqplib.connect连接到 RabbitMQ
connection.close关闭连接
const channel = connection.createChannel创建通道
channel.assertQueue创建队列
channel.sendToQueue发送消息
channel.consume接收消息
channel.ack确认消息,消息被成功处理。
channel.nack拒绝消息,消息处理失败。
3、nack的参数
参数注释
message该参数代表需要进行否定确认的消息对象,也就是消费者从队列中接收到的消息。这个消息对象包含了消息的内容、属性等信息
multiple这是一个布尔类型的参数,用于指定是否批量否定确认消息。当 multiple 为 true 时,表示会将当前消费者已经接收到但还未确认(包括 ack 和 nack)的所有消息进行否定确认。这在需要一次性处理多个未确认消息时很有用,可以提高处理效率。当 multiple 为 false 时,仅对当前传入的这一条消息进行否定确认。
requeue同样是布尔类型的参数,用于控制消息在被否定确认后的去向。当 requeue 为 true 时,消息会被重新放回原队列中,通常会被放置在队列的尾部,之后可能会再次被分发给其他消费者或者当前消费者(取决于 RabbitMQ 的分发策略)。这种方式适用于消息处理失败是由于临时原因导致的情况,例如网络抖动、数据库短暂不可用等,让消息有机会被重新处理。当 requeue 为 false 时,消息不会被重新放回原队列,而是会被丢弃或者根据死信队列(Dead Letter Queue,DLQ)的配置进行处理。死信队列用于存储那些无法被正常消费的消息,方便后续分析和处理
4、 RabbitMQ 的参数
字段说明
durable: true队列持久化,队列信息将被写入磁盘,在RabbitMQ服务重启后仍然存在。但是队列已经创建过后,这个参数就没法改了,可以删掉队列,重新建一个新的。
persistent: true消息持久化,消息将会被写入磁盘,在RabbitMQ服务重启后仍然存在。
noAck: true自动确认消息,只要收到了就会自动确认。如果设置成false,则需要手动确认。

使用RabbitMQ发送邮件

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/22313.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【SQL实验】触发器

下载素材文件”tsgl”、“成绩管理”,将tsgl.bak和成绩管理.bak数据库还原到库中【导入操作在之前的文章中详细讲过】 触发器 1、为图书表设置更新触发器&#xff0c;根据总编号来更新书名、作者、出版社、分类号和单价(根据总编号找到相应记录&#xff0c;然后更新书名、作者…

Win10系统Docker+DeepSeek+ragflow搭建本地知识库

文章目录 1、安装ollama1.1 下载1.2 安装1.3 cmd命令行测试安装成功1.4 拉取模型2、安装ragflow2.1 下载项目2.2 通过docker拉取镜像安装2.3 查看docker日志是否安装成功3、模型配置3.1 第一次登录需要注册3.2 模型添加4、知识库配置4.1 创建知识库4.2 上传文档4.3 解析5、聊天…

redis的应用,缓存,分布式锁

1.应用 1.1可以用作缓存 作用&#xff1a;提交数据的查询效率&#xff0c;减少对数据库的访问频率 什么数据适合放入缓存 1.查询频率高&#xff0c;修改频率低 2.对安全系数比较低 如何实现 Service public class DeptServer {Autowiredprivate DeptMapper deptMapper;Auto…

springboot整合 xxl-job

文章目录 一、xxl-job是什么二、使用步骤 1. 下载并运行管理端代码2. 访问管理页面&#xff0c;确认是否启动成功3. 配置执行器【在自己的springboot项目中配置】4. 在页面上创建执行器和任务&#xff0c;与项目中绑定 总结参考 一、xxl-job是什么 XXL-JOB 是一个分布式任务调…

Jenkins 环境搭建---基于 Docker

前期准备 提前安装jdk、maven、nodeJs&#xff08;如果需要的话&#xff09; 创建 jenkins 环境目录&#xff0c;用来当做挂载卷 /data/jenkins/ 一&#xff1a;拉取 Jenkins 镜像 docker pull jenkins/jenkins:lts 二&#xff1a;设置 Jenkins挂载目录 mkdir -p ~/jen…

小米路由器 AX3000T 降级后无法正常使用,解决办法

问题描述 买了个 AX3000T 路由器&#xff0c;想安装 OpenWRT 或者 安装 Clash 使用&#xff0c;看教程说是需要降级到 v1.0.47 版本。 结果刷机之后路由器无法打开了&#xff0c;一直黄灯亮&#xff0c;中间灭一下&#xff0c;又是黄灯长亮&#xff0c;没有 WIFI 没有连接。以…

金融学-金融机构

前言 金融机构在金融体系运行体系运营中起着不可获缺的关键作用.如规则的制定与监管-中央银行,体系的运营证券公司,体系的供贷的参与者金融中介.本章将用一种说明我们的金融体系是怎样改进经济效率的经济分析,来讲述相关金融机构 金融结构的经济学分析 世界各国的金融体系在…

公网远程家里局域网电脑过程详细记录,包含设置路由器。

由于从校内迁居小区,校内需要远程控制访问小区内个人电脑,于是早些时间刚好自己是电信宽带,可以申请公网ipv4不需要花钱,所以就打电话直接申请即可,申请成功后访问光猫设备管理界面192.168.1.1,输入用户名密码登录超管(密码是网上查下就有了)设置了光猫为桥接模式,然后…

002 SpringCloudAlibaba整合 - Feign远程调用、Loadbalancer负载均衡

前文地址&#xff1a; 001 SpringCloudAlibaba整合 - Nacos注册配置中心、Sentinel流控、Zipkin链路追踪、Admin监控 文章目录 8.Feign远程调用、loadbalancer负载均衡整合1.OpenFeign整合1.引入依赖2.启动类添加EnableFeignClients注解3.yml配置4.日志配置5.远程调用测试6.服务…

基于javaweb的SpringBoot校园二手商品系统设计和实现(源码+文档+部署讲解)

技术范围&#xff1a;SpringBoot、Vue、SSM、HLMT、Jsp、PHP、Nodejs、Python、爬虫、数据可视化、小程序、安卓app、大数据、物联网、机器学习等设计与开发。 主要内容&#xff1a;免费功能设计、开题报告、任务书、中期检查PPT、系统功能实现、代码编写、论文编写和辅导、论…

国产开源PDF解析工具MinerU

前言 PDF的数据解析是一件较困难的事情&#xff0c;几乎所有商家都把PDF转WORD功能做成付费产品。 PDF是基于PostScript子集渲染的&#xff0c;PostScript是一门图灵完备的语言。而WORD需要的渲染&#xff0c;本质上是PDF能力的子集。大模型领域&#xff0c;我们的目标文件格…

stm32单片机个人学习笔记16(SPI通信协议)

前言 本篇文章属于stm32单片机&#xff08;以下简称单片机&#xff09;的学习笔记&#xff0c;来源于B站教学视频。下面是这位up主的视频链接。本文为个人学习笔记&#xff0c;只能做参考&#xff0c;细节方面建议观看视频&#xff0c;肯定受益匪浅。 STM32入门教程-2023版 细…

Springboot + Ollama + IDEA + DeepSeek 搭建本地deepseek简单调用示例

1. 版本说明 springboot 版本 3.3.8 Java 版本 17 spring-ai 版本 1.0.0-M5 deepseek 模型 deepseek-r1:7b 需要注意一下Ollama的使用版本&#xff1a; 2. springboot项目搭建 可以集成在自己的项目里&#xff0c;也可以到 spring.io 生成一个项目 生成的话&#xff0c;如下…

Ubuntu 的RabbitMQ安装

目录 1.安装Erlang 查看erlang版本 退出命令 2. 安装 RabbitMQ 3.确认安装结果 4.安装RabbitMQ管理界面 5.启动服务并访问 1.启动服务 2.查看服务状态 3.通过IP:port 访问界面 4.添加管理员用户 a&#xff09;添加用户名&#xff1a;admin&#xff0c;密码&#xff1…

Powershell Install deepseek

前言 deepseekAI助手。它具有聊天机器人功能&#xff0c;可以与用户进行自然语言交互&#xff0c;回答问题、提供建议和帮助解决问题。DeepSeek 的特点包括&#xff1a; 强大的语言理解能力&#xff1a;能够理解和生成自然语言&#xff0c;与用户进行流畅的对话。多领域知识&…

VS Code 如何搭建C/C++开发环境

目录 1.VS Code是什么 2. VS Code的下载和安装 2.1 下载和安装 2.2.1 下载 2.2.2 安装 2.2 环境的介绍 2.3 安装中文插件 3. VS Code配置C/C开发环境 3.1 下载和配置MinGW-w64编译器套件 3.1.1 下载 3.1.2 配置 3.2 安装C/C插件 3.3 重启VSCode 4. 在VSCode上编写…

vue从入门到精通(十一):条件渲染

条件渲染 1.v-if 写法: (1).v-if“表达式” (2).v-else-if“表达式” (3).v-else“表达式” 适用于:切换频率较低的场景。 特点:不展示的DOM元素直接被移除。 注意:v-if可以和:v-else-if、v-else一起使用&#xff0c;但要求结构不能被“打断” 2.v-show 写法:v-show“…

Java之——“String类”(内容较多,结合目录察看分类)

前言 在C语言中已经涉及到字符串了&#xff0c;但是在C语言中要表示字符串只能使用字符数组或者字符指针&#xff0c;可以使用标准库提供的字符串系列函数完成大部分操作&#xff0c;但是这种将数据和操作数据方法分离开的方式不符合面向对象的思想&#xff0c;而字符串应用又…

【C++篇】树影摇曳,旋转无声:探寻AVL树的平衡之道

文章目录 从结构到操作&#xff1a;手撕AVL树的实现一、AVL树介绍1.1 什么是AVL树1.2 平衡因子的定义1.3 平衡的意义1.4 AVL树的操作 二、AVL树的节点结构2.1 节点结构的定义&#xff1a; 三、插入操作3.1 插入操作概述3.2 步骤1&#xff1a;按二叉查找树规则插入节点3.3 步骤2…

限制Doris端口访问,解决REST API漏洞

方案一&#xff1a;通过Linux防火墙规则限制 目标&#xff1a;限制Doris的端口&#xff0c;只允许指定的ip访问此端口&#xff0c;其他禁止 1、设置规则 1.1、准备工作 注意&#xff1a;以上命令顺序不能错&#xff0c;先禁止后允许&#xff0c;另外此处只是临时设置。 # …