分布式消息队列:RabbitMQ(1)

目录

一:中间件

二:分布式消息队列 

2.1:是消息队列

2.1.1:消息队列的优势

2.1.1.1:异步处理化

2.1.1.2:削峰填谷

2.2:分布式消息队列

2.2.1:分布式消息队列的优势

2.2.1.1:数据的持久化

2.2.1.2:可扩展性

2.2.1.3:应用解耦

2.2.1.4:发送订阅 

2.2.2:分布式消息队列的应用场景 

三:Rabbitmq

3.1:基本概念

3.2:快速入门 

3.2.1:引入消息队列Java客户端

3.2.2:单消费开发生产者和消费者

 3.2.3:多消费开发生产者和消费者

3.3.3:交换机

3.3.3.1:交换机的类别

a):fanout


一:中间件

连接多个系统,帮助多个系统紧密协作的技术(组件)

二:分布式消息队列 

2.1:是消息队列

概念:存储消息的队列

关键词:存储,消息,队列

存储:存储数据

消息:某种数据结构,比如l字符串,对象,二进制数据,json等

队列:先进先出的数据结构

作用:在不同的系统下,应用之间实现消息的传输,不需要考虑传输应用的编程语言,系统和,框架等等,实现应用解耦的作用。

        eg:可以让Java开发的应用发消息,让php开发的应用收消息。

 针对生产者来说:不需要关心消费者什么时候接受消息,什么时候消费,我只需要把我的工作完成就好了。生产者和消费者之间实现了解耦。

针对上图,同样我们会发现,当小李要别的书籍的时候,小王也可以将别的书籍放到消息队列中。生产者和消费者从某一种程度上实现了解耦合。

2.1.1:消息队列的优势

2.1.1.1:异步处理化

生产者发送消息之后,可以继续去忙别的,消费者什么时候消费都可以,不产生阻塞。

2.1.1.2:削峰填谷

先把用户的请求放到消息队列种,消费者(实际执行操作的应用)可以按照自己的需求,慢慢去取。

举个栗子:

原本:

12点时来了10万个请求,原本情况下,10万个请求都在系统内部立刻处理,很快系统压力过大宕机。

现在:

把10万个请求放到消息队列中,处理系统以自己的恒定速率(比如每秒1个)慢慢执行,稳定处理。

2.2:分布式消息队列

2.2.1:分布式消息队列的优势

分布式消息队列继承于消息队列的优势,并进行了一部分的拓展。

2.2.1.1:数据的持久化

把消息集中存储在硬盘当中,服务器重启就不会丢失。

2.2.1.2:可扩展性

可以根据需求,随时增加(或减少)节点,继续保持稳定的服务。

2.2.1.3:应用解耦

可以连接不同语言(Java,PHP),框架开发的系统,让这些系统读取数据。

示例:

以前的项目:

加了分布式消息队列之后的项目: 

1:一个系统挂了,不影响另一个系统。

2:系统挂了之后并恢复,仍然可以从消息队列中取消息

3:只要发送消息到队列,就可以立即进行返回,不用同步调用所有系统,性能更高

2.2.1.4:发送订阅 

假设情景:当QQ进行了一部分改革之后,其他使用QQ的APP也应该处理
这部分改革。
QQ做了一个情景,要让其他系统知道,比如公告消息。如果QQ一次性给这些应用发消息,所引出的问题如下:
1.每次发通知都要调用很多系统,很麻烦,很可能失败
2.不知道哪个系统需要这些QQ的改革。
解决方案:大的核心系统始终往消息队列发消息,其他的系统都去订阅这个消息队列的消息,用的时候进行取就OK。

2.2.2:分布式消息队列的应用场景 

1:耗时场景。

2:高并发场景。

3:分布式系统的协作。(跨团队,跨业务合作,应用解耦)

4:强稳定的场景(金融业务,持久化,可靠性,削锋填谷)  

三:Rabbitmq

特点:生态好,易学习,易于理解,时效性强,支持不同语言的客户端,扩展性,可用性都很不错。

3.1:基本概念

AMPQ协议:Rabbitmq是遵循AMPQ协议的一种消息中间件。

生产者:发消息到交换机

消费者:收消息的,从某个队列中取消息

交换机(exchange):负责把消息转发到对应的队列

队列(Queue):存储消息的

路由(Rountes):转发,怎么把一个消息从一个地方转发到另一个地方(比如生产者转发到某个队列)

Rabbitmq:端口占用   5672:程序连接的端口 15672:管理界面端口

Rabbitmq的安装:https://blog.csdn.net/qq_25919879/article/details/113055350

管理器页面打不开:http://t.csdnimg.cn/6FqZl

3.2:快速入门 

3.2.1:引入消息队列Java客户端

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.17.0</version></dependency>

3.2.2:单消费开发生产者和消费者

生产者端代码:

public class SingeProducer {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {//创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();//频道相当于客户端(jdbcClient,redisClient),提供了和消队列server建立通信,程序通过channel进行发送消息Channel channel = connection.createChannel()) {//创建消息队列,第二个参数(durable):是否开启持久化,第三个参数exclusiove:是否允许当前这个创建消息队列的//连接操作消息队列 第四个参数:没有人使用队列,是否需要删除channel.queueDeclare(QUEUE_NAME, false, false, false, null);//发送消息String message = "Hello World!";channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));System.out.println(" [x] Sent '" + message + "'");}}
}

消费者代码:

public class SingeConsumer {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {//创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");//创建频道,提供通信Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");//如何处理消息DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });}
}

 3.2.3:多消费开发生产者和消费者

场景:一个生产者给队列里面发了一条消息,多个消费者进行消费。适用于多个机器同时去接收并处理任务(每个机器处理任务有限)

队列持久化:

durable:

参数设置为true,服务器队列不丢失

 channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

 消息持久化:

 指定MessageProperties.PERSISTENY_TEXT_PLAIN参数

    channel.basicPublish("", TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));

生产者端代码:

public class MultiProducer {private static final String TASK_QUEUE_NAME = "multi_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);Scanner scanner=new Scanner(System.in);while(scanner.hasNext()){String message = scanner.nextLine();channel.basicPublish("", TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + "'");}}}
}

消费者代码: 

在消费者代码中,如何测验一个消费者只能取一个任务,我们利用for循环来进行解决。

指定确认某条消息:

第一个参数:获取消息的信息

第二个参数:如果是true,把所有的历史消息全都确认了。如果为false,取出当前的消息。

   //第二个参数:是否一次性取所有的消息。如果为true,则要取所有的挤压在消息队列中的消息//如果为false,则为一次性取一个消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

指定拒绝某条消息

第一个参数:获取消息的信息

第二个参数:如果是true,则代表是否要拒绝所有的历史消息。

第三个参数:如果是false, 则代表失败的任务是否要重新入队。

  channel.basicNack(delivery.getEnvelope().getDeliveryTag(),false,false);
public class MultiConsumer {private static final String TASK_QUEUE_NAME = "multi_queue";public static void main(String[] argv) throws Exception {//建立连接ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");final Connection connection = factory.newConnection();for (int i = 0; i <= 2; i++) {final Channel channel = connection.createChannel();int finalI=i;//声明队列channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");//控制单个消费者的任务积压数:每个消费者最多处理一个任务,每个消费者智能处理一个任务channel.basicQos(1);//处理从队列中取的的消息DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");try {//处理工作System.out.println(" [x] Received '" +"编号:"+finalI+ message + "'");//停20秒模拟一个机器处理工作能力有限Thread.sleep(20000);//第二个参数:是否一次性取所有的消息。如果为true,则要取所有的挤压在消息队列中的消息//如果为false,则为一次性取一个消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);} catch (InterruptedException e) {throw new RuntimeException(e);} finally {System.out.println(" [x] Done");channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}};//开启消费监听channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });}}}

3.3.3:交换机

一个生产者给多个队列发消息,一个生产者对多个队列。交换机:转发功能,怎么把消息转发到不同的队列上。

3.3.3.1:交换机的类别
a):fanout

场景:很适用于发布订阅的场景。

特点:消息会被转发到所有绑定到交换机的队列。

生产者代码:当生产者发送消息后,由交换机放到消息队列中,消费者从消息队列中取。

public class FonoutProducer {private static final String EXCHANGE_NAME = "1";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {//创建交换机channel.exchangeDeclare(EXCHANGE_NAME, "fanout");Scanner scanner=new Scanner(System.in);while(scanner.hasNext()){String message = scanner.nextLine();channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + "'");}}}}

消费者代码:

public class FonoutConsumer {private static final String EXCHANGE_NAME = "1";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();Channel channel2= connection.createChannel();//声明交换机//创建队列,随机分配一个队列名称channel.exchangeDeclare(EXCHANGE_NAME, "fanout");String queueName="xiaowang";channel.queueDeclare(queueName,true,false,false,null);channel.queueBind(queueName, EXCHANGE_NAME, "");channel2.exchangeDeclare(EXCHANGE_NAME, "fanout");String queueName2="xiaoli";channel2.queueDeclare(queueName2,true,false,false,null);channel2.queueBind(queueName2,EXCHANGE_NAME,"");System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [小王] Received '" + message + "'");};DeliverCallback deliverCallback2 = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [小李] Received '" + message + "'");};channel.basicConsume(queueName, true, deliverCallback1, consumerTag -> { });channel.basicConsume(queueName2, true, deliverCallback2, consumerTag -> { });}}

运行结果:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/172304.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

生成树协议:监控 STP 端口和交换机

什么是生成树协议 生成树协议 &#xff08;STP&#xff09; 用于网络交换机&#xff0c;以防止循环和广播风暴。在局域网 &#xff08;LAN&#xff09; 中&#xff0c;两条或多条冗余路径可以连接到同一网段。当交换机或网桥从所有可用端口传输帧时&#xff0c;这些帧开始在网…

1818_ChibiOS的计数信号量

全部学习汇总&#xff1a; GreyZhang/g_ChibiOS: I found a new RTOS called ChibiOS and it seems interesting! (github.com) 之前见过计数信号量&#xff0c;也是在FreeRTOS中看到的。也看到过这样的功能在驱动设计中的应用&#xff0c;但是当时没有理解这个使用的方式。 1.…

【STM32】标准库的引入

一、为什么要会有标志外设库 1、传统单片机软件开发方式 (1)芯片厂商提供数据手册、示例代码、开发环境 (2)单片机软件工程师面向产品功能&#xff0c;查阅数据手册&#xff0c;参考官方示例代码进行开发 (3)硬件操作的方式是用C语言对寄存器进行读写以操作硬件 (4)主要工作量…

指针仪表读数YOLOV8NANO

指针仪表读数YOLOV8 NANO 采用YOLOV8 NANO训练&#xff0c;标记&#xff0c;然后判断角度&#xff0c;得出角度&#xff0c;可以通过角度&#xff0c;换算成数据

Table-GPT:让大语言模型理解表格数据

llm对文本指令非常有用&#xff0c;但是如果我们尝试向模型提供某种文本格式的表格数据和该表格上的问题&#xff0c;LLM更有可能产生不准确的响应。 在这篇文章中&#xff0c;我们将介绍微软发表的一篇研究论文&#xff0c;“Table-GPT: Table- tuning GPT for Diverse Table…

Linux系统之watch命令的基本使用

Linux系统之watch命令的基本使用 一、watch命令介绍二、watch命令的使用帮助2.1 watch命令的help帮助2.2 watch命令的语法解释 三、watch命令的基本使用3.1 使用默认的2秒时间间隔执行ls命令3.2 每隔10秒执行一次ps命令3.3 每隔1秒输出一次磁盘使用情况3.4 高亮显示grep命令的输…

Springboot 使用JavaMailSender发送邮件 + Excel附件

目录 1.生成Excel表格 1.依赖设置 2.代码&#xff1a; 2.邮件发送 1.邮件发送功能实现-带附件 2.踩过的坑 1.附件名中文乱码问题 3.参考文章&#xff1a; 需求描述&#xff1a;项目审批完毕后&#xff0c;需要发送邮件通知相关人员&#xff0c;并且要附带数据库表生成的…

京东平台数据分析:2023年9月京东空气净化器行业品牌销售排行榜

鲸参谋监测的京东平台9月份空气净化器市场销售数据已出炉&#xff01; 9月份&#xff0c;空气净化器的销售同比上年增长。根据鲸参谋平台的数据显示&#xff0c;今年9月&#xff0c;京东平台空气净化器的销量将近15万&#xff0c;同比增长约1%&#xff1b;销售额将近2亿元&…

C++多态(超级详细版)

目录 一、什么是多态 二、多态的定义及实现 1.多态构成条件 2.虚函数的重写和协变 虚函数重写的两个例外&#xff1a; 2.1协变 2.2析构函数的重写 &#xff08;析构函数名统一处理成destructor&#xff09; 3.重载、覆盖(重写)、隐藏(重定义)的对比 4.final 和 overr…

【计算机网络笔记】DNS报文格式

DNS 提供域名到主机IP地址的映射  域名服务的三大要素&#xff1a;  域&#xff08;Domain&#xff09;和域名(Domain name)&#xff1a; 域指由地 理位置或业务类型而联系在一起的一组计算机构 成。  主机&#xff1a;由域名来标识。域名是由字符和&#xff08;或&a…

如何在linux服务器上安装Anaconda与pytorch,以及pytorch卸载

如何在linux服务器上安装Anaconda与pytorch&#xff0c;以及pytorch卸载 1&#xff0c;安装anaconda1.1 下载anaconda安装包1.2 安装anaconda1.3 设计环境变量1.4 安装完成验证 2 Anaconda安装pytorch2.1 创建虚拟环境2.2 查看现存环境2.3 激活环境2.4 选择合适的pytorch版本下…

Python:实现日历到excel文档

背景 日历是一种常见的工具,用于记录事件和显示日期。在编程中,可以使用Python编码来制作日历。 Python提供了一些内置的模块和函数,使得制作日历变得更加简单。 在本文,我们将探讨如何使用Python制作日历,并将日历输出到excel文档中。 效果展示 实现 在代码中会用到cale…

FFmpeg5.1.3编译动态库踩坑之旅(基于Linux虚拟机)

准备工作 环境准备 1.Windows安装Oracle VM VirtualBox 7.0.10&#xff0c;安装ubuntu-22.04.3。 坑一&#xff1a;无法往虚拟机里拖放复制文件&#xff0c;解决办法&#xff1a;登录Ubuntu虚拟机时切换到xorg方式登录&#xff0c;参考地址&#xff1a;Ubuntu Desktop 22.04…

软考系统架构之案例篇(架构设计相关概念)

案例篇-架构设计相关概念 1. 架构风格的概念2. 五大架构风格有哪些3. MVC架构含义4. 云计算架构5. 云原生架构设计原则6. ESB的主要功能包括7. 质量属性的含义及其设计策略8. EJB中的 Bean 分三种类型9. 风险点、敏感点、权衡点的含义10. REST 的5个原则 其它相关推荐&#xff…

Generative AI 新世界 | Falcon 40B 开源大模型的部署方式分析

在上期文章&#xff0c;我们探讨了如何在自定义数据集上来微调&#xff08;fine-tuned&#xff09;模型。本期文章&#xff0c;我们将重新回到文本生成的大模型部署场景&#xff0c;探讨如何在 Amazon SageMaker 上部署具有 400 亿参数的 Falcon 40B 开源大模型。 亚马逊云科技…

ICLR 2023丨3DSQA:3D 场景中的情景问答

来源&#xff1a;投稿 作者&#xff1a;橡皮 编辑&#xff1a;学姐 论文链接&#xff1a;https://arxiv.org/pdf/2210.07474.pdf 主页链接&#xff1a;http://sqa3d.github.io 图 1&#xff1a;3D 场景中情景问答 (SQA3D) 的任务图示。给定场景上下文 S&#xff08;例如&#…

并发编程 - 并发可见性,原子性,有序性 与 JMM内存模型

1. 并发三大特性 并发编程Bug的源头&#xff1a; 原子性 、 可见性 和 有序性 问题 1.1 原子性 一个或多个操作&#xff0c;要么全部执行且在执行过程中不被任何因素打断&#xff0c;要么全部不执行。 在 Java 中&#xff0c;对基本数据类型的变量的读取和赋值操作是原子性操…

常用字符串函数拓展

文章目录 字符串拓展函数strncpystrncatstrncmpstrstrstrtokstrerrormemcpymemmovememcmpmemset 库函数模拟实现memmoveqsort 我们在学习C语言时已经学习了一些常见的字符串函数&#xff0c;但这还不能满足我们的需求&#xff0c;为此我们拓展了几个常用的字符串函数。 字符串拓…

Maven项目转为SpringBoot项目

Maven项目转为SpringBoot项目 前言创建一个maven项目前的软件的一些通用设置Maven仓库的设置其他的设置字符编码编译器注解支持 创建的Maven项目修改为Spring Boot项目修改pom.xml文件修改启动类-Main新建WAR包所需的类 添加核心配置文件 测试的控制器最后整个项目的目录结构![…