RabbitMQ 的高阶应用及可靠性保证

       

目录

一、RabbitMQ 高阶应用        

     1.1 消息何去何从

        1.2 过期时间

        1.3 死信队列

        1.4 延迟队列

        1.5 优先级队列

        1.6 消费质量保证(QOS)

二、持久化

三、生产者确认

四、消息可靠性和重复消费

        4.1 消息可靠性

        4.2 重复消费问题


上篇文章介绍了 RabbitMQ 的基本概念和使用,这篇文章就来介绍下其高阶应用和可靠性保证。

一、RabbitMQ 高阶应用        

        RabbitMQ 还提供了诸多高级特性,比如:过期时间、交换器备份、死信队列、延迟队列、优先级队列、持久化、消费端消息分发等等,下面介绍几个重要特性。

     1.1 消息何去何从

        mandatory 参数,当设置为 true 时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么 RabbitMQ 会调用 Basic.Return 命令将消息返回给生产者。当设置为 false 时,出现上述情形,则消息直接被丢弃。那么生产者如何获取到未被路由到合适队列的消息呢?需要实现 listener,SpringBoot 中需要实现 ReturnCallback。

        immediate 参数,为 true 时,如果交换机将消息路由到队列时发现队列上并不存在消费者,那么这条消息将不会被存入队列中。当与路由键匹配的队列都没有消费者时,该消息会 return 给生产者。

        概括来说,mandatory 参数告诉服务器至少将消息路由到一个队列中,否则将消息返回给生产者。immediate 参数告诉服务器,如果该消息关联的队列上有消费者,则立刻投递,如果所有匹配的队列上无消费者,则将消息返回给生产者,不用将消息存入队列等待消费者。

        RabbitMQ 3.0 版本去掉了 immediat 参数的支持,官方解释是:会影响镜像队列的性能,增加代码的复杂性,建议采用 TTL 和 DLX 的方法替换。

        1.2 过期时间

        RabbitMQ 可以对消息和队列设置 TTL。

        设置消息的TTL。方法一:通过队列的属性设置,队列中的所有消息都有相同的过期时间,一旦消息过期,就会立即从队列中抹去。方法二:对消息单独设置,每条消息的TTL可以不同,即使消息过期也不会立即从队列中抹去,在投递前判定。如果两者一同使用,则以最小的那个为准,消息的生存时间一旦超过了设置的TTL,就会变成“死信”,消费者则无法收到该消息。设置过期时间的方法如下:

Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 6000);
Queue queue = new Queue(vodQueue, true, false, false, args);

        1.3 死信队列

        DLX,全称为 Dead-Letter-Exchange,死信交换器。当一个消息在队列中变成死信之后,它能被发送到另一个交换器中,这个交换器即是 DLX,绑定死信交换器的队列称为死信队列。消息变为死信的情况:

  • 消息被拒绝,并且设置的 requeue 为 false
  • 消息过期
  • 队列达到最大长度

        DLX 也是一个正常的交换器,和一般的交换器没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。当这个队列中存在死信时,RabbitMQ 就会自动的将消息重新发布到设置的 DLX 上去,进而被路由到另一个队列中,即死信队列。可以监听这个队列中的消息进行相应的处理。设置死信队列的方法:

Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx-exchange");
Queue queue = new Queue(vodQueue, true, false, false, args);

        对于 RabbitMQ 来说,DLX 是一个非常有用的特性。他可以处理异常情况,消息不能被消费者正确消费而被至于死信队列中的情况后去分析程序可以通过这个死信队列中的内容来分析当时所遇到的一场情况。进而可优化改善系统。

        1.4 延迟队列

        所谓延迟队列是指当消息被发出后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到消息。

        在AMQP协议中,或者 RabbitMQ 本身并没有直接支持延迟队列的功能,但可以通过 DLX 和TTL 来实现。        

        1.5 优先级队列

         优先级队列,顾名思义,具有高优先级的队列具有高的优先权,优先级高的消息具备优先被消费的特权。设置优先级队列

Map<String, Object> args = new HashMap<>();
args.put("x-max-priority", 10);
Queue queue = new Queue(vodQueue, true, false, false, args);

        在发送消息时设置当前的优先级,默认最低为0,最高为队列设置的最大优先级。优先级高的消息可以被优先消费,这个是有前提的:如果在消费者消费速度大于生产者的生产速度且broker 中没有消息堆积的情况,对发送的消息设置优先级也就没有什么实际意义了。因为生产者刚发送完一条消息就被消费了,那么就意味着 broker 中至多有一条消息,对于单条消息来说优先级是没意义的。       

        1.6 消费质量保证(QOS)

        当 RabbitMQ 队列拥有多个消费者时,队列收到的消息将以轮询的方式分发给消费者。每条消息只会发送给一个消费者。这种方式非常适合扩展,而且是专门为并发程序设计的。如果现在的负载加重,只需要创建更多的消费者即可。

        这种方式不那么优雅,分发中不管消费者的消息是否处理完了,试想一下,某些消费者的任务繁重,来不及处理消息并确认,而某些消费者由于某些原因很快处理完了所分配的消息,进而进程空闲,这样会造成总体的吞吐量下降。该如何处理这种情况呢?引入Qos,他会告诉 broker 我没消费完当前消息前,不要给我新消息了,这就保证了消费质量。Qos对于拉模式是无效的。                设置方法如下:

// prefetchSize和prefetchCount设置为0,说明无限制
// prefetchSize: 指定消费者可以接收的最大内容量(单位通常是字节)。如果设置了非零值,RabbitMQ 会阻止发布者发送更多的消息,直到消费者发送了足够多的确认来释放足够的容量。默认情况下,RabbitMQ 并不实现 prefetchSize 参数,所以通常设置为0,表示不对此做限制。
// prefetchCount: 更常用的一个参数,表示消费者最多可以接收多少个未确认的消息。当达到这个数量后,RabbitMQ 将暂停向该消费者推送更多消息,直到消费者确认了部分消息,腾出了“槽位”。例如,设置为1意味着消费者每次处理完一个消息并发送确认之后,才能接收下一个消息。
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;

二、持久化

        持久化可以提高 RabbitMQ 的可靠性,防止在异常情况下消息的丢失。RabbitMQ 的持久化分成三部分:交换器的持久化、队列的持久化和消息的持久化。

        交换器的持久化可以在声明交换器的时候设置,如果交换器不设置持久化,RabbitMQ 重启后,交换器的元数据会丢失,不过消息不会丢失,只是不能将消息发送给这个交换器了。

        队列的持久化在声明队列的时候设置,如果不设置队列的持久化,RabbitMQ 服务重启后,队列的元数据会丢失,此时数据也会丢失。

        将交换器、队列、消息都设置成持久化后能保证数据不丢失吗?答案是否定的。

  1. 从消费者的角度来看,将 autoAck 设置成 true,那么当消费者接受到相关消息后,还没来得及处理就宕机了,这样也算数据丢失。
  2. 在持久化的消息正确存入 RabbitMQ 之后,还需要一段时间才能存入磁盘。RabbitMQ 不会为每条消息都进行同步存盘(调用内核的fsync)的处理,可能仅保存在操作系统缓存之中而不是物理磁盘之中。如果在这段时间内,RabbitMQ 发生了宕机、重启等异常情况,消息保存还没来得及落盘,那么这些消息将会丢失。

        上面的问题可以通过镜像队列机制来解决。相当于配置了副本,如果主节点在此特殊时期挂掉了,可以自动切换到从节点,这样有效的保证了高可用性,除非整个集群都挂掉。虽然这样也不能完全保证不丢失,但这样已经好很多。

三、生产者确认

        生产者将消息发送出去后,消息到底有没有到达服务器呢?如果不进行配置,默认情况下发送消息的操作是不会返回任何信息给生产者的,也就是默认情况下,生产者不知道消息有没有正确的到达服务器。如果到达服务器之前就丢失了,持久化操作也解决不了问题,因为还没到达服务器,何谈持久化?针对这个问题提供了两种解决方式:

  • 通过事务机制实现
  • 通过发送确认机制实现

        开启事务多了几个环节,只有消息成功被 RabbitMQ 接收,事务才能提交,否则便可在捕获异常之后进行处理。但事务会严重影响 RabbitMQ 的性能,大大降低吞吐量。

        发送方确认是一种轻量级的机制,生产者将信道设置成 confirm(确认)模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都会被指派一个唯一的 ID,一旦消息被投递到所有匹配的队列之后,RabbitMQ 就会发送一个确认(Basic.Ack)给生产者(包括消息的唯一id),这就使得生产者知晓消息已经正确到达目的地。如果消息和队列是持久化的,那么确认消息会在消息写入磁盘之后发出。

        事务机制在一条消息发出后会使发送端阻塞,以等待rabbitmq的回应,之后才能发送下一条消息。相比消息确认机制,发送方确认机制是异步的。事务机制和确认机制是互斥的不能共存。

四、消息可靠性和重复消费

        只要涉及到消息中间件,消息可靠性和重复消费就是无可避免的话题,那 RabbitMQ 是如何设计的呢?

        4.1 消息可靠性

  1. 持久化设置,这在上文已经介绍,通过持久化队列、交换器和消息来保存消息。
  2. 事务和确认机制:上文已经介绍了生产者的确认机制,通过这个机制来保证生产者发送的消息不回丢失。
  3. 消费者消息确认:可以通过消息的手动ack来保证消息能消费完成
  4. 消息镜像队列:设置队列为镜像队列,可以将消息复制到多个节点,即使某个节点宕机,消息仍可以从其他节点获取。

        通过以上措施的组合使用,可以大大提高 RabbitMQ 消息传递的可靠性,尽可能减少消息丢失的风险。然而,即使采取了所有措施,也不能完全保证100%的消息不丢失,因为消息在传输过程中可能还受到网络、硬件故障等因素的影响。在实际应用中,需要根据业务场景权衡消息的可靠性、性能和成本。

        4.2 重复消费问题

        在 RabbitMQ 中,重复消费指的是同一个消息被多个消费者或者同一个消费者消费多次的现象。这种问题可能会导致数据不一致或者业务逻辑错误。造成重复消费的原因可能有:

  1. 消费者ACK确认失败:消费者接收到消息并开始处理,但是在处理完毕并发送 ACK 确认之前断开了连接,比如网络抖动或消费者进程异常退出,导致 RabbitMQ 未收到ACK确认,于是消息重新入队等待再次被消费。
  2. 消息重回队列:在有死信交换机(Dead Letter Exchange, DLX)或者消息TTL(Time To Live)到期后重新投递的情况下,消息可能被重新发送到原来的队列或另一个队列,从而被再次消费。
  3. 消费者超时设置不当:如果消费者的超时设置过短,可能会在消息处理未完成时就已经被认为超时,消息会被重新放回队列。

        那如何解决重复消费问题呢?

  1. 消息确认机制:确保消费者正确使用手动确认模式(Manual Acknowledgments),只有当消息处理成功后才发送 ACK 确认给 RabbitMQ,否则在遇到异常时可以重新消费。
  2. 幂等性设计:消费者的业务逻辑应当设计为幂等的,即使同一条消息被消费多次,处理结果也是相同的,不影响业务状态。例如,通过消息ID或业务流水号来判断消息是否已经处理过。
  3. 防重ID:在消息体中携带一个全局唯一的ID,消费者在处理消息前,先检查这个ID是否已经被处理过,如果已经处理过,则直接丢弃消息。

        总之,避免重复消费的关键在于消息确认机制、幂等性设计以及合理的重试和补偿策略。同时,完善的日志记录和监控也是非常重要的,以便在出现问题时能够快速定位和修复。

往期经典推荐

探秘 RabbitMQ 的设计理念与核心技术要点-CSDN博客

走进 Mybatis 内核世界:理解原理,释放更多生产力-CSDN博客

深入浅出 Kafka 消费者:解密分布式消息流的幕后英雄_kafka消费-CSDN博客

深入剖析Kafka生产者:揭秘消息从发送到落地的全过程-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/285432.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

QT----给程序添加上任务栏托盘图标和退出

让我们的程序拥有任务栏托盘图标&#xff0c;实现程序后台运行&#xff0c;退出等功能 1、关闭程序保持后台 重写关闭事件,忽略点击窗口关闭 void MainWindow::closeEvent(QCloseEvent *event) {// 隐藏窗口&#xff0c;而不是真正关闭setVisible(false);// 忽略关闭事件&am…

由浅到深认识Java语言(26):阶段性练习

该文章Github地址&#xff1a;https://github.com/AntonyCheng/java-notes 在此介绍一下作者开源的SpringBoot项目初始化模板&#xff08;Github仓库地址&#xff1a;https://github.com/AntonyCheng/spring-boot-init-template & CSDN文章地址&#xff1a;https://blog.c…

后端如何返回404地址

当我们网站输入不存在的地址&#xff0c;经常会出现404的页面&#xff0c;这是如何做到的 1.添加配置 spring:mvc:view:prefix: /templates/suffix: .html 2.resources下添加templates目录&#xff0c;下面放404的网站 3.添加依赖&#xff0c;版本在主pom里面配置好了&#x…

一个开源的分布式在线教育系统

项目介绍 roncoo-education —— 一个分布式在线教育系统。目前主要功能有课程点播功能&#xff0c;支持多家视频云的接入&#xff0c;课程附件管理功能&#xff0c;支持多家存储云的接入&#xff0c;可以帮助个人或者企业快速搭建一个轻量级的在线教育平台。 系统分为后台、前…

CHAT~(持续更新)

CHAT&#xff08;持续更新&#xff09; 实现一个ChatGPT创建API设计页面布局业务操作技术架构 编码其他 实现一个ChatGPT 创建API 最简单也最需要信息的一步 继续往下做的前提 此处省略&#xff0c;想要获取接口创建方式联系 设计 页面布局 按照官网布局 业务操作 注册登…

Linux Ncurses库部分函数使用说明

目录 1. initscr&#xff08;&#xff09;函数 2. endwin&#xff08;&#xff09;函数 3. curs_set()函数 4.noecho()函数 5. keypad()函数 6. start_color()函数 7.init_pair()函数 8.getch()函数 9.move()函数 10.addch()函数 11. refresh()函数 12.inch()函数…

网络协议栈--网络层--IP协议

目录 本节重点网络层IP协议一、 基本概念二、 IP协议报头格式三、网段划分(重要)四、特殊的IP地址五、IP地址的数量限制六、私有IP地址和公网IP地址七、路由八、IP协议全部内容一览图 本节重点 1、理解网络层的作用, 深入理解IP协议的基本原理 2、对整个TCP/IP协议有系统的理解…

llvm后端

SelectionDAGBuilder是LLVM&#xff08;Low Level Virtual Machine&#xff09;编译器中的一个重要组件&#xff0c;它负责将LLVM中间表示&#xff08;Intermediate Representation&#xff0c;IR&#xff09;转换为SelectionDAG&#xff08;选择有向无环图&#xff09;的形式。…

JavaScript进阶

1. 作用域 1.1 局部作用域 1.2 全局作用域 1.3 作用域链 1.4 JS垃圾回收机制&#xff08;闭包做铺垫&#xff09; 1.4.1 什么是垃圾回收机制 1.4.2 内存的声明周期 1.4.3 垃圾回收的算法说明 1.4.3.1 引用计数法 1.4.3.2 标记清除法 1.5 闭包 <!DOCTYPE html> <html …

QTabWidget的tabbar不同方向显示 文字方向设置 图标跟随变化 实现方式 qt控件绘制原理

先来看结果图&#xff1a;&#xff08;参考博客&#xff1a;QTabWidget中tab页文本水平或垂直设置_pyqt tab_widget.settabposition(qtabwidget.west) 字体-CSDN博客&#xff09; 从图中可知&#xff0c;"普通"是qt自己的样式&#xff0c;但是很明显&#xff0c;在垂…

【linux】进程的地址空间

1.代码看现象引入 #include<stdio.h>#include<unistd.h>#include<string.h> #include<stdlib.h>int val100;int main (){ printf("i am father,pid:%d,ppid:%d,val:%d&#xff0c;&val:%p\n",getpid(),getppid(),val,&val);size_t…

Linux:点命令source

相关阅读 Linuxhttps://blog.csdn.net/weixin_45791458/category_12234591.html?spm1001.2014.3001.5482 source命令用于读取一个文件的内容并在当前Shell环境&#xff08;包括交互式Shell或是非交互式Shell&#xff09;执行里面的命令。它被称为点命令是因为命令名source也可…

IIS7/iis8/iis10安装II6兼容模块 以windows2022为例

因安全狗的提示 安全狗防护引|擎安装失败 可能原因是: IIS7及以上版本末安装1IS6兼容模块! .所以操作解决 如下. 在开始菜单中,找到服务器管理器.找到下图的IIS,右键添加角色和功能,找到web服务器的管理工具选项,iis6管理兼容性 打钩并安装. 如下图

自然拼读-组合音(上篇)

自然拼读-组合音 1、元音字母A的二声发音组合 组合音at 注意&#xff1a;“t”是气音&#xff0c;不要太重。 Eg&#xff1a;cat&#xff08;猫&#xff09; fat&#xff08;肥的&#xff09; bat&#xff08;蝙蝠&#xff09; 组合音ap 注意&#xff1a;“p”在尾巴发音…

MRC是谁?- 媒体评级委员会 Media Rating Council

在在线广告的世界里&#xff0c;有许多不同的技术和实践用于提供和衡量广告。对于广告商、出版商和营销人员来说&#xff0c;了解这些技术是如何工作的以及如何有效使用这些技术很重要。在这方面发挥关键作用的一个组织是媒体评级委员会&#xff08;MRC&#xff09;。 1. 了解…

蓝桥杯基础练习详细讲解二(具体代码、解题思路、Python)

试题 基础练习 回文数 提交此题 评测记录 资源限制 内存限制&#xff1a;512.0MB C/C时间限制&#xff1a;1.0s Java时间限制&#xff1a;3.0s Python时间限制&#xff1a;5.0s 问题描述 1221是一个非常特殊的数&#xff0c;它从左边读和从右边读是一样的&#x…

mysql基础3索引

存储引擎 存储引擎就是存储数据、建立索引、更新/查询数据等技术的实现方式 。存储引擎是基于表的&#xff0c;而不是 基于库的&#xff0c;所以存储引擎也可被称为表类型。 1). 建表时指定存储引擎 CREATE TABLE 表名(字段1 字段1类型 [ COMMENT 字段1注释 ] ,......字段n…

Docker进阶:Docker-compose 实现服务弹性伸缩

Docker进阶&#xff1a;Docker-compose 实现服务弹性伸缩 一、Docker Compose基础概念1.1 Docker Compose简介1.2 Docker Compose文件结构 二、弹性伸缩的原理和实现步骤2.1 弹性伸缩原理2.2 实现步骤 三、技术实践案例3.1 场景描述3.2 配置Docker Compose文件3.3 使用 docker-…

【项目管理后台】Vue3+Ts+Sass实战框架搭建二

Vue3TsSass搭建 git cz的配置mock 数据配置viteMockServe 建立mock/user.ts文件夹测试一下mock是否配置成功 axios二次封装解决env报错问题&#xff0c;ImportMeta”上不存在属性“env” 统一管理相关接口新建api/index.js 路由的配置建立router/index.ts将路由进行集中封装&am…

AIGC实战——Transformer模型

AIGC实战——Transformer模型 0. 前言1. T52. GPT-3 和 GPT-43. ChatGPT小结系列链接 0. 前言 我们在 GPT (Generative Pre-trained Transformer) 一节所构建的 GPT 模型是一个解码器 Transformer&#xff0c;它逐字符地生成文本字符串&#xff0c;并使用因果掩码只关注输入字…