使用RabbitMQ

一、MQ是什么

MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信,主要功能业务解耦。

二、市面上常见的MQ产品

RabbitMQ、RocketMQ(阿里的)、Kafka 、 ActiveMQ(很少用了)

三、为什么要用MQ

3.1、异步处理

场景说明:用户注册后,需要发注册邮件和注册短信,传统的做法有两种 1.串行的方式 2.并行的方式

串行化方式:将用户信息注册到数据库以后再发送邮件然后再发送短信,这三步完成以后才返回给客户端。但是邮件短信这种东西不是必须立马发送给用户的,这样就会导致我们使用串行化会很慢

并行化方式:就是用户信息注册到数据库以后,发送邮件的同时发送信息,虽然比串行化快一点,但是依旧是要等待发送完邮箱和短信才能返回给客户端,依旧不够快

使用消息队列:

使用消息队列,用户只管发送请求,而写入数据库到写入消息队列这段时间交给MQ来处理,起两个消费者关注消息队列去消费消息;这样我们发现用户的响应时间就等于写入数据库的时间+写入消息队列的时间(可以忽略不计),而消费消息队列是异步处理的

3.2、应用解耦

场景说明:双11是购物狂节,用户下单后,订单系统需要通知库存系统,传统的做法就是订单系统调用库存系统的接口.

订单系统只负责写入消息,库存系统只负责消费消息,大大提高了效率并且也将我们的订单模块和库存模块解耦了,哪怕库存系统宕机了也不会影响到订单系统下单,只需要修复库存系统重新去消费消息队列的消息即可;

3.3、流量削峰

场景说明: 秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。

1.可以控制活动人数,超过此一定阀值的订单直接丢弃.

2.可以缓解短时间的高流量压垮应用(应用程序按自己的最大处理能力获取订单)

四、交换机类型

1、Fanout Exchange(广播交换机):

扇型(广播)交换机,这个交换机没有路由键概念,就算你绑了路由键也是无视的。 这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列。

2、Direct Exchange(直连交换机):

直连型交换机,根据RoutingKey(路由键)路由到不同的队列

3、Topic Exchange (主题交换机):

主题交换机,这个交换机其实跟直连交换机流程差不多,但是它的特点就是在它的路由键和绑定键之间是有规则的。(开始计算)

简单地介绍下规则:

* (星号) 用来表示一个单词 (必须出现的)

# (井号) 用来表示任意数量(零个或多个)单词

通配的绑定键是跟队列进行绑定的,举个小例子

队列Q1 绑定键为 *.TT.* 队列Q2绑定键为 TT.#

如果一条消息携带的路由键为 A.TT.B,那么队列Q1将会收到;

如果一条消息携带的路由键为TT.AA.BB,那么队列Q2将会收到;

当一个队列的绑定键为 "#"(井号) 的时候,这个队列将会无视消息的路由键,接收所有的消息。

当 * (星号) 和 # (井号) 这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有的直连交换机的行为。

如果只有 # ,它就实现了扇形交换机的功能。

所以主题交换机也就实现了扇形交换机的功能,和直连交换机的功能

五、springboot整合RabbitMQ

1、使用前先引入依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency>

2、配置RabbitMQ连接

在application.properties文件里配置

3、使用直连交换机(其他两个使用方式一样,无非是路由键是否使用以及路由键的配置规则*,#等)

消费者:

@Configuration
public class DirectConsumer {//注册一个队列@Bean  //启动多次为什么不报错?启动的时候,它会根据这个名称Direct_Q01先去查找有没有这个队列,如果有什么都不做,如果没有创建一个新的public Queue queue(){return   QueueBuilder.durable("Direct_Q01").maxLength(100).build();}//注册交换机@Beanpublic DirectExchange exchange(){//1.启动的时候,它会根据这个名称Direct_E01先去查找有没有这个交换机,如果有什么都不做,如果没有创建一个新的return  ExchangeBuilder.directExchange("Direct_E01").build();}//绑定交换机与队列关系@Beanpublic Binding binding(Queue queue,DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with("RK01");}//启动一个消费者@RabbitListener(queues = "Direct_Q01")public void receiveMessage(String msg){System.out.println("收到消息:"+msg);}}

注意,如果只设置了一个队列或者交换机,那么在rabbitmq的网页中是看不到的,因为需要有绑定关系,只有建立的绑定关系才能够看到相应的队列和交换机

生产者:

@Service
public class DirectProvider {@Autowiredprivate RabbitTemplate rabbitTemplate;public void send(Object message) {rabbitTemplate.convertAndSend("Direct_E01", "RK01", message);}
}

当然我们在实际开发中不可能只传递字符串,也有可能传递对象,这时候需要定义一个消息转换器

package com.by.config;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMqConfig {@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}
}

这个Bean 是一个 MessageConverter 类型的对象,这个Bean 将被自动注入到任何需要 MessageConverter 的地方,以确保所有消息处理都使用这个转换器;他的作用定义了一个消息转换器

messageConverter Bean 的作用:消息序列化、消息反序列化、确保一致性(所有的消息都遵循统一的json格式)

六、交换机、队列、消费者之间的关系

一个交换机对应多个队列,每个队列对应一个消费者的时候:

一个队列对应多个消费者的时候:

七、死信交换机

什么是死信

死信就是消息在特定场景下的一种表现形式,这些场景包括:

当消息队列满的时候,我们的默认策略是丢弃旧消息到死信队列,让新的消息插入进来

1. 消息被拒绝访问,即消费者返回 basicNack 的信号时 或者拒绝basicReject

2. 消费者发生异常,超过重试次数 。 (其实spring框架调用的就是 basicNack

3. 消息的Expiration 过期时长或队列TTL过期时间。.ttl(20*1000) 进入的是 先进业务队列的数据,超时之后送给死信交换机

4. 消息队列达到最大容量 .maxLength(5)

什么是死信队列

存储死信消息的队列

当有消息变成死信了,那么这个消息就会重新被死信交换机路由到指定的死信队列中去,我们可以通过对这个死信队列进行监听,从而手动的去对这一消息进行补偿。 人工干预

死信队列的使用

他的使用方式和正常的队列一样,需要注意的是需要把创建的死信队列和死信交换机绑定到正常的业务队列上

@Slf4j
@Configuration
public class DeadConsumer {@Bean//死信队列public Queue deadQueue() {return QueueBuilder.durable("dead-q").build();}@Beanpublic Exchange deadExchange() {return ExchangeBuilder.fanoutExchange("dead-e").autoDelete().build();}@Beanpublic Binding deadBinding() {return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("").noargs();}@Beanpublic Queue fanoutQueue() {return QueueBuilder.durable("fanout-q").maxLength(10)//队列容积最大为10,超过的消息将发送到死信交换机.deadLetterExchange("dead-e").ttl(5000)//五秒没消费直接发送到死信交换机.build();}@Beanpublic Exchange fanoutExchange() {return ExchangeBuilder.fanoutExchange("fanout-e").durable(true).build();}@Beanpublic Binding fanoutBinding() {return BindingBuilder.bind(fanoutQueue()).to(fanoutExchange()).with("").noargs();}@RabbitListener(queues = "fanout-q")public void consume(Ordering ordering){log.debug("消费者->{}", JSONUtil.toJsonStr(ordering));
//消费者发生异常,不消费消息直接进入死信队列int i = 5/0;}}

八、应答模式

RabbitMQ 中的消息应答模式主要包括两种:自动应答(Automatic Acknowledgement)和手动应答(Manual Acknowledgement)。

自动应答:在这种模式下,一旦消息被传递给消费者,RabbitMQ 就认为这条消息已经被成功消费,并立即从队列中移除。这种方式简化了编程模型,因为它不需要消费者显式地发送确认。

手动应答:在这种模式下,消息不会被自动确认,而是等待消费者显式地发送一个确认信号。只有当消费者明确表示已经成功处理了消息之后,RabbitMQ 才会将这条消息从队列中移除。

三种应答方法:

方法

作用

Channel.basickAck

用于肯定确认

Channel.basickNack

用于否定确认(可以处理单个消息或多个消息)

Channel.basickReject

用于否定确认(只能处理单个消息)

自动应答实现:

# 自动应答模式
spring.rabbitmq.listener.simple.acknowledge-mode = auto
@Component
public class AutoAcknowledgementConsumer {@RabbitListener(queues = "yourQueue")public void consumeMessage(String message, Message amqpMessage) {// 处理消息...// 框架会在方法执行完成后自动发送ack确认消息}
}

手动应答实现:

# 手动应答模式
spring.rabbitmq.listener.simple.acknowledge-mode = manual

九、如何保证消息可靠性

从两方面考虑

1、消费者端

①消息确认:消费者在收到消息后默认情况下rabbitmq会自动应答(autoAck=true),为了保证消息可靠性可设置为手动应答,使得消费者在处理完消息后手动发送确认(basicAck),如果消费者在处理消息时发生异常,那么消息会被重新投递给其他消费者;缺点就是代码多,容易出现死循环

②死信队列:如果消息不能被正常消费,就将消息放入死信队列,由人工干预,后续来分析和处理

2、生产者端

①消息持久化:

当生产者发布消息时,可以选择将其标记为持久化(persistent).这意味着即使 RabbitMQ 服务器重启,消息也不会丢失,因为它们会被存储在磁盘上

②消息确认机制:

确认机制分为发布者确认机制和发布者退回机制

发布者确认机制:Publisher Confirm机制允许RabbitMQ服务器通知生产者一个消息是否已经被交换机正确接收。当publisher-confirm-type设置为CORRELATED时,RabbitMQ会向生产者发送确认或否定响应,确认消息已到达交换机,但不保证消息已被路由到至少一个队列中。

总结来说是生产者到交换机的确认

spring.rabbitmq.publisher-confirm-type = CORRELATED
//开启方式,在properties文件中配置

代码示例:

public class DirectProvider {
//在生产者端设置确认机制@PostConstructpublic void init() {//发布者确认机制的回调,肯定·要在发布之前设置回调,所以放在send方法上面//如果消息到达交换机就调用这个方法,如果没有到达也发送回调//这里只是注册一个回调方法rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack){log.debug("直连发送成功");}else {log.debug("直连发送失败");}});}@AutowiredRabbitTemplate rabbitTemplate;public void send(Ordering ordering) {CorrelationData correlData = new CorrelationData(ordering.getId()+"");rabbitTemplate.convertAndSend("direct", "key2", ordering, correlData);}
}

发布者退回:

Publisher Return机制用于当消息无法按照路由键规则路由到任何队列时,或者由于其他原因(例如队列满、消息过大等)而被交换机拒绝时,RabbitMQ将消息返回给生产者。

总结来说就是交换机到队列的消息确认(消息只要到达任何一个队列就算成功,并且只有消息没有路由到队列才会触发这个回调)

开启方式:

spring.rabbitmq.publisher-returns = true

代码示例:

public class DirectProvider {@PostConstructpublic void init() {//消息回退机制,监听消息是否到达队列rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {System.out.println("消息回退"+new String(message.getBody()));System.out.println("消息回退码"+replyCode);System.out.println("消息回退原因"+replyText);System.out.println("消息回退交换机"+exchange);System.out.println("消息回退路由键"+routingKey);});}@AutowiredRabbitTemplate rabbitTemplate;public void send(Ordering ordering) {CorrelationData correlData = new CorrelationData(ordering.getId()+"");rabbitTemplate.convertAndSend("direct", "key2", ordering, correlData);}
}

十、如何保证消息幂等性

在消费端做手脚,消费者消费的时候避免重复消费,加上一个BizId

BizId由业务标识加上数字组成,这样我们可以很清楚的知道是什么业务生成的BizId。

十一、Canal的使用

Canal将自己伪装成与数据库绑定的从库,每次数据更新的时候主数据库会把binlog日志发送给从库和canal,这样canal可以根据binlog日志知道对数据库那些数据进行了增删改操作。

Canal的原理:

当我们数据库发生消息变更的时候,直接通过canal去删除Redis缓存中的键,这样可以保证每次我们去查询Redis拿到的都是最新的数据。

十二、百万条消息堆积怎么办

1、增加消费者,提高消费速度

2、消费者内部开启线程池(多线程),加快消息处理速度

3、增加队列容积,启用惰性队列

什么是惰性队列?

惰性队列的特点:

1、接收到消息后直接存入磁盘而非内存

2、当消费者要消费消息的时候才会从磁盘中读取到内存中

3、支持百万条消息存储

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/496430.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

大模型的实践应用33-关于大模型中的Qwen2与Llama3具体架构的差异全解析

大家好,我是微学AI,今天给大家介绍一下大模型的实践应用33-关于大模型中的Qwen2与Llama3具体架构的差异全解析。Qwen2模型与Llama3模型在架构上存在一些细微的差异,这些差异主要体现在注意力机制、模型尺寸相关参数以及嵌入层处理等方面。以下是对这些差异的详细分析。 文章…

NAT 技术如何解决 IP 地址短缺问题?

NAT 技术如何解决 IP 地址短缺问题&#xff1f; 前言 这是我在这个网站整理的笔记,有错误的地方请指出&#xff0c;关注我&#xff0c;接下来还会持续更新。 作者&#xff1a;神的孩子都在歌唱 随着互联网的普及和发展&#xff0c;IP 地址的需求量迅速增加。尤其是 IPv4 地址&…

kafka的备份策略:从备份到恢复

文章目录 一、全量备份二、增量备份三、全量恢复四、增量恢复 前言&#xff1a;Kafka的备份的单元是partition&#xff0c;也就是每个partition都都会有leader partiton和follow partiton。其中leader partition是用来进行和producer进行写交互&#xff0c;follow从leader副本进…

使用sam进行零样本、零学习的分割实践

参照&#xff1a;利用SAM实现自动标注_sam标注-CSDN博客&#xff0c;以及SAM&#xff08;分割一切模型&#xff09;的简单调用_sam使用-CSDN博客 sam简介&#xff1a; Segment Anything Model&#xff08;SAM&#xff09;是Meta公司于2023年发布的一种AI模型&#xff0c;它打破…

【Git】—— 使用git操作远程仓库(gitee)

目录 一、远程仓库常用命令 1、从远程仓库克隆项目 2、查看关联的远程仓库 3、添加关联的远程仓库 4、移除关联的远程仓库 5、将本地仓库推送到远程仓库 6、从远程仓库拉取项目 二、分支命令 1、查询分支 2、创建分支 3、切换分支 4、推送到远程分支 5、合并分支 …

攻防世界web新手第五题supersqli

这是题目&#xff0c;题目看起来像是sql注入的题&#xff0c;先试一下最常规的&#xff0c;输入1&#xff0c;回显正常 输入1‘&#xff0c;显示错误 尝试加上注释符号#或者–或者%23&#xff08;注释掉后面语句&#xff0c;使1后面的单引号与前面的单引号成功匹配就不会报错…

【MySQL】SQL 优化经验

1. 表的设计优化 参考依据&#xff1a;参考阿里开发手册嵩山版&#xff0c;其中有很多关于MySQL表设计的内容。类型选择&#xff1a;根据存储内容选择合适类型&#xff0c;如数值存储可选tinyint、bigint等&#xff0c;字符串可选varchar或text&#xff0c;根据内容长短选择合…

使用 .NET 6 或 .NET 8 上传大文件

如果您正在使用 .NET 6&#xff0c;并且它拒绝上传大文件&#xff0c;那么本文适合您。 我分享了一些处理大文件时需要牢记的建议&#xff0c;以及如何根据我们的需求配置我们的服务&#xff0c;并提供无限制的服务。 本文与 https://blog.csdn.net/hefeng_aspnet/arti…

STM32使用UART发送字符串与printf输出重定向

首先我们先看STM32F103C8T6的电路图 由图可知&#xff0c;其PA9和PA10引脚分别为UART的TX和RX(注意&#xff1a;这个电路图是错误的&#xff0c;应该是PA9是X而PA9是RX&#xff0c;我们看下图的官方文件可以看出)&#xff0c;那么接下来我们应该找到该引脚的定义是什么&#xf…

转运机器人推动制造业智能化转型升级

​在当今制造业智能化转型的浪潮中&#xff0c;技术创新成为企业脱颖而出的关键。富唯转运机器人凭借一系列先进技术&#xff0c;成为智能转型的卓越之选。 一体化 AMR 控制系统是富唯的一大亮点。它采用低代码流程搭建和配置模式&#xff0c;极大地缩短了部署时间。企业无需耗…

深度分析java 使用 proguard 如何解析混淆后的堆栈

经过proguard混淆过后&#xff0c;发生异常时堆栈也进行了混淆&#xff0c;那么如果获取的原始的堆栈呢&#xff1f;我们下面来看下 使用proguard 根据mapping文件直接解析 import proguard.obfuscate.MappingReader; import proguard.retrace.FrameInfo; import proguard.re…

基于JAVA+SpringBoot+Vue的影院订票系统

基于JAVASpringBootVue的影院订票系统 前言 ✌全网粉丝20W,csdn特邀作者、博客专家、CSDN[新星计划]导师、java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末附源码下载链接&#x1f345; 哈喽兄…

LeetCode 83 :删除排链表中的重复元素

题目&#xff1a; 地址&#xff1a;https://leetcode.cn/problems/remove-duplicates-from-sorted-list/ 方法一&#xff1a; 方法二&#xff1a; package com.zy.leetcode.LeetCode_04;/*** Author: zy* Date: 2024-12-25-15:19* Description: 删除排链表中的里复元素* …

金仓数据库-用户与角色对象权限访问的查看

数据库用户 创建用户 创建用户且设置密码 create user user01 password 123;\du 查看用户user01&#xff0c;可以看见创建成功 创建用户设置密码和不可继承 create user02 password 123 noinherit;修改用户的属性 设置用户的连接数 设置为1个 alter user user01 connect…

理解神经网络

神经网络是一种模拟人类大脑工作方式的计算模型&#xff0c;是深度学习和机器学习领域的基础。 基本原理 神经网络的基本原理是模拟人脑神经系统的功能&#xff0c;通过多个节点&#xff08;也叫神经元&#xff09;的连接和计算&#xff0c;实现非线性模型的组合和输出。每个…

联通光猫怎么自己改桥接模式?

环境&#xff1a; 联通光猫 ZXHN F677V9 硬件版本号 V9.0 软件版本号 V9.0.0P1T3 问题描述&#xff1a; 联通光猫怎么自己改桥接模式 家里用的是ZXHN F677V9 光猫&#xff0c;最近又搞了个软路由&#xff0c;想改桥接模式 解决方案&#xff1a; 1.拿到最新超级密码&…

Matrix-Breakout 2 Morpheus(找到第一个flag)

第一步 信息收集 (1)寻找靶场真实ip arp-scan -l 靶场真实 ip 为192.168.152.154 (2)探测端口及服务 nmap -p- -sV 192.168.52.135 第二步 开始渗透 (1)访问web服务 http://192.168.152.154and http://192.168.52.135:81 发现 81 端口的页面要登录 我们使用 dirb 扫描…

【CSS in Depth 2 精译_094】16.2:CSS 变换在动效中的应用(下)——导航菜单的文本标签“飞入”特效与交错渲染效果的实现

当前内容所在位置&#xff08;可进入专栏查看其他译好的章节内容&#xff09; 第五部分 添加动效 ✔️【第 16 章 变换】 ✔️ 16.1 旋转、平移、缩放与倾斜 16.1.1 变换原点的更改16.1.2 多重变换的设置16.1.3 单个变换属性的设置 16.2 变换在动效中的应用 16.2.1 放大图标&am…

机器人C++开源库The Robotics Library (RL)使用手册(三)

进入VS工程,我们先看看这些功能函数及其依赖库的分布关系: rl命名空间下,主要有八大模块。 搞定VS后将逐个拆解。 1、编译运行 根据报错提示,配置相应错误的库(根据每个人安装位置不同而不同,我的路径如下:) 编译所有,Release版本耗时大约10分钟。 以rlPlan运动…