RabbitMQ08_保证消息可靠性

保证消息可靠性

        • 一、生产者可靠性
          • 1、生产者重连机制(防止网络波动)
          • 2、生产者确认机制
            • Publisher Return 确认机制
            • Publisher Confirm 确认机制
        • 二、MQ 可靠性
          • 1、数据持久化
            • 交换机、队列持久化
            • 消息持久化
          • 2、Lazy Queue 惰性队列
        • 三、消费者可靠性
          • 1、消费者确认机制
          • 2、失败重试机制
          • 3、业务幂等性

一、生产者可靠性
1、生产者重连机制(防止网络波动)
spring:rabbitmq:connection-timeout: 1s #设置MQ的连接超时时间template:retry:enabled: true #开启超时重试机制(默认是false)initial-interval: 1000ms #失败后的初始等待时间multiplier: 1 #失败后下次的等待时长倍数,下次等待时长= initial-interval * multipliermax-attempts: 3 #最大重试次数
2、生产者确认机制
Publisher Return 确认机制

消息投递到MQ但是MQ路由失败,MQ返回路由失败原因

spring:rabbitmq:publisher-returns: true # 开启publisher return机制
@Slf4j
@AllArgsConstructor
@Configuration
public class MqConfig {private final RabbitTemplate rabbitTemplate;@PostConstructpublic void init(){rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {log.error("触发return callback,");log.debug("exchange: {}", returned.getExchange());log.debug("routingKey: {}", returned.getRoutingKey());log.debug("message: {}", returned.getMessage());log.debug("replyCode: {}", returned.getReplyCode());log.debug("replyText: {}", returned.getReplyText());}});}
}
Publisher Confirm 确认机制

临时消息投递到了MQ且入队成功,返回ACK
持久消息投递到了MQ且入队完成持久化,返回ACK
消息投递异常,返回NACK

spring:rabbitmq:publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型

publisher-confirm-type 的三种类型

  • none 关闭confirm机制
  • simple 同步阻塞等待MQ回执消息
  • correlated MQ异步回调返回回执消息
@Test
void testPublisherConfirm() throws InterruptedException {CorrelationData cd = new CorrelationData();cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {        @Overridepublic void onFailure(Throwable ex) {// Future发生异常时的处理逻辑,基本不会触发log.error("handle message ack fail", ex);}@Overridepublic void onSuccess(CorrelationData.Confirm result) {// Future接收到回执的处理逻辑,参数中的result就是回执内容if(result.isAck()){ log.debug("发送消息成功,收到 ack!");} else {log.error("发送消息失败,收到 nack, reason : {}", result.getReason());//TODO 重试发送}}});// 3.发送消息rabbitTemplate.convertAndSend("hmall.direct", "red1", "hello", cd);
}
二、MQ 可靠性
1、数据持久化
交换机、队列持久化

默认创建时就是持久化的(Durability = Durable)

在这里插入图片描述

消息持久化

RabbitTemplate 的 convertAndSend() 方法发送的消息默认就是持久化的(delivery mode = 2)

如果非要发送一个非持久化的消息,需要在调用 rabbitTemplate.convertAndSend() 方法时,显式地设置消息的 MessageProperties,并将 deliveryMode 设置为 1 (非持久化)

2、Lazy Queue 惰性队列

Lazy Queue是一种以惰性模式运行的队列,它尽可能地将消息存储在磁盘上,而不是内存中。只有当消费者需要消费消息时,这些消息才会被加载到内存中,效率比传统队列高。

3.12版本后,所有队列都是Lazy Queue模式,无法更改。

三、消费者可靠性
1、消费者确认机制

消费者回执消息类型

  • ack 消费者处理成功,RabbitMQ 将从队列中删除消息
  • nack 消费者处理失败,RabbitMQ 需再次投递消息
  • reject 消费者拒绝处理,RabbitMQ 将从队列中删除消息

SpringAMQP 消息监听器的三种确认模式

  • none 不处理。即消费者收到消息后立刻返回ack,消息会丢失,非常不安全。
  • manual 手动模式。业务代码手动调用api发送 ack 或 reject,存在业务入侵,但更灵活。
  • auto 自动模式(默认)。通过 AOP 对消息处理方法做环绕增强,正常返回ack,出现业务异常返回nack,出现消息处理或校验异常返回reject
spring:rabbitmq:listener:simple:acknowledge-mode: auto
2、失败重试机制

消费者处理消息出现异常时利用本地重试,而不是无限的requeue到mq,让mq重新投递给消费者

spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试(默认是关闭的)initial-interval: 1000ms # 初始的失败等待时长为1秒multiplier: 1 # 下次失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现。

失败消息处理策略

  • RejectAndDontRequeueRecoverer:默认实现,重试耗尽后直接reject,丢弃消息。
  • ImmediateRequeueMessageRecoverer:重试耗尽后返回nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后将失败消息投递到指定的交换机

以第三种失败消息处理策略为例,配置方式如下:

@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate) {return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
3、业务幂等性

由于存在各种确认和重试机制,消费者有重复消费消息的可能性,因此要保证业务的幂等性。
保证业务幂等性的方式如下:

  • 方案一:发送消息时生成唯一消息ID,投递给消费者,消费者接收到消息,业务处理成功后将消息ID保存到数据库,下次根据消息ID去数据库查询判断是否已处理,如果已处理则放弃处理。
@Bean
public MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jjmc.setCreateMessageIds(true);return jjmc;
}
  • 方案二:结合业务逻辑,基于业务本身做判断。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/428996.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Springboot 文件上传下载相关问题

文章目录 关于Springboot 文件上传下载问题解决方案注意事项文件上传文件下载文件删除文件在线打开在写练习的时候&#xff0c;发现了一些小小的问题&#xff0c;已经在 上述代码中体现。① 代码路径碰到中文的时候&#xff0c;会有乱码&#xff0c;需要转换&#xff08;内容中…

华润电力最新校招社招润择认知能力测评:逻辑推理数字计算语言理解高分攻略

​ 尊敬的求职者们&#xff0c; 在您准备加入华润电力这个大家庭之前&#xff0c;了解其招聘测评的详细流程和要求是至关重要的。以下是我们为您整理的测评系统核心内容&#xff0c;希望对您的求职之旅有所帮助。 测评系统概览 华润电力的招聘测评系统旨在全面评估求职者的认…

【WEB】序列一下

1、 2、反序列化 <?phpclass Polar{public $url polarctf.com;public $ltsystem;public $bls /;function __destruct(){$a $this->lt;$a($this->b);} }$a new Polar(); echo serialize($a); ?>###O:5:"Polar":3:{s:3:"url";s:12:"…

CSS 布局三大样式简单学习

目录 1. css 浮动 1.1 效果1 1.2 效果2 1.3 效果3 1.4 效果4 2. css 定位 2.1 absolute 2.2 relative 2.3 fixed 3. css 盒子模型 3.1 效果1 3.2 效果2 3.3 效果3 3.4 效果4 1. css 浮动 1.1 效果1 1.2 效果2 1.3 效果3 1.4 效果4 2. css 定位 2.1 absolute 2.2 …

AI 时代的网络危机沟通计划

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

吉首大学--23级题目讲解

7-1 单链表基本操作 在 C/C 中&#xff0c;.&#xff08;点&#xff09;和 ->&#xff08;箭头&#xff09;运算符用于访问结构体或类的成员&#xff0c;但它们的使用场景不同。 1. . 运算符 . 运算符用于访问结构体或类的成员&#xff0c;通过对象或结构体变量直接访问。…

51单片机——独立按键

一、独立按键对应单片机P3管脚&#xff0c;如图 二、按键点亮LED灯 #include <STC89C5xRC.H> void main() { while(1) { if(P300) { P200; } else { P201; } } } 当按键为0时&#xff0c;代表按下&#xff0c;所以当P30按下时&#xff0c;让P20&#xff1d;0&#…

[产品管理-32]:NPDP新产品开发 - 30 - 文化、团队与领导力 - 领导力与团队的可持续发展

目录 一、团队领导的领导力 1.1 领导力 1、领导力的定义 2、领导力的重要性 3、领导力的构成要素 4、如何提升领导力 1.2 情商 二、虚拟团队 1、团队定义与特征 2、团队优势 3、团队挑战与应对策略 三、可持续发展 四、团队管理和领导力中的度量指标 4.1 激励创新…

XXL-JOB分片概念讲解

3. 分片功能讲解 3.1 案例需求&#xff1a; 1.我们现在实现这样的需求&#xff0c;在指定节假日&#xff0c;需要给平台的所有用户去发送祝福的短信 3.2.编码实现&#xff1a; a.初始化数据 1.在数据库中导入xxl_job_demo.sql数据 b.集成Druid&MyBatis 1.添加依赖 &…

[Python数据拟合与可视化]:使用线性、多项式、指数和高斯模型拟合数据

引言 在数据分析和机器学习领域&#xff0c;选择合适的模型对数据进行拟合是至关重要的。本文将通过一个实际的Python编程案例&#xff0c;比较线性、多项式、指数和高斯模型在数据拟合方面的性能。通过生成模拟数据&#xff0c;我们将使用这些模型进行拟合&#xff0c;并评估…

安捷伦Agilent/keysight 53220A参数资料 通用频率计 计数器

Agilent 53220A&#xff0c;Keysight 53220A&#xff0c;通用频率计数器/计时器&#xff0c;350 MHz&#xff0c;12 位&#xff0c;100 ps 53220A 350 MHz 通用频率计数器/计时器是一款双通道频率计数器&#xff0c;能够执行所需的全部频率和时间间隔测量。它可以添加可选的射…

Linux--守护进程与会话

进程组 概念 进程组就是一个或多个进程的集合。 一个进程组可以包含多个进程。 下面我们通过一句简单的命令行来展示&#xff1a; 为什么会有进程组&#xff1f; 批量操作&#xff1a;进程组允许将多个进程组织在一起&#xff0c;形成一个逻辑上的整体。当需要对多个进程…

【计算机网络】详解UDP套接字网络字节序IP地址端口号

一、网络字节序 我们已经知道, 内存中的多字节数据相对于内存地址有大端和小端之分, 磁盘文件中的多字节数据相对于文件中的偏移地址也有大端小端之分, 网络数据流同样有大端小端之分. 发送主机通常将发送缓冲区中的数据按内存地址从低到高的顺序发出; 接收主机把从网络上接到…

2023北华大学程序设计新生赛部分题解

时光如流水般逝去&#xff0c;我已在校园中奋战大二&#xff01;(≧▽≦) 今天&#xff0c;静静回顾去年的新生赛&#xff0c;心中涌起无尽感慨&#xff0c;仿佛那段青春岁月如烟花般绚烂。✧&#xff61;(≧▽≦)&#xff61;✧ 青春就像一场燃烧的盛宴&#xff0c;激情澎湃&…

DOS(Disk Operating System,磁盘操作系统)常用指令

目录 背景: 早期探索: DOS之父&#xff1a; 发展历程&#xff1a; 常用指令&#xff1a; 进入命令&#xff1a; 操作1.进入和回退&#xff1a; 操作2.增、删&#xff1a; 操作3.其它&#xff1a; 总结: 背景: 早期探索: DOS(Disk Operating System,磁盘操作系统)在…

数据结构——二叉搜索树

目录 二叉搜索树 概念性质 性能分析 实现代码 前置准备 插入 查找 删除&#xff08;重点&#xff09; ​编辑 key和key/value的使用场景 key/value二叉搜索树代码实现 二叉搜索树 概念性质 二叉搜索树&#xff08;Binary Search Tree&#xff0c;简称BST&#xff0…

主流卷积神经网络CNN总结

ResNet&#xff08;2015&#xff09;残差神经网络 残差结构 ResNet50具体卷积结构图 ResNeXt&#xff08;2016&#xff09;加入了分组卷积的思想&#xff0c;将原ResNet网络中的block替换成由group分组的block&#xff0c;两者得到的feature map一致&#xff0c;只是参数量更少…

2024年华为杯-研赛F题论文问题一二讲解+代码分享

X射线脉冲星光子到达时间建模 摘要 脉冲星是一类高速自转的中子星&#xff0c;其自转形成规律性脉冲信号&#xff0c;类似于“宇宙中的灯塔”&#xff0c;因此被认为是极为精确的时钟。X射线脉冲星导航利用脉冲星信号为航天器提供时间和空间参考。通过比较脉冲信号到达航天器…

Vue3.0组合式API:使用reactive()、ref()创建响应式代理对象

Vue3.0组合式API系列文章&#xff1a; 《Vue3.0组合式API&#xff1a;setup()函数》 《Vue3.0组合式API&#xff1a;使用reactive()、ref()创建响应式代理对象》 《Vue3.0组合式API&#xff1a;computed计算属性、watch监听器、watchEffect高级监听器》 《Vue3.0组合式API&…

内网渗透之中间人欺骗攻击-ARP攻击

ARP攻击 ARP协议简介 ARP全称为Address Resolution Protocol&#xff0c;即地址解析协议&#xff0c;它是一个根据IP地址获取物理地址的TCP/IP协议&#xff0c;主机发送信息时将包含目标IP地址的ARP请求广播到网络上的所有主机&#xff0c;并接收返回消息&#xff0c;以此确定…