RabbitMQ (Java)学习笔记

目录

一、概述

①核心组件

②工作原理

③优势

④应用场景

二、入门

1、docker 安装 MQ

2、Spring AMQP

3、代码实现

pom 依赖

配置RabbitMQ服务端信息

发送消息

接收消息

三、基础

work Queue

案例

消费者消息推送限制(解决消息堆积方案之一)

Work模型的使用

Fanout交换机

Fanout 广播模式

案例

Fanout Exchange交换机的作用是什么?

Direct 交换机


 

一、概述

        RabbitMQ是一种开源的消息代理软件,基于AMQP(高级消息队列协议)实现。它充当消息中间件的角色,允许应用程序通过消息队列进行异步通信。RabbitMQ的主要功能是接收、存储和转发消息,从而解耦应用程序组件,提高系统的可扩展性和可靠性。

①核心组件

  1. 生产者(Producer):负责创建消息并将其发送到RabbitMQ服务器。生产者可以是任何应用程序,它们将消息封装成特定的格式,然后通过网络发送到消息代理。

  2. 交换器(Exchange):交换器是消息传递的枢纽,它接收生产者发送的消息,并根据一定的规则将消息路由到一个或多个队列中。RabbitMQ提供了多种类型的交换器,如直接交换器(direct)、扇形交换器(fanout)、主题交换器(topic)等,每种交换器的路由规则不同。

    • 直接交换器:根据消息的路由键(routing key)直接将消息发送到与该路由键绑定的队列。

    • 扇形交换器:将消息广播到所有绑定的队列,不考虑路由键。

    • 主题交换器:根据路由键的模式匹配规则将消息发送到匹配的队列。

  3. 队列(Queue):队列是存储消息的容器,它是一个先进先出(FIFO)的数据结构。队列中的消息会被消费者(Consumer)依次消费。队列可以持久化,即使RabbitMQ服务器重启,队列中的消息也不会丢失。

  4. 消费者(Consumer):消费者从队列中获取消息并进行处理。消费者可以是应用程序的另一个组件,也可以是独立的进程。消费者通过订阅队列来接收消息,一旦队列中有消息到达,消费者就会按照一定的策略(如轮询、负载均衡等)进行消费。

②工作原理

  1. 消息发送过程

    • 生产者创建消息,并指定交换器和路由键。

    • 生产者将消息发送到RabbitMQ服务器。

    • 交换器根据路由键将消息路由到一个或多个队列。

    • 如果队列不存在,交换器会根据配置决定是否丢弃消息或返回错误。

  2. 消息接收过程

    • 消费者向RabbitMQ服务器发送订阅请求,指定要消费的队列。

    • 当队列中有消息到达时,RabbitMQ服务器将消息推送给消费者。

    • 消费者接收到消息后进行处理,处理完成后向服务器发送确认消息(ack),告知服务器该消息已被成功消费。如果消费者在处理消息过程中失败或崩溃,RabbitMQ服务器会将消息重新放入队列,等待其他消费者消费。

③优势

  1. 高可靠性:RabbitMQ支持消息持久化,即使服务器出现故障,消息也不会丢失。同时,它还支持镜像队列,可以在多个节点之间复制队列,提高系统的可用性。

  2. 高可用性:RabbitMQ可以部署在多个节点上,形成集群。集群中的节点可以相互备份,当某个节点出现故障时,其他节点可以接管其工作,保证系统的正常运行。

  3. 灵活的路由策略:通过不同类型的交换器和路由键,可以实现复杂的消息路由逻辑,满足各种业务场景的需求。

  4. 易于扩展:RabbitMQ支持水平扩展,可以通过增加节点来提高系统的处理能力。同时,它还支持多种编程语言的客户端库,方便开发者进行集成。

④应用场景

  1. 异步任务处理:当应用程序需要执行耗时的任务时,可以将任务封装成消息发送到RabbitMQ,然后由消费者在后台进行处理,从而提高系统的响应速度。

  2. 服务间通信:在微服务架构中,不同的服务可以通过RabbitMQ进行通信,实现服务的解耦和异步交互。

  3. 事件驱动架构:RabbitMQ可以作为事件总线,将事件消息传递给不同的消费者,实现事件驱动的业务逻辑。

  4. 日志收集:将日志消息发送到RabbitMQ,然后由专门的日志处理程序进行消费和分析,实现日志的集中管理和分析。

二、入门

        MQ技术选型参考图如下:

1、docker 安装 MQ

docker run \-e RABBITMQ_DEFAULT_USER=itheima \     # MQ 默认登录用户-e RABBITMQ_DEFAULT_PASS=123321 \      # MQ 默认登录用户的密码-v mq-plugins:/plugins \               # 默认挂载--name mq \                            # 容器名--hostname mq \                        # 主机名-p 15672:15672 \                       # MQ 访问图像化页面端口-p 5672:5672 \                         # MQ 通信端口-d \                        rabbitmq:3.8-management

2、Spring AMQP

SpringAmqp的官方地址 :SpringAMQP官方网址

        

3、代码实现

pom 依赖

        <!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

配置RabbitMQ服务端信息

在每个微服务中引入MQ服务端信息,这样微服务才能连接到RabbitMQ

spring:rabbitmq:host: localhost # 主机名port: 5672          # 端口virtual-host: /hmall # 虚拟主机username: hmall  # 用户名password: 123   # 密码

发送消息

SpringAMQP提供了RabbitTemplate工具类,方便我们发送消息。样例发送消息代码如下

@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testvoid testSendMessage2Queue() {//        队列名称String queueName = "simple.queue";
//        消息String message = "Hello,Spring amqp !";
//        发送消息rabbitTemplate.convertAndSend(queueName, message);}
}

接收消息

SpringAMQP提供声明式的消息监听,我们只需要通过注解在方法上声明要监听的队列名称,将来SpringAMQP就会把消息传递给当前方法:

@Slf4j
@Component
public class mqListener {@RabbitListener(queues = "simple.queue")public void listenSimpleQueue(String msg){     // 注意这里接受的参数类型,取决于队列中发送者发送的消息类型,因为我们发送者发送字符串,所以这里接受的参数类型也是字符串log.info("消费者收到了Simple.queue的消息:【{}】",msg);}
}

三、基础

work Queue

        Workqueues,任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息

案例

        模拟WorkQueue,实现一个队列绑定多个消费者

        基本思路如下:

  1. 在RabbitMQ的控制台创建一个队列,名为work.queue
  2. 在publisher服务中定义测试方法,在1秒内产生50条消息,发送到work.queue
  3. 在consumer服务中定义两个消息监听者,都监听work.queue队列
  4. 消费者1每秒处理50条消息,消费者2每秒处理5条消息

通过Thread.sleep 方式 模拟性能不同的时候接受到的数据

    @RabbitListener(queues = "work.queue")public void listenWorkQueue1(String msg) throws InterruptedException {     // 注意这里接受的参数类型,取决于队列中发送者发送的消息类型,因为我们发送者发送字符串,所以这里接受的参数类型也是字符串System.out.println("消费者1 收到了 work.queue 的消息:【" + msg + "】");Thread.sleep(20);}@RabbitListener(queues = "work.queue")public void listenWorkQueue2(String msg) throws InterruptedException {     // 注意这里接受的参数类型,取决于队列中发送者发送的消息类型,因为我们发送者发送字符串,所以这里接受的参数类型也是字符串System.err.println("消费者2 收到了 work.queue 的消息........:【" + msg + "】");Thread.sleep(200);}

最后发现,消费者接受到的消息是默认的一人一半效果,而处理消息方面可以显示出效率比较低下

消费者消息推送限制(解决消息堆积方案之一)

        默认情况下,RabbitMQ的会将消息依次轮询投递给绑定在队列上的每一个消费者。但这并没有考虑到消费者是否已经处理完消息,可能出现消息堆积

因此我们需要修改application.yml,设置preFetch值为1,确保同一时刻最多投递给消费者1条消息:

spring:rabbitmq:listener:simple:prefetch: 1   # 每次只能获取一条消息,处理完成才能获取下一个消息

我们重新发送消息可以发现

这样我们可以充分消费每一条消息,能者多劳效果。

Work模型的使用

  • 多个消费者绑定到一个队列,可以加快消息处理速度
  • 同一条消息只会被一个消费者处理
  • 通过设置prefetch来控制消费者预取的消息数量,处理完一条再处理下一条,实现能者多劳

Fanout交换机

真正生产环境都会经过exchange来发送消息,而不是直接发送到队列,交换机的类型有以下三种:

  • Fanout : 广播
  • Direct : 定向
  • Topic : 话题

Fanout 广播模式

Fanout Exchange 会将接收到的消息广播到每一个跟其绑定的queue,所以也叫广播模式

通过 exchange 广播消息给队列,多个消费者 处理绑定的一个队列中的不同订单消息,并行操作,提高效率。

案例

利用SpringAMQP演示FanoutExchange的使用
实现思路如下:

  1. 在RabbitMQ控制台中,声明队列fanout.queue1和fanout.queue2
  2. 在RabbitMO控制台中,声明交换机hmall.fanout,将两个队列与其绑定
  3. 在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2
  4. 在publisher中编写测试方法,向hmall.fanout发送消息.

消费者方法

    @RabbitListener(queues = "fanout.queue1")public void listenFanoutQueue1(String msg){     // 注意这里接受的参数类型,取决于队列中发送者发送的消息类型,因为我们发送者发送字符串,所以这里接受的参数类型也是字符串System.out.println("消费者1 收到了 fanout.queue1 的消息:【{"+ msg + "}】");}@RabbitListener(queues = "fanout.queue2")public void listenFanoutQueue2(String msg){     // 注意这里接受的参数类型,取决于队列中发送者发送的消息类型,因为我们发送者发送字符串,所以这里接受的参数类型也是字符串System.err.println("消费者2 收到了 fanout.queue2 的消息:【{" + msg + "}】");}

发送消息方法

    @Testvoid testSendFanout() {//        队列名称String exchangeName = "hmall.fanout";
//        消息String message = "Hello,everyone!!!";
//        发送消息rabbitTemplate.convertAndSend(exchangeName, "", message);
//      rabbitTemplate.convertAndSend(exchangeName, null, message);  // 或者 第二个参数为 null}

Fanout Exchange交换机的作用是什么?

  • 接收publisher发送的消息
  • 将消息按照规则路由到与之绑定的队列
  • Fanout Exchange的会将消息路由到每个绑定的队列

Direct 交换机

Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由。

  • 每一个Queue都与Exchange设置一个BindingKey
  • 发布者发送消息时,指定消息的RoutingKey
  • Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

---------------------------------------------- 持续更新中----------------------------------------------------------

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/34236.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

HW基本的sql流量分析和wireshark 的基本使用

前言 HW初级的主要任务就是看监控&#xff08;流量&#xff09; 这个时候就需要我们 了解各种漏洞流量数据包的信息 还有就是我们守护的是内网环境 所以很多的攻击都是 sql注入 和 webshell上传 &#xff08;我们不管对面是怎么拿到网站的最高权限的 我们是需要指出它是…

camellia redis proxy v1.3.3对redis主从进行读写分离(非写死,自动识别故障转移)

1 概述 camellia-redis-proxy是一款高性能的redis代理&#xff08;https://github.com/netease-im/camellia&#xff09;&#xff0c;使用netty4开发&#xff0c;主要特性如下&#xff1a; 支持代理到redis-standalone、redis-sentinel、redis-cluster。支持其他proxy作为后端…

贪吃蛇小游戏-简单开发版

一、需求 本项目旨在开发一个经典的贪吃蛇游戏&#xff0c;用户可以通过键盘控制蛇的移动方向&#xff0c;让蛇吃掉随机出现在游戏区域内的食物&#xff0c;每吃掉一个食物&#xff0c;蛇的身体长度就会增加&#xff0c;同时得分也会相应提高。游戏结束的条件为蛇撞到游戏区域的…

使用 Docker 部署前端项目全攻略

文章目录 1. Docker 基础概念1.1 核心组件1.2 Docker 工作流程 2. 环境准备2.1 安装 Docker2.2 验证安装 3. 项目配置3.1 项目结构3.2 创建 Dockerfile 4. 构建与运行4.1 构建镜像4.2 运行容器4.3 访问应用 5. 使用 Docker Compose5.1 创建 docker-compose.yml5.2 启动服务5.3 …

接口自动化测试用例

Post接口自动化测试用例 Post方式的接口是上传接口&#xff0c;需要对接口头部进行封装&#xff0c;所以没有办法在浏览器下直接调用&#xff0c;但是可以用Curl命令的-d参数传递接口需要的参数。当然我们还以众筹网的登录接口为例&#xff0c;讲解post方式接口的自动化测试用…

使用WireShark解密https流量

概述 https协议是在http协议的基础上&#xff0c;使用TLS协议对http数据进行了加密&#xff0c;使得网络通信更加安全。一般情况下&#xff0c;使用WireShark抓取的https流量&#xff0c;数据都是加密的&#xff0c;无法直接查看。但是可以通过以下两种方法&#xff0c;解密抓…

阿里百炼Spring AI Alibaba

文章目录 学习链接阿里百炼创建api-key查看api调用示例示例pom.xmlAQuickStartMultiChatStreamChat Spring AI Alibaba简单示例pom.xmlapplication.ymlHelloworldControllerDashScopeChatModelController图解spring AI的结构 deepseekpom.xmlapplication.ymlDeepSeekChatClient…

【模拟算法】

目录 替换所有的问号 提莫攻击 Z 字形变换 外观数列 数青蛙&#xff08;较难&#xff09; 模拟算法&#xff1a;比葫芦画瓢。思路较简单&#xff0c;考察代码能力。 1. 模拟算法流程&#xff0c;一定要在演草纸上过一遍流程 2. 把流程转化为代码 替换所有的问号 1576. 替…

【Linux】进程(1)进程概念和进程状态

&#x1f31f;&#x1f31f;作者主页&#xff1a;ephemerals__ &#x1f31f;&#x1f31f;所属专栏&#xff1a;Linux 目录 前言 一、什么是进程 二、task_struct的内容 三、Linux下进程基本操作 四、父进程和子进程 1. 用fork函数创建子进程 五、进程状态 1. 三种重…

配置blender的python环境

在blender的脚本出输入&#xff1a; import sys print(sys.executable) 2. 通过上述命令我们得到blener的python版本&#xff0c;下面我们在conda配置一个同样版本的python环境。 conda create -n blenderpy python3.11.9找到blender安装路径下的python文件夹&#xff0c;将它…

【bug日记】 编译错误

在我使用vscode的时候&#xff0c;我想用一个头文件和两个cpp文件&#xff0c;头文件是用来声明一个类的&#xff0c;一个cpp是用来类的成员函数&#xff0c;一个cpp是主函数 但是我写完编译发现会弹出找不到这个类成员函数这个cpp文件&#xff0c;爆出这样的错误 提示我找不到…

SQLAlchemy系列教程:批量插入数据

高效地批量插入数据对于应用程序的性能至关重要。SQLAlchemy为批处理操作提供了几种机制&#xff0c;可以最大限度地减少开销并加快数据库事务时间。在本指南中&#xff0c;我们将探讨如何使用SQLAlchemy执行批量插入&#xff0c;包括从基础技术到高级技术。 搭建环境 在开始之…

蓝桥杯十天冲刺-day1(getline读入空格)

getline读入带空格的字符串 解决cin或scanf无法读入空格的问题 作文标题 代码思路 主要通过这个代码体会getline函数可以输入空格的作用 用getline函数输入含空格的字符串&#xff0c;用length()函数记字符串长度 依次扫描不为空格的字符计数 #include<bits/stdc.h>…

使用py-ffmpeg批量合成视频的脚本

我有一个小米摄像头&#xff0c;用它录出来的视频全部都是3s一段3s一段的。其中有几个小时的视频我需要保存&#xff0c;当初直接把摄像头的卡文件导出来重命名掉了&#xff0c;那时候没有注意&#xff0c;之后想剪辑/发送给别人的时候发现疯了&#xff1a; 1.剪辑的话&#x…

el-table表格样式设置单元格样式方法 :cell-class-name

需求&#xff1a;是否匹配当天日期决定当天时间高亮显示 效果如图 页面代码 <el-tableref"manpowerTable":key"manpowerForUserHandle.tableKey"class"sysDictInfoTable":data"handle.manpowerTable.data"style"width: 100…

基于express+TS+mysql+sequelize的后端开发环境搭建

步骤一&#xff1a;初始化node环境 npm init -y 步骤二&#xff1a;安装 Express、TypeScript、以及相关类型的定义文件 npm install express npm install --save-dev typescript types/node types/express ts-node nodemon npm install body-parser npm install mysql2 npm in…

蓝耘MaaS平台:阿里QWQ应用拓展与调参实践

摘要&#xff1a;本文深入探讨了蓝耘MaaS平台与阿里QWQ模型的结合&#xff0c;从平台架构、模型特点到应用拓展和调参实践进行了全面分析。蓝耘平台凭借其强大的算力支持、弹性资源调度和全栈服务&#xff0c;为QWQ模型的高效部署提供了理想环境。通过细化语义描述、调整推理参…

2. qt写带有槽的登录界面(c++)

我们在1.Qt写简单的登录界面(c)_c qt 设计一个简单界面-CSDN博客中写了个简单的登录界面&#xff0c;但没有槽&#xff0c;在这里写一个带有槽的界面。 1.代码 代码目录如下&#xff1a; main.cpp的代码如下&#xff1a; #include "MainWindow.h" #include <Qt…

linux - 基础IO之操作与文件描述符全解析:从C语言到系统调用底层实现

目录 1.回顾c语言中所学的文件 2.提炼对文件的理解&#xff08;linux基础io第一阶段的学习&#xff09; a.在操作系统内部&#xff0c;一个进程和一个被打开的文件&#xff0c;他们到后面会变成两种对象之间的指针关系。 b.文件 属性 内容 c.在c语言中,以w的方式打开文件…

【A2DP】深入解读A2DP中通用访问配置文件(GAP)的互操作性要求

目录 一、模式支持要求 1.1 发现模式 1.2 连接模式 1.3 绑定模式 1.4 模式间依赖关系总结 1.5 注意事项 1.6 协议设计深层逻辑 二、安全机制&#xff08;Security Aspects&#xff09; 三、空闲模式操作&#xff08;Idle Mode Procedures&#xff09; 3.1 支持要求 …