【RabbitMQ】一文详解消息可靠性

目录:

1.前言

2.生产者

3.数据持久化

4.消费者

5.死信队列

1.前言

RabbitMQ 是一款高性能、高可靠性的消息中间件,广泛应用于分布式系统中。它允许系统中的各个模块进行异步通信,提供了高度的灵活性和可伸缩性。然而,这种通信模式也带来了一些挑战,其中最重要的之一是确保消息的可靠性

影响消息可靠性的因素主要有以下几点:

  • 发送消息时连接RabbitMQ失败
  • 发送时丢失:
    • 生产者发送的消息未送达交换机;
    • 消息到达交换机后未到达队列;
  • MQ 宕机,队列中的消息会丢失;
  • 消费者接收到消息后未消费就宕机了。

2.生产者

2.1.生产者重连机制

生产者发送消息时,出现了网络故障,导致与MQ的连接中断。为了解决这个问题,RabbitMQ提供的消息发送时的重连机制。即:当RabbitTemplate与MQ连接超时后,多次重试。

在生产者yml文件添加配置开启重连机制

spring:rabbitmq:connection-timeout: 1s # 设置MQ的连接超时时间template:retry:enabled: true # 开启超时重试机制initial-interval: 1000ms # 失败后的初始等待时间multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multipliermax-attempts: 3 # 最大重试次数

当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。但是RabbitMQ提供的重试机制是阻塞式的重试。 如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,就需要合理配置等待时长和重试次数,或者使用异步线程来执行发送消息的代码

2.2.生产者确认机制

RabbitMQ的生产者确认机制(Publisher Confirm)是一种确保消息从生产者发送到MQ过程中不丢失的机制。当消息发送到 RabbitMQ 后,系统会返回一个结果给消息的发送者,表明消息的处理状态。这个结果有两种可能的值:

返回结果有两种方式:

  • publisher-confirm(发送者确认)
    • 消息成功投递到交换机,返回ACK。
    • 消息未投递到交换机,返回NACK。(可能是由于网络波动未能连接到RabbitMQ,可利用生产者重连机制解决)
  • publisher-return(发送者回执)
    • 消息投递到交换机了,但是没有路由到队列。返回ACK和路由失败原因。(这种问题一般是因为路由键设置错误,可以人为规避)

通过这种机制,生产者在发送消息后获取返回的回执结果,从而采取对应的策略,如消息重发或记录失败信息。

3.数据持久化

3.1.配置持久化

在默认情况下,RabbitMQ会将接收到的信息保存在内存中以降低消息收发的延迟。这样会导致两个问题

  1. RabbitMQ宕机,存在内存中的消息会丢失。
  2. 内存空间有限,当消费者故障或处理过慢时,会导致消息积压,引发MQ阻塞。

为了提升性能,默认情况下MQ的数据都是在内存存储的临时数据,重启后就会消失。RabbitMQ可以通过配置数据持久化,从而将消息保存在磁盘,包括:

  • 交换机持久化(确保RabbitMQ重启后交换机仍然存在)
  • 队列持久化(确保RabbitMQ重启后队列仍然存在)
  • 消息持久化(确保RabbitMQ重启后队列中的消息仍然存在)

由于Spring会在创建队列时默认将交换机和队列设置为持久化,发送消息时也默认指定消息为持久化消息,因此不需要额外配置。

// 将消息指定为持久化消息
Message message = MessageBuilder.withBody("hello".getBytes(standardcharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();
// 给队列发送消息
rabbitTemplate.convertAndSend("simple.queue", message);

3.2.惰性队列

从RabbitMQ的3.6.0版本开始,就增加了Lazy Queue的概念,也就是惰性队列

在3.12版本后,所有队列都是Lazy Queue模式,无法更改。

惰性队列的特点如下:

  • 接收到消息后直接存入磁盘而非内存(内存中只保留最近的消息,默认2048条)

  • 消费者要消费消息时才会从磁盘中读取并加载到内存

  • 支持数百万条的消息存储

对于低于3.12版本的情况,可以使用注解的arguments来指定

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "grade.queue", durable = "true"),exchange = @Exchange(name = "intel.topic", type = ExchangeTypes.TOPIC),key = "intel.grade",arguments = @Argument(name = "x-queue-mode", value = "lazy")))

3.3.为什么需要数据持久化?

数据持久化在 RabbitMQ 中有以下重要作用:

队列和交换机的持久化:

  • 防止重启后丢失:将队列和交换机设置为持久化,可以防止 RabbitMQ 服务器重启后丢失这些队列和交换机,确保它们的存在和绑定关系保持不变。

消息的持久化:

  1. 安全性
    • 防止数据丢失:消息持久化后,可以防止 RabbitMQ 服务器重启或宕机时数据丢失,方便数据恢复,保证消息的可靠性和耐久性。
  2. 性能
    • 内存管理:未持久化的临时消息默认存储在内存中。内存空间有限,大量消息涌入时会导致内存占满,系统需要进行 page out 操作将消息写入磁盘。频繁的 page out 操作会严重影响性能。
    • 预防内存溢出:通过持久化消息,可以缓解内存压力,防止因内存溢出导致的系统性能问题和崩溃。

4.消费者

4.1.消费者确认机制

为了确认消费者是否正确处理了消息,RabbitMQ提供了消费者确认机制。当消费者处理消息后,会返回回执信息给RabbitMQ。回执有三种值:

  • ack:消息处理成功,RabbitMQ从队列中删除消息。
  • nack:消息处理失败,RabbitMQ需要再次投递消息。
  • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除消息。

在SpringBoot项目中,我们可以通过配置文件选择回执信息的处理方式,一共有三种处理方式:

  • none:不处理。RabbitMQ 假定消费者获取消息后会一定会成功处理,因此消息投递后立即返回ack,将消息从队列中删除。

  • manual:手动模式。需要在业务代码结束后,调用SpringAMQP提供的API发送ackreject,存在代码侵入问题,但比较灵活。

  • auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑进行了环绕增强,返回结果如下:

    • 如果消费者正常处理消息,自动返回ack并删除队列的消息。

    • 如果消费者消息处理失败,自动返回nack并重新向消费者投递消息。

    • 如果消息校验异常,自动返回reject并删除队列中的消息。

注意: 手动模式返回回执消息时通常需要显式指定requeue参数,当requeue=true时,表明消息需要重新入队;当requeue=false时,RabbitMQ将从队列删除消息。

spring:rabbitmq:listener:simple:prefetch: 1acknowledge-mode: auto # none,关闭ack;manual,手动ack;auto,自动 ack

4.2.消息失败重试机制

当消费者出现异常后,消息会不断requeue(重新入队)到队列,再重新发送给消费者,然后再次异常,再次requeue无限循环,导致mq的消息处理飙升,带来不必要的压力。

可以通过设置yml文件开启失败重试机制,在消息异常时利用本地重试,而不是无限制的进行requeue操作。

spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 1000 # 初识的失败等待时长为1秒multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false 有状态。如果业务中包含事务,这里改为 false

4.3.消息失败处理策略

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有 MessageRecoverer 接口来处理,它包含三种不同的实现:

  • RejectAndDontRequeueRecoverer:重试次数耗尽后,直接reject,丢弃消息,这是默认采取的方式;
  • ImmediateRequeueMessageRecoverer:重试次数耗尽后,返回nack,消息重新入队;
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机。

5.死信队列

尽管通过以上设置可以确保消息在生产者、消息队列和消费者之间的传递过程中不会丢失,但在某些情况下,消费者仍可能无法成功处理消息(如消息重试次数耗尽后仍无法被消费)。这时候,我们需要一个机制来妥善处理这些无法被正常消费的消息。死信队列便是用于解决这一问题的兜底机制。

5.1.死信

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消息被拒绝: 当消费者明确拒绝一个消息并且设置不再重新入队(requeue=false)时,这个消息会被标记为死信。
  • 消息过期: 每个消息或队列可以设置一个TTL(Time-To-Live),即消息的存活时间。如果消息在队列中停留的时间超过了这个TTL,消息会被认为过期,并被转移到死信队列。
  • 队列达到最大长度: 如果队列设置了最大长度并且达到了这个限制,那么新进入的消息会被转移到死信队列中。

5.2.创建死信队列

5.2.1.创建死信交换机和死信队列

正常使用注解,创建交换机和队列即可

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "dead.queue", durable = "true",arguments = @Argument(name = "x-queue-mode", value = "lazy")),exchange = @Exchange(name = "dead.exchange", type = ExchangeTypes.TOPIC),key = "dead.key"
))
public void deadLetterQueue(String msg) {System.out.println("您的消息已经死亡:" + msg);
}
5.2.2.绑定死信交换机

如果队列通过dead-letter-exchange属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中。这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)

可以通过@Argument注解指定死信交互机和路由键,如下。

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "simple.queue", durable = "true",arguments = {@Argument(name = "x-queue-mode", value = "lazy"),@Argument(name = "x-dead-letter-exchange", value = "dead.exchange"),@Argument(name = "x-dead-letter-routing-key", value = "dead.key")}),exchange = @Exchange(name = "simple.topic",type = ExchangeTypes.TOPIC),key = "simple.key"))

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/378725.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

用adb指令把文件拷贝到Android模拟器

不解释太多,科学上网从youtube看了一个视频得来的 跳转到视频 首先必须要运行你要拷贝文件的目标Android模拟器,你关闭他的话,你是找不到这个设备的 管理员权限运行vs studio,在vs studio下打开Andriod的设备管理器 运行你要拷…

.net core appsettings.json 配置 http 无法访问

1、在appsettings.json中配置"urls": "http://0.0.0.0:8188" 2、但是网页无法打开 3、解决办法,在Program.cs增加下列语句 app.UseAntiforgery();

构建gitlab远端服务器(check->build->test->deploy)

系列文章目录 提示:这里可以添加系列文章的所有文章的目录,目录需要自己手动添加 TODO:写完再整理 文章目录 系列文章目录前言构建gitlab远端服务器一、步骤一:搭建gitlab的运行服务器【运维】1. 第一步:硬件服务器准备工作(1)选择合适的硬件和操作系统linux(2)安装必…

k8s集群 安装配置 Prometheus+grafana+alertmanager

k8s集群 安装配置 Prometheusgrafanaalertmanager k8s环境如下:机器规划: node-exporter组件安装和配置安装node-exporter通过node-exporter采集数据显示192.168.40.180主机cpu的使用情况显示192.168.40.180主机负载使用情况 Prometheus server安装和配置…

CentOS 7 安装MySQL 5.7.30

CentOS 7 安装MySQL卸载(离线安装) 安装配置MySQL之前先查询是否存在,如存在先卸载再安装 rpm -qa|grep -i mysql rpm -qa|grep -i mariadb rpm -e --nodeps mariadb-libs-5.5.68-1.el7.x86_64如下命令找到直接 rm -rf 删除(删除…

电脑系统重装数据被格式化,那些文件还有办法恢复吗?

在日常使用电脑的过程中,系统重装或格式化操作是常见的维护手段,尤其是在遇到系统崩溃、病毒感染或需要升级系统时。然而,这一操作往往伴随着数据丢失的风险,尤其是当C盘(系统盘)和D盘(或其他数…

数学建模(1)

论文:做流程图 论文查重不能高于30% 论文 分模块备战 摘要不能超过一页的四分之三 数学建模的六个步骤: 【写作】---学术语言 团队练题

docker搭建普罗米修斯监控gpu

ip8的服务器监控ip110和ip111的服务器 被监控的服务器110和111只需要安装node-export和nvidia-container-toolkit 下载镜像包 docker pull prom/node-exporter docker pull prom/prometheus docker pull grafana/grafana新建目录 mkdir /opt/prometheus cd /opt/prometheus/…

用了6年git,不知道cherry-pick是啥意思

背景 可能是测试开发角色原因,平时很少有代码冲突或多人协同的编码场景。今天有个协同项目,需要提交自己的代码到其它业务的代码库中,这个代码库是分支开发分支上线模式,同时会有多个同事提交代码,然后模块负责的同学…

【python】多种回归算法对比气温预测

目录 引言 决策树回归(Decision Tree Regression) 线性回归(Linear Regression) 随机森林回归(Random Forest Regression) 气温预测对比实例 数据集 预测值与实际值对比图 模型评价指标 代码实现 …

微信小程序 vant-weapp的 SwipeCell 滑动单元格 van-swipe-cell 滑动单元格不显示 和 样式问题 滑动后删除样式不显示

在微信小程序开发过程中 遇到个坑 此处引用 swipeCell 组件 刚开始是组件不显示 然后又遇到样式不生效 首先排除问题 是否在.json文件中引入了组件 {"usingComponents": {"van-swipe-cell": "vant/weapp/swipe-cell/index","van-cell-gro…

【JavaEE】synchronized原理详解

本文使用的是JDK1.8 目录 引言 Java对象在JVM的结构 对象头 Mark Word Monitor Owner EntryList WaitSet 加锁过程 锁消除 偏向锁 偏向锁使用 重偏向 撤销偏向 轻量级锁 重量级锁 自旋优化 引言 对于synchronized原理讲解之前,我们需要知道Java对象…

FATE Flow 源码解析 - 日志输出机制

背景介绍 在 之前的文章 中介绍了 FATE 的作业处理流程,在实际的使用过程中,为了查找执行中的异常,需要借助运行生成的日志,但是 FATE-Flow 包含的流程比较复杂,对应的日志也很多,而且分散在不同的文件中&…

转移C盘中的conda环境(包括.condarc文件修改,environment.txt文件修改,conda报错)

conda环境一般是默认安装到C盘的,若建立多个虚拟环境,时间长了,容易让本不富裕的C盘更加雪上加霜,下面给出将conda环境从C盘转移到D盘的方法。 目录 电脑软硬件转移方法查看当前conda目录转移操作第一步:.condarc文件修…

走进NoSql

一、引入 1.1什么是NoSql NoSQL(Not Only SQL)是一组非关系型数据库(或称为非SQL数据库)的统称,它们提供了与传统的关系型数据库不同的数据存储和检索方式。NoSQL数据库通常用于处理大量的、分布式的、非结构化或半结…

美式键盘 QWERTY 布局的来历

注:机翻,未校对。 The QWERTY Keyboard Is Tech’s Biggest Unsolved Mystery QWERTY 键盘是科技界最大的未解之谜 It’s on your computer keyboard and your smartphone screen: QWERTY, the first six letters of the top row of the standard keybo…

数据湖表格式 Hudi/Iceberg/DeltaLake/Paimon TPCDS 性能对比(Spark 引擎)

当前,业界流行的集中数据湖表格式 Hudi/Iceberg/DeltaLake,和最近出现并且在国内比较火的 Paimon。我们现在看到的很多是针对流处理场景的读写性能测试,那么本篇文章我们将回归到大数据最基础的场景,对海量数据的批处理查询。本文…

dp or 数学问题

看一下数据量&#xff0c;只有一千&#xff0c;说明这个不是数学问题 #include<bits/stdc.h> using namespace std;#define int long long const int mo 100000007; int n, s, a, b; const int N 1005;// 2 -3 // 1 3 5 2 -1 // 1 -2 -5 -3 -1 int dp[N][N]; int fun…

泛微Ecology8明细表对主表赋值

文章目录 [toc]1.需求及效果1.1 需求1.2 效果2.思路与实现3.结语 1.需求及效果 1.1 需求 在明细表中的项目经理&#xff0c;可以将值赋值给主表中的项目经理来作为审批人员 1.2 效果 在申请人保存或者提交后将明细表中的人名赋值给主表中对应的值2.思路与实现 在通过js测…

生成树(STP)协议

一、生成树的技术背景 1、交换机单线路上链,存在单点故障,上行线路及设备都不具备冗余性,一旦链路或上行设备发生故障,网络将面临断网。 总结:以下网络不够健壮,不具备冗余性。 2、因此引入如下网络拓扑结构: 上述冗余拓扑能够解决单点故障问题,但同时冗拓扑也带来了…