RabbitMQ 高级特性——发送方确认

在这里插入图片描述

文章目录

  • 前言
  • 发送方确认
    • confirm 确认模式
    • return 退回模式
  • 常见面试题

前言

前面我们学习了 RabbitMQ 中交换机、队列和消息的持久化,这样能够保证存储在 RabbitMQ Broker 中的交换机和队列中的消息实现持久化,就算 RabbitMQ 服务发生了重启或者是宕机,也不会导致交换机和消息的丢失。那么这个机制是保证存储在 RabbitMQ Broker 中的可靠性,但是对于生产者发送的消息如果都到达不了 RabbitMQ 的话,那么这些持久化操作也就没有意义了,那么对于生产者发送的消息,生产者如何知道消息是否已经成功到达 RabbitMQ Broker 了呢?这里就需要用到 RabbitMQ 发送发确认这个特性了,前面我们大概的讲了一下 RabbitMQ Java Client 中的 Publisher/confirm 发送方确认,那么这篇文章我们将学习在 SpringBoot 中如何实现发送方确认。

发送方确认

其实对于上面的问题,RabbitMQ 为我们提供了两种解决方案:

  1. 通过事务机制实现
  2. 通过发送方确认机制实现

因为使用事务机制的话比较消耗性能,在实际工作中使用的不多,所以我们就主要介绍发送方确认的机制来实现发送方的确认。并且对于发送方确认的机制 RabbitMQ 也为我们提供了两个方式来控制消息的可靠性。

  1. confirm 确认模式
  2. return 退回模式

confirm 模式是确认消息是否到达指定的 Exchange 交换机的,而 return 退回模式则是确认消息是否到达指定队列的。

confirm 确认模式

Producer 在发送消息的时候,对发送端设置一个 ConfirmCallback 的监听,无论消息是否到达 Exchange,这个监听都会被执行,如果 Exchange 成功收到,ACK (Ackonwledge character 确认字符)为 true,如果没有收到消息,ACK 就为 false。

那么下面我们就来看看在 SpringBoot 中如何实现 confirm 确认模式:

首先在配置文件中配置信息:

spring:rabbitmq:publisher-confirm-type: correlated #消息发送确认

然后设置确认回调函数的内容并且发送消息:

无论消息是否成功送到,都会执行这个回调函数,确认消息是否成功送达的判断依据就是 ACK 的值:

public class Constants {public static final String CONFIRM_EXCHANGE = "confirm.exchange";public static final String CONFIRM_QUEUE = "confirm.queue";
}
@Configuration
public class RabbitConfig {@Bean("confirmQueue")public Queue confirmQueue() {return QueueBuilder.durable(Constants.CONFIRM_QUEUE).build();}@Bean("confirmExchange")public DirectExchange confirmExchange() {return ExchangeBuilder.directExchange(Constants.CONFIRM_EXCHANGE).build();}@Bean("confirmBinding")public Binding confirmBinding(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") DirectExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("confirm");}@Bean("confirmRabbitTemplate")public RabbitTemplate confirmRabbitTemplate(ConnectionFactory factory) {//创建新的RabbitTemplate对象,并且设置confirm回调函数RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {System.out.printf("消息接收成功,id:%s\n",correlationData.getId());}else {System.out.printf("消息接受失败,id:%s,cause:%s",correlationData.getId(),cause);}}});return rabbitTemplate;}
}
@RequestMapping("/producer")
@RestController
public class ProducerController {@AutowiredRabbitTemplate rabbitTemplate;@RequestMapping("/confirm")public String confirm() {CorrelationData correlationData = new CorrelationData("1");rabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE,"confirm","rabbitmq confirm",correlationData);return "消息确认成功";}
}

在这里插入图片描述
然后我们指定交换机的时候,如果指定一个不存在的交换机,也就是消息无法到达指定的交换机,那么看看时候会执行确认回调函数:

rabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE + 1,"confirm","rabbitmq confirm",correlationData);

2024-08-13 14:47:52.646 ERROR 11252 — [3.57.1.114:5672] o.s.a.r.c.CachingConnectionFactory : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange ‘confirm.exchange1’ in vhost ‘test’, class-id=60, method-id=40)
消息接受失败,id:1,cause:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange ‘confirm.exchange1’ in vhost ‘test’, class-id=60, method-id=40)

可以看到,如果消息没有到达指定的交换机,那么也是会执行相应的回调函数的。

public interface ConfirmCallback {
/**
* 确认回调
* @param correlationData: 发送消息时的附加信息, 通常⽤于在确认回调中识别特定的消
息
* @param ack: 交换机是否收到消息, 收到为true, 未收到为false
* @param cause: 当消息确认失败时,这个字符串参数将提供失败的原因.这个原因可以⽤于调
试和错误处理.
* 成功时, cause为null
*/
void confirm(@Nullable CorrelationData correlationData, boolean ack,
@Nullable String cause);
}

RabbitTemplate.ConfirmCallback 和 ConfirmListener 的区别:

  • RabbitTemplate.ConfirmCallback:这是Spring AMQP库提供的一个回调接口,主要用于在使用RabbitTemplate发送消息时,接收来自RabbitMQ服务器的确认信息。这些确认信息表明消息是否已成功发送到RabbitMQ的交换机(Exchange)。
  • ConfirmListener:这个接口或功能更多是直接与RabbitMQ的Channel相关,而不是直接通过Spring AMQP的RabbitTemplate来使用的。它用于监听RabbitMQ Channel上的消息确认事件,包括消息的ACK(确认)和NACK(不确认)。这种方式通常需要更底层的操作,直接处理RabbitMQ的Channel和连接。

return 退回模式

当消息成功到达 Exchange 交换机的时候,交换机会根据路由规则匹配对应的队列,将消息路由到指定的队列,在消息从 Exchange 到 Queue 的过程中,如果一条消息无法被任何队列消费(即没有队列与消息的 Routing Key 匹配或者队列不存在等),可以选择把消息退回给发送者,消息退回给发送者时,我们可以设置一个返回回调方法,对消息进行处理。

那么使用 SpringBoot 如何实现 return 退回模式呢?

首先还是需要进行配置,配置和上面的 confirm 模式是一样的:

spring:rabbitmq:publisher-returns: true #设置回退

设置返回回调逻辑并发送消息:

@Bean("confirmRabbitTemplate")
public RabbitTemplate confirmRabbitTemplate(ConnectionFactory factory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {System.out.printf("消息接收成功,id:%s\n",correlationData.getId());}else {System.out.printf("消息接受失败,id:%s,cause:%s",correlationData.getId(),cause);}}});rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {System.out.printf("消息被退回:%s",returnedMessage);}});return rabbitTemplate;
}

setConfirmCallback() 和 setReturnCallback() 方法可以同时存在也可以单独设置。

rabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE,"confirm1","rabbitmq confirm",correlationData);

发送消息的时候,我们的 Routing Key 设置为没有 Binding Key 与之匹配的,然后来看看这个 returnCallback 是否会被执行:

消息被退回:ReturnedMessage [message=(Body:‘rabbitmq confirm’ MessageProperties [headers={spring_returned_message_correlation=1}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]), replyCode=312, replyText=NO_ROUTE, exchange=confirm.exchange, routingKey=confirm1]
消息接收成功,id:1

消息成功到达了 Exhcange,但是没有到达指定的队列,所以执行了 returnCallback 方法。

public class ReturnedMessage {//返回的消息对象,包含了消息体和消息属性private final Message message;//由Broker提供的回复码, 表⽰消息⽆法路由的原因. 通常是⼀个数字代码,每个数字代表不同的含义.private final int replyCode;//⼀个⽂本字符串, 提供了⽆法路由消息的额外信息或错误描述.private final String replyText;//消息被发送到的交换机名称private final String exchange;//消息的路由键,即发送消息时指定的键private final String routingKey;
}

常见面试题

如何保证 RabbitMQ 消息的可靠传输:

在这里插入图片描述

从这个图中可以看出,消息可能丢失的场景以及解决方案:

  1. 生产者将消息发送到RabbitMQ失败
    a. 可能原因: 网络问题等
    b. 解决办法: 参考本章节[发送方确认-confirm确认模式]

  2. 消息在交换机中无法路由到指定队列:
    a. 可能原因: 代码或者配置层面错误,导致消息路由失败
    b. 解决办法: 参考本章节[发送方确认-return模式]

  3. 消息队列自身数据丢失
    a. 可能原因: 消息到达RabbitMQ之后,RabbitMQ Server宕机导致消息丢失。
    b. 解决办法: 参考本章节[持久性]。开启RabbitMQ持久化,就是消息写入之后会持久化到磁盘,如果RabbitMQ挂了,恢复之后会自动读取之前存储的数据。(极端情况下,RabbitMQ还未持久化就挂了,可能导致少量数据丢失,这个概率极低,也可以通过集群的方式提高可靠性)

  4. 消费者异常,导致消息丢失
    a. 可能原因: 消息到达消费者,还没来得及消费,消费者宕机。消费者逻辑有问题。
    b. 解决办法: 参考本章节[消息确认]。RabbitMQ提供了消费者应答机制来使RabbitMQ能够感知到消费者是否消费成功消息。默认情况下消费者应答机制是自动应答的,可以开启手动确认,当消费者确认消费成功后才会删除消息,从而避免消息丢失。除此之外,也可以配置重试机制(参考下一章节),当消息消费异常时,通过消息重试确保消息的可靠性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/428188.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

中间件知识点-消息中间件(Rabbitmq)一

消息中间件介绍 MQ的作用(优点)主要有以下三个方面&#xff1a; a.异步 b.解耦 c.削峰 MQ的作用(缺点)主要有以下三个方面&#xff1a; a.系统可用性降低 b.系统复杂度提高 c.存在消息一致性问题需要解决 备注&#xff1a; 引入MQ后系统的复杂度会大大提高。 以前服务之间可以…

【软件基础知识】什么是 API,详细解读

想象一下,你正在使用智能手机上的天气应用。你打开应用,瞬间就能看到实时天气、未来预报,甚至是空气质量指数。但你有没有想过,这些数据是如何神奇地出现在你的屏幕上的?答案就在三个字母中:API。 API,全称Application Programming Interface(应用程序编程接口),是现代软件世…

计算机网络 --- 初识协议

序言 上一篇文章中 &#xff08;&#x1f449;点击查看&#xff09;&#xff0c;我们简单的了解了怎么寻找目标计算机&#xff0c;需要通过交换机&#xff0c;路由器等设备跨越多个网络来不断的转发我们需要传输的数据&#xff0c;直至到达目标计算机。  那我们设备之间数据是…

重回极简:华为如何走向全面智能化?

“人类发现地球只是宇宙一员的时候&#xff0c;也是我们距离群星最遥远的时候。” 这个来自天文领域的喟叹&#xff0c;今天同样出现在行业与企业的智能化之路上。在这个时代坐标上&#xff0c;AI大模型技术极速成熟&#xff0c;AIGC和AI Agent等应用受到了各个行业的巨大期待。…

昇腾大模型推理解决方案MindIE部署

MindIE大模型推理套件 MindIE&#xff08;Mind Inference Engine&#xff0c;昇腾推理引擎&#xff09;是华为公司针对AI全场景推出的整体解决方案&#xff0c;包含丰富的推理加速套件。通过开放各层次AI能力&#xff0c;支撑客户多样化的AI业务需求&#xff0c;使能百模千态&a…

存储 NFS

目录 1.存储的应用场景 2.存储分类 3.NFS服务组成 4.环境说明 ​编辑 5.服务端部署 6.NFS服务端的配置 7.NFS服务端本地进行测试 1.存储的应用场景 存储一般用于上传网站数据&#xff08;内容&#xff09;&#xff0c;一般用于在网站集群中。使用存储的话用户上传的…

成型的程序

加一个提示信息 加上python 常用的包 整个程序打包完 250M 安装 960MB matplot numpy pandas scapy pysearial 常用的包 (pyvisa)… … 啥都有 Python 解释器组件构建 要比 lua 容易的多 &#xff08;C/Rust 的组件库)

JavaSE--集合总览02:单列集合Collection的体系之一:List

Collection体系的特点 分为 list 和set集合&#xff0c;这篇文章主要讲述List&#xff0c;下篇讲述Set。 简单认识单列集合collection集合的特点 : list集合的特点&#xff1a; 有序 可重复 有索引 set集合的特点&#xff1a;无序 不重复 无索引 其中LinkedHashSet有序 TreeS…

微服务架构陷阱与挑战

微服务架构6大陷阱 现在微服务的基础设施还是越来越完善了&#xff0c;现在基础设施缺乏的问题逐渐被解决了。 拆分粒度太细&#xff0c;服务关系复杂 拆分降低了服务的内部复杂度&#xff0c;但是提升了系统的外部复杂度&#xff0c;服务越多&#xff0c;服务和服务之间的连接…

from tqdm.auto import tqdm用法详细介绍

tqdm 是一个 Python 库&#xff0c;用于在长时间运行的任务中显示进度条。tqdm.auto 是 tqdm 的一个版本&#xff0c;能够自动适配输出环境&#xff08;如 Jupyter Notebook、命令行等&#xff09;&#xff0c;以确保进度条在各种环境下显示正确。下面是 tqdm.auto 的详细用法介…

每天五分钟计算机视觉:将人脸识别问题转换为二分类问题

本文重点 在前面的课程中,我们学习了两种人脸识别的网络模型,这两种人脸识别网络不能算是基于距离或者Triplet loss等等完成的神经网络参数的学习。我们比较熟悉的是分类任务,那么人脸识别是否可以转变为分类任务呢? 本节课程我们将介绍一种全新的方法来学习神经网络的参…

用友U8二次开发工具KK-FULL-*****-EFWeb使用方法

1、安装: 下一步&#xff0c;下一步即可。弹出黑框不要关闭&#xff0c;让其自动执行并关闭。 2、服务配置&#xff1a; 输入服务器IP地址&#xff0c;选择U8数据源&#xff0c;输入U8用户名及账号&#xff0c;U8登录日期勾选系统日期。测试参数有效性&#xff0c;提示测试通过…

利用 FastAPI 和 Jinja2 模板引擎快速构建 Web 应用

FastAPI 是一个现代、快速&#xff08;高性能&#xff09;的 Web 框架&#xff0c;用于构建 API&#xff0c;它基于标准 Python 类型提示。FastAPI 支持异步编程&#xff0c;使得开发高性能的 Web 应用变得简单快捷。在本文中&#xff0c;我们将探讨如何使用 FastAPI 结合 Jinj…

VS code 查看 ${workspaceFolder} 目录指代路径

VS code 查看 ${workspaceFolder} 目录指代路径 引言正文 引言 在 VS code 创建与运行 task.json 文件 一文中我们已经介绍了如何创建属于自己的 .json 文件。在 VS code 中&#xff0c;有时候我们需要添加一些文件路径供我们导入自定义包使用&#xff0c;此时&#xff0c;我们…

OpenCV系列教程二:基本图像增强(数值运算)、滤波器(去噪、边缘检测)

文章目录 一、基本图像增强&#xff08;数值运算&#xff09;1.1 加法 &#xff08;cv2.add&#xff09;1.1.1 图像与标量相加&#xff08;调节亮度&#xff09;1.1.2 图像与图像相加&#xff08;两个图像shape要相同&#xff09;1.1.3 图像的加权加法&#xff08;渐变切换&…

基于SpringBoot+Vue+MySQL的网上租赁系统

系统展示 用户前台界面 管理员后台界面 系统背景 在当前共享经济蓬勃发展的背景下&#xff0c;网上租赁系统作为连接租赁双方的重要平台&#xff0c;正逐步改变着人们的消费观念和生活方式。通过构建一个基于SpringBoot、Vue.js与MySQL的网上租赁系统&#xff0c;我们旨在为用户…

python线程(python threading模块、python多线程)(守护线程与非守护线程)

文章目录 Python多线程入门1. Python多线程概述2. threading模块基础- Thread 类: 这是一个代表线程的类。可以通过创建Thread类的实例来新建一个线程。- Lock 类: 在多线程环境中&#xff0c;为了防止数据错乱&#xff0c;通常需要用到锁机制。Lock类提供了基本的锁功能&#…

本地搭建我的世界服务器(JAVA)简单记录

网上参考教程挺多的&#xff0c;踩了不少坑&#xff0c;简单记录一下&#xff0c;我做的是一个私人服务器&#xff0c;就是和朋友3、4个人玩。 笨蛋 MC 开服教程 先放一个比较系统和完整的教程&#xff0c;萌新可用&#xff0c;这个教程很详细&#xff0c;我只是记录一下自己的…

【C++】list容器的基本使用

一、list是什么 list的底层结构是带头双向循环链表。 相较于 vector 的连续线性空间&#xff0c;list 就显得复杂很多&#xff0c;它是由一个个结点构成&#xff0c;每个结点申请的空间并不是连续的&#xff0c;它的好处是每次插入或删除一个数据&#xff0c;就配置或释放一个…

禁忌搜索算法(TS算法)求解实例---旅行商问题 (TSP)

目录 一、采用TS求解 TSP二、 旅行商问题2.1 实际例子&#xff1a;求解 6 个城市的 TSP2.2 **求解该问题的代码**2.3 代码运行过程截屏2.4 代码运行结果截屏&#xff08;后续和其他算法进行对比&#xff09; 三、 如何修改代码&#xff1f;3.1 减少城市坐标&#xff0c;如下&am…