RabbitMQ - 如保证消息的可靠性?

目录

一、消息可靠性

1.1、生产者消息确认(生产者角度)

1.1.1、理论

1.1.2、实践

1.2、消息持久化(消息角度)

1.2.1、理论

1.3、消费者消息确认(消费者角度)

1.3.1、理论

1.3.2、实践

1.4、失败重试机制(失败后的处理机制)

1.4.1、理论


一、消息可靠性


1.1、生产者消息确认(生产者角度)

1.1.1、理论

在生产者这边,RabbitMQ 提供了 消息确认机制 来确保生产者的消息到达队列。

具体的,生产者将消息发送给 MQ 之后,会返回一个结果给生产者,表示消息是否处理成功,具体有以下两种响应:

  1. publish-confirm 响应
    1. 消息成功投递到交换机,返回 ack.
    2. 消息未投递到交换机(比如交换机不存在,或者是交换机名字写错了),返回 nack.
  2. publish-return 响应
    1. 消息投递到交换机,但是没有路由到队列(比如指定的队列名字写错了),返回 ack,以及路由失败的原因.

最后生产者这边的回调接收到响应后,根据不同的 ack 执行不同的“策略”(类似于你去买书,然后拿到书以后具体要干啥,都由你决定).

Ps:确认机制发送消息时,需要给每一个消息设置一个全局唯一的 id, 以区分不同消息,避免 ack 冲突.

1.1.2、实践

a)再 publisher 微服务的 application.yml 中添加配置:

spring:rabbitmq:publisher-confirm-type: correlated publisher-returns: true template:mandatory: true

配置说明:

  1. publish-confirm-type:开启publisher-confirm,这里支持两种类型,
    1. simple(不推荐,类似死等,占用资源):同步等待confirm结果,直到超时.
    2. correlated(推荐):异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback.
  2. publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback.
  3. template.mandatory:定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息.

b)每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置:

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {// 获取RabbitTemplateRabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 设置ReturnCallbackrabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}", replyCode, replyText, exchange, routingKey, message.toString());});}
}

Ps:ApplicationContextAware 就是 Spring 容器启动时的要执行的通知接口,通过 setApplicationContext 方法实现具体的通知.

c)生产者发送消息,指定 ID,消息  ConfirmCallback

@Test
public void testSendMessage2SimpleQueue() throws InterruptedException {// 消息体String message = "hello, spring amqp!";// 消息ID,需要封装到CorrelationData中CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());// 添加callbackcorrelationData.getFuture().addCallback(result -> {if(result.isAck()){ // ack,消息成功log.debug("消息发送成功, ID:{}", correlationData.getId());}else{// nack,消息失败log.error("消息发送失败, ID:{}, 原因{}",correlationData.getId(), result.getReason());}},ex -> log.error("消息发送异常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage()));// 发送消息rabbitTemplate.convertAndSend("amq.direct", "simple", message, correlationData);
}

1.2、消息持久化(消息角度)

1.2.1、理论

MQ 默认时内存存储消息,通过开启持久化功能(设置 durable = true),就可以将消息持久化到文件中,保证保证消息不丢失.

Ps:消息要持久化的前提是交换机(不一定,但最好是)和队列是持久化的.

1.2.2、实践

a)交换机持久化

@Bean
public DirectExchange simpleExchange(){// 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除 return new DirectExchange("simple.direct", true, false);
}

b)队列持久化

@Bean
public Queue simpleQueue(){// 使用QueueBuilder构建队列,durable就是持久化的return QueueBuilder.durable("simple.queue").build();
}

c)消息持久化

    public void testDurableMessage() {//1.构造一个持久的消息Message message = MessageBuilder.withBody("hello".getBytes()).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();rabbitTemplate.convertAndSend("simple.queue", message);}

Ps:delivery_mode = 2 就表示消息要持久化.

1.3、消费者消息确认(消费者角度)

1.3.1、理论

RabbitMQ 支持消费者确认机制,即:消费者处理消息后可以向 MQ 发送 ack 回执,MQ收到ack回执后才会删除该消息.

SpringAMQP 允许配置三种确认模式:

  • manual:手动ack,需要在消费者执行的消息代码结束后,调用api发送ack。
  • auto:自动ack,由 spring 监测消费者的执行的消费代码是否出现异常,没有异常则返回ack;抛出异常则返回nack,然后会将消息重新加入到队列,再发送给消费者,然后再次异常...,无限循环.
  • none:关闭ack,MQ 假定消费者获取消息后会成功处理,因此消息投递后立即被删除

1.3.2、实践

这里只需要配置以下 application.yml 文件,添加以下配置:

spring:rabbitmq:listener:simple:prefetch: 1acknowledge-mode: none # none,关闭ack;manual,手动ack;auto:自动ack

1.4、失败重试机制(失败后的处理机制)

1.4.1、理论

刚刚讲到,消费者消费确认,SpringAMQP 提供了三种确认模式,其中 auto 这种方式,在消费者执行消费代码遇到异常时,会重新将消息加入到队列中,然后发送给消费者,再次异常,无限循环,导致 mq 的消息处理飙升,带来不必要的压力.

假设消费任务如下:

@Component
public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void listenSimpleQueue(String msg) {System.out.println("消费者接收到消息:" + msg);System.out.println("开始消费!");System.out.println(1/0);System.out.println("消费完成!");}
}

我们可以利用 Spring 的 retry 机制,在消费者出现异常时,利用本地重试,而不是无限制的加入到 mq 队列,只需要对消费者的配置文件进行以下配置:

spring:rabbitmq:listener:simple:prefetch: 1retry:enabled: true # 开启消费者失败重试initial-interval: 1000 # 初始的失败等待时长为1秒multiplier: 3 # 下次失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 4 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

在开启重试模式以后,若重试次数耗尽,并且消息依然失败,则需要有 MessageRecoverer 接口来处理,他包含三种不同的实现:

  1. RejectAndDontRequeueRecoverer(默认方式):重试耗尽后,直接reject,丢弃消息。默认就是这种方式
  2. ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  3. RepublishMessageRecoverer(推荐方式):重试耗尽后,将失败消息投递到指定的交换机,再由交换机投递到指定的队列.
     

上述第三种方式比较推荐,如下图:

1.4.2、实践

这里就测试以下推荐方案 RepublishMessageRecoverer

a)首先要定义用来接收失败消息的交换机、队列、绑定关系,最后定义 RepublishMessageRecoverer(Bean 的方式注入,覆盖 Spring 默认的方案):

@Configuration
public class ErrorMessageConfig {@Beanpublic DirectExchange errorMessageExchange() {return new DirectExchange("error.direct");}@Beanpublic Queue errorQueue() {return new Queue("error.queue", true);}@Beanpublic Binding errorBinding() {return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");}@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate) {return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}}

b)定义消费者执行的消费任务

@Component
public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void listenSimpleQueue(String msg) {System.out.println("消费者接收到消息:" + msg);System.out.println("开始消费!");System.out.println(1/0);System.out.println("消费完成!");}
}

c)启动消费者,如下:

d)查看失败队列中具体信息(异常栈信息和信息信息)

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/129343.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ARM指令集--数据处理指令

数据处理指令:数学运算,逻辑运算 立即数 立即数的本质 就是包含在指令当中的数,属于指令的一部分 立即数的优点:取指的时候就可以将其读取到CPU,不用单独去内存读取,速度快 立即数的缺点:不…

数电课程设计——课设二:交通信号灯

一、实验内容 (1)十字路口有 x、y 方向两组交通信号灯,每组有红、黄、绿灯各一个; (2)设计一个交通灯控制电路,模拟十字路口交通灯工作情况,红灯亮 35s,黄灯亮 5s&…

JAVA设计模式第七讲:设计模式在 Spring 源码中的应用

设计模式(design pattern)是对软件设计中普遍存在的各种问题,所提出的解决方案。本文以面试题作为切入点,介绍了设计模式的常见问题。我们需要掌握各种设计模式的原理、实现、设计意图和应用场景,搞清楚能解决什么问题…

电子企业应该先实施ERP系统还是WMS仓储管理系统

电子企业应该先实施ERP系统还是WMS仓储管理系统?这是一个有争议的问题,不同的企业和管理专家有不同的看法。但是,从我个人的观点来看,电子企业应该先实施ERP系统,然后再考虑WMS仓储管理系统。 首先,ERP系统…

医疗知识图谱 neo4j

开源项目: https://github.com/liuhuanyong/QASystemOnMedicalKG 一.效果 二.需要安装: pip install pyahocorasick pip install py2neo 三.需要修改: 需要改的点: 1.改连接的方式 2.改读文件的方式 MedicalGraph 运行&am…

【C++进阶】二叉树搜索树

⭐博客主页:️CS semi主页 ⭐欢迎关注:点赞收藏留言 ⭐系列专栏:C进阶 ⭐代码仓库:C进阶 家人们更新不易,你们的点赞和关注对我而言十分重要,友友们麻烦多多点赞+关注,你们的支持是我…

一文读懂java变量类型

前言 在学习和使用Java编程语言时,理解变量类型是至关重要的基础知识。Java是一种静态类型语言,强调变量必须先声明其类型,才能进行后续操作。因此,对于初学者来说,了解Java中不同的变量类型及其特性是迈向编程成功的…

基于Alexnet深度学习网络的人员口罩识别算法matlab仿真

目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 5.算法完整程序工程 1.算法运行效果图预览 2.算法运行软件版本 matlab2022a 3.部分核心程序 file_path1 test\mask\;% 图像文件夹路径 %获取测试图像文件夹下所有jpg格式的图像文件…

2023年9月NPDP产品经理国际认证报名来这里就对了

产品经理国际资格认证NPDP是新产品开发方面的认证,集理论、方法与实践为一体的全方位的知识体系,为公司组织层级进行规划、决策、执行提供良好的方法体系支撑。 【认证机构】 产品开发与管理协会(PDMA)成立于1979年,是…

Python网络爬虫库:轻松提取网页数据的利器

网络爬虫是一种自动化程序,它可以通过访问网页并提取所需的数据。Python是一种流行的编程语言,拥有许多强大的网络爬虫库。在本文中,我们将介绍几个常用的Python网络爬虫库以及它们的使用。 Requests库 Requests是一个简单而优雅的HTTP库&…

三维模型3DTile格式轻量化压缩处理工具常用几款软件介绍

三维模型3DTile格式轻量化压缩处理工具常用几款软件介绍 三维模型3DTile格式的轻量化处理旨在减少模型的存储空间和提高渲染性能。以下是一些推荐的工具软件,可以用于实现这个目的: MeshLab:MeshLab是一个开源的三维模型处理软件&#xff0c…

TensorFlow详解

TensorFlow详解 TensorFlow是一个开源的机器学习框架,由Google开发。它是一个强大、高度可扩展的计算框架,可以用于各种机器学习任务,包括图像和语音识别、自然语言处理、推荐系统等。 TensorFlow 是一种由 Google 开发的开源机器学习框架&am…

护航数字政府建设,美创科技成为“数字政府建设赋能计划”成员单位

近日,“2023软博会-软件驱动数字政府创新发展论坛”顺利召开,本次论坛由中国信息通信研究院、中国通信标准化协会承办,中国通信标准化协会云计算标准和开源推进委员会、数字政府建设赋能计划支持。 天津市工业和信息化局总经济师杨冬梅、中国…

Leetcode125. 验证回文串

力扣(LeetCode)官网 - 全球极客挚爱的技术成长平台 如果在将所有大写字符转换为小写字符、并移除所有非字母数字字符之后,短语正着读和反着读都一样。则可以认为该短语是一个 回文串 。 字母和数字都属于字母数字字符。 给你一个字符串 s&…

Cpolar+Tipas:在Ubuntu上搭建私人问答网站,为您提供专业的问题解答

文章目录 前言2.Tipask网站搭建2.1 Tipask网站下载和安装2.2 Tipask网页测试2.3 cpolar的安装和注册 3. 本地网页发布3.1 Cpolar临时数据隧道3.2 Cpolar稳定隧道(云端设置)3.3 Cpolar稳定隧道(本地设置) 4. 公网访问测试5. 结语 前…

Threejs汽车展厅

2023-09-06-16-29-40 预览:https://9kt8fy-1234.csb.app/ 源码链接

微信自动打招呼自动回复

点击蓝字 关注我们 微信无疑是我们日常生活中最常用的社交工具之一。但是,你有没有感觉到,每天都要花费大量时间去添加好友、回复简单咨询消息和打招呼,是一件很烦琐的事情呢?如果你也有这样的困扰,那么今天就给大家介…

如何注册喀麦隆商标?

想象一下,你正在喀麦隆的雨林中寻找宝藏,突然你发现了一个从未被人发现的部落。这个部落的人们用一种独特的图案作为他们的标记,来展示他们的身份和与众不同。这个图案就是喀麦隆的商标! 在商业世界中,商标就像这个独特…

数据结构 每日一练:选择 + 编程

目录 选择 编程 选择 1、 设对n(n>1)个元素的线性表的运算只有4种:删除第一个元素,删除最后一个元素,在第一个元素之前插入新元素,在最后一个元素之后插入新元素,则最好使用(&a…

IT运维:使用数据分析平台监控H3C交换机

概述 在企业日常运维中,设备种类繁多,日志格式各异,日志量巨大,大量的告警,我们面临着如何统一的存放这些日志?如何对海量的日志进行查看,分析?传统的日志设备无法满足日志格式各异的…