RabbitMQ消费者的可靠性

目录

一、消费者确认

二、失败重试机制

2.1、失败处理策略

三、业务幂等性

3.1、唯一消息ID

 3.2、业务判断

3.3、兜底方案


一、消费者确认

RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。即:当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:

  • ack:成功处理消息,RabbitMQ从队列中删除该消息
  • nack:消息处理失败,RabbitMQ需要再次投递消息
  • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

一般reject方式用的较少,除非是消息格式有问题,那就是开发问题了。因此大多数情况下我们需要将消息处理的代码通过try catch机制捕获,消息处理成功时返回ack,处理失败时返回nack.

由于消息回执的处理代码比较统一,因此SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式,有三种模式:

  • none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用
  • manual:手动模式。需要自己在业务代码中调用api,发送ackreject,存在业务入侵,但更灵活
  • auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack. 当业务出现异常时,根据异常判断返回不同结果:
    • 如果是业务异常,会自动返回nack
    • 如果是消息处理或校验异常,自动返回reject;

 通过下面的配置可以修改SpringAMQP的ACK处理方式:

spring:rabbitmq:host: 192.168.200.129 # 你的虚拟机IPport: 5672 # 端口virtual-host: / # 虚拟主机username: admin # 用户名password: 123 # 密码listener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息acknowledge-mode: none

当使用none模式时

生产者发送一条消息

 消费者接受消息时抛异常

 

测试可以发现:当消息处理发生异常时,消息依然被RabbitMQ删除了。

把确认机制修改为auto

spring:rabbitmq:host: 192.168.200.129 # 你的虚拟机IPport: 5672 # 端口virtual-host: / # 虚拟主机username: admin # 用户名password: 123 # 密码listener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息acknowledge-mode: auto

 再次发送消息

 

在异常位置打断点,再次发送消息,程序卡在断点时,可以发现此时消息状态为unacked(未确定状态):

当我们把配置改为auto时,消息处理失败后,会回到RabbitMQ,并重新投递到消费者。

二、失败重试机制

当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理成功为止。 极端情况就是消费者一直无法执行成功,那么消息requeue就会无限循环,导致mq的消息处理飙升,带来不必要的压力。

为了应对上述情况Spring又提供了消费者失败重试机制:在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。

修改consumer服务的application.yml文件,添加内容

spring:rabbitmq:host: 192.168.200.129 # 你的虚拟机IPport: 5672 # 端口virtual-host: / # 虚拟主机username: admin # 用户名password: 123 # 密码listener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息acknowledge-mode: auto #消息确认retry:enabled: true # 开启消费者失败重试initial-interval: 1000ms # 初识的失败等待时长为1秒multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

重启consumer服务,重复之前的测试。可以发现:

  • 消费者在失败后消息没有重新回到MQ无限重新投递,而是在本地重试了3次

结论:

  • 开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试
  • 重试达到最大次数后,Spring会返回reject,消息会被丢弃
2.1、失败处理策略

在之前的测试中,本地测试达到最大重试次数后,消息会被丢弃。这在某些对于消息可靠性要求较高的业务场景下,显然不太合适了

因此Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery接口来定义的,它有3个不同实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

在消费者里创建一个异常消息配置类

@Configuration
@Slf4j
//当配置文件中spring.rabbitmq.listener.simple.retry.enabled 属性为ture时配置类才生效
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfig {//消息处理失败交换机@Beanpublic DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");}//消息处理失败队列@Beanpublic Queue errorQueue(){return new Queue("error.queue", true);}//绑定关系@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");}//定义一个RepublishMessageRecoverer,关联队列和交换机@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){log.error("加载RepublishMessageRecoverer");return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}

 当接收消息出现异常时,会创建error.queue队列

 可以查看到异常信息

三、业务幂等性

何为幂等性? 幂等是一个数学概念,用函数表达式来描述是这样的:f(x) = f(f(x)),例如求绝对值函数。 在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。例如:

  • 根据id删除数据
  • 查询数据
  • 新增数据

但数据的更新往往不是幂等的,如果重复执行可能造成不一样的后果。比如:

  • 取消订单,恢复库存的业务。如果多次恢复就会出现库存重复增加的情况
  • 退款业务。重复退款对商家而言会有经济损失。

所以,我们要尽可能避免业务被重复执行。 然而在实际业务场景中,由于意外经常会出现业务被重复执行的情况,例如:

  • 页面卡顿时频繁刷新导致表单重复提交
  • 服务间调用的重试
  • MQ消息的重复投递

我们在用户支付成功后会发送MQ消息到交易服务,修改订单状态为已支付,就可能出现消息重复投递的情况。如果消费者不做判断,很有可能导致消息被消费多次,出现业务故障。 举例:

  1. 假如用户刚刚支付完成,并且投递消息到交易服务,交易服务更改订单为已支付状态。
  2. 由于某种原因,例如网络故障导致生产者没有得到确认,隔了一段时间后重新投递给交易服务。
  3. 但是,在新投递的消息被消费之前,用户选择了退款,将订单状态改为了已退款状态。
  4. 退款完成后,新投递的消息才被消费,那么订单状态会被再次改为已支付。业务异常。

因此,我们必须想办法保证消息处理的幂等性。这里给出两种方案:

  • 唯一消息ID
  • 业务状态判断
3.1、唯一消息ID
  1. 每一条消息都生成一个唯一的id,与消息一起投递给消费者。
  2. 消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库
  3. 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。

SpringAMQP的MessageConverter自带了MessageID的功能,我们只要开启这个功能即可。 以Jackson的消息转换器为例 

在生产者和消费者的启动类里加一个配置

@Bean
public MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jjmc.setCreateMessageIds(true);return jjmc;
}

 测试生产者发送消息会产生一个id

 3.2、业务判断

业务判断就是基于业务本身的逻辑或状态来判断是否是重复的请求或消息,不同的业务场景判断的思路也不一样。 例如我们当前案例中,处理消息的业务逻辑是把订单状态从未支付修改为已支付。因此我们就可以在执行业务时判断订单状态是否是未支付,如果不是则证明订单已经被处理过,无需重复处理。

相比较而言,消息ID的方案需要改造原有的数据库,所以我更推荐使用业务判断的方案。

以支付修改订单的业务为例,我们需要修改OrderServiceImpl中的markOrderPaySuccess方法:

 @Overridepublic void markOrderPaySuccess(Long orderId) {// 1.查询订单Order old = getById(orderId);// 2.判断订单状态if (old == null || old.getStatus() != 1) {// 订单不存在或者订单状态不是1,放弃处理return;}// 3.尝试更新订单Order order = new Order();order.setId(orderId);order.setStatus(2);order.setPayTime(LocalDateTime.now());updateById(order);}

上述代码逻辑上符合了幂等判断的需求,但是由于判断和更新是两步动作,因此在极小概率下可能存在线程安全问题。

@Override
public void markOrderPaySuccess(Long orderId) {// UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1lambdaUpdate().set(Order::getStatus, 2).set(Order::getPayTime, LocalDateTime.now()).eq(Order::getId, orderId).eq(Order::getStatus, 1).update();
}

注意看,上述代码等同于这样的SQL语句:

UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1

我们在where条件中除了判断id以外,还加上了status必须为1的条件。如果条件不符(说明订单已支付),则SQL匹配不到数据,根本不会执行。

3.3、兜底方案

我们可以在交易服务设置定时任务,定期查询订单支付状态。这样即便MQ通知失败,还可以利用定时任务作为兜底方案,确保订单支付状态的最终一致性。
 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/174737.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

synchronized 同步锁的思考

经过前面的分析,我们大概对同步锁有了一些基本的认识,同步锁的本质就是实现多线程的互斥,保证同一时刻只有一个线程能够访问加了同步锁的代码,使得线程安全性得到保证。下面我们思考一下,为了达到这个目的,…

Linux 块设备驱动实验

前面我们都是在学习字符设备驱动,本章我们来学习一下块设备驱动框架,块设备驱动是 Linux 三大驱动类型之一。块设备驱动要远比字符设备驱动复杂得多,不同类型的存储设备又 对应不同的驱动子系统,本章我们重点学习一下块设备相关驱…

图像分类任务ViT与CNN谁更胜一筹?DeepMind用实验证明

精华置顶 墙裂推荐!小白如何1个月系统学习CV核心知识:链接 点击CV计算机视觉,关注更多CV干货 今天跟大家分享DeepMind发表的一篇技术报告,通过实验得出,CNN与ViT的架构之间虽然存在差异,但同等计算资源的预…

人工智能AI 全栈体系(九)

第一章 神经网络是如何实现的 如何用神经网络处理不等长文本的方法? 八、循环神经网络(RNN: Recurrent Neural Network) 处理不等长文本的神经网络 – 循环神经网络 RNN。 1. 从句子理解说起 上次讲了用词向量表示词,一句话也…

北邮22级信通院数电:Verilog-FPGA(7)第七周实验(2):BCD七段显示译码器(关注我的uu们加群咯~)

北邮22信通一枚~ 跟随课程进度更新北邮信通院数字系统设计的笔记、代码和文章 持续关注作者 迎接数电实验学习~ 获取更多文章,请访问专栏: 北邮22级信通院数电实验_青山如墨雨如画的博客-CSDN博客 关注作者的uu们可以进群啦~ 目录 一.verilog代码…

Kubernetes Label Selector

Author:rab 目录 前言一、Labels1.1 定义1.2 案例1.2.1 节点标签1.2.2 对象标签 二、Selector2.1 Node Selector2.2 Service Selector2.3 Deployment Selector2.4 StatefulSet Selector2.5 DaemonSet Selector2.6 HorizontalPodAutoscaler Selector2.7 NetworkPolic…

【C】柔性数组

柔性数组 也许你从来没有听说过柔性数组(flexible array)这个概念,但是它确实是存在的。 C99 中,结构中的最后一个元素允许是未知大小的数组,这就叫做『柔性数组』成员。 例如: 柔性数组的特点 结构中的柔性数组成员前…

ZKP7.1 Polynomial Commitments Based on Error-correcting Codes (Background)

ZKP学习笔记 ZK-Learning MOOC课程笔记 Lecture 7: Polynomial Commitments Based on Error-correcting Codes (Yupeng Zhang) Recall: common paradigm for efficient SNARK A polynomial commitment scheme A polynomial interactive oracle proof (IOP) SNARK for gene…

CAN总线通信协议

Reference video: 趋近于完美的通讯 CAN总线!4分钟看懂! CAN通信精华整理,汽车工程师必备技能,一个视频带你轻松掌握! 写在前面:CAN通信就三个要点 - 波特率的配置 - 过滤寄存器的配置与理解(…

数组与链表算法-单向链表算法

目录 数组与链表算法-单向链表算法 C代码 单向链表插入节点的算法 C代码 单向链表删除节点的算法 C代码 对单向链表进行反转的算法 C代码 单向链表串接的算法 C代码 数组与链表算法-单向链表算法 在C中,若以动态分配产生链表节点的方式,则可以…

“第五十五天”

定点数: 原码的乘法: 乘法的符号位是单独处理的(通过对被乘数和乘数的符号位进行异或实现),数值位去绝对值进行运算。这里的乘法实际上是通过多次加法实现的。 这里被乘数是放在x寄存器,乘数放在MQ寄存器…

【音视频|wav】wav音频文件格式详解

😁博客主页😁:🚀https://blog.csdn.net/wkd_007🚀 🤑博客内容🤑:🍭嵌入式开发、Linux、C语言、C、数据结构、音视频🍭 🤣本文内容🤣&a…

洛谷 B2009 计算 (a+b)/c 的值 C++代码

目录 题目描述 AC Code 切记 题目描述 题目网址&#xff1a;计算 (ab)/c 的值 - 洛谷 AC Code #include<bits/stdc.h> using namespace std; int main() {int a,b,c;cin>>a>>b>>c;cout<<(ab)/c<<endl;return 0; } 切记 不要复制题…

Netty复习:(2)IdleStateHandler的用法

一、handler定义&#xff1a; package handler;import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter;public class MyChatServerHandler3 extends ChannelInboundHandlerAdapter {Overridepublic void userEventTriggered(…

Pytorch指定数据加载器使用子进程

torch.utils.data.DataLoader(train_dataset, batch_sizebatch_size, shuffleTrue,num_workers4, pin_memoryTrue) num_workers 参数是 DataLoader 类的一个参数&#xff0c;它指定了数据加载器使用的子进程数量。通过增加 num_workers 的数量&#xff0c;可以并行地读取和预处…

分布式:一文吃透分布式锁,Redis/Zookeeper/MySQL实现

目录 一、项目准备spring项目数据库 二、传统锁演示超卖现象使用JVM锁解决超卖解决方案JVM失效场景 使用一个SQL解决超卖使用mysql悲观锁解决超卖使用mysql乐观锁解决超卖四种锁比较Redis乐观锁集成Redis超卖现象redis乐观锁解决超卖 三、分布式锁概述四、Redis分布式锁实现方案…

Linux 文件系统简介

文章目录 一、磁盘简介1.1 简介1.2 机械硬盘与固态硬盘1.2.1 机械磁盘&#xff08;HDD&#xff09;1.2.2 固态磁盘&#xff08;SSD&#xff09;1.2.3 I/O操作 二、文件系统简介2.1. 简介2.2 文件系统特点2.3 Linux文件系统 三、文件数据存储方式3.1 连续存储3.2 链接表存储3.3 …

前端知识与基础应用#2

标签的分类 关于标签我们可以分为 &#xff1a; 单标签&#xff1a;img, br hr 双标签&#xff1a;a&#xff0c;h,div 按照属性可分为&#xff1a; 块儿标签&#xff08;自己独自占一行&#xff09;&#xff1a;h1-h6, p,div 行内&#xff08;内联&#xff09;标签&#xff08…

One-to-N N-to-One: Two Advanced Backdoor Attacks Against Deep Learning Models

One-to-N & N-to-One: Two Advanced Backdoor Attacks Against Deep Learning Models----《一对N和N对一&#xff1a;针对深度学习模型的两种高级后门攻击》 1对N&#xff1a; 通过控制同一后门的不同强度触发多个后门 N对1&#xff1a; 只有当所有N个后门都满足时才会触发…

3.5每日一题(求齐次方程组的特解)

1、判断类型选择方法&#xff1a;看出为齐次方程&#xff08;次幂都一样&#xff09; 2、 化为变量可分离&#xff1b;按变量可分离的方法求出通解&#xff08;此题等式两边同时除以 x &#xff09; 3、把x1&#xff0c;y0带入通解&#xff0c;定常数C&#xff0c;求出特解 …