Spring RabbitMQ那些事(3-消息可靠传输和订阅)

目录

  • 一、序言
  • 二、生产者确保消息发送成功
    • 1、为什么需要Publisher Confirms
    • 2、哪些消息会被确认处理成功
  • 三、消费者保证消息被处理
  • 四、Spring RabbitMQ支持代码示例
    • 1、 application.yml
    • 2、RabbigtMQ配置
    • 3、可靠生产者配置
    • 4、可靠消费者配置
    • 5、测试用例

一、序言

在有些业务场景中,消息是不能丢的,比如分布式事务资金动账,出账方扣款,那么入账方就一定要收款。以前写了一篇分布式事务的文章,里面的跨地区转账就是一个实际案例。

消息是有可能丢的,比如生产者在发送消息时broker服务挂了,消息没有来得及落盘,这时消息就彻底丢了。

保证MQ消息可靠传输主要有两个方面,一方面是消息生产者确保消息一定发送成功,另一方面是消费者确保消息一定被处理。


二、生产者确保消息发送成功

1、为什么需要Publisher Confirms

在Spring AMQP中AmqpTemplate的实现RabbitTemplate已经支持 Publisher Confirms and Returns,所谓的publisher confirms意思就是消息发布者确认消息是否已经被发送。

在RabbitMQ官方文档描述中,持久化的消息在Broker重启时也是应该存活的,这里的词用的是应该,因为消息有可能在落地磁盘前Broker就挂了,导致消息丢失。

最直接的解决方案是通过事务,但是通过事务有两个问题:

  • 事务阻塞:发布者必须等待Broker处理完每条消息。
  • 事务很重:每次提交都会要求触发fsync(),强制磁盘,这个过程需要花很长的时间。

备注:在RabbitMQ官方测试中,通过事务去保证,发布10000条消息需要花至少4分钟的时间。

在这里插入图片描述

而通过Publisher Confirm机制,一旦Broker处理完就会确认消息,而且这个过程是异步的,生产者可以流式发布消息,不需要等待Broker,并且Broker会批量高效将消息落盘。

2、哪些消息会被确认处理成功

当Broker确认消息时,会通知消息发布者消息是否被成功处理,成功处理的基本规则如下:

  • 无法路由的mandatory(必须有符合条件的队列)和immediate(必须有消费者在线)类型在被basic.return后会被确认。
  • 非持久化消息在入队时会被确认。
  • 持久化消息当持久化到磁盘或者被消费者消费时会被确认。

三、消费者保证消息被处理

消费者端确保消息消费很简单,关闭消息自动确认就好,开启消息手动确认。当然有些场景消息只能被处理一次,可以通过分布式锁来实现。


四、Spring RabbitMQ支持代码示例

1、 application.yml

server:port: 8080
spring:rabbitmq:addresses: localhost:5672username: adminpassword: adminvirtual-host: /publisher-returns: truepublisher-confirm-type: correlatedlistener:type: simplesimple:acknowledge-mode: manualconcurrency: 5max-concurrency: 20prefetch: 5template:mandatory: true

备注:

  • 这里一定要设置spring.rabbitmq.publisher-returnstrue,并且设置spring.rabbitmq.publisher-confirm-typecorrelated,同时设置spring.rabbitmq.template.mandatorytrue
  • 上面我们将消费者的确认模式改为了手动确认

2、RabbigtMQ配置

@Configuration
public class RabbitReliableTransportConfig {/*** RabbitTemplate消息转换器配置,自动将对象转换为json字符串** @return*/@Beanpublic MessageConverter jackson2JsonMessageConverter() {Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();messageConverter.setClassMapper(new DefaultJackson2JavaTypeMapper());return messageConverter;}@Beanpublic Queue reliableQueue() {return QueueBuilder.durable("reliable-queue").build();}
}

3、可靠生产者配置

@Slf4j
@Component
@RequiredArgsConstructor
public class RabbitMqReliableProducer {private final RabbitTemplate rabbitTemplate;public void sendReliableMsg(String body) {// 发送可靠消息ReliableMsgDTO reliableMsgDTO = ReliableMsgDTO.builder().body(body).build();CorrelationData correlationData = new CorrelationData();rabbitTemplate.convertAndSend("reliable-queue", reliableMsgDTO, correlationData);// 发送确认逻辑CompletableFuture<Confirm> future = correlationData.getFuture().completable();future.whenComplete((confirm, throwable) -> {if (confirm.isAck()) {log.info("消息已经被成功发送, 消息内容:{}", JSON.toJSONString(reliableMsgDTO));return;}log.warn("消息发送未成功发送, 原因:{}, 消息内容:{}", confirm.getReason(), JSON.toJSONString(reliableMsgDTO), throwable);// 5秒后再发送LockSupport.parkNanos(5 * 1000 * 1000 * 1000L);rabbitTemplate.convertSendAndReceive(reliableMsgDTO, correlationData);});}
}

4、可靠消费者配置

@Slf4j
@Component
public class RabbitMQReliableConsumer {@RabbitListener(queues = "reliable-queue")public void handleMsgFromQueue(ReliableMsgDTO reliableMsgDTO, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {channel.basicAck(tag, false);// channel.basicNack(tag, false, false);log.info("Message received from queue, message body: {}", JSON.toJSONString(reliableMsgDTO));}
}

备注:这里我们开启了消息的手动确认,如果消息处理失败没有确认,那么消息将会在下次消费者参加连接时再次被投递。

5、测试用例

测试结果如下,每当消息发送至Broker成功后会触发回调,如果消息发送失败将会触发重新发送。

2024-01-20 18:13:11.399  INFO 12316 --- [78.107.127:5672] c.u.r.i.p.RabbitMqReliableProducer       : 消息已经被成功发送, 消息内容:{"body":"hello"}
2024-01-20 18:13:11.399  INFO 12316 --- [ntContainer#0-5] c.u.r.i.c.RabbitMQReliableConsumer       : Message received from queue, message body: {"body":"hello"}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/244258.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

告别无法访问的Github

告别无法访问的Github 最近在使用github的时候又登不上去了&#xff0c;挂着VPN都没用 但是自己很多项目都存在github&#xff0c;登不上去那不得损失很大 所以一行必须整点儿特殊手段来访问&#xff0c;顺便分享一下 1.加速器 网上很多解决方案都是在分享各种加速器来登陆…

大型语言模型 (LLM)全解读

一、大型语言模型&#xff08;Large Language Model&#xff09;定义 大型语言模型 是一种深度学习算法&#xff0c;可以执行各种自然语言处理 (NLP) 任务。 大型语言模型底层使用多个转换器模型&#xff0c; 底层转换器是一组神经网络。 大型语言模型是使用海量数据集进行训练…

【Linux】Linux中的日志查询方法

文章目录 linux日志与日志的查询方法更多journalctl用法journalctl用法案例部分日志路径说明推荐阅读 linux日志与日志的查询方法 在Linux系统中&#xff0c;日志文件用于记录系统的各种运行信息和错误消息。常见的日志文件包括但不限于/var/log/下的各种日志&#xff0c;如me…

Armv8-M的TrustZone技术之SAU寄存器总结

每个SAU寄存器是32位宽。下表显示了SAU寄存器概要。 5.1 SAU_CTRL register SAU_CTRL寄存器的特征如下图和表所示&#xff1a; 5.2 SAU_TYPE register 5.3 SAU_RNR register 5.4 SAU_RBAR register 5.5 SAU_RLAR register 5.6 SAU区域配置 当SAU启用时&#xff0c;未由已启用…

亚信安慧AntDB:AntDB-M元数据锁之对象锁(四)

l 对象锁 (per-object locks) 除了IX锁&#xff0c;其他类型都可以用于其他命名空间&#xff0c;这部分是最常用的锁类型。主要用于对数据库的某个具体元数据的并发控制。这类锁对象会比较多&#xff0c;对其有独特的管理&#xff0c;本文不再展开说明。 5.3 两种锁类型 根据…

桌面型物联网智能机器人设计(预告)

相关资料 桌面级群控机器人CoCube探索-2022--CSDN博客 视频&#xff1a; 能&#xff01;有&#xff01;多&#xff01;酷&#xff01;CoCube桌面级群控机器人 让我看看谁在SJTU里划水… 简要介绍 设计一个桌面型物联网智能机器人&#xff0c;以ESP32芯片为核心&#xff0c;配…

网络安全的使命:守护数字世界的稳定和信任

在数字化时代&#xff0c;网络安全的角色不仅仅是技术系统的守护者&#xff0c;更是数字社会的信任保卫者。网络安全的使命是保护、维护和巩固数字世界的稳定性、可靠性以及人们对互联网的信任。本文将深入探讨网络安全是如何履行这一使命的。 第一部分&#xff1a;信息资产的…

【Linux】常见指令(一)

前言: Linux有许多的指令&#xff0c;通过学习这些指令&#xff0c;可以对目录及文件进行操作。 文章目录 一、基础指令1. ls—列出目录内容2. pwd—显示当前目录3. cd—切换目录重新认识指令4. touch—创建文件等5. mkdir—创建目录6. rmdir指令 && rm 指令7. man—显…

学会使用ubuntu——ubuntu22.04使用WebCatlog

Ubuntu22.04使用WebCatlog WebCatlog是适用于Gnu / Linux&#xff0c;Windows或Mac OS X系统的桌面程序。 引擎基于铬&#xff0c;它用于在我们的桌面上处理Web服务。简单点就是把网页单独一个窗口出来显示&#xff0c;当一个app用。本文就是利用WebCatlog安装后的notion编写的…

5G-A:“繁花”盛开在2024

2019年&#xff0c;我国正式发牌5G&#xff0c;开启5G商用新时代。通信技术十年一代&#xff0c;五年过去了&#xff0c;5G是否要进入“半代更迭”阶段&#xff1f; 2024年被视为5G-A商用元年&#xff0c;是5G走向6G的关键一跃。5G-A以R18为演进起点&#xff0c;在连接速率、网…

macOS跨进程通信: Unix Domain Socket 创建实例

macOS跨进程通信: Unix Domain Socket 创建实例 一&#xff1a; 简介 Socket 是 网络传输的抽象概念。 一般我们常用的有Tcp Socket和 UDP Scoket&#xff0c; 和类Unix 系统&#xff08;包括Mac&#xff09;独有的 Unix Domain Socket&#xff08;UDX&#xff09;。 Tcp So…

数据的存储

目录 1 -> 数据类型的介绍 1.1 -> 类型的基本归类 2 -> 整型在内存中的存储 2.1 -> 原码、反码、补码 2.2 -> 大小端介绍 3 -> 浮点型在内存中的存储 3.1 -> 浮点数存储规则 1 -> 数据类型的介绍 基本内置类型有&#xff1a; char /…

小程序技术实践:快速开发适配鸿蒙的App

今年&#xff0c;在中国&#xff0c;被各大媒体和开发者称为“鸿蒙元年”。 在2023年底就有业内人士透露&#xff0c;华为明年将推出不兼容安卓的鸿蒙版本&#xff0c;未来IOS、鸿蒙、安卓将成为三个各自独立的系统。 果不其然&#xff0c;执行力超强的华为&#xff0c;与202…

黑马程序员JavaWeb开发|Maven高级

一、分模块设计与开发 分模块设计&#xff1a; 将项目按照功能拆分成若干个子模块&#xff0c;方便项目的管理维护、扩展&#xff0c;也方便模块间的相互调用&#xff0c;资源共享。 注意&#xff1a;分模块开发需要先对模块功能进行设计&#xff0c;再进行编码。不会先将工…

C++设计模式之迭代器模式

【声明】本题目来源于卡码网&#xff08;https://kamacoder.com/&#xff09; 【提示&#xff1a;如果不想看文字介绍&#xff0c;可以直接跳转到C编码部分】 【设计模式大纲】 【简介】 --什么是迭代器模式&#xff08;第19种设计模式&#xff09; 迭代器模式是⼀种行为设计模…

消息中间件之Kafka(二)

1.Kafka线上常见问题 1.1 为什么要对topic下数据进行分区存储? 1.commit log文件会受到所在机器的文件系统大小的限制&#xff0c;分区之后可以将不同的分区放在不同的机器上&#xff0c; 相当于对数据做了分布式存储&#xff0c;理论上一个topic可以处理任意数量的数据2.提…

Hbas简介:数据模型和概念、物理视图

文章目录 说明零 BigTable一 Hbase简介二 HBase 访问接口简介三 行式&列式存储四 HBase 数据模型4.1 HBase 列族数据模型4.2 数据模型的相关概念4.3 数据坐标 五 概念&物理视图 说明 本文参考自林子雨老师的大数据技术原理与应用(第三版)教材内容&#xff0c;仅供学习…

MySQL---多表分组查询综合练习

创建dept表 CREATE TABLE dept ( deptno INT(2) NOT NULL COMMENT 部门编号, dname VARCHAR (15) COMMENT 部门名称, loc VARCHAR (20) COMMENT 地理位置 ); 添加dept表主键 mysql> alter table dept add primary key(deptno); Query OK, 0 rows affected (0.02 s…

Spring5系列学习文章分享---第三篇(AOP概念+原理+动态代理+术语+Aspect+操作案例(注解与配置方式))

目录 AOP概念AOP底层原理AOP(JDK动态代理)使用 JDK 动态代理&#xff0c;使用 Proxy 类里面的方法创建代理对象**编写** **JDK** 动态代理代码 AOP(术语)AOP操作&#xff08;准备工作&#xff09;**AOP** **操作&#xff08;**AspectJ注解)**AOP** **操作&#xff08;**AspectJ…

GitHub无法完成推送 的设置选项

GitHub无法完成推送 的设置选项 系统设置 VS中控制台设置【指令】 控制台调出方法 以下为VS控制台指令 git config --global --unset http.proxy git config --global --unset https.proxygit config --global http.proxy 127.0.0.1:7890 git config --global https.proxy …