【RabbitMQ】可靠性策略(幂等,消息持久化)

MQ可靠性策略

  • 发送者的可靠性问题
    • 生产者的重连
    • 生产者确认
  • MQ的可靠性
    • 数据持久化
    • Lazy Queue
  • 消费者的可靠性问题
    • 消费者确认机制
    • 消息失败处理
  • 业务幂等性
  • 简答问题

发送者的可靠性问题

生产者的重连

可能存在由于网络波动,出现的客户端连接MQ失败,我们可以通过配置文件配置解决

spring:rabbitmq:connection-timeout: 1s # 设置MQ的连接超时时间template:retry:enabled:  true #开启超时重试机制initial-interval: 1000ms #失败后的初始等待时间multiplier: 1 #失败后下次的等待时长倍数,下次等待时间= initial-interval * multipliermax-attempts: 3 #最大重试次数

生产者确认

RabbitMQ提供了Publisher Confirm和Publisher Return两种确认机制,开机确认机制后,在MQ成功收到消息后会返回确认消息给生产者,返回的结果有以下几种情况:

  1. 消息投递到了MO,但是路由失败。此时会通过PublisherReturn返回路由异常原因,然后返回ACK,告知投递成功
  2. 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
  3. 持久消息投递到了MQ,并且入队完成持久化,返回ACK,告知投递成功
  4. 其他情况都会返回NACK,告知投递失败

通过配置文件配置生产者的消息类型:

spring:rabbitmq:publisher-confirm-type: correlated #开启publisher confirm机制,并设置confirm类型publisher-returns: true # 开启publisher return机制

这里的publisher-confirm-type 有三种模式可选:
none:关闭confirm机制
simple: 同步阻塞等待MQ的回执消息
correlated:MQ异步调用方式返回回执消息

异步回调方式:

我们完成一个任务将消息交由消息队列中,就进行别的任务了,当消息队列返回异常问题,在过来进行对应的处理

我们需要调用ReturnCallback函数完成消息失败后的操作:
在使用之前需要配置ReturnCallback,每个RabbitTemplate只能配置一个ReturnCallback

@Configuration
public class CommonConfig implements ApplicationContextAware{@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException{//获取RabbitTemplateRabbitTemplate rabbittemplate =applicationContext.getBean(RabbitTemplate.class);//设置ReturnCallbackrabbitTemplate.setReturnCallback(message,replyCode,replyText,exchange,routingKey)->{//处理操作}}
}

通过ConfirmCallback来处理消息失败:
每一个消息指定一个ConfirmCallback

void test() throws InterruptedException{CorrelationData cd= new CorrelationData();cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.confirm>(){@Overridepublic void onFailure(Throwable ex){//Future 发生异常是的逻辑处理,基本不会触发}@Overridepublic void onSuccess(CorrelationData.confirm result){//Future接收到回执的处理逻辑,参数中的result就是回执内容if(result.isAck()){ //result.isack,boolean类型,true代表ack回执,false表示nack回执//处理逻辑}else{//异常处理}}});
}
rabbittemplate.convertAndSend("","",cd);

MQ的可靠性

在默认情况下,Rabbitmq会将接收到的数据保存在内存中以降低消息收发的延迟,这样会有问题:

  1. 一旦MQ宕机,内存的消息会丢失
  2. 内存空间有限,当消费者故障或处理过慢时,会导致消息积压,引发MQ阻塞(消息对队列将消息保存到磁盘,此时MQ阻塞)

数据持久化

RabbitMQ的数据持久化包括:

  1. 交换机持久化(Durable 永久的,Transient临时的)
  2. 队列持久化(Durable 永久的,Transient临时的)
  3. 消息持久化

消息的持久化:

void test(){Message message =MessageBuilder.withBody("hello".getBytes(StandardCharsets.UTF-8)).setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build();
}

会将消息在过程中持久化到磁盘不会导致MQ阻塞

消息的持久化性能不是很高,可以通过Lazy Queue进行消息的持久化

Lazy Queue

惰性队列
特征:

  1. 接收到消息后直接存入磁盘而非内存(内存只保留最近消息,默认2048条)
  2. 消费者要消费消息时才会从磁盘中读取并加载到内存
  3. 支持百万条数据的消息存储

在3.12 版本后,所有的队列都是Lazy Queue模式,无法更改
在这里插入图片描述
在java中要设置一个队列为惰性队列,只需要在声明队列时,指定x-queue-mode属性为lazy:

@Bean
public Queue lazyQueue(){return QueueBuilder.durable("lazy.queue").lazy() //开启lazy模式.build();
}

通过注解也可以实现

@RabbitListener(queuesToDeclare= @Queue(name="lazy.queue",durable="true",//持久化arguments =@Argument(name="x-queue-mode",value="lazy")))
public void listenLazyQueue(String msg){//消费处理
}

消费者的可靠性问题

消费者确认机制

为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制,当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:
ack:成功处理消息,RabbitMQ从队列中删除该信息
nack:消息处理失败,RabbitMQ需要再次投递信息
reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该信息
那么如何实现呢:
SprinaAMOP已经实现了消息确认功能。并允许我们通过配置文件选择ACK处理方式
有三种方式:
none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用
manual:手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活
auto:自动模式。SprinGAMOP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack.当业务出现异常时:
如果是业务异常,则会自动返回nack
如果是消息处理或校验异常,自动返回reject

spring:rabbitmq:listener:simple:prefetch: 1acknowledge-mode: none #none,关闭

消息失败处理

如果消费者返回nack,那就会重复进行,这样大大影响效率
我们可以利用Spring的retry机制,在消费者出现异常的时利用本地重试:

spring:rabbitmq:listener:simple:prefetch: 1retry:enabled: true  #开启消费者失败重试initial-interval: 1000ms #初始的失败等待时间multiplier: 1 #下次失败的等待时长的倍数max-attempts: 3 # 最大重试次数stateless: true # true无状态,false有状态,如果业务中包含事务,这里改为false

在开启重试模式之后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种实现:
RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息,默认这种方式
ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
在这里插入图片描述

将失败消息重新投递到error交换机中,可以绑定error消息队列,将来发送信息给开发人员等操作消息

将失败策略改为RepublishMessageRecoverer:

  1. 首先,定义接收失败消息的交换机,队列绑定
  2. 定义RepublishMessageRecoverer
@Bean
public MessageRecoverer test(RabbitTemplate rabbittemplate){return new RepublishMessageRecoverer(rabbitTemplate,"交换机名称","key值")
}

业务幂等性

幂等是一个数学概念,用函数表达式来描述是这样的:f(x)=f(f(x)),在程序开发中,则指同一个业务,执行一次或多次对业务状态的影响是一致的
幂等的使用场景:防止某一数据被重复进行修改
幂等业务:根据id的查询业务,根据id的删除业务等
非幂等:用户下单,扣减库存等

如何实现幂等:
方案一唯一消息id
给每一个消息都设置一个唯一id,利用id区分是否重复消息:

  1. 每一条消息都生成一个唯一id,与消息一起投递给消费者
  2. 消费者接收到消息后处理自己的业务,业务处理成功后将消息id保存到数据库
  3. 如果下次又收到相同的消息,去数据库查询判断是否存在,存在则为重复消息放弃处理
@Bean
public MessageConverter messageConverter(){//定义消息转换器Jackson2JsonMessageConverter jjmc= new jackson2JsonMessageConverter();//配置自动创建消息id,用于识别不同消息,也可以在业务中基于id判断是否是重复消息jjmc.setCreateMessageIds(true);return jjmc;
}

但是这样会造成额外的操作冗余,比如还需要写数据库等等
方案二:结合业务逻辑,基于业务本身做判断

简答问题

如何保证支付服务与交易服务之间的订单状态一致性:

  1. 首先,支付服务会在正在用户支付成功以后利用MQ消息通知交易服务,完成订单状态同步
  2. 其次,为了保证消息的可靠性,我们采取了生产者确认机制,消费者确认,消费者失败重试等策略,确保消息投递和处理的可靠性,同时可开启了MQ的持久化,避免因服务宕机导致消息丢失
  3. 最后,我们还会交易服务更新订单状态时作业业务幂等判断,避免因消息重复导致订单异常

如果交易服务处理失败,还有什么方案:
在交易服务设置定时任务,定期查询订单生产状态,这样即使MQ通知失败,还可以利用定时任务作为兜底方案,确保订单支付状态的最终一致性

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/319346.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

10G MAC层设计系列-(4)MAC TX模块

一、前言 MAC TX模块就是要将IP层传输过来的数据封装前导码、MAC地址、帧类型以及进行CRC校验&#xff0c;并与CRC值一块组成以太网帧。 二、模块设计 首先对输入的数据进行缓存&#xff0c;原因是在之后要进行封装MAC帧头&#xff0c;所以需要控制数据流的流动 FIFO_DATA_6…

neo4j 的插入速度为什么越来越慢,可能是使用了过多图谱查询操作

文章目录 背景描述分析解决代码参考neo4j 工具类Neo4jDriver知识图谱构建效果GuihuaNeo4jClass 背景描述 使用 tqdm 显示&#xff0c;处理的速度&#xff1b; 笔者使用 py2neo库&#xff0c;调用 neo4j 的API 完成节点插入&#xff1b; 有80万条数据需要插入到neo4j图数据中&am…

手机恢复出厂设置ip地址会变吗

当我们对手机进行恢复出厂设置时&#xff0c;很多人会担心手机的IP地址是否会发生变化。IP地址对于手机的网络连接至关重要&#xff0c;它决定了手机在网络中的身份和位置。那么&#xff0c;手机恢复出厂设置后&#xff0c;IP地址到底会不会发生变化呢&#xff1f;虎观代理小二…

华为鸿蒙系统(Huawei HarmonyOS)

华为鸿蒙系统&#xff08;华为技术有限公司开发的分布式操作系统&#xff09; 华为鸿蒙系统&#xff08;HUAWEI HarmonyOS&#xff09;&#xff0c;是华为公司在2019年8月9日于东莞举行的华为开发者大会&#xff08;HDC.2019&#xff09;上正式发布的分布式操作系统。 华为鸿蒙…

进程控制【Linux】

文章目录 进程终止进程等待 创建一批子进程 #include <stdio.h> #include <unistd.h> #include <stdlib.h> #define N 5void runChild() {int cnt 10;while (cnt ! 0){printf("i am a child : %d , ppid:%d\n", getpid(), getppid());sleep(1);c…

MySQL:飞腾2000+Centos7.6 aarch64 部署MySQL8.0.36

目录 1.硬件环境 2.MySQL选择 Bundle版本【全部文件】​编辑 3.下载并安装 4.安装完成后检查mysql 5.初始化MySQL 6.那就问了&#xff0c;都初始化了啥&#xff1f; 7.尝试启动MySQL 8.给mysql文件授权 9.再次尝试启动正常 10.mysql初始化目录出现了mysql.sock 11.找…

buuctf-misc-30.被劫持的神秘礼物1

30.被劫持的神秘礼物1 题目&#xff1a;http数据流追踪&#xff0c;MD5哈希一下账户名和密码 MD5在线加密/解密/破解—MD5在线 (sojson.com)

C语言 | Leetcode C语言题解之第61题旋转链表

题目&#xff1a; 题解&#xff1a; struct ListNode* rotateRight(struct ListNode* head, int k) {if (k 0 || head NULL || head->next NULL) {return head;}int n 1;struct ListNode* iter head;while (iter->next ! NULL) {iter iter->next;n;}int add n…

NASA数据集——NOAA 气溶胶和海洋科学考察数据(AEROSE)

Saharan Dust AERosols and Ocean Science Expeditions 简介 NOAA 气溶胶和海洋科学考察&#xff08;AEROSE&#xff09;是一种基于测量的综合方法&#xff0c;用于了解热带海洋上空气溶胶长程飘移的影响&#xff08;Morris 等人&#xff0c;2006 年&#xff1b;Nalli 等人&a…

GitHub显示无法在此仓库中合并不相关的历史记录

你好,我是Qiuner. 为记录自己编程学习过程和帮助别人少走弯路而写博客 这是我的 github gitee 如果本篇文章帮到了你 不妨点个赞吧~ 我会很高兴的 &#x1f604; (^ ~ ^) 想看更多 那就点个关注吧 我会尽力带来有趣的内容 GitHub显示无法在此仓库中合并不相关的历史记录 场景&…

C++初阶之模板初阶

一、泛型编程 如何实现一个通用的交换函数呢&#xff1f; void Swap(int& left, int& right) {int temp left;left right;right temp; } void Swap(double& left, double& right) {double temp left;left right;right temp; } void Swap(char& left,…

分类规则挖掘(一)

目录 一、分类问题概述&#xff08;一&#xff09;分类规则挖掘&#xff08;二&#xff09;分类规则评估&#xff08;三&#xff09;分类规则应用 二、k-最近邻分类法 一、分类问题概述 动物分类&#xff1a;设有动物学家陪小朋友林中散步&#xff0c;若有动物突然从小朋友身边…

详解SDRAM基本原理以及FPGA实现读写控制(一)

文章目录 一、SDRAM简介二、SDRAM存取结构以及原理2.1 BANK以及存储单元结构2.2 功能框图2.3 SDRAM速度等级以及容量计算 三、SDRAM操作命令3.1 禁止命令&#xff1a; 4b1xxx3.2 空操作命令&#xff1a;4b01113.3 激活命令&#xff1a;4b00113.4 读命令&#xff1a;4b01013.5 写…

llama_index微调BGE模型

微调模型是为了让模型在特殊领域表现良好,帮助其学习到专业术语等。 本文采用llama_index框架微调BGE模型,跑通整个流程,并学习模型微调的方法。 已开源:https://github.com/stay-leave/enhance_llm 一、环境准备 Linux环境,GPU L20 48G,Python3.8.10。 pip该库即可。…

新型直膨式光伏光热热泵/动力热管复合循环系统

太阳能光伏光热热泵&#xff08;即PVT热泵&#xff09;技术是建筑领域内实现碳中和的有效技术手段&#xff0c;该技术具有优越的热电冷联产能力。然而&#xff0c;现有的PVT热泵在良好的室外工况下能耗较高。为了解决这一问题&#xff0c;本文提出了一种新型的DX-PVT热泵/动力热…

【c++】模板编程解密:C++中的特化、实例化和分离编译

&#x1f525;个人主页&#xff1a;Quitecoder &#x1f525;专栏&#xff1a;c笔记仓 朋友们大家好&#xff0c;本篇文章我们来学习模版的进阶部分 目录 1.非类型模版参数按需实例化 2.模版的特化函数模版特化函数模版的特化类模版全特化偏特化 3.分离编译模版分离编译 1.非类…

ubuntu搭建kms服务器

1.下载kms开源包(如果提示找不到wget命令的话:apt install wget): wget https://github.com/Wind4/vlmcsd/releases/download/svn1111/binaries.tar.gz2.解压: tar -xzvf binaries.tar.gz接着cd 进入 Linux/intel/static/ 文件夹下: 3.选择对应的文件&#xff0c;这里我们选…

力扣每日一题104:二叉树的最大深度

题目 给定一个二叉树 root &#xff0c;返回其最大深度。 二叉树的 最大深度 是指从根节点到最远叶子节点的最长路径上的节点数。 示例 1&#xff1a; 输入&#xff1a;root [3,9,20,null,null,15,7] 输出&#xff1a;3示例 2&#xff1a; 输入&#xff1a;root [1,null,2…

OpenCV(二)—— 车牌定位

从本篇文章开始我们进入 OpenCV 的 Demo 实战。首先&#xff0c;我们会用接下来的三篇文章介绍车牌识别 Demo。 1、概述 识别图片中的车牌号码需要经过三步&#xff1a; 车牌定位&#xff1a;从整张图片中识别出牌照&#xff0c;主要操作包括对原图进行预处理、把车牌从整图…

飞书API(7):MySQL 入库通用版本

一、引入 在上一篇介绍了如何使用 pandas 处理飞书接口返回的数据&#xff0c;并将处理好的数据入库。最终的代码拓展性太差&#xff0c;本篇来探讨下如何使得上一篇的最终代码拓展性更好&#xff01;为什么上一篇的代码拓展性太差呢&#xff1f;我总结了几点&#xff1a; 列…