RabbitMQ之旅(1)

相信自己,终会成功

目录

主流MQ产品

1.kafaka 

2.RocketMQ

3.RabbitMQ

在xshell上安装RabbitMQ

 RabbitMQ七种工作模式

1.简单模式

​编辑

2.工作队列模式

3.发布/订阅模式

4.路由模式

5.通配符模式

6.RPC模式

AMQP.BasicProperties 设置消息属性的类

7.发布确认模式

代码展示(生产者)

常量命名规范

连接工厂 (ConnectionFactory) 

Channel(通道)

channel.queueDeclare 声明队列

channel.basicPublish 发送消息

channel.exchangeDeclare 声明创建交换机

channel.queueBind() 将队列绑定到交换机

channel.basicQos设置消费者预取限制

channel.basicAck 手动确认消息

代码展示(消费者)

DefaultConsumer:

handleDelivery 方法:


主流MQ产品

1.kafaka 

  1. 特点:高吞吐量、分布式、持久化、支持分区和副本。
  2. 适用场景:日志收集、流处理、实时数据分析等大数据场景。
  3. 优势:高吞吐量和低延迟,适合处理大量数据。
  4. 缺点:配置复杂,对小型项目可能过于重量级。

2.RocketMQ

  1. 特点:分布式、高吞吐量、低延迟、支持事务消息。
  2. 适用场景:电商、金融等需要高可靠性和事务支持的场景。
  3. 优势:支持事务消息,适合金融等高可靠性要求的场景。
  4. 缺点:社区相对较小,文档和资源不如Kafka丰富。

3.RabbitMQ

  1. 特点:轻量级、支持多种消息协议、易于使用和部署。
  2. 适用场景:中小型项目、需要快速上手的场景。
  3. 优势:易于使用,支持多种消息协议,社区活跃。
  4. 缺点:在大规模高并发场景下性能不如Kafka和RocketMQ。

在xshell上安装RabbitMQ

1.更新xshell中最新的软件包列表

sudo apt-get update

2.安装erlang

sudo apt-get install erlang

输入erl,出现下图内容表示安装成功  输入halt().退出即可

 3.安装rabbitmq

sudo apt-get install rabbitmq-server

4.确认安装结果

systemctl status rabbitmq-server

 显示running即可

 5.安装RabbitMQ管理界面

rabbitmq-plugins enable rabbiting_management

6.启动服务

sudo service rabbitmq-server start

在浏览器中输入自己云服务器的端口号+15672即可访问页面

添加用户名和密码 

rabbitmqctl add_user 用户名 密码

给用户权限

rabbitmqctl set_user_tags 用户名 权限等级

 RabbitMQ七种工作模式

P:生产者        C:消费者

queue:队列   X:交换机

在使用绑定的时候为 BindingKey

在发送消息的时候为RoutingKey

官方网站:RabbitMQ Tutorials | RabbitMQ

建立连接需要的信息:

1.IP   2.端口号   3.账号   4.密码   5. 虚拟主机

消费者代码:

1.创建连接  2.创建Channel  3.声明一个队列Queue  4.消费信息  5.释放资源

1.简单模式

一个生产者,一个消费者,点到点模式

2.工作队列模式

一个生产者,多个消费者

假如有十条队列消息,C1和C2是共同消费这10条消息,消息不会重复消费

3.发布/订阅模式

4.路由模式

订阅模式的变化形式

5.通配符模式

6.RPC模式

AMQP.BasicProperties 设置消息属性的类
属性名类型说明
contentTypeString消息内容的 MIME 类型(如 text/plainapplication/json)。
contentEncodingString消息内容的编码方式(如 UTF-8)。
headersMap<String, Object>自定义消息头,用于传递额外信息。
deliveryModeInteger消息的持久化模式:1(非持久化)或 2(持久化)。
priorityInteger消息的优先级(0 到 9,数值越大优先级越高)。
correlationIdString关联 ID,通常用于 RPC 模式,匹配请求和响应。
replyToString响应队列的名称,通常用于 RPC 模式。
expirationString消息的过期时间(以毫秒为单位的字符串)。
messageIdString消息的唯一标识符。
timestampDate消息的时间戳。
typeString消息的类型标识符。
userIdString用户 ID,用于验证消息的发送者。
appIdString应用程序 ID,用于标识发送消息的应用程序。
//        AMQP.BasicProperties 是一个不可变类,因此需要通过其内部类 Builder 来创建对象。AMQP.BasicProperties props = new AMQP.BasicProperties().builder().correlationId(correlationID).replyTo(Constants.RPC_RESPONSE_QUEUE).build();
  1. 客户端

    • 生成唯一的 correlationId

    • 设置 replyTo 为响应队列的名称。

    • 发送消息到请求队列(Constants.RPC_REQUEST_QUEUE)。

    • 监听响应队列(Constants.RPC_RESPONSE_QUEUE),等待服务器返回结果。

  2. 服务器

    • 监听请求队列(Constants.RPC_REQUEST_QUEUE)。

    • 处理请求后,将结果发送到客户端指定的响应队列(replyTo)。

    • 在响应消息中设置与请求相同的 correlationId

  3. 客户端匹配响应

    • 收到响应后,根据 correlationId 匹配对应的请求。

7.发布确认模式

是RabbitMQ消息可靠性保证的关键 


代码展示(生产者)

下图的代码Constants是自己写的 Java 常量

常量命名规范

1.常量名使用 全大写字母,并用下划线 _ 分隔单词(如 VIRTUALHOST 和 WORK_QUEUE)。

2.这是 Java 中的命名约定,便于区分常量和变量。 

//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST); //云服务器的IP地址connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD);  //密码connectionFactory.setVirtualHost(Constants.VIRTUALHOST); //虚拟主机//2. 开启信道Channel channel = connection.createChannel();//3. 声明交换机   使用内置的交换机

连接工厂 (ConnectionFactory) 

是一个设计模式中的“工厂类”,它的目的是隐藏创建连接的复杂细节(比如网络配置、认证信息等)。你可以通过这个工厂对象预先设置连接参数(如服务器地址、端口、用户名、密码等),然后通过它来生成具体的连接对象

Channel(通道)

通道 是建立在连接(Connection)之上的一个轻量级逻辑连接。一个连接(Connection)可以创建多个通道,每个通道可以独立地发送和接收消息。通道的设计是为了复用连接,避免频繁创建和销毁连接的开销。创建通道后,通常会用它来声明队列、发送消息或消费消息

channel.queueDeclare 声明队列
//4.声明队列channel.queueDeclare(Constants.WORK_QUEUE,true,false,false,null);/*** queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,*                                  Map<String, Object> arguments)*  参数说明:*  queue: 队列名称*  durable: 可持久化*  exclusive: 是否独占*  autoDelete: 是否自动删除*  arguments: 参数*/

channel.basicPublish 发送消息
        //5. 发送消息/*** basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)* 参数说明:* exchange: 交换机名称* routingKey: 内置交换机, routingkey和队列名称保持一致* props: 属性配置* body: 消息*/for (int i = 0; i < 10; i++) {String msg = "hello rabbitmq~"+i;channel.basicPublish("","hello", null, msg.getBytes());}

 //6. 资源释放channel.close();connection.close();
channel.exchangeDeclare 声明创建交换机
Exchange.DeclareOk exchangeDeclare(String exchange,              // 交换机名称String type,                  // 交换机类型(direct、fanout、topic、headers)boolean durable,              // 是否持久化boolean autoDelete,           // 是否自动删除boolean internal,            // 是否内部交换机Map<String, Object> arguments // 额外参数
) throws IOException;

channel.queueBind() 将队列绑定到交换机
  • queue:队列名称(如 Constants.PUBLISH_QUEUE1)。

  • exchange:交换机名称(如 Constants.PUBLISH_EXCHANGE)。

  • routingKey:路由键(如 "" 或 "key1"

channel.queueBind(Constants.PUBLISH_QUEUE1, Constants.PUBLISH_EXCHANGE, "");
channel.queueBind(Constants.PUBLISH_QUEUE2, Constants.PUBLISH_EXCHANGE, "");
//作用:将 Constants.PUBLISH_QUEUE1 和 Constants.PUBLISH_QUEUE2 绑定到 //Constants.PUBLISH_EXCHANGE。
//路由键为空字符串(""),表示所有消息都会被路由到这两个队列(前提是交换机类型支持)。
channel.basicQos设置消费者预取限制
参数名类型说明
prefetchSizeint预取消息的总大小(以字节为单位),通常设置为 0 表示不限制大小。
prefetchCountint预取消息的数量限制(即未确认消息的最大数量)。
globalboolean是否全局生效:true 表示对整个 Channel 生效,false 表示对每个消费者生效。
// 设置每个消费者最多预取 10 条未确认消息
channel.basicQos(10);// 设置整个 Channel 最多预取 100 条未确认消息
channel.basicQos(100, true);// 设置预取消息的总大小不超过 1MB,数量不超过 10 条
channel.basicQos(1024 * 1024, 10, false);
channel.basicAck 手动确认消息
参数名类型说明
deliveryTaglong消息的唯一标识符,由 RabbitMQ 分配。
multipleboolean是否批量确认:true 表示确认所有比 deliveryTag 小的消息;false 表示仅确认当前消息。

使用场景

  • 手动确认模式:当消费者从队列中拉取消息时,如果启用了手动确认模式(autoAck=false),则必须调用 basicAck 来确认消息。

  • 确保消息可靠性:通过手动确认,可以确保消息在处理成功后才会从队列中删除,避免消息丢失。

  • 批量确认:通过设置 multiple=true,可以一次性确认多条消息,提高效率。


代码展示(消费者)

public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD);  //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3. 声明队列   使用内置的交换机//如果队列不存在, 则创建, 如果队列存在, 则不创建channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);//4. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel){//从队列中收到消息, 就会执行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+ new String(body));}};channel.basicConsume(Constants.WORK_QUEUE, true, consumer);6. 资源释放channel.close();connection.close();}

DefaultConsumer

RabbitMQ 提供的默认消费者类。

channel:消费者绑定的通道(Channel)。

匿名内部类:通过 new DefaultConsumer(channel) { ... } 创建一个匿名内部类,并写 handleDelivery 方法。

handleDelivery 方法

当队列中有消息时,RabbitMQ 会调用此方法将消息传递给消费者

RabbitMQ 支持两种消息确认模式:

  1. 自动确认

    • 在 basicConsume 方法中将第二个参数设置为 true

    • 消费者接收到消息后,RabbitMQ 会自动将消息标记为已处理。

    • 示例:

      channel.basicConsume(QUEUE_NAME, true, consumer);
  2. 手动确认

    • 在 basicConsume 方法中将第二个参数设置为 false

    • 需要在 handleDelivery 方法中手动调用 channel.basicAck() 确认消息。

    • 示例:

      channel.basicAck(envelope.getDeliveryTag(), false);


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/31873.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于Matlab的人脸识别的二维PCA

一、基本原理 传统 PCA 在处理图像数据时&#xff0c;需将二维图像矩阵拉伸为一维向量&#xff0c;这使得数据维度剧增&#xff0c;引发高计算成本与存储压力。与之不同&#xff0c;2DPCA 直接基于二维图像矩阵展开运算。 它着眼于图像矩阵的列向量&#xff0c;构建协方差矩阵…

el-pagination的使用说明

<el-paginationv-model:current-page"pageNo" //当前第几页v-model:page-size"pageSize" //每页显示多少条数据:page-sizes"[10, 20, 30]" //控制每页显示的条数:small"true" //控制分页器大小:disabled&quo…

Redis Redis介绍、安装 - Redis客户端

目录 redis是什么&#xff0c;他的应用场景是什么&#xff1f; Redis的一些主要特点和应用场景&#xff1a; redis的官方网站&#xff1a;Redis redis是键值型数据库&#xff1a;&#xff08;也就是key-value模式&#xff09;&#xff08;跟python的字典很像&#xff09; …

LWIP网络模型及接口简介(DAY 01)

目录 1.网络协议分层模型 2. LWIP三种编程接口 1.网络协议分层模型 其中各层级的封装与拆封过程 2. LWIP三种编程接口 LwIP 提供了三种编程接口&#xff0c;分别为 RAW/Callback API、NETCONN API、SOCKET API。它们的易用性从左到右依次提高&#xff0c;而执行效率从左到右依…

【Python 数据结构 14.邻接表】

希望你的眼睛可以一直笑&#xff0c;想要的都得到 —— 25.3.11 一、邻接表的概念 1.邻接表的定义 邻接表是一种表示图的数据结构。邻接表的主要概念是&#xff1a;对于图中的每个顶点&#xff0c;维护一个由与其相邻的顶点组成的列表。这个列表可以用数组、链表或其他数据结构…

01 音视频知识学习(视频)

图像基础概念 ◼像素&#xff1a;像素是一个图片的基本单位&#xff0c;pix是英语单词picture的简写&#xff0c;加上英 语单词“元素element”&#xff0c;就得到了“pixel”&#xff0c;简称px&#xff0c;所以“像素”有“图像元素” 之意。 ◼ 分辨率&#xff1a;是指图像…

git文件过大导致gitea仓库镜像推送失败问题解决(push failed: context deadline exceeded)

问题描述&#xff1a; 今天发现gitea仓库推送到某个镜像仓库的操作几个月前已经报错终止推送了&#xff0c;报错如下&#xff1a; 首先翻译报错提示可知是因为git仓库大小超过1G限制。检查本地.git文件&#xff0c;发现.git文件大小已达到1.13G。确定是.git文件过大导致&…

clickhouse集群部署保姆级教程

ClickHouse安装 版本要求 23.8及之后的版本 硬件要求 三台机器 建议配置 磁盘 ssd 500G内存 32gcpu 16c 最低配置 磁盘 机械硬盘 50G内存 4gcpu 4c 容量规划 一亿条数据大约使用1TB磁盘容量 参考官方容量推荐 安装包准备 zookeeper安装 zookeeper需要java启动&…

FANformer:融合傅里叶分析网络的大语言模型基础架构

近期大语言模型(LLM)的基准测试结果引发了对现有架构扩展性的思考。尽管OpenAI推出的GPT-4.5被定位为其最强大的聊天模型&#xff0c;但在多项关键基准测试上的表现却不及某些规模较小的模型。DeepSeek-V3在AIME 2024评测中达到了39.2%的Pass1准确率&#xff0c;在SWE-bench Ve…

Electron使用WebAssembly实现CRC-32 常用标准校验

Electron使用WebAssembly实现CRC-32 常用标准校验 将C/C语言代码&#xff0c;经由WebAssembly编译为库函数&#xff0c;可以在JS语言环境进行调用。这里介绍在Electron工具环境使用WebAssembly调用CRC-32 常用标准格式校验的方式。 CRC-32 常用标准校验函数WebAssembly源文件…

MySQL数据库的相关语句

数据库的操作&#xff08;CURD&#xff09; 创建数据库&#xff08;重点&#xff09; 查看数据库&#xff08;重点&#xff09; show databases; ‐‐ 查看所有的数据库use 数据库名称;(*****) ‐‐ 使用数据库show create database 数据库名称; ‐‐ 查询数据库的创建的信息s…

Git的命令学习——适用小白版

浅要了解一下Git是什么&#xff1a; Git是目前世界上最先进的的分布式控制系统。Git 和其他版本控制系统的主要差别在于&#xff0c;Git 只关心文件数据的整体是否发生变化&#xff0c;而大多数其他系统则只关心文件内容的具体差异。Git 并不保存这些前后变化的差异数据。实际上…

充电桩快速搭建springcloud(微服务)+前后端分离(vue),客户端实现微信小程序+ios+app使用uniapp(一处编写,处处编译)

充电桩管理系统是专为中小型充电桩运营商、企业和个人开发者设计的一套高效、灵活的管理平台。系统基于Spring Cloud微服务架构开发&#xff0c;采用模块化设计&#xff0c;支持单机部署与集群部署&#xff0c;能够根据业务需求动态扩展。系统前端使用uniapp框架&#xff0c;可…

Unity光照之Halo组件

简介 Halo 组件 是一种用于在游戏中创建光晕效果的工具&#xff0c;主要用于模拟光源周围的发光区域&#xff08;如太阳、灯泡等&#xff09;或物体表面的光线反射扩散效果。 核心功能 1.光晕生成 Halo 组件会在光源或物体的周围生成一个圆形光晕&#xff0c;模拟光线在空气…

【cocos creator】热更新

一、介绍 试了官方的热更新功能&#xff0c;总结一下 主要用于安卓包热更新 参考&#xff1a; Cocos Creator 2.2.2 热更新简易教程 基于cocos creator2.4.x的热更笔记 二、使用软件 1、cocos creator v2.4.10 2、creator热更新插件&#xff1a;热更新manifest生成工具&…

深度评测阿里云操作系统控制台:功能全面,体验卓越!

&#x1f4dd;个人主页&#x1f339;&#xff1a;Eternity._ &#x1f339;&#x1f339;期待您的关注 &#x1f339;&#x1f339; ❀ 阿里云操作系统控制台 操作系统控制台操作系统实践体验服务的开通创建ESC实例组件管理功能体验&#xff1a;节点健康系统诊断系统观测订阅管…

Spring Boot 解析 LocalDateTime 失败?Uniapp 传输时间变 1970 的原因与解决方案

目录 前言1. 问题分析2. 时间戳&#xff08;推荐&#xff0c;可尝试&#xff09;3. 使用 JsonDeserialize & JsonSerialize&#xff08;中立&#xff09;4. 前端传 ISO-8601 格式&#xff08;不推荐&#xff0c;可尝试&#xff09;5. 用 String&#xff08;中立&#xff09…

【vitepress】如何搭建并部署自己的博客网站

文章目录 新的改变旧的github.io地址,现在不用更新netlify托管之后为这个 一 如何搭建[1]:安装vitepress初始化Vitepress启动项目 二 如何部署[2]视频教程 [3] 新的改变 旧的github.io地址,现在不用 https://dl-hx.github.io/myBlog/ 更新netlify托管之后为这个 https://dl…

Cursor新版0.47.x发布

0.47.x - 可靠性、键盘快捷键与提前体验选项功能 本次更新主要聚焦于稳定性和性能改进&#xff0c;以确保现有功能更好地运行。 新功能与改进 键盘快捷键&#xff1a;所有键盘快捷键现在都可以在键盘快捷键菜单中找到。前往 设置 > 键盘快捷键 来修改或添加新的快捷键。 …

docker 小记

一、卸载 查看当前版本 docker -v2. 如果有&#xff0c;先停止docker systemctl stop docker如果是yum安装&#xff0c;卸载方式为 #已防版本冲突&#xff0c;直接卸载 yum remove docker \docker-client \docker-client-latest \docker-common \docker-latest \docker-lat…