如何保证 RabbitMQ 的消息可靠性?

项目开发中经常会使用消息队列来完成异步处理、应用解耦、流量控制等功能。虽然消息队列的出现解决了一些场景下的问题,但是同时也引出了一些问题,其中使用消息队列时如何保证消息的可靠性就是一个常见的问题。如果在项目中遇到需要保证消息一定被消费的场景时,如何保证消息不丢失,如何保证消息的可靠性?

先放一张 RabbitMQ 是如何消息传递的图:
在这里插入图片描述
生产者Producer 将消息发送到指定的 交换机Exchange,交换机根据路由规则路由到绑定的 队列Queue 中,最后和消费者建立连接后,将消息推送给 消费者Consumer

那么消息会在哪些环节丢失呢,列出可能出现消息丢失的场景有:

生产者将消息发送到 RabbitMQ Server 异常: 可能因为网络问题造成 RabbitMQ 服务端无法收到消息,造成生产者发送消息丢失场景。

RabbitMQ Server 中消息在交换机中无法路由到指定队列: 可能由于代码层面或配置层面错误导致消息路由到指定队列失败,造成生产者发送消息丢失场景。

RabbitMQ Server 中存储的消息丢失:可能因为 RabbitMQ Server 宕机导致消息未完全持久化或队列丢失导致消息丢失等持久化问题,造成 RabbitMQ Server 存储的消息丢失场景。

消费者消费消息异常: 可能在消费者接收到消息后,还没来得及消费消息,消费者宕机或故障等问题,造成消费者无法消费消息导致消息丢失的场景。

以上就是 RabbitMQ 可能出现消息丢失的场景,接下来将依次讲解如何避免这些消息丢失的场景问题。

1. 保证生产者发送消息到 RabbitMQ Server

为了避免因为网络故障或闪断问题导致消息无法正常发送到 RabbitMQ Server 的情况,RabbitMQ 提供了两种方案让生产者可以感知到消息是否正确无误的发送到 RabbitMQ Server中,这两种方案分别是 事务机制发送方确认机制。下面分别介绍一下这两种机制如何实现。

事务机制

先说配置和使用:

1.配置类中配置事务管理器

/*** 消息队列配置类** @author 单程车票*/
@Configuration
public class RabbitMQConfig {/*** 配置事务管理器*/@Beanpublic RabbitTransactionManager transactionManager(ConnectionFactory connectionFactory) {return new RabbitTransactionManager(connectionFactory);}
}

2.通过添加事务注解 + 开启事务实现事务机制

/*** 消息业务实现类** @author 单程车票*/
@Service
public class RabbitMQServiceImpl {@Autowiredprivate RabbitTemplate rabbitTemplate;@Transactional // 事务注解public void sendMessage() {// 开启事务rabbitTemplate.setChannelTransacted(true);// 发送消息rabbitTemplate.convertAndSend(RabbitMQConfig.Direct_Exchange, routingKey, message);}
}

通过上面的配置即可实现事务机制,执行流程为:在生产者发送消息之前,开启事务,而后发送消息,如果消息发送至 RabbitMQ Server 失败后,进行事务回滚,重新发送。如果 RabbitMQ Server 接收到消息,则提交事务。

可以发现事务机制其实是同步操作,存在阻塞生产者的情况直到 RabbitMQ Server 应答,这样其实会很大程度上降低发送消息的性能,所以一般不会使用事务机制来保证生产者的消息可靠性,而是使用发送方确认机制。

发送方确认机制

先说配置和使用:

配置文件

spring:rabbitmq:publisher-confirm-type: correlated  # 开启发送方确认机制

配置属性有三种分别为:

这里一般使用 correlated 开启发送方确认机制即可,至于 simple 的 waitForConfirms() 方法调用是指串行确认方法,即生产者发送消息后,调用该方法等待 RabbitMQ Server 确认,如果返回 false 或超时未返回则进行消息重传。由于串行性能较差,这里一般都是用异步

confirm 模式。

none:表示禁用发送方确认机制

correlated:表示开启发送方确认机制

simple:表示开启发送方确认机制,并支持 waitForConfirms() 和 waitForConfirmsOrDie() 的调用。

通过调用 setConfirmCallback() 实现异步 confirm 模式感知消息发送结果

/*** 消息业务实现类** @author 单程车票*/
@Service
public class RabbitMQServiceImpl {@Autowiredprivate RabbitTemplate rabbitTemplate;@Overridepublic void sendMessage() {// 发送消息rabbitTemplate.convertAndSend(RabbitMQConfig.Direct_Exchange, routingKey, message);// 设置消息确认回调方法rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/*** MQ确认回调方法* @param correlationData 消息的唯一标识* @param ack 消息是否成功收到* @param cause 失败原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {// 记录日志log.info("ConfirmCallback...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");if (!ack) {// 出错处理...}}});}
}

生产者发送消息后通过调用 setConfirmCallback() 可以将信道设置为 confirm 模式,所有消息会被指派一个消息唯一标识,当消息被发送到 RabbitMQ Server 后,Server 确认消息后生产者会回调设置的方法,从而实现生产者可以感知到消息是否正确无误的投递,从而实现发送方确认机制。并且该模式是异步的,发送消息的吞吐量会得到很大提升。

上面就是发送放确认机制的配置和使用,使用这种机制可以保证生产者的消息可靠性投递,并且性能较好。

2. 保证消息能从交换机路由到指定队列
在确保生产者能将消息投递到交换机的前提下,RabbitMQ 同样提供了消息投递失败的策略配置来确保消息的可靠性,接下来通过配置来介绍一下消息投递失败的策略。

先说配置:

spring:rabbitmq:publisher-confirm-type: correlated  # 开启发送方确认机制publisher-returns: true   # 开启消息返回template:mandatory: true     # 消息投递失败返回客户端

mandatory 分为 true 失败后返回客户端 和 false 失败后自动删除两种策略。显然设置为 false 无法保证消息的可靠性。

到这里的配置是可以保证生产者发送消息的可靠性投递。

通过调用 setReturnCallback() 方法设置路由失败后的回调方法:


/*** 消息业务实现类** @author 单程车票*/
@Service
public class RabbitMQServiceImpl {@Autowiredprivate RabbitTemplate rabbitTemplate;@Overridepublic void sendMessage() {// 发送消息rabbitTemplate.convertAndSend(RabbitMQConfig.Direct_Exchange, routingKey, message);// 设置消息确认回调方法rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/*** MQ确认回调方法* @param correlationData 消息的唯一标识* @param ack 消息是否成功收到* @param cause 失败原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {// 记录日志log.info("ConfirmCallback...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");if (!ack) {// 出错处理...}}});// 设置路由失败回调方法rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {/*** MQ没有将消息投递给指定的队列回调方法* @param message 投递失败的消息详细信息* @param replyCode 回复的状态码* @param replyText 回复的文本内容* @param exchange 消息发给哪个交换机* @param routingKey 消息用哪个路邮键*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {// 记录日志log.info("Fail Message["+message+"]==>replyCode["+replyCode+"]" +"==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");// 出错处理...}});}
}

通过调用 setReturnCallback() 方法即可实现当交换机路由到指定队列失败后回调方法,拿到被退回的消息信息,进行相应的处理如记录日志或重传等等。

3. 保证消息在 RabbitMQ Server 中的持久化
对于消息的持久化,只需要在发送消息时将消息持久化,并且在创建交换机和队列时也保证持久化即可。

配置如下:

/*** 消息队列*/
@Bean
public Queue queue() {// 四个参数:name(队列名)、durable(持久化)、 exclusive(独占)、autoDelete(自动删除)return new Queue(MESSAGE_QUEUE, true);
}/*** 直接交换机*/
@Bean
public DirectExchange exchange() {// 四个参数:name(交换机名)、durable(持久化)、autoDelete(自动删除)、arguments(额外参数)return new DirectExchange(Direct_Exchange, true, false);
}

在创建交换机和队列时通过构造方法将持久化的参数都设置为 true 即可实现交换机和队列的持久化。

@Override
public void sendMessage() {// 构造消息(将消息持久化)Message message = MessageBuilder.withBody("单程车票".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();// 向MQ发送消息(消息内容都为消息表记录的id)rabbitTemplate.convertAndSend(RabbitMQConfig.Direct_Exchange, routingKey, message);
}

在发送消息前通过调用 MessageBuilder 的 setDeliveryMode(MessageDeliveryMode.PERSISTENT) 在构造消息时设置消息持久化(MessageDeliveryMode.PERSISTENT)即可实现对消息的持久化。

通过确保消息、交换机、队列的持久化操作可以保证消息的在 RabbitMQ Server 中不丢失,从而保证可靠性,其实除了持久化之外还需要保证 RabbitMQ 的高可用性,否则 MQ 都宕机或磁盘受损都无法确保消息的可靠性,关于高可用性这里就不作过多说明,有兴趣的可以去了解一下。

4. 保证消费者消费的消息不丢失
在保证发送方和 RabbitMQ Server 的消息可靠性的前提下,只需要保证消费者在消费消息时异常消息不丢失即可保证消息的可靠性。

RabbitMQ 提供了 消费者应答机制 来使 RabbitMQ 能够感知到消费者是否消费成功消息,默认情况下,消费者应答机制是自动应答的,也就是RabbitMQ 将消息推送给消费者,便会从队列删除该消息,如果消费者在消费过程失败时,消息就存在丢失的情况。所以需要将消费者应答机制设置为手动应答,只有消费者确认消费成功后才会删除消息,从而避免消息的丢失。

下面来看看如何配置消费者手动应答:

spring:rabbitmq:publisher-confirm-type: correlated  # 开启发送方确认机制publisher-returns: true   # 开启消息返回template:mandatory: true     # 消息投递失败返回客户端listener:simple:acknowledge-mode: manual  # 开启手动确认消费机制

通过 listener.simple.acknowledge-mode = manual 即可将消费者应答机制设置为手动应答。

之后只需要在消费消息时,通过调用 channel.basicAck() 与 channel.basicNack() 来根据业务的执行成功选择是手动确认消费还是手动丢弃消息。

/*** 监听消费队列的消息*/
@RabbitListener(queues = RabbitMQConfig.MESSAGE_QUEUE)
public void onMessage(Message message, Channel channel) {// 获取消息索引long index = message.getMessageProperties().getDeliveryTag();// 解析消息byte[] body = message.getBody();...try {// 业务处理...// 业务执行成功则手动确认channel.basicAck(index, false);}catch (Exception e) {// 记录日志log.info("出现异常:{}", e.getMessage());try {// 手动丢弃信息channel.basicNack(index, false, false);} catch (IOException ex) {log.info("丢弃消息异常");}}
}

这里说明一下 basicAck() 与 basicNack() 的参数说明:

void basicAck(long deliveryTag, boolean multiple) 方法(会抛异常):deliveryTag:该消息的indexmultiple:是否批量处理(true 表示将一次性ack所有小于deliveryTag的消息)void basicNack(long deliveryTag, boolean multiple, boolean requeue) 方法(会抛异常):deliveryTag:该消息的indexmultiple:是否批量处理(true 表示将一次性ack所有小于deliveryTag的消息)requeue:被拒绝的是否重新入队列(true 表示添加在队列的末端;false 表示丢弃)

通过设置手动确认消费者应答机制即可保证消费者在消费信息时的消息可靠性。

Spring Boot 提供的消息重试机制

除了消费者应答机制外,Spring Boot也提供了一种重试机制,只需要通过配置即可实现消息重试从而确保消息的可靠性,这里简单介绍一下:

spring:rabbitmq:listener:simple:acknowledge-mode: auto  # 开启自动确认消费机制retry:enabled: true # 开启消费者失败重试initial-interval: 5000ms # 初始失败等待时长为5秒multiplier: 1  # 失败的等待时长倍数(下次等待时长 = multiplier * 上次等待时间)max-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态(如果业务中包含事务,这里改为false

通过配置在消费者的方法上如果执行失败或执行异常只需要抛出异常(一定要出现异常才会触发重试,注意:不要捕获异常) 即可实现消息重试,这样也可以保证消息的可靠性。

上面就是我在项目中关于如何保证 RabbitMQ 的消息可靠性的配置和实现方案了。下面想聊聊我在实际使用消息队列实现消息可靠性时遇到的问题。

消费者消费消息需要保证幂等性

由于实现了消息可靠性导致消息重发或消息重试造成消费者可能会存在消息被重复消费的情况,这种情况就需要保证消息不被重复消费,也就是消息保证幂等性。

实现幂等性的方法有很多:借助数据库的乐观锁或悲观锁、借助 redis 的分布式锁、借助 redis 实现 token 机制等等都可以很好的保证消息的幂等性。

使用消息队列很难做到 100% 的消息可靠性

我在项目实际开发中使用 RabbitMQ 实现消息可靠性,实践后的感受是消息队列很难能做到 100% 的消息可靠性,上面的实现方案中 RabbitMQ 提供的机制做到的是尽可能地减小消息丢失的几率。

大多数情况下消息丢失都是因为代码出现错误,那么这样无论进行多少次重发都是无法解决问题的,这样只会增加 CPU 的开销,所以我认为更好的解决办法是通过记录日志的方式等待后续回溯时更好的发现问题并解决问题。对于一些不是很需要保证百分百可靠性的场景,都可以通过记录日志的方式来保证消息可靠性即可。

我在项目中采用的是消息落库的方式,先将消息落库,而后生产者将消息发送给 MQ,使用数据库记录消息的消费情况,对于重试多次仍然无法消费成功的消息,后续通过定时任务调度的方式对这些无法消费成功的消息进行补偿。我认为这样可以尽可能地保证消息的可靠性。但是同样这样也带来了问题就是消息落库需要数据库磁盘IO的开销,增大数据库压力同时降低了性能。

总之,在实现消息的可靠性时,应该根据项目的需求来考虑如何处理。对于消息要求可靠性低的只需要在出错时记录日志方便后续回溯解决出错问题即可,对于消息可靠性要求高的则可以采用消息落库 + 定时任务的方式尽可能保证百分百的可靠性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/152883.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Leetcode---114双周赛

题目列表 2869. 收集元素的最少操作次数 2870. 使数组为空的最少操作次数 2871. 将数组分割成最多数目的子数组 2872. 可以被 K 整除连通块的最大数目 一、收集元素的最小操作次数 直接模拟,倒序遍历即可,代码如下 class Solution { public:int mi…

docker搭建jenkins

1.拉取镜像 docker pull jenkinsci/blueocean 2.启动容器 docker run -d -u root -p 8666:8080 -p 50000:50000 -v /var/jenkins_home:/var/jenkins_home -v /etc/localtime:/etc/localtime --name MyJenkins jenkinsci/blueocean 3.访问ip:port,就能访问了 4.docker logs 容器…

Go 团队发布组织 / 构建 Go module 的官方指南

导读Go 团队发布了一份官方指南,帮助开发者更规范地组织 / 构建 Go module。 刚接触 Go 的开发者常见问题之一是,“就文件和文件夹的组织布局而言,如何组织我的 Go 项目?”。这份指南就是提供建议来帮助回答这个问题。其中包括针对…

基于Softmax回归的多分类任务

Logistic回归可以有效地解决二分类问题,但在分类任务中,还有一类多分类问题,即类别数C大于2 的分类问题。Softmax回归就是Logistic回归在多分类问题上的推广。 使用Softmax回归模型对一个简单的数据集进行多分类实验。 首先给大家看一下需要的…

多路彩灯控制器led流水灯VHDL速度可调仿真图视频、源代码

名称:多路彩灯控制器led流水灯VHDL速度可调 软件:Quartus 语言:VHDL 代码功能: 使用VHDL设计彩灯控制器,共24个led灯,分为5种不同的花样,可以通过按键切换花样的变化速度。 代码下载&#…

VBox启动失败、Genymotion启动失败、Vagrant迁移

VBox启动失败、Genymotion启动失败、Vagrant迁移 2023.10.9 最新版本vbox7.0.10、Genymotion3.5.0 Vbox启动失败 1、查看日志 Error -610 in supR3HardenedMainInitRuntime! (enmWhat4) Failed to locate ‘vcruntime140.dll’ 日志信息查看方法->找到虚拟机所在位置->…

如何开始学习量子机器学习

一、关于量子计算 这是我关于量子机器学习(QML)的第二篇文章,这是第一篇,关于为什么你应该开始学习QML。 开始研究量子机器学习很困难,因为我不知道我需要了解多少量子力学和计算知识。我在101年上大学时上了量子力学2…

抽象轻松的java-mybatis简单入门

第一步:用IDEA新建一个java包 第二步:在IDEA中添加数据库(ps:自己百度) 点击数据库 第二步,新建数据库 选择你使用的数据库 用户与密码根据自己的设置进行配置 为了更方便的查看数据库,可以像图…

【kubernetes】带你了解k8s中PV和PVC的由来

文章目录 1 为什么需要卷(Volume)2 卷的挂载2.1 k8s集群中可以直接使用2.2 需要额外的存储组件2.3 公有云 2 PV(Persistent Volume)3 SC(Storage Class) 和 PVC(Persistent Volume Claim)4 总结 1 为什么需要卷(Volume) Pod是由一个或者多个容器组成的,在启动Pod中…

Linux免密登录

目标: 192.168.233.31 ton-pc-003 192.168.233.32 ton-pc-004 在ton-pc-004(以下简称004)免密登录ton-pc-003(以下简称003) 具体流程和说明: 1、在004(客户机)中生成秘钥对 ssh…

【计算机视觉|人脸建模】学习从4D扫描中获取的面部形状和表情的模型

本系列博文为深度学习/计算机视觉论文笔记,转载请注明出处 标题:Learning a model of facial shape and expression from 4D scans 链接:Learning a model of facial shape and expression from 4D scans | ACM Transactions on Graphics Pe…

解决方案:AI赋能工业生产3.0,从工业“制造”到“智造”

视频监控技术是一种既成熟又广泛应用于工业制造领域的先进技术。它可以通过安装各种摄像头和传感器来监测整个生产流程,包括原材料的采购、加工、装配和物流等环节,从而实现对生产过程的实时监控和管理,以及对异常事件的及时预警和响应。 在…

【SV中的多线程fork...join/join_any/join_none】

SV中fork_join/fork_join_any/fork_join_none 1 一目了然1.1 fork...join1.2 fork...join_any1.3 fork...join_none 2 总结 SV中fork_join和fork_join_any和fork_join_none; Note: fork_join在Verilog中也有,只有其他的两个是SV中独有的; 1 一目了然 1.…

FreeRTOS自我救赎3之USB虚拟串口

任何项目的功能都从需求出发,在这里我用的是斥侯蜂的一块STM32F407ZGT6 在开发一个项目的过程中,免不了串口调试,而这块板子板载的mircousb不是直接连的引脚而是一个OTGUSB

SpringCloud Alibaba - Seata 部署 TC 服务,并集成微服务

目录 一、Seata 架构 1.1、Seata 架构重要角色 1.2、部署 TC 服务 1.2.1、前言 1.2.2、下载 seata-server 包,解压 1.2.3、修改配置 1.2.4、在 nacos 中添加配置 1.2.5、创建数据库表 1.2.6、启动 TC 服务 1.3、微服务集成 Seata 1.3.1、引入依赖 1.3.2、…

SpringBoot 实现数据脱敏

SpringBoot 实现数据脱敏 前言Hutool 实现数据脱敏引入依赖脱敏工具类代码实现 使用注解的方式定义枚举自定义序列化类定义注解测试 前言 数据脱敏是指对某些敏感信息通过脱敏规则进行数据的变形,实现敏感隐私数据的可靠保护。 数据脱敏常用规则有替换、重排、加密…

C++对象模型(8)-- 数据语义学:this指针

1、this指针的认识 this 是 C 中的一个关键字,也是一个 const 指针 ,它指向当前对象,通过它可以访问当前对象的所有成员。所谓当前对象,是指正在使用的对象。 假如有这么一个类: class Base { public:int b_i;int b…

[NewStarCTF 2023 公开赛道] week1 Crypto

brainfuck 题目描述&#xff1a; [>>>>>>>>>>>>>>>><<<<<<<<<<<<<<<<-]>>>>>>>.>----.<-----.>-----.>-----.<<<-.>>..…

论文解析——异构多芯粒神经网络加速器

作者 朱郭益, 马胜&#xff0c;张春元, 王波&#xff08;国防科技大学计算机学院&#xff09; 摘要 随着神经网络技术的快速发展, 出于安全性等方面考虑, 大量边缘计算设备被应用于智能计算领域。首先&#xff0c;设计了可应用于边缘计算的异构多芯粒神经网络加速器其基本结构…

TCP相关面试题

TCP相关面试题 题目1 介绍一下TCP三次握手的过程 介绍TCP三次握手应该从3个方面进行回答&#xff0c;分别是数据包名称&#xff0c;客户端与服务端的状态变化&#xff0c;数据包的序号变化。而不能只是简单回答发送的数据包名称。 TCP三次握手的过程如下&#xff1a; 从数据…