MQ,RabbitMQ,SpringAMQP的原理与实操

MQ

同步通信

image-20240202103233412

image-20240202105021949

image-20240202105123170

异步通信

image-20240202111930461

事件驱动优势:

  • 服务解耦

  • 性能提升,吞吐量提高

    image-20240202141023030

  • 服务没有强依赖,不担心级联失败问题

    image-20240202141137606

  • 流量消峰

    image-20240202141435355

​ 小结: 大多情况对时效性要求较高,所有大多数时间用同步。而如果不需要对方的结果,且吞吐量,并发量较高则需要使用异步通信

image-20240202141921703

MQ常见框架

MQ(MessageQueue),消息队列,字面来看就是存放消息的队列,也就是事件驱动架构中的Broker

消息:就是事件,比如支付成功了这个事件,在MQ中就是一个消息

image-20240202144211395

RabbitMQ,RocketMQ 适合处理业务(若需要优化定制则选Rocket,因为用Java写的)

Kafka 适合处理日志(海量数据且对数据安全性要求不高的场景),ActiveMQ用的较少

RabbitMQ

RabbitMQ概述与安装

RabbitMQ是基于Erlang语言(面向并发的语言,天生为分布式系统而设计的)开发的开源消息通信中间件,官网地址:https://www.rabbitmq.com/

参考课前资料(链接:https://pan.baidu.com/s/1JuVKKFpUXg8TFxa_FoV3Gg
提取码:1468) 来安装RabbitMQ

image-20240202144811905

之后在浏览器输入:http://192.168.83.130:15672/ 进入RabbitMQ管理页面,按docker run中设置的账号密码进行登录

结果如下

image-20240204101726227

mq整体架构

image-20240204103735587

小结

image-20240204103835121

常见消息模型

image-20240204105108372

HelloWorld 案例

image-20240204105310538

动手实践

案例: 完成官方Demo中的hello world案例(链接:https://pan.baidu.com/s/1JuVKKFpUXg8TFxa_FoV3Gg
提取码:1468)

image-20240204105416259

打开项目,将ip调成自己的rabbitmq使用虚拟机(或电脑)的ip,再运行一次PublisherTest中的 testSendMessage() 方法

发送一条消息。再运行ConsumerTest 中main方法来接收消息。

image-20240204112803673

小结

image-20240204135103572

SpringAMQP

AMOP(Advanced Message Queuing Protocol)高级消息队列协议,大大简化消息发送和接收的代码量,且与语言无关

SpringAmqp的官方地址:https://spring.io/projects/spring-amqp

image-20240204145954201

image-20240204140927498

AMQP依赖

<!--AMQP依赖,包含RabbitMQ-->
<dependency>    <groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

在配置文件中添加mq连接信息

spring:rabbitmq:host: 192.168.83.130 # 主机名port: 5672 # 端口virtual-host: / # 虚拟主机 username: itcast # 用户名password: 123321 # 密码

Basic Queue 简单队列模型

案例:利用SpringAMQP实现HelloWorld中的基础消息队列功能

流程如下:

1.在父工程中引入spring-amqp的依赖,以及在publisher服务中编写配置

2.在publisher服务中利用RabbitTemplate的convertAndSend方法,发送消息到simple.queue这个队列

image-20240204145734357

SpringAMQP发送消息步骤:引入依赖和设置配置---->利用RabbitTemplate的convertAndSend方法

3.在consumer中编写代码,接收消息

image-20240204151638720

SpringAMQP接收消息步骤:引入依赖和设置配置—》定义类,添加Component注解,类中声明方法添加@RabbitListener注解

Work Queue 工作队列模型

Work queue,工作队列,可以提高消息处理速度,避免队列消息堆积

比如队列 一秒来50条消息 一个消费者一秒处理40条消息,那么需要两个消费者才能使得队列中消息被处理不丢失

image-20240204153355750

案例:实现一个队列绑定多个消费者

image-20240204153947098

问题:rabbitMQ消息预取,会将50条消息平均分给消费者1和消费者2,但消费者2处理速度慢,因此在1s内处理不完publisher发过来的50条消息

解决方案:让能者多劳,设置preFetch,控制预取消息的上限

image-20240204160513742

小结image-20240204161439493

发布、订阅模型-Fanout

image-20240204161952605

注意:exchange负责消息路由,而不是存储(queue负责存储),路由失败则消息丢失

Fanout Exchange 会将接收到的消息路由到每一个跟其绑定的queue(广播)

案例:利用SpringAMQP演示FanoutExchange的使用

image-20240204163804072

step1 在consumer服务中声明Exchange、Queue、Binding(绑定关系)

image-20240204163828992

image-20240204164305716

step2 在consumer服务声明两个消费者

在consumer服务的SpringRabbitListener类中,添加两个方法,分别监听fanout.queue1和fanout.queue2:

@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}@RabbitListener(queues = "fanout.queue2") 
public void listenFanoutQueue2(String msg) {System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}

step3 在publisher服务发送消息到FanoutExchange

在publisher服务的SpringAmqpTest类中添加测试方法:

@Test
public void testFanoutExchange() {// 队列名称  String exchangeName = "itcast.fanout"; // 消息String message = "hello, everyone!";// 发送消息,参数分别是:交互机名称、RoutingKey(暂时为空)、消息		rabbitTemplate.convertAndSend(exchangeName, "", message);
}

小结

image-20240205092228233

发布、订阅模型-Direct

image-20240205092356181

案例:利用SpringAMQP演示DirectExchange的使用

image-20240205092544599

步骤一 在consumer服务声明Exchange、Queue

1.在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2,

2.并利用@RabbitListener声明Exchange、Queue、RoutingKey

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),key = {"red","blue"}
))
public void listenDirectQueue1(String msg){System.out.println("消费者1........接收到路由消息:【" + msg + "】" + LocalTime.now());
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),key = {"red","yellow"}
))
public void listenDirectQueue2(String msg){System.out.println("消费者2........接收到消路由息:【" + msg + "】" + LocalTime.now());
}

步骤二 在publisher服务发送消息到DirectExchange

在publisher服务的SpringAmqpTest类中添加测试方法:

@Test
public void testDirectExchange() {//交换机名字String exchangeName = "itcast.direct";//消息String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";//发送消息,参数依次为:交换机名称,RoutingKey,消息rabbitTemplate.convertAndSend(exchangeName, "blue", message);
}

从blue->yellow->red 运行三次,得到结果如下

image-20240205104021565

小结

image-20240205104321850

发布、订阅模型-Topic

image-20240205104559605

案例 利用SpringAMQP演示TopicExchange的使用

image-20240205104825731

步骤一:在consumer服务声明Exchange、Queue

1.在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2,

2.并利用@RabbitListener声明Exchange、Queue、RoutingKey

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),key = "china.#"
))
public void listenTopicQueue1(String msg){System.out.println("消费者1........接收到路由消息:【" + msg + "】" + LocalTime.now());
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),key = "#.news"
))
public void listenTopicQueue2(String msg){System.out.println("消费者2........接收到消路由息:【" + msg + "】" + LocalTime.now());
}

步骤二:在publisher服务发送消息到TopicExchange

在publisher服务的SpringAmqpTest类中添加测试方法:

@Test
public void testTopicExchange() {//交换机名字String exchangeName = "itcast.topic";//消息String message = "喜报!孙悟空大战哥斯拉,胜!";//发送消息,参数依次为:交换机名称,RoutingKey,消息rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}

小结

image-20240205105655795

消息转化器

Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。如果要修改只需要定义一个MessageConverter 类型的Bean即可。推荐用JSON方式序列化,步骤如下:

​ 在publisher服务引入依赖

<dependency>   <groupId>com.fasterxml.jackson.core</groupId>   <artifactId>jackson-databind</artifactId>
</dependency>

​ 在publisher服务声明MessageConverter。(原本应该放到配置类中,但启动类也是配置类,所以可以放启动类中)

@Bean
public MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter(); 
}

image-20240205111950238

案例 测试发送Object类型消息

image-20240205111336946

结果如下(没有更改JDK序列化方式)

image-20240205111231469

使用json序列化器之后

image-20240205111303797

consumer接收消息过程

step1:加jackson依赖,依赖上面已经放父工程中,就不用做了

step2: 将pulisher中相同的MessageConverter放入consumer 启动类中(发送方与接收方必须相同)

@Bean
public MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter(); 
}

step3: 定义一个消费者,监听object.queue队列并消费消息

 @RabbitListener(queues = "object.queue")
public void listenObjectQueue(Map<String,Object> msg){System.out.println("消费者........接收到对象消息:【" + msg + "】" + LocalTime.now());
}

image-20240205135854654

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/252731.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

20240202在Ubuntu20.04.6下使用whisper.cpp的CPU模式

20240202在Ubuntu20.04.6下使用whisper.cpp的CPU模式 2024/2/2 14:15 rootrootrootroot-X99-Turbo:~/whisper.cpp$ ./main -l zh -osrt -m models/ggml-medium.bin chs.wav 在纯CPU模式下&#xff0c;使用medium中等模型&#xff0c;7分钟的中文视频需要851829.69 ms&#xf…

代码随想录 Leetcode51. N 皇后

题目&#xff1a; 代码(首刷看解析 2024年2月6日&#xff09;&#xff1a; class Solution { private:vector<vector<string>> res;void backtracking(int n, int row, vector<string>& chessboard) {if (row n) {res.push_back(chessboard);return;}f…

SpringBoot activemq收发消息、配置及原理

SpringBoot集成消息处理框架 Spring framework提供了对JMS和AMQP消息框架的无缝集成&#xff0c;为Spring项目使用消息处理框架提供了极大的便利。 与Spring framework相比&#xff0c;Spring Boot更近了一步&#xff0c;通过auto-configuration机制实现了对jms及amqp主流框架…

C++ 动态规划 状态压缩DP 最短Hamilton路径

给定一张 n 个点的带权无向图&#xff0c;点从 0∼n−1 标号&#xff0c;求起点 0 到终点 n−1 的最短 Hamilton 路径。 Hamilton 路径的定义是从 0 到 n−1 不重不漏地经过每个点恰好一次。 输入格式 第一行输入整数 n 。 接下来 n 行每行 n 个整数&#xff0c;其中第 i 行…

什么是TCP三次握手、四次挥手?

&#x1f3ac;作者简介&#xff1a;大家好&#xff0c;我是小徐&#x1f947;☁️博客首页&#xff1a;CSDN主页小徐的博客&#x1f304;每日一句&#xff1a;好学而不勤非真好学者 &#x1f4dc; 欢迎大家关注&#xff01; ❤️ 1、三次握手 你(客户端)给一个朋友(服务器)打电…

ERR_SSL_VERSION_OR_CIPHER_MISMATCH

我在namesilo买的域名&#xff0c;coludflare做的解析&#xff0c;华为云的SSL&#xff0c;用宝塔部署的SSL&#xff0c;访问https报错&#xff0c;http却正常&#xff1a; 报错&#xff1a;此网站无法提供安全连接www.hongkong.ioyunxin.top 使用了不受支持的协议。 ERR_SSL_…

机器学习 | 探索朴素贝叶斯算法的应用

朴素贝叶斯算法是一种基于贝叶斯定理和特征条件独立假设的分类算法。它被广泛应用于文本分类、垃圾邮件过滤、情感分析等领域&#xff0c;并且在实际应用中表现出色。 朴素贝叶斯法是基于贝叶斯定理与特征条件独立假设的分类方法&#xff1a; 1&#xff09;对于给定的待分类项r…

字节AIGC场景发力,云雀大模型+扣子智能体,快速构建专属AI应用

之前给大家推荐过类似的编排工具dify,今天给大家推荐的字节的这个产品&#xff0c;总体体验还是不错的。 第一步登录扣子 点击创建BOT-编排 创建插件 插件能够让 Bot 调用外部 API&#xff0c;例如搜索信息、浏览网页、生成图片等&#xff0c;扩展 Bot 的能力和使用场景。 我…

【宝藏系列】嵌入式入门概念大全

【宝藏系列】嵌入式入门概念大全 0️⃣1️⃣操作系统&#xff08;Operating System&#xff0c;OS&#xff09; 是管理计算机硬件与软件资源的系统软件&#xff0c;同时也是计算机系统的内核与基石。操作系统需要处理管理与配置内存、决定系统资源供需的优先次序、控制输入与输…

【Java程序设计】【C00246】基于Springboot的留守儿童爱心网站(有论文)

基于Springboot的留守儿童爱心网站&#xff08;有论文&#xff09; 项目简介项目获取开发环境项目技术运行截图 项目简介 这是一个基于Springboot的留守儿童爱心网站 本系统分为系统功能模块、管理员功能模块以及用户功能模块。 系统功能模块&#xff1a;系统首页的主要功能展…

我在项目中使用Redis的几个场景

目录 缓存 会话存储 分布式锁 消息队列 位统计 计数器 排行榜 缓存 缓存的目的是为了提高系统响应速度、减少数据库等资源的压力&#xff0c;redis作为键值对形式的内存数 据库&#xff0c;可以提供非常快速的读取速度&#xff0c;使得它成为存储热点数据或频繁访问数…

使用STM32 HAL库配置和控制外设接口

使用STM32 HAL库配置和控制外设接口非常简单&#xff0c;以下是一个示例&#xff0c;演示如何使用STM32 HAL库配置和控制USART外设接口。 ✅作者简介&#xff1a;热爱科研的嵌入式开发者&#xff0c;修心和技术同步精进 ❤欢迎关注我的知乎&#xff1a;对error视而不见 代码获取…

小白水平理解面试经典题目_二维数组类LeetCode 2966 Divide Array【排序算法实现】

2966 将数组划分为具有最大差值的数组 小白渣翻译&#xff1a; 给定一个大小为 n 的整数数组 nums 和一个正整数 k 。 将数组分成一个或多个大小为 3 的数组&#xff0c;满足以下条件&#xff1a; nums 的每个元素都应该位于一个数组中。一个数组中任意两个元素之间的差异小…

【力扣】两数之和,暴力枚举+哈希表

两数之和原题地址 方法一&#xff1a;暴力枚举 首先&#xff0c;我们需要枚举数组中所有可能的下标对组合&#xff0c;对于n个数的数组&#xff0c;从中选2个下标&#xff0c;有种可能。做法很简单&#xff0c;遍历数组中的所有元素&#xff0c;对于每一个元素&#xff0c;遍…

【数据开发】pyspark入门与RDD编程

【数据开发】pyspark入门与RDD编程 文章目录 1、pyspark介绍2、RDD与基础概念3、RDD编程3.1 Transformation/Action3.2 数据开发流程与环节 1、pyspark介绍 pyspark的用途 机器学习专有的数据分析。数据科学使用Python和支持性库的大数据。 spark与pyspark的关系 spark是一…

Golang 学习(一)基础知识

面向对象 Golang 也支持面向对象编程(OOP)&#xff0c;但是和传统的面向对象编程有区别&#xff0c;并不是纯粹的面向对象语言。 Golang 没有类(class)&#xff0c;Go 语言的结构体(struct)和其它编程语言的类(class)有同等的地位&#xff0c;Golang 是基于 struct 来实现 OOP…

使用 Python、Elasticsearch 和 Kibana 分析波士顿凯尔特人队

作者&#xff1a;来自 Jessica Garson 大约一年前&#xff0c;我经历了一段压力很大的时期&#xff0c;最后参加了一场篮球比赛。 在整个过程中&#xff0c;我可以以一种我以前无法做到的方式断开连接并找到焦点。 我加入的第一支球队是波士顿凯尔特人队。 波士顿凯尔特人队是…

新书速览|Kubernetes从入门到DevOps企业应用实战

从0到1&#xff0c;从零开始全面精通Kubernetes&#xff0c;助力企业DevOps应用实践 本书内容 《Kubernetes从入门到DevOps企业应用实战》以实战为主&#xff0c;内容涵盖容器技术、Kubernetes核心资源以及基于Kubernetes的企业级实践。从容器基础知识开始&#xff0c;由浅入深…

2024 年十大 Vue.js UI 库

Vue.js 是一个流行的 JavaScript 框架&#xff0c;它在前端开发者中越来越受欢迎&#xff0c;以其简单、灵活和易用性而闻名。 Vue.js 如此受欢迎的原因之一是它拥有庞大的 UI 库生态系统。 这些库为开发人员提供了预构建的组件和工具&#xff0c;帮助他们快速高效地构建漂亮…

003集—三调数据库添加三大类字段——arcgis

在国土管理日常统计工作中经常需要用到三大类数据&#xff08;农用地、建设用地、未利用地&#xff09;&#xff0c;而三调数据库中无三大类字段&#xff0c;因此需要手工录入三大类字段&#xff0c;并根据二级地类代码录入相关三大类名称。本代码可一键录入海量三大类名称统计…