RabbitMq--消息可靠性

12.消息可靠性

1.消息丢失的情况

  1. 生产者向消息代理传递消息的过程中,消息丢失了
  2. 消息代理( RabbitMQ )把消息弄丢了
  3. 消费者把消息弄丢了

image-20250310192901333

那怎么保证消息的可靠性呢,我们可以从消息丢失的情况入手——从生产者、消息代理( RabbitMQ )、消费者三个方面来保证消息的可靠性

2.生产者的可靠性

1.生产者重连

由于网络问题,可能会出现客户端连接 RabbitMQ 失败的情况,我们可以通过配置开启连接 RabbitMQ 失败后的重连机制

application.yml(将 host 更改为部署 RabbitMQ 的服务器的地址)

spring:rabbitmq:host: 127.0.0.1port: 5672virtual-host: /blogusername: CaiXuKunpassword: T1rhFXMGXIOYCoyiconnection-timeout: 1s # 连接超时时间template:retry:enabled: true # 开启连接超时重试机制initial-interval: 1000ms # 连接失败后的初始等待时间multiplier: 1 # 连接失败后的等待时长倍数,下次等待时长 = (initial-interval) * multipliermax-attempts: 3 # 最大重试次数

注意事项: 当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率,但 SpringAMOP 提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,线程会被阻塞,影响业务性能
如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长(比如 200 ms)和重试次数,也可以考虑使用异步线程来执行发送消息的代码

2.生产者确认

RabbitMQ 提供了 Publisher ConfirmPublisher Return 两种确认机制。开启确机制认后,如果 MQ 成功收到消息后,会返回确认消息给生产者,返回的结果有以下几种情况

  • 消息投递到了 MQ,但是路由失败(业务原因),此时会通过 PublisherReturn 机制返回路由异常的原因,然后返回 ACK,告知生产者消息投递成功

  • 临时消息投递到了 MQ,并且入队成功,返回 ACK,告知生产者消息投递成功

  • 持久消息投递到了MQ,并且入队完成持久化,返回 ACK,告知生产者消息投递成功

  • 其它情况都会返回 NACK,告知生产者消息投递失败

image-20250310194409045

生产者确认机制有关的配置信息( application.yml 文件)

spring:rabbitmq:publisher-returns: truepublisher-confirm-type: correlated

publisher-confirm-type 有三种模式:

  1. none:关闭 confirm 机制
  2. simple:以同步阻塞等待的方式返回 MQ 的回执消息
  3. correlated:以异步回调方式的方式返回 MQ 的回执消息

每个 RabbitTemplate 只能配置一个 ReturnCallback

新增一个名为 RabbitMQConfig 的配置类,并让该类实现 ApplicationContextAware 接口

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 配置回调rabbitTemplate.setReturnsCallback((returnedMessage) -> {System.out.println("收到消息的return callback, " +"exchange = " + returnedMessage.getExchange() + ", " +"routingKey = " + returnedMessage.getRoutingKey() + ", " +"replyCode = " + returnedMessage.getReplyCode() + ", " +"replyText = " + returnedMessage.getReplyText() + ", " +"message = " + returnedMessage.getMessage());});}}

添加一个测试类,测试 ReturnCallback 的效果

@Test
void testConfirmCallback() throws InterruptedException {CorrelationData correlationData = new CorrelationData();correlationData.getFuture().whenCompleteAsync((confirm, throwable) -> {if (confirm.isAck()) {// 消息发送成功System.out.println("消息发送成功,收到ack");} else {// 消息发送失败System.err.println("消息发送失败,收到nack,原因是" + confirm.getReason());}if (throwable != null) {// 消息回调失败System.err.println("消息回调失败");}});rabbitTemplate.convertAndSend("blog.direct", "red", "Hello, confirm callback", correlationData);// 测试方法执行结束后程序就结束了,所以这里需要阻塞线程,否则程序看不到回调结果Thread.sleep(2000);
}

如何看待和处理生产者的确认信息

  • 生产者确认需要额外的网络开销和系统资源开销,尽量不要使用
  • 如果一定要使用,无需开启 Publisher-Return 机制,因为路由失败一般是业务出了问题
  • 对于返回 nack 的消息,可以尝试重新投递,如果依然失败,则记录异常消息

3.消息代理(RabbitMQ)的可靠性

在默认情况下,RabbitMQ 会将接收到的信息保存在内存中以降低消息收发的延迟,这样会导致两个问题:

  • 一旦 RabbitMQ 宕机,内存中的消息会丢失
  • 内存空间是有限的,当消费者处理过慢或者消费者出现故障或时,会导致消息积压,引发 MQ 阻塞( Paged Out 现象

**MQ 阻塞:**当队列的空间被消息占满了之后,RabbitMQ 会先把老旧的信息存到磁盘,为新消息腾出空间,在这个过程中,整个 MQ 是被阻塞的,也就是说,在 MQ 完成这一系列工作之前,无法处理已有的消息和接收新的消息

1.数据持久化

RabbitMQ 实现数据持久化包括 3 个方面:

  1. 交换机持久化
  2. 队列持久化
  3. 消息持久化

注意事项:利用 SpringAMQP 创建的交换机、队列、消息,默认都是持久化的
在 RabbitMQ 控制台创建的交换机、队列默认是持久化的,而消息默认是存在内存中( 3.12 版本之前默认存放在内存,3.12 版本及之后默认先存放在磁盘,消费者处理消息时才会将消息取出来放到内存中)

2. LazyQueue( 3.12 版本后所有队列都是 Lazy Queue 模式)

从 RabbitMQ 的 3.6.0 版本开始,增加了 Lazy Queue 的概念,也就是惰性队列,惰性队列的特征如下:

  1. 接收到消息后直接存入磁盘而非内存(内存中只保留最近的消息,默认 2048条 )
  2. 消费者要处理消息时才会从磁盘中读取并加载到内存
  3. 支持数百万条的消息存储,在 3.12 版本后,所有队列都是 Lazy Queue 模式,无法更改

开启持久化和生产者确认时,RabbitMQ 只有在消息持久化完成后才会给生产者返回 ACK 回执

  • 在 RabbitMQ 控制台中,要创建一个惰性队列,只需要在声明队列时,指定 x-queue-mode 属性为 lazy 即可

    image-20250310200323362

  • 在 Java 代码中,要创建一个惰性队列,只需要在声明队列时,指定 x-queue-mode 属性为 lazy 即可

    //注解式创建
    @RabbitListener(queuesToDeclare = @org.springframework.amqp.rabbit.annotation.Queue(name = "lazy.queue2",durable = "true",arguments = @Argument(name = "x-queue-mode",value = "lazy")
    ))
    public void listenLazeQueue(String message) {System.out.println("消费者收到了 laze.queue2的消息: " + message);
    }
    //编程式创建@Bean
    public org.springframework.amqp.core.Queue lazeQueue() {return QueueBuilder.durable("lazy.queue1").lazy().build();
    }
    

4.消费者的可靠性

1. 消费者确认机制

为了确认消费者是否成功处理消息,RabbitMQ 提供了消费者确认机制(Consumer Acknowledgement)。处理消息后,消费者应该向 RabbitMQ 发送一个回执,告知 RabbitMQ 消息的处理状态,回执有三种可选值:

  1. ack:成功处理消息,RabbitMQ 从队列中删除该消息
  2. nack:消息处理失败,RabbitMQ 需要再次投递消息
  3. reject:消息处理失败并拒绝该消息,RabbitMQ 从队列中删除该消息

SpringAMQP 已经实现了消息确认功能,并允许我们通过配置文件选择 ACK 的处理方式,有三种方式:

  • none:不处理,即消息投递给消费者后立刻 ack,消息会会立刻从 MQ 中删除,非常不安全,不建议使用

  • manual:手动模式。需要自己在业务代码中调用 api,发送 ack 或 reject ,存在业务入侵,但更灵活

  • auto:自动模式,SpringAMQP 利用 AOP 对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回 ack,当业务出现异常时,会根据异常的类型返回不同结果:

    • 如果是业务异常,会自动返回 nack
    • 如果是消息处理或校验异常,自动返回 reject

开启消息确认机制,需要在 application.yml 文件中编写相关的配置

spring:rabbitmq:listener:simple:prefetch: 1acknowledge-mode: none
2.失败重试机制

当消费者出现异常后,消息会不断重新入队,重新发送给消费者,然后再次发生异常,再次 requeue(重新入队),陷入 无限循环,给 RabbitMQ 带来不必要的压力

我们可以利用 Spring 提供的 retry 机制,在消费者出现异常时利用本地重试,而不是无限制地重新入队

在 application.yml 配置文件中开启失败重试机制

spring:rabbitmq:listener:simple:prefetch: 1acknowledge-mode: autoretry:enabled: true # 开启消息消费失败重试机制initial-interval: 1000ms # 消息消费失败后的初始等待时间multiplier: 1 # 消息消费失败后的等待时长倍数,下次等待时长 = (initial-interval) * multipliermax-attempts: 3 # 最大重试次数stateless: true # true表示无状态,false表示有状态,如果业务中包含事务,需要设置为false

在达到最大重试次数后,消息会丢失!!!怎样解决?

3. 失败消息的处理策略

开启重试模式后,如果重试次数耗尽后消息依然处理失败,则需要由 MessageRecoverer 接口来处理, MessageRecoverer 有三个实现类:

  • RejectAndDontRequeueRecoverer:重试次数耗尽后,直接 reject,丢弃消息,默认就是这种方式

  • ImmediateRequeueMessageRecoverer:重试次数耗尽后,返回 nack,消息重新入队

  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

image-20250310203318152

我们来演示一下使用 RepublishMessageRecoverer 类的情况

  • 第一步:定义一个名为 blog.error 的交换机、一个名为 error.queue 的队列,并将队列和交换机进行绑定

    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.retry.MessageRecoverer;
    import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
    import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;@Configuration
    @ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple.retry", name = "enabled", havingValue = "true")
    public class ErrorConfiguration {@Beanpublic DirectExchange errorExchange() {return new DirectExchange("error.direct", true, false);}@Beanpublic Queue errorQueue() {return new Queue("error.queue", true, false, false);}@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorExchange) {return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");}}
  • 第二步:将失败处理策略改为 RepublishMessageRecoverer (开起了消费者重试机制才会生效)

    @Bean
    public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
  • 在控制台中可以看到,消息的重试次数耗尽后,消息被放入了 error.queue 队列

    image-20250310203613628

  • 在 RabbitMQ 的控制塔也可以看到, error.direct 交换机 和 error.queue 队列成功创建,消息也成功放入了 error.queue 队列

    image-20250310203633853

总结:消费者如何保证消息一定被消费?

  • 开启消费者确认机制为 auto ,由 Spring 帮我们确认,消息处理成功后返回 ack,异常时返回 nack
  • 开启消费者失败重试机制,并设置 MessageRecoverer ,多次重试失败后将消息投递到异常交换机,交由人工处理
4. 业务幂等性

在程序开发中,幂等是指同一个业务,执行一次或多次对业务状态的影响是一致的

image-20250310203936288

那么有什么方法能够确保业务的幂等性呢

方案一:为每条消息设置一个唯一的 id

给每个消息都设置一个唯一的 id,利用 id 区分是否是重复消息:

  1. 为每条消息都生成一个唯一的 id,与消息一起投递给消费者
  2. 消费者接收到消息后处理自己的业务,业务处理成功后将消息 id 保存到数据库
  3. 如果消费者下次又收到相同消息,先去数据库查询该消息对应的 id 是否存在,如果存在则为重复消息,放弃处理

可以在指定 MessageConverter 的具体类型时,同时为 MessageConverter 设置自动创建一个 messageId

@Bean
public MessageConverter jacksonMessageConvertor() {Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;
}

发送消息后,在 RabbitMQ 的控制台可以看到,消息的 properties 属性附带了 messageId 信息

image-20250310204104246

但这种方式对业务有一定的侵入性

方案二:结合业务判断 – 结合实例
兜底的解决方案

我们可以在交易服务设置定时任务,定期查询订单支付状态,这样即便 MQ 通知失败,还可以利用定时任务作为兜底方案,确保订单支付状态的最终一致性

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/31487.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2019年蓝桥杯第十届CC++大学B组真题及代码

目录 1A:组队(填空5分_手算) 2B:年号字符(填空5分_进制) 3C:数列求值(填空10分_枚举) 4D:数的分解(填空10分) 5E:迷宫…

UI-APP---基于HBuilder X的微信小程序

目录 概要 Uni-app 和 HBuilderX 的关系 技术名词解释 ui-app: 概念: 核心特点: 技术细节 基本步骤: 开发流程 项目功能分析: ①首页包括公共头部、导航栏、轮播图、视频列表区域。 ②视频详情页包括公共头部区域、视频详情区域、…

多宠识别:基于计算机视觉的智能宠物管理系统架构解析

一、行业痛点与技术方案演进 在多宠家庭场景中,传统方案面临三大技术瓶颈: 1. 生物特征混淆:同品种/毛色宠物识别准确率低于65% 2. 动态场景适应:进食/奔跑状态下的误检率达30% 3. 数据孤岛问题:离线设备无法实现持续…

论文阅读分享——UMDF(AAAI-24)

概述 题目:A Unified Self-Distillation Framework for Multimodal Sentiment Analysis with Uncertain Missing Modalities 发表:The Thirty-Eighth AAAI Conference on Artificial Intelligence (AAAI-24) 年份:2024 Github:暂…

Unity组件大全之 Layout 组件 |(27)Content Size Fitter 内容大小适配器

📂 Unity 开发资源汇总 | 插件 | 模型 | 源码 💓 欢迎访问 Unity 打怪升级大本营 在 Unity 的 UI 系统中,Content Size Fitter 是一个重要的布局组件,能够根据内容动态调整 UI 元素的尺寸。它通过自动检测内容的大小来改变元素的宽…

<3D建模>.max文件转换为.fbx文件

今天在使用unity3D开发软件时,下载了.max文件。大家知道.max文件是3DMax生成的文件,然而我的电脑中也没有3DMax,而unity中的场景文件通常要用到.fbx文件,这可怎么办呢?难道要去下载一个3DMax软件吗?其实并不…

【js逆向】iwencai国内某金融网站实战

地址:aHR0cHM6Ly93d3cuaXdlbmNhaS5jb20vdW5pZmllZHdhcC9ob21lL2luZGV4 在搜索框中随便输入关键词 查看请求标头,请求头中有一个特殊的 Hexin-V,它是加密过的;响应数据包中全是明文。搞清楚Hexin-V的值是怎么生成的,这个值和cooki…

国内免费使用 Claude 3.7 Sonnt,GPT-4o,DeepSeek-R1联网极速响应

地址我放在下面了!打开即用,也支持在ChatBox、Cursor、Dify、VSCode这些平台调用API令牌 注册登录进来之后,系统会自动赠送免费额度,可以使用Open、Claude、DeepSeek这些系统支持的全部模型。 我将经常使用到的模型型号都列在下…

面向高质量视频生成的扩散模型方法-算法、架构与实现【附核心代码】

目录 算法原理 架构 代码示例 算法原理 正向扩散过程:从真实的视频数据开始,逐步向其中添加噪声,随着时间步 t 的增加,噪声添加得越来越多,最终将原始视频数据变成纯噪声。数学上,t 时刻的视频数据与 t…

使用免费IP数据库离线查询IP归属地

一、准备工作 1.下载免费IP数据库 首先,访问 MaxMind官网(https://www.maxmind.com/en/home)如果你还没有MaxMind账号,可以通过此链接地址(https://www.maxmind.com/en/geolite2/signup)进行账号注册&…

Python----数据可视化(Seaborn二:绘图一)

常见方法 barplot方法 单独绘制条形图 catplot方法 可以条形图、散点图、盒图、小提亲图、等 countplot方法 统计数量 一、柱状图 seaborn.barplot(dataNone, xNone, yNone, hueNone, colorNone, paletteNone) 函数描述data用于绘图的数据集。x用于绘制长格式数据的输入。…

C/C++中使用CopyFile、CopyFileEx原理、用法、区别及分别在哪些场景使用

文章目录 1. CopyFile原理函数原型返回值用法示例适用场景 2. CopyFileEx原理函数原型返回值用法示例适用场景 3. 核心区别4. 选择建议5. 常见问题6.区别 在Windows系统编程中,CopyFile和CopyFileEx是用于文件复制的两个API函数。它们的核心区别在于功能扩展性和控制…

SpringBoot 如何调用 WebService 接口

前言 调用WebService接口的方式有很多&#xff0c;今天记录一下&#xff0c;使用 Spring Web Services 调用 SOAP WebService接口 一.导入依赖 <!-- Spring Boot Web依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId…

tomcat单机多实例部署

一、部署方法 多实例可以运行多个不同的应用&#xff0c;也可以运行相同的应用&#xff0c;类似于虚拟主机&#xff0c;但是他可以做负载均衡。 方式一&#xff1a; 把tomcat的主目录挨个复制&#xff0c;然后把每台主机的端口给改掉就行了。 优点是最简单最直接&#xff0c;…

计算机视觉算法实战——老虎个体识别(主页有源码)

✨个人主页欢迎您的访问 ✨期待您的三连 ✨ ✨个人主页欢迎您的访问 ✨期待您的三连 ✨ ✨个人主页欢迎您的访问 ✨期待您的三连✨ ​ ​​​ 1. 领域介绍 老虎个体识别是计算机视觉中的一个重要应用领域&#xff0c;旨在通过分析老虎的独特条纹图案&#xff0c;自动识别和区…

【数据结构】初识集合框架及背后的数据结构(简单了解)

目录 前言 如何学好数据结构 1. 什么是集合框架 2. 集合框架的重要性 3. 背后所涉及的数据结构以及算法 3.1 什么是数据结构 3.2 容器背后对应的数据结构 3.3 相关java知识 3.4 什么是算法 3.5 基本关系说明&#xff08;重要&#xff0c;简单了解&#xff09; 前言 …

Hadoop命令行语句

一、前言 1、启动虚拟机 2、连接工具 3、启动Hadoop并查询确保进程为51 start-all.shjps练习完请一定 stop-all.sh 关掉hadoop进程 关掉虚拟机 再关机电脑 二、Hadoop命令行主命令 1、进入Hadoop安装目录的bin路径 cd /training/hadoop-3.3.0/bin/2、查看低下的执行文…

TypeScript系列07-类型声明文件

在现代前端开发中&#xff0c;TypeScript已成为提升代码质量和开发体验的利器。对于React和React Native项目&#xff0c;合理利用类型声明文件不仅能提供更好的智能提示和类型检查&#xff0c;还能显著减少运行时错误。本文将深入探讨类型声明文件的编写与使用。 1. 声明文件…

迎接AI智能体新时代,推动新质生产力加快发展

随着人工智能技术的飞速发展&#xff0c;AI智能体正逐步成为推动新质生产力加快发展的重要力量。2025年&#xff0c;被业界普遍认为是AI智能体的爆发元年&#xff0c;这一技术范式的深刻变革&#xff0c;正重塑着人机关系&#xff0c;为各行各业带来前所未有的机遇与挑战。本文…

python: DDD using postgeSQL and SQL Server

postgreSQL 注意&#xff1a; # psycopg 2 驱动的连接字符串 #engine create_engine(postgresql://post:geovindulocalhost:5433/TechnologyGame) #Session sessionmaker(bindengine)# 使用 psycopg3 驱动的连接字符串 #engine create_engine(postgresqlpsycopg://user:g…