RabbitMq使用与整合

MQ基本概念

MQ概述

MQ全称 Message Queue([kjuː])(消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信。

(队列是一种容器,用于存放数据的都是容器,存放消息的就是消息队列)

分布式系统的调用:

方式一:直接调用

方式二:间接调用

A将数据存放到中间一个系统,通过中间的系统发送到B

中间系统可以成为中间件MQ

MQ是用于存放消息的中间件

被调用者叫生产者 调用者是消费者

MQ的优势和劣势

优势

应用解耦:提高系统容错性和可维护性。

异步提速:提升用户体验和系统吞吐量。

削峰填谷:提高系统稳定性。

应用解耦

系统的耦合性越高,容错性就越低,可维护性就越低。

使用 MQ 使得应用间解耦,提升容错性和可维护性

异步提速

提升用户体验和系统吞吐量(单位时间内处理请求的数目)。

削峰填谷(削峰)

使用了 MQ 之后,限制消费消息的速度为1000,这样一来,高峰期产生的数据势必会被积压在 MQ 中,高峰就被“削”掉了,但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000,直到消费完积压的消息,这就叫做“填谷”。

使用MQ后,可以提高系统稳定性。

劣势

系统可用性降低

系统引入的外部依赖越多,系统稳定性越差。一旦 MQ 宕机,就会对业务造成影响。如何保证MQ的高可用?

系统复杂度提高

MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ 进行异步调用。如何保证消息没有被重复消费?怎么处理消息丢失情况?那么保证消息传递的顺序性?

一致性问题

A 系统处理完业务,通过 MQ 给B、C、D三个系统发消息数据,如果 B 系统、C 系统处理成功,D 系统处理失败。如何保证消息数据处理的一致性?

既然 MQ 有优势也有劣势,那么使用 MQ 需要满足什么条件呢?

消费者-->生产者

1.生产者不需要从消费者处获得反馈。引入消息队列之前的直接调用,其接口的返回值应该为空,这才让明明下层的动作还没做,上层却当成动作做完了继续往后走,即所谓异步成为了可能。

2.容许短暂的不一致性。

3.确实是用了有效果。即解耦、提速、削峰这些方面的收益,超过加入MQ,管理MQ这些成本。

RabbitMQ基本介绍

2007年,Rabbit 技术公司基于 AMQP 标准开发的 RabbitMQ 1.0 发布。RabbitMQ 采用 Erlang 语言开发。Erlang语言由Ericson设计,专门为开发高并发和分布式系统的一种语言,在电信领域使用广泛。

RabbitMQ 基础架构

Broker 中间者 服务

procedure 和consumer都是客户端

客户端通过链接和服务端进行通信 所以需要建立起来连接 然后进行通信a

使用channel(管道)节省资源

一个rabbitmq里面有很多的虚拟机 相当于mysql里面有很多数据库,数据库里面有很多表,都是独立的。

每个虚拟机里面有很多的exchange和queue 独立分区的作用

 RabbitMQ 中的相关概念

Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker。

Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等。

Connection:publisher/consumer 和 broker 之间的 TCP 连接。

Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销。

Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。常用的类型有:

        direct (point-to-point)

        topic (publish-subscribe)

        fanout (multicast)

Queue:消息最终被送到这里等待 consumer 取走

Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据

RabbitMQ的6 种工作模式

RabbitMQ 提供了 6 种工作模式:

简单模式、work queues、Publish/Subscribe 发布与订阅模式、Routing 路由模式、Topics 主题模式、RPC 远程调用模式(远程调用,不太算 MQ;暂不作介绍)。

官网对应模式介绍:RabbitMQ Tutorials — RabbitMQ

RabbitMQ的安装和配置

安装依赖环境

在线安装依赖环境:

yum -y install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz

安装Erlang

rpm -ivh erlang-22.0.7-1.el7.x86_64.rpm

安装RabbitMQ

#安装依赖的包

rpm -ivh socat-1.7.3.2-2.el7.x86_64.rpm

#安装rabbitmq

rpm -ivh rabbitmq-server-3.7.18-1.el7.noarch.rpm

rpm -ivh erlang-22.0.7-1.el7.x86_64.rpm socat-1.7.3.2-2.el7.x86_64.rpm rabbitmq-server-3.7.18-1.el7.noarch.rpm

启动RabbitMQ

systemctl start rabbitmq-server # 启动服务

systemctl stop rabbitmq-server # 停止服务

systemctl restart rabbitmq-server # 重启服务

systemctl status rabbitmq-server # 查看状态

开启管理界面及配置

# 开启管理界面

rabbitmq-plugins enable rabbitmq_management

# 修改默认配置信息

vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.7.18/ebin/rabbit.app

# 比如修改密码、配置等等,例如:loopback_users 中的 <<"guest">>,只保留guest

修改之后重启一下rabbitmq

入门实例

1.添加虚拟主机

2.添加用户

3.重新设置权限

点击虚拟主机设置权限

4.idea连接

项目结构搭建

mq 导入依赖

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.6.0</version>
</dependency>

生产者

public static void main( String[] args ) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setUsername("root");connectionFactory.setPassword("root");connectionFactory.setHost("192.168.229.16");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/test");
//        建立连接Connection connection = connectionFactory.newConnection();
//        管道Channel channel = connection.createChannel();
//        创建队列/** String queue 队列的名称* boolean durable 持久化* boolean exclusive 是否独占* boolean autoDelete 是否自动删除* Map<String,Object> arguments 参数* */channel.queueDeclare("test01",false,false,false,null);
//        发布消息channel.basicPublish("","test01",null,"第一次发送".getBytes());}

消费者

{public static void main( String[] args ) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setUsername("root");connectionFactory.setPassword("root");connectionFactory.setHost("192.168.229.16");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/test");
//        建立连接Connection connection = connectionFactory.newConnection();
//        管道Channel channel = connection.createChannel();
//        消费信息Consumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body);System.out.println(s);}};channel.basicConsume("test01",true,consumer);}

同时消费后消息组内为 0

RabbitMQ工作模式

Work queues工作队列模式

Work Queues与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。

应用场景对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。

代码

Work Queues与入门程序的简单模式的代码是几乎一样的;可以完全复制,并复制多一个消费者进行多个消费者同时消费消息的测试。

1.复制一个消费者

2.先运行起两个消费者

3.在生产者中多发布几条消息

for (int i = 0; i < 10; i++) {channel.basicPublish("","test01",null,("第"+i+"次发送").getBytes());
}

4.两个消费者会采用轮询的方式拿到消息

在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系

订阅模式类型

而在订阅模型中,多了一个exchange角色,而且过程略有变化:

生产者发消息给交换机,交换机将消息路由分发给队列,消费者监听队列接收信息

Exchange:交换机。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:

Fanout:广播,将消息交给所有绑定到交换机的队列

Direct:定向,把消息交给符合指定routing key 的队列

Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

代码实现

生产者更改代码逻辑为:

//        创建交换机channel.exchangeDeclare("Exchange", BuiltinExchangeType.FANOUT,false);
//        创建队列channel.queueDeclare("test01Ex",false,false,false,null);channel.queueDeclare("test02Ex",false,false,false,null);
//        队列绑定交换机channel.queueBind("test01Ex","Exchange","");channel.queueBind("test02Ex","Exchange","");
//        发布消息for (int i = 0; i < 10; i++) {channel.basicPublish("Exchange","",null,("第"+i+"次发送").getBytes());}

消费者分别从两个队列获取消息

Routing路由模式(Direct:定向)

队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)

消息的发送方在 向Exchange发送消息时,也必须指定消息的RoutingKey

Exchange不再把消息交给每一个绑定的队列,而是根据消息的RoutingKey进行判断,只有队列的RoutingKey与消息的 RoutingKey完全一致,才会接收到消息

 channel.queueBind(queuename,exchangename,"error"); // errorchannel.queueBind(queuename2,exchangename,"error");// error info channel.queueBind(queuename2,exchangename,"info");// 发送消息//String exchange, String routingKey, BasicProperties props, byte[] bodychannel.basicPublish(exchangename,"info",null,"hello".getBytes());

Topics通配符模式

Topic与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定RoutingKey的时候使用通配符

RoutingKey一般都是有一个或多个单词组成,多个单词之间以”.”分割

#:匹配一个或多个词 

*:匹配不多不少恰好1个词   test.* test.insert

channel.queueBind(queuename,exchangename,"order.*");channel.queueBind(queuename,exchangename,"*.error");channel.queueBind(queuename2,exchangename,"*.*");//发送消息//String exchange, String routingKey, BasicProperties props, byte[] bodychannel.basicPublish(exchangename,"goods.info",null,"hello".getBytes());

模式总结

RabbitMQ工作模式:

简单模式 HelloWorld

一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)

工作队列模式 Work Queue

一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)

发布订阅模式 Publish/subscribe

需要设置类型为fanout的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列

路由模式 Routing

需要设置类型为direct的交换机,交换机和队列进行绑定,并且指定routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列

通配符模式 Topic

需要设置类型为topic的交换机,交换机和队列进行绑定,并且指定通配符方式的routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列

SpringBoot整合Mq

在Spring项目中,可以使用Spring-Rabbit去操作RabbitMQ

尤其是在spring boot项目中只需要引入对应的amqp启动器依赖即可,方便的使用RabbitTemplate发送消息,使用注解接收消息。

创建工程结构

添加依赖(sys-mq

<parent><groupId>com.example</groupId><artifactId>mqdemo02</artifactId><version>0.0.1-SNAPSHOT</version></parent><artifactId>sys-mq</artifactId><packaging>pom</packaging><name>sys-mq</name><url>http://maven.apache.org</url><modules><module>mq-product</module><module>mq-consumer</module></modules><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>3.8.1</version><scope>test</scope></dependency>
<!--        rabbitmq--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency></dependencies>

sys-mq里面两个子模块的 application.yml

server

     port: 记得修改

spring:rabbitmq:username: rootpassword: roothost: 192.168.229.16virtual-host: /testport: 5672
server:port: 8086

生产者

@Configuration
public class RabbitMqConfig {
//    设置交换机的名称和队列的名字public static final String EXCHANGE_NAME="exchange_topic-test";public static final String QUEUE_NAME="queue_topic-test";public static final String QUEUE_NAME2="queue_topic-test2";
//    创建交换机 将交换机作为bean注入到spring中@Bean("topicExchange")public Exchange topicExchange(){return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(false).build();}
//    队列@Bean("topicQueue")public Queue topicQueue(){return QueueBuilder.durable(QUEUE_NAME).build();}@Bean("topicQueue2")public Queue topicQueue2(){return QueueBuilder.durable(QUEUE_NAME2).build();}
//    将队列与交换机进行绑定@Beanpublic Binding exchangeWithQueue(@Qualifier("topicExchange") Exchange exchange,@Qualifier("topicQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("*.test").noargs();}@Beanpublic Binding exchangeWithQueue2(@Qualifier("topicExchange") Exchange exchange,@Qualifier("topicQueue2") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("test.#").noargs();}
}
@SpringBootTest
public class MyTest {public static final String EXCHANGE_NAME="exchange_topic-test";@Resourceprivate RabbitTemplate rabbitTemplate;@Testvoid sendMsg() {rabbitTemplate.convertAndSend(EXCHANGE_NAME,"success.test","测试整合");}
}

消费者

@Component
public class MyListener {
//    监听队列的消息@RabbitListener(queues = "queue_topic-test")public void listenQueue(Message message) {byte[] body = message.getBody();System.out.println(new String(body));}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/207457.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【3D程序软件】SideFX与上海道宁一直为设计师提供程序化 3D动画和视觉效果工具,旨在创造高质量的电影效果

Houdini是一个 从头开始构建的程序系统 使艺术家能够自由工作 创建多次迭代 并与同事快速共享工作流程 Houdini FX为 视觉特效艺术家创作故事片 广告或视频游戏 凭借其基于程序节点的工作流程 Houdini FX可让 您更快地创建更多内容 从而缩短时间并 在所有创意任务中…

SpringBoot——自定义start

优质博文&#xff1a;IT-BLOG-CN 一、Mybatis 实现 start 的原理 首先在写一个自定义的start之前&#xff0c;我们先参考下Mybatis是如何整合SpringBoot&#xff1a;mybatis-spring-boot-autoconfigure依赖包&#xff1a; <dependency><groupId>org.mybatis.spr…

中国移动联合中国华电完成基于ZETA物联网技术的风电机组主辅智能控制系统试点应用

2023年11月17日&#xff0c;中国移动联合中国华电研发的“基于ZETA物联网技术的风电机组主辅智能控制系统与风电机组叶片巡检系统”在甘肃省酒泉华电黑崖子风电场成功投运。中移物联网有限公司相关人员主导参与了本次试点。 ZETA技术是一种基于UNB的低功耗广域网&#xff08;LP…

JVM的小知识总结

加载时jvm做了这三件事&#xff1a; 1&#xff09;通过一个类的全限定名来获取该类的二进制字节流 什么是全限定类名&#xff1f; 就是类名全称&#xff0c;带包路径的用点隔开&#xff0c;例如: java.lang.String。 即全限定名 包名类型 非限定类名也叫短名&#xff0c;就…

近期知识点随笔

菜单查询&#xff08;编写权限时的细节&#xff09; 菜单查询list为了侧边框展示更完整&#xff08;不报空指针&#xff09; 登录时&#xff08;用户名&#xff09;查询出多个结果&#xff08;保证用户名唯一&#xff09; 文件上传 前端 对权限与菜单绑定的修改&#xff08;实…

【数据结构】树的概念以及二叉树

目录 1 树概念及结构 1.1 树的概念 1.3 树的存储 2 二叉树的概念及结构 2.1 概念 2.2 特殊的二叉树 2.3 二叉树的性质 2.4 二叉树的存储结构 1 树概念及结构 1.1 树的概念 树是一种非线性的数据结构&#xff0c;它是由n&#xff08;n>0&#xff09;个有限结点组…

04 # 第一个 TypeScript 程序

初始化项目以及安装依赖 新建 ts_in_action 文件夾 npm init -y安装好 typescript&#xff0c;就可以执行下面命令查看帮助信息 npm i typescript -g tsc -h创建配置文件&#xff0c;执行下面命令就会生成一个 tsconfig.json 文件 tsc --init使用 tsc 编译一个 js 文件 新…

解决:AttributeError: ‘NoneType’ object has no attribute ‘shape’

解决&#xff1a;AttributeError: ‘NoneType’ object has no attribute ‘shape’ 文章目录 解决&#xff1a;AttributeError: NoneType object has no attribute shape背景报错问题报错翻译报错位置代码报错原因解决方法今天的分享就到此结束了 背景 在使用之前的代码时&…

Vue3集成ThreeJS实现3D效果,threejs+Vite+Vue3+TypeScript 实战课程【一篇文章精通系列】

Vue3集成ThreeJS实现3D效果&#xff0c;threejsViteVue3TypeScript 实战课程【一篇文章精通系列】 项目简介一、项目初始化1、添加一些依赖项 二、创建3D【基础搭建】1、绘制板子&#xff0c;立方体&#xff0c;球体2、材质和光照3、材质和光照和动画4、性能监控5、交互控制6、…

pathlib --- 面向对象的文件系统路径

目录 基础使用 纯路径 通用性质 运算符 访问个别部分 方法和特征属性 具体路径 方法 对应的 os 模块的工具 3.4 新版功能. 源代码 Lib/pathlib.py 该模块提供表示文件系统路径的类&#xff0c;其语义适用于不同的操作系统。路径类被分为提供纯计算操作而没有 I/O 的 …

在Spring Boot中隔离@Async异步任务的线程池

在异步任务执行的时候&#xff0c;我们知道其背后都有一个线程池来执行任务&#xff0c;但是为了控制异步任务的并发不影响到应用的正常运作&#xff0c;我们需要对线程池做好相关的配置&#xff0c;以防资源过度使用。这个时候我们就考虑将线程池进行隔离了。 那么我们为啥要…

FIORI /N/UI2/FLP 始终在IE浏览器中打开 无法在缺省浏览器中打开

在使用/N/UI2/FLP 打开fiori 启动面板的时候&#xff0c;总是会在IE浏览器中打开&#xff0c;无法在缺省浏览器打开 并且URL中包含myssocntl 无法正常打开 启动面板 这种情况可以取消激活ICF节点/sap/public/myssocntl

SpringBoot项目打成jar包后,上传的静态资源(图片等)如何存储和访问

1.问题描述&#xff1a; 使用springboot开发一个项目&#xff0c;开发文件上传的时候&#xff0c;通常会将上传的文件存储到资源目录下的static里面&#xff0c;然后在本地测试上传文件功能没有问题&#xff0c;但是将项目打成jar包放到服务器上运行的时候就会报错&#xff0c…

IDC MarketScape2023年分布式数据库报告:OceanBase位列“领导者”类别,产品能力突出

12 月 1 日&#xff0c;全球领先的IT市场研究和咨询公司 IDC 发布《IDC MarketScape:中国分布式关系型数据库2023年厂商评估》&#xff08;Document number:# CHC50734323&#xff09;。报告认为&#xff0c;头部厂商的优势正在扩大&#xff0c;OceanBase 位列“领导者”类别。…

STM32 定时器TIM

单片机学习 目录 文章目录 前言 一、TIM简介 二、STM32的三种定时器 2.1基本定时器 2.1.1定时中断功能 1. 时钟源 2. 预分频器 3. 计数器 4. 自动重装寄存器 5.更新中断和更新事件 2.1.2主模式触发DAC功能 2.2 计数模式 2.2通用定时器 2.2.1 时钟源 外部时钟模式2 外部时钟模式…

Java中的异常你了解多少?

目录 一.认识异常二.异常分类三.异常的分类1.编译时异常2.运行时异常 四.异常的处理1.LYBL&#xff1a;事前防御型2.EAFP&#xff1a;事后认错型 五.异常的抛出Throw注意事项 六.异常的捕获1.异常的捕获2.异常声明throws3.try-catch捕获并处理 七.自定义异常 一.认识异常 在Jav…

一文带你了解网络安全简史

网络安全简史 1. 上古时代1.1 计算机病毒的理论原型1.2 早期计算机病毒1.3 主要特点 2. 黑客时代2.1 计算机病毒的大流行2.2 知名计算机病毒2.3 主要特点 3. 黑产时代3.1 网络威胁持续升级3.2 代表性事件3.3 主要特点 4 高级威胁时代4.1 高级威胁时代到来4.2 著名的APT组织4.3 …

基于A*的网格地图最短路径问题求解

基于A*的网格地图最短路径问题求解 一、A*算法介绍、原理及步骤二、Dijkstra算法和A*的区别三、A*算法应用场景四、启发函数五、距离六、基于A*的网格地图最短路径问题求解实例分析完整代码 七、A*算法的改进思路 一、A*算法介绍、原理及步骤 A*搜索算法&#xff08;A star al…

Echarts大屏可视化_03 定制柱状图

柱状图模块引入 1.找到合适的图表 在echarts中寻找与目标样式相近的图表 Examples - Apache ECharts 2. 引入柱状图 使用立即执行函数构建&#xff0c;防止变量全局污染 实例化对象 将官网中提供的option复制到代码中&#xff0c;并且构建图表 // 柱状图模块1 (function () {/…

WEB渗透—反序列化(十)

Web渗透—反序列化 课程学习分享&#xff08;课程非本人制作&#xff0c;仅提供学习分享&#xff09; 靶场下载地址&#xff1a;GitHub - mcc0624/php_ser_Class: php反序列化靶场课程&#xff0c;基于课程制作的靶场 课程地址&#xff1a;PHP反序列化漏洞学习_哔哩哔_…