RabbitMQ基本原理

一、基本结构

工作原理
所有中间件技术都是基于 TCP/IP 协议基础之上进行构建新的协议规范,RabbitMQ遵循的是AMQP协议(Advanced Message Queuing Protocol - 高级消息队列协议)。
生产者发送消息流程:

  • 1、生产者和Broker建立TCP连接;
  • 2、生产者和Broker建立通道;
  • 3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发;
  • 4、Exchange将消息转发到指定的Queue(队列)。

【详细】

1、消息生产者连接到`RabbitMQ Broker`,建立链接(Connection),在链接(Connection)上开启一个信道(Channel);
2、声明一个交换机(Exchange),并设置相关属性,比如交换机类型、是否持久化等;
3、声明一个队列(Queue),并设置相关属性,比如是否排他、是否持久化、是否自动删除等;
4、使用路由键(RoutingKey)将队列(Queue)和交换机(Exchange)绑定起来;
5、生产者发送消息至 RabbitMQ Broker,其中包含路由键、交换器等信息,根据路由键(RoutingKey)发送消息到交换机(Exchange);
6、相应的交换器(Exchange)根据接收到的路由键(RoutingKey)查找相匹配的队列如果找到 ,则将从生产者发送过来的消息存入相应的队列中;
7、如果没有找到 ,则根据生产者配置的属性选择丢弃还是回退给生产者;
8、关闭信道(Channel);
9、关闭链接(Connection);

消费者接收消息流程:

  • 1、消费者和Broker建立TCP连接;
  • 2、消费者和Broker建立通道;
  • 3、消费者监听指定的Queue(队列);
  • 4、当有消息到达QueueBroker默认将消息推送给消费者;
  • 5、消费者接收到消息;
  • 6、ack回复。

【详细】

- 1、建立链接(Connection);
- 2、在链接(Connection)上开启一个信道(Channel);
- 3、请求消费指定队列(Queue)的消息,并设置回调函数(onMessage);
- 4、[MQ]将消息推送给消费者,消费者接收消息;
- 5、消费者发送消息确定(Ack[acknowledge]);
- 6、[MQ]删除被确认的消息;
- 7、关闭信道(Channel);
- 8、关闭链接(Connection);

MQ消费消息分发原理

1)一种是Pull模式,对应的方法是basicGet。
消息存放在服务端,只有消费者主动获取才能拿到消息。如果每搁一段时间获取一次消息,消息的实时性会降低。
但是好处是可以根据自己的消费能力决定消息的频率。2)另一种是push,对应的方法是BasicConsume,只要生产者发消息到服务器,就马上推送给消费者,
消息保存客户端,实时性很高,如果消费不过来有可能会造成消息积压。Spring AMQP是push方式,
通过事件机制对队列进行监听,只要有消息到达队列,就会触发消费消息的方法。

二、RabbitMQ组成部分说明

  • Producer: 消息生产者,即生产方客户端,生产方客户端将消息发送;

  • Connection: TCP连接,生产者或消费者与消息队列RabbitMQ版间的物理TCP连接;

    1)Connection会执行认证、IP解析、路由等底层网络任务。
    2)应用与消息队列RabbitMQ版完成Connection建立大约需要15个TCP报文交互,因而会消耗大量的网络资源和消息队列RabbitMQ版资源。
    3)一个进程对应一个Connection,一个进程中的多个线程则分别对应一个Connection中的多个Channel。
    4)Producer和Consumer分别使用不同的Connection进行消息发送和消费;

  • Channel: 在客户端的每个物理TCP连接里,可建立多个Channel,每个Channel代表一个会话任务。

    1)Channel是物理TCP连接中的虚拟连接。
    2)当应用通过Connection与消息队列RabbitMQ版建立连接后,所有的AMQP协议操作(例如创建队列、发送消息、接收消息等)都会通过Connection中的Channel完成。
    3) Channel可以复用Connection,即一个Connection下可以建立多个Channel。
    4) Channel不能脱离Connection独立存在,而必须存活在Connection中。
    5) 当某个Connection断开时,该Connection下的所有Channel都会断开。

  • Broker: 消息队列服务进程,此进程包括两个部分:Exchange和Queue;

  • Exchange(交换器): 生产者将消息发送到Exchange,由Exchange将消息路由到一个或多个Queue中。Exchange根据消息的属性或内容路由消息。

  • Queue: 消息队列,存储消息的队列,每个消息都会被投入到一个或多个Queue里;

  • Consumer: 消息消费者,即消费方客户端,接收MQ转发的消息;

  • Routing Key(路由键): 生产者在向Exchange发送消息时,需要指定一个Routing Key来设定该消息的路由规则。 Routing Key需要与Exchange类型及Binding Key联合使用才能生效。一般情况下,生产者在向Exchange发送消息时,可以通过指定Routing Key来决定消息被路由到哪个或哪些Queue;

  • Binding: 一套绑定规则,用于告诉Exchange消息应该被存储到哪个Queue。它的作用是把Exchange和Queue按照路由规则绑定起来。

  • Binding Key(绑定键): 用于告知Exchange应该将消息投递到哪些Queue中(生产者将消息发送给哪个Exchange是需要由RoutingKey决定的,生产者需要将Exchange与哪个队列绑定时需要由BindingKey决定的);

  • Virtual Host: 虚拟主机,本质上是一个mini版的RabbitMQ服务器,拥有自己的队列、交换机、绑定和权限机制,vhost是共享相同的身份认证和加密环境的独立服务器域。vhost是AMQP的基础,必须在连接时指定,RabbitMQ默认的vhost是/。

三、交换模式

Direct Exchange(直连模式)

【路由规则】 Direct Exchange根据Binding Key和Routing Key完全匹配的规则路由消息。
【使用场景】 Direct Exchange适用于通过简单字符标识符区分消息的场景。
Direct Exchange常用于单播路由。
Direct

Fanout Exchange(广播模式)

【路由规则】 Fanout Exchange忽略Routing Key和Binding Key的匹配规则,将消息路由到所有绑定的Queue。
【使用场景】 Fanout Exchange适用于广播消息的场景。例如,分发系统使用Fanout Exchange来广播各种状态和配置更新。
广播模式

Topic Exchange(主题模式)

【路由规则】 Topic Exchange根据Binding Key和Routing Key通配符匹配的规则路由消息。
Topic Exchange支持的通配符包括星号(*)和井号(#)。 星号(*)代表一个英文单词(例如cn)。 井号(#)代表零个、一个或多个英文单词,英文单词间通过英文句号(.)分隔,例如cn.zj.hz。
【使用场景】 Topic Exchange适用于通过通配符区分消息的场景。
Topic Exchange常用于多播路由。例如,使用Topic Exchange分发有关于特定地理位置的数据。
主题模式

Headers Exchange (头部交换机)

【路由规则】 Headers Exchange可以被视为Direct Exchange的另一种表现形式。
Headers Exchange可以像Direct Exchange一样工作,不同之处在于Headers Exchange使用Headers属性代替Routing Key进行路由匹配。
在绑定Headers Exchange和Queue时,可以设置绑定属性的键值对。然后,在向Headers Exchange发送消息时,设置消息的Headers属性键值对。
Headers Exchange将根据消息Headers属性键值对和绑定属性键值对的匹配情况路由消息。
匹配算法由一个特殊的绑定属性键值对控制。该属性为x-match,只有以下两种取值:

  • 1)all:所有除x-match以外的绑定属性键值对必须和消息Headers属性键值对匹配才会路由消息。
  • 2)any:只要有一组除x-match以外的绑定属性键值对和消息Headers属性键值对匹配就会路由消息。
    以下两种情况下,认为消息Headers属性键值对和绑定属性键值对匹配:
    • 1、 消息Headers属性的键和值与绑定属性的键和值完全相同;
    • 2、 消息Headers属性的键和绑定属性的键完全相同,但绑定属性的值为空。

【使用场景】 Headers Exchange适用于通过多组Headers属性区分消息的场景。Headers Exchange常用于多播路由。例如,涉及到分类或者标签的新闻更新。

生产者确认机制

1、确认原理
生产者将消息发送到exchange,exchange根据路由规则将消息投递到了queue。

  • 1)Confirm确认:生产者发送消息到交换机时会存在消息丢失的情景,开启事务会导致吞吐量下降,Confirm机制就是消息发送到交换机(Exchange)时会触发Confirm回调。通过 publisher confirm (发送方确认机制)可以确定消息是否被成功路由到MQ broker从而选择是否重发等步骤。当生产者开启 publisher confirm 消息发送到MQ端之后,MQ会回一个ack给生产者,ack是个boolean值,为true消息成功发送到MQ。反之发送失败。
  • 2)Return确认:从交换机到队列也有可能出现路由失败导致消息丢失情景(可能是MQ出问题导致queue和exchange绑定丢失,或者失误删除了绑定关系等),Return机制可解决这个问题,路由失败时可以通过Return回调来将路由失败的消息记录下来。

消费者确认机制

1、消费者确认原理
消费者确认是指当一条消息投递到消费者处理后,消费者发送给MQ broker的确认
(通俗的说就是 告知服务器这条消息已经被我消费了,可以在队列删掉 ,这样以后就不会再发了, 否则消息服务器以为这条消息没处理掉 重启应用后还会在发)。

有auto和manual两种

  • 1)auto则由broker自行选择时机,一般可认为消息发送到消费者后就直接被ack,也即消息会被从队列中移除掉而不顾消息的处理逻辑是否成功;

  • 2)manual则是需要消费者显式的去手动ack后消息才会被从队列中移除掉,通过这个机制可以限制在消息处理完之后再Ack或者nack; 开启手动确认模式,即由消费方自行决定何时应该ack,通过设置autoAck=false开启手动确认模式;

消息持久化

消息发送并保存到队列之后如果不做特殊处理是保存在内存中,当节点宕机重启或者内存故障等,会导致消息丢失,通过对消息进行持久化到磁盘可以降低这种风险, 除了对消息进行持久化还是不够,还需要对queue、exchange进行持久化。

RabbitMQ解决消息丢失问题

消息确认机制

RabbitMQ提供了消息确认机制,即生产者在发送消息后,可以等待RabbitMQ服务器返回确认信息,以确保消息已经被正确地接收和处理。如果RabbitMQ服务器没有返回确认信息,生产者可以选择重新发送消息或者采取其他的补救措施。

生产者确认消息和重试
  1. 使用缓存:在confirmCallback中,将ackfalse的消息存到缓存中。然后,可以使用另外的线程或者定时任务来处理这些失败的消息,进行重试。

  2. 设置重试次数:为了避免无限重试,我们可以设置一个重试次数的上限。当达到这个上限后,我们可以选择将消息发送到死信队列,或者进行其他的错误处理。

  3. 使用死信队列:在RabbitMQ中,我们可以设置一个死信队列来存储那些无法被正常处理的消息。当消息在主队列中被拒绝或者过期后,它们会被发送到死信队列。然后,我们可以对死信队列中的消息进行人工处理,或者在一段时间后再次进行处理。

事务机制

1.首先需要配置一个事务管理器

 @BeanPlatformTransactionManager platformTransactionManager(ConnectionFactory connectionFactory) {return new RabbitTransactionManager(connectionFactory);}

2.然后在生产者上添加事务注解以及设置通信通道为事务模式。

@Transactional

  1. 开启事务机制就三步:
  • 配置事务管理器
  • 使用 @Transactional 注解开启事务
  • 调用 setChannelTransacted 方法设置消息通道为事务模式,即设置为 true

4.当我们开启事务模式之后,RabbitMQ 生产者发送消息会有这样几个步骤:

  • (1) 客服端发出请求,将通信管道设置为事务模式
  • (2) 服务端给出回复,同意将通信管道设置为事务模式
  • (3) 客户端发送消息
  • (4) 客户端提交事务
  • (5) 服务端给出响应,确认事务提交

6.两步 RabbitMQ 都有提供解决的方案。那么,如果确保消息成功到达 RabbitMQ 呢?

  • (1) 开启事务机制

  • (2) 发送方确认机制
    注意:这是两种不同的方案,不可以同时开启,只能二选其一。如果同时开启,则会报错

消息持久化

RabbitMQ支持将消息持久化到磁盘,即使RabbitMQ服务器宕机或重启,消息也不会丢失。在发布消息时,可以设置消息的持久化标志,这样消息就会被写入磁盘中,而不是仅仅保存在内存中。
其中我们交换机和队列都要设置对应的持久化,在创建时,我们会设置持久化参数。同时为了避免单点故障,RabbitMQ应该做成集群模式,以免一台机器损坏,出现数据丢失的问题。

消费端确认和重试

对于消费者来说,该配置不仅起到了连接作用,同时也启动了重试机制,默认重试 2 次。

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.listener.simple.retry.enabled=true # 开启消费者重试机制
spring.rabbitmq.listener.simple.retry.max-attempts=3 # 最大重试次数
spring.rabbitmq.listener.simple.retry.initial-interval=3000 # 重试时间间隔

确认的话需要我们做签收操作

 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

死信队列配置

被拒收的消息,或者是过期的消息,或者是队列已经满了的消息,都会进入死信队列,死信队列有一个默认的生效时间,如果没有做任务配置,到了时间会自动删除消息。
java中配置死信队列

    /*** 延迟队列,又叫死信队列 “** @return*/@Beanpublic Queue delayQueue() {HashMap<String, Object> arguments = new HashMap<>();arguments.put("x-dead-letter-exchange", "test_ex");arguments.put("x-dead-letter-routing-key", "test_ex.dead");// 消息过期时间 2分钟arguments.put("x-message-ttl", 60000);return new Queue("delayQueue", true, false, false, arguments);}

以上参数说明:

  • x-dead-letter-exchange:死信队列过期以后往指定交换机发
  • x-dead-letter-routing-key:死信队列过期指定路由键
  • x-message-ttl: 死信队列过期时间,单位是毫秒

RabbitMQ解决消息积压问题

RabbitMQ消息积压问题通常是由于消费者无法及时消费消息或消费速度过慢或发送者流量太大导致的。以下是一些解决方法:

1.增加消费者数量: 可以通过增加消费者的数量来提高消费速度,减少消息积压。可以通过添加更多的消费者进程或者增加消费者的线程数来实现。

2.调整消费者的QoS参数: 消费者的QoS参数可以控制消费者每次从RabbitMQ服务器获取的消息数量,以及未确认消息的最大数量。可以适当调整这些参数,以减少消息积压。

3.设置消费者的超时时间: 可以设置消费者的超时时间,如果消费者在指定的时间内没有消费消息,就将消息重新投递到队列中,以便其他消费者消费。

4.增加队列的容量: 可以增加队列的容量,以便存储更多的消息。但是,如果队列容量过大,可能会导致内存占用过高,影响系统的性能。

5.使用死信队列: 可以将未能及时消费的消息转移到死信队列中,以便后续处理。可以设置死信队列的超时时间,以便在一定时间内处理这些消息。

6.监控和调整: 可以使用RabbitMQ的监控工具来监控队列的状态和消费者的消费速度,及时发现并解决消息积压问题。

RabbitMQ解决消息重复消费问题

RabbitMQ提供了消息去重的机制来解决消息重复消费的问题。具体来说,可以使用以下两种方式来实现:

1.消息去重插件
RabbitMQ提供了一个消息去重插件,可以通过在RabbitMQ节点上安装该插件来实现消息去重。该插件会在消息传输之前对消息进行唯一性校验,如果消息已经被消费过,那么该消息将被丢弃。该插件的实现原理是将已经消费过的消息ID保存在内存中,当新消息到达时,会检查该消息ID是否已经存在,如果存在则丢弃该消息。

2.消息幂等性设计
消息幂等性是指对于同一条消息,无论消费多少次,最终的结果都是一致的。因此,可以通过在消息的生产者或消费者端实现消息幂等性来解决消息重复消费的问题。具体实现方式包括:

  • 在消息生产者端,为每条消息生成唯一的ID,将该ID与消息一起发送到RabbitMQ,消费者在消费消息时根据该ID进行幂等性校验;
  • 在消息消费者端,记录已经消费过的消息ID,当重复消费同一条消息时,直接忽略该消息。

需要注意的是,实现消息幂等性需要考虑业务逻辑的复杂性和消息处理的性能。如果业务逻辑比较简单,可以通过对消息进行去重来解决问题;如果业务逻辑比较复杂,可以通过实现消息幂等性来保证消息的正确性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/436453.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Spring之生成Bean

Bean的生命周期&#xff1a;实例化->属性填充->初始化->销毁 核心入口方法&#xff1a;finishBeanFactoryInitialization-->preInstantiateSingletons DefaultListableBeanFactory#preInstantiateSingletons用于实例化非懒加载的bean。 1.preInstantiateSinglet…

Azure Data Box 80 TB 现已在中国区正式发布

我们非常高兴地宣布&#xff0c;Azure Data Box 80 TB SKU现已在 Azure 中国区正式发布。Azure Data Box 是 Azure 的离线数据传输解决方案&#xff0c;允许您以快速、经济且可靠的方式将 PB 级数据从 Azure 存储中导入或导出。通过硬件传输设备可加速数据的安全传输&#xff0…

NVIDIA G-Assist 项目:您的游戏和应用程序AI助手

NVIDIA G-Assist 是一个革命性的人工智能助手项目&#xff0c;旨在通过先进的AI技术提升玩家的游戏体验和系统性能。这个项目在2024年Computex上首次亮相&#xff0c;展示了其在游戏和应用程序中的潜在应用。 喜好儿网 G-Assist 的核心功能是提供上下文感知的帮助。它能够接收…

用示波器测动态滞回线

大学物理&#xff08;下&#xff09;实验-中南民族大学通信工程2022级 手动逐个处理数据较为麻烦且还要绘图&#xff0c;故想到用pythonmatplotlib来计算结果并数据可视化。 代码实现 import matplotlib.pyplot as plt# 样品一磁化曲线 X [0, 0.2, 0.4, 0.6, 0.8, 1, 1.5, 2.…

云计算:MySQL

第一周第一天-MySQL的SQL语句解析 数据库的介绍 什么是数据库 数据库是存储和管理数据的系统或集合&#xff0c;通常用于支持软件系统的高效数据处理和查询。它能够以结构化的方式组织数据&#xff0c;使用户可以快速存储、更新、查询和删除数据。数据库不仅保存数据&#xff0…

【数学分析笔记】第4章第2节 导数的意义和性质(1)

4. 微分 4.2 导数的意义与性质 4.2.1 导数在物理中的背景 物体在OS方向上运动&#xff0c;位移函数为 s s ( t ) ss(t) ss(t)&#xff0c;求时刻 t t t的瞬时速度&#xff0c;找一个区间 [ t , t △ t ] [t,t\bigtriangleup t] [t,t△t]&#xff0c;从时刻 t t t变到时刻 t…

2024年9月26日--- Spring-AOP

SpringAOP 在学习编程过程中&#xff0c;我们对于公共方法的处理应该是这样的一个过程&#xff0c;初期阶段如下 f1(){Date now new Date();System.out.println("功能执行之前的各种前置工作"now)//...功能代码//...功能代码System.out.println("功能执行之前…

vue3使用Teleport 控制台报警告:Invalid Teleport target on mount: null (object)

Failed to locate Teleport target with selector “.demon”. Note the target element must exist before the component is mounted - i.e. the target cannot be rendered by the component itself, and ideally should be outside of the entire Vue component tree main.…

OpenStack Yoga版安装笔记(十五)Horizon安装

1、官方文档 OpenStack Installation Guidehttps://docs.openstack.org/install-guide/ 本次安装是在Ubuntu 22.04上进行&#xff0c;基本按照OpenStack Installation Guide顺序执行&#xff0c;主要内容包括&#xff1a; 环境安装 &#xff08;已完成&#xff09;OpenStack…

ndb9300public-ndb2excel简介

1 引言 ndb9300是一个自己定义的机载导航数据库劳作&#xff08;不敢称为项目&#xff09;代号&#xff0c;其中3表示是第3种数据库。 多年前&#xff0c;对在役民航客机中的某型机载导航数据库的二进制文件进行分析&#xff0c;弄明白它的数据结构后做了几个工具&#xff0c…

仿真设计|基于51单片机的土壤温湿度监测及自动浇花系统仿真

目录 具体实现功能 设计介绍 51单片机简介 资料内容 仿真实现&#xff08;protues8.7&#xff09; 程序&#xff08;Keil5&#xff09; 全部内容 资料获取 具体实现功能 &#xff08;1&#xff09;DS18B20实时检测环境温度&#xff0c;LCD1602实时显示土壤温湿度&…

<使用生成式AI对四种冒泡排序实现形式分析解释的探讨整理>

<使用生成式AI对四种冒泡排序实现形式分析解释的探讨整理> 文章目录 <使用生成式AI对四种冒泡排序实现形式分析解释的探讨整理>1.冒泡排序实现形式总结1.1关于冒泡排序实现形式1的来源&#xff1a;1.2对四种排序实现形式使用AI进行无引导分析&#xff1a;1.3AI&…

正交阵的概念、性质与应用

正交阵是线性代数中一种重要的特殊矩阵&#xff0c;它在很多领域都有广泛的应用。 1. 概念 一个实数方阵 Q 被称为正交阵&#xff0c;如果它的转置等于它的逆矩阵&#xff1a; 这意味着&#xff1a; 其中&#xff0c;Q T 表示矩阵 Q 的转置&#xff0c;I 表示单位矩阵。 2…

Linux:磁盘管理

一、静态分区管理 静态的分区方法不可以动态的增加或减少分区的容量。 1、磁盘分区-fdisk 该命令是用于查看磁盘分区情况&#xff0c;和分区管理的命令 命令格式&#xff1a;fdisk [选项] 设备文件名常用命令&#xff1a; -h&#xff1a;查看分区信息 fdisk系统常用命令&…

GIT安装及集成到IDEA中操作步骤

最近深感GIT使用技能太差&#xff0c;我只会些皮毛&#xff0c;还是得看官网&#xff0c;总结一下常用的操作方法吧。 GIT环境配置到IDEA中安装 一、GIt的基本的安装 这个不在这里赘述了&#xff0c;自己装一个git吧 二、给IDEA指定本地GIT的安装路径 1、下图这个是我本地的…

05-函数传值VS传引用

函数传值 一、没法改变值的方式&#xff1a; 一个变量拷贝到另一个变量, 这种形式的函数调用被称为: 传值调用 局部变量的生命周期在函数的运行期间会一直存在. void Increment(int a)//假设一个 x(只是为了验证实参会被映射到形参这件事情),a的值会被拷贝到x {a a 1; //1…

vscode开发uniapp安装插件指南

安装vuets的相关插件 首先是vue的相关插件&#xff0c;目前2024年9月应该是vue-offical 安装uniapp开发插件 uni-create-view &#xff1a;快速创建 uni-app 页面 安装uni-create-view之后修改插件拓展设置 勾选第一个选择创建视图时创建同名文件夹 选择第二个创建文件夹中生…

【RockyLinux 9.4】安装新版 QQ for Linux(不再是 QQ2008 那种老款了!)

总览 还记得两年之前的时候&#xff0c;当初用的还是那种 QQ2008 一样的 LinuxQQ 啥也干不了&#xff0c;还不如 QQ2008 最近寻思自己装个服务器玩&#xff0c;想下载一个 QQ 用来文件传输&#xff0c;没想到现在的 QQ Linux 这么棒&#xff01; 一、下载 1.下载网址 https…

神经网络激活函数

神经网络的激活函数&#xff08;Activation Function&#xff09; 神经网络可以用在分类问题和回归问题上&#xff0c;不过需要根据情况改变输出层的激活函数。一般而言&#xff0c;回归问题用恒等函数&#xff0c;分类问题用softmax函数。 神经网络的激活函数必须使用非线性函…

Trilium Notes笔记本地化部署与简单使用指南打造个人知识库

文章目录 前言1. 安装docker与docker-compose2. 启动容器运行镜像3. 本地访问测试4.安装内网穿透5. 创建公网地址6. 创建固定公网地址 前言 今天和大家分享一款在G站获得了26K的强大的开源在线协作笔记软件&#xff0c;Trilium Notes的中文版如何在Linux环境使用docker本地部署…