RabbitMQ如何做到不丢不重

目录

MQTT协议

如何保证消息100%不丢失

生产端可靠性投递

​编辑

RabbitMQ的Broker端投

(1)消息持久化

(2)设置集群镜像模式

(3)消息补偿机制

消费端

ACK机制改为手动

总结


MQTT协议

先来说下MQTT协议中的3种语义,这个非常重要。

在MQTT协议中,给出了三种传递消息时能够提供的服务质量标准,这三种服务质量从低到高依次是:

At most once:至多一次。消息在传递时,最多会被送达一次。也就是说,没什么消息可靠性保证,允许丢消息。
At least once:至少一次。消息在传递时,至少会被送达一次。也就是说,不允许丢消息,但是允许有少量重复消息出现。
Exactly once:恰好一次。消息在传递时,只会被送达一次,不允许丢失也不允许重复,这个是最高的等级 这个服务质量标准不仅适用于MQTT,对所有的消息队列都是适用的。现在常用的绝大部分消息队列提供的服务质量都是 At least once,包括RocketMQ、RabbitMQ和Kafka都是这样。也就是说,消息队列很难保证消息不重复。

At least once+幂等消费=Exactly once

如何保证消息100%不丢失

消息从生产端到消费端消费要经过3个步骤:

  1. 生产端发送消息到RabbitMQ;
  2. RabbitMQ发送消息到消费端;
  3. 消费端消费这条消息;  

所以要保证消息不丢,就得从三个方面入手,分别是生产端、RabbitMQ的Broker端、消费端。三个都保证不丢失,才能保证100%不丢。

生产端可靠性投递

事务机制:

// 设置channel开启事务
rabbitTemplate.setChannelTransacted(true);@Bean
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory){return new RabbitTransactionManager(connectionFactory);}@Transactional(rollbackFor = Exception.class,transactionManager = "rabbitTransactionManager")
public void publishMessage(String message) throws Exception {rabbitTemplate.setMandatory(true);rabbitTemplate.convertAndSend("java",message);}

confirm消息确认机制:

# 开启发送确认
spring.rabbitmq.publisher-confirm-type=correlated
# 开启发送失败回退
spring.rabbitmq.publisher-returns=true	     
@Configuration
@Slf4j
public class RabbitMQConfig {@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void enableConfirmCallback() {//confirm 监听,当消息成功发到交换机 ack = true,没有发送到交换机 ack = false//correlationData 可在发送时指定消息唯一 idrabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if(!ack){//记录日志、落库定时任务扫描重发,同时对开发人员进行通知}});//当消息成功发送到交换机没有路由到队列触发此监听rabbitTemplate.setReturnsCallback(returned -> {//记录日志、落库、同时对开发人员进行通知});}
}

一般不推荐事务的模式,因为是同步的会影响性能,所以都会采用异步回调的confirm模式。

RabbitMQ的Broker端投

说三点:

(1)要保证rabbitMQ不丢失消息,那么就需要开启rabbitMQ的持久化机制,即把消息持久化到硬盘上,这样即使rabbitMQ挂掉在重启后仍然可以从硬盘读取消息;

(2)如果rabbitMQ单点故障怎么办,这种情况倒不会造成消息丢失,这里就要提到rabbitMQ的3种安装模式,单机模式、普通集群模式、镜像集群模式,这里要保证rabbitMQ的高可用就要配合HAPROXY做镜像集群模式

(3)如果硬盘坏掉怎么保证消息不丢失

(1)消息持久化

RabbitMQ 的消息默认存放在内存上面,如果不特别声明设置,消息不会持久化保存到硬盘上面的,如果节点重启或者意外crash掉,消息就会丢失。

所以就要对消息进行持久化处理。如何持久化,下面具体说明下:

要想做到消息持久化,必须满足以下三个条件,缺一不可。

1) Exchange 设置持久化

2)Queue 设置持久化

3)Message持久化发送:发送消息设置发送模式deliveryMode=2,代表持久化消息

(2)设置集群镜像模式

我们先来介绍下RabbitMQ三种部署模式:

1)单节点模式:最简单的情况,非集群模式,节点挂了,消息就不能用了。业务可能瘫痪,只能等待。

2)普通模式:消息只会存在与当前节点中,并不会同步到其他节点,当前节点宕机,有影响的业务会瘫痪,只能等待节点恢复重启可用(必须持久化消息情况下)。

3)镜像模式:消息会同步到其他节点上,可以设置同步的节点个数,但吞吐量会下降。属于RabbitMQ的HA方案

为什么设置镜像模式集群,因为队列的内容仅仅存在某一个节点上面,不会存在所有节点上面,所有节点仅仅存放消息结构和元数据。

如果想解决上面途中问题,保证消息不丢失,需要采用HA 镜像模式队列。

下面介绍下三种HA策略模式

1)同步至所有的

2)同步最多N个机器

3)只同步至符合指定名称的nodes

命令处理HA策略模版:rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]

1)为每个以“rock.wechat”开头的队列设置所有节点的镜像,并且设置为自动同步模式 rabbitmqctl set_policy ha-all "^rock.wechat" '{"ha-mode":"all","ha-sync-mode":"automatic"}' rabbitmqctl set_policy -p rock ha-all "^rock.wechat" '{"ha-mode":"all","ha-sync-mode":"automatic"}'

2)为每个以“rock.wechat.”开头的队列设置两个节点的镜像,并且设置为自动同步模式 rabbitmqctl set_policy -p rock ha-exacly "^rock.wechat" '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'

3)为每个以“node.”开头的队列分配指定的节点做镜像 rabbitmqctl set_policy ha-nodes "^nodes." '{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'

但是:HA 镜像队列有一个很大的缺点就是:系统的吞吐量会有所下降。

(3)消息补偿机制

为什么还要消息补偿机制呢?难道消息还会丢失,没错,系统是在一个复杂的环境,不要想的太简单了,虽然以上的三种方案,基本可以保证消息的高可用不丢失的问题,

但是作为有追求的程序员来讲,要绝对保证我的系统的稳定性,有一种危机意识。

比如:持久化的消息,保存到硬盘过程中,当前队列节点挂了,存储节点硬盘又坏了,消息丢了,怎么办?

1)生产端首先将业务数据以及消息数据入库,需要在同一个事务中,消息数据入库失败,则整体回滚

2)根据消息表中消息状态,失败则进行消息补偿措施,重新发送消息处理。

消费端
ACK机制改为手动

RabbitMQ的自动ack机制默认在消息发出后就立即将这条消息删除,而不管消费端是否接收到,是否处理完。
我们需要进行手动消费

#开启手动ACK,消费消息的时候,就必须发送ack确认,不然消息永远还在队列中
spring.rabbitmq.listener.simple.acknowledge-mode=manual

 basicNack 方法的第三个参数代表是否重回队列,通常代码的报错并不会因为重试就能解决,所以可能这种情况:继续被消费,继续报错,重回队列,继续被消费…死循环。
一定要有重发消息次数的限制,或者干脆不入队,发送到Redis进行下记录也行。一般就不会再次入队了,而是记录并通知开发人员,进行手动处理

@RabbitHandler@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC))public void process(String msg, Message message, Channel channel) {long tag = message.getMessageProperties().getDeliveryTag();Action action = Action.SUCCESS;try {System.out.println("消费者RabbitDemoConsumer从RabbitMQ服务端消费消息:" + msg);if ("bad".equals(msg)) {throw new IllegalArgumentException("测试:抛出可重回队列的异常");}if ("error".equals(msg)) {throw new Exception("测试:抛出无需重回队列的异常");}} catch (IllegalArgumentException e1) {e1.printStackTrace();//根据异常的类型判断,设置action是可重试的,还是无需重试的action = Action.RETRY;} catch (Exception e2) {//打印异常e2.printStackTrace();//根据异常的类型判断,设置action是可重试的,还是无需重试的action = Action.REJECT;} finally {try {if (action == Action.SUCCESS) {//multiple 表示是否批量处理。true表示批量ack处理小于tag的所有消息。false则处理当前消息channel.basicAck(tag, false);} else if (action == Action.RETRY) {//Nack,拒绝策略,消息重回队列channel.basicNack(tag, false, true);} else {//Nack,拒绝策略,并且从队列中删除channel.basicNack(tag, false, false);}channel.close();} catch (Exception e) {e.printStackTrace();}}}
}
总结

如果需要保证消息在整条链路中不丢失,那就需要生产端、mq自身与消费端共同去保障。

生产端:对生产的消息进行状态标记,开启confirm机制,依据mq的响应来更新消息状态,使用定时任务重新投递超时的消息,多次投递失败进行报警。

mq自身:开启持久化,并在落盘后再进行ack。如果是镜像部署模式,需要在同步到多个副本之后再进行ack。

消费端:开启手动ack模式,在业务处理完成后再进行ack,并且需要保证幂等。

通过以上的处理,理论上不存在消息丢失的情况,但是系统的吞吐量以及性能有所下降。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/226203.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

java设计模式学习之【备忘录模式】

文章目录 引言备忘录模式简介定义与用途实现方式 使用场景优势与劣势在Spring框架中的应用备忘录示例代码地址 引言 想象一下,你正在编辑一篇重要的文档,突然你意识到最近的一些更改实际上破坏了文档的结构。幸运的是,你的文本编辑器允许你撤…

每周一算法:区间覆盖

问题描述 给定 N N N个闭区间 [ a i , b i ] [a_i,b_i] [ai​,bi​],以及一个线段区间 [ s , t ] [s,t] [s,t],请你选择尽量少的区间,将指定线段区间完全覆盖。 输出最少区间数,如果无法完全覆盖则输出 − 1 -1 −1。 输入格式…

基于遗传算法的航线规划

MATLAB2016b可以正常运行 基于遗传算法的无人机航线规划资源-CSDN文库

搭建FTP服务器详细介绍

一.FTP简介 1.1什么是FTP 1.2FTP服务器介绍 1.3FTP服务器优缺点 二.FTP服务器的搭建与配置 2.1 开启防火墙 2.2创建组 2.3创建用户 2.4安装FTP服务器 2.5配置FTP服务器 2.&#xff…

java itext5 生成PDF并填充数据导出

java itext5 生成PDF并填充数据导出 依赖**文本勾选框****页眉**&#xff0c;**页脚****图片**实际图 主要功能有文本勾选框&#xff0c;页眉&#xff0c;页脚&#xff0c;图片等功能。肯定没有专业软件画的好看&#xff0c;只是一点儿方法。仅供参考。 依赖 <!--pdf-->&…

[Redis实战]优惠券秒杀

三、优惠券秒杀 3.1 全局唯一ID 每个店铺都可以发布优惠券&#xff1a; 当用户抢购时&#xff0c;就会生成订单并保存到tb_voucher_order这种表中&#xff0c;而订单表如果使用数据库自增ID就存在一些问题&#xff1a; id的规律性太明显受单表数据量的限制 场景分析一&…

R503S指纹识别模块的指令系统(一)

1.采集指纹图像 GetImage&#xff08;0x01&#xff09; 功能说明&#xff1a;探测手指&#xff0c;探测到后录入指纹图像存于 ImageBuffer&#xff0c;并返回录入成功确认码&#xff1b;若探测不到手指&#xff0c;直接返回无手指确认码(模块对于每一条指令都快速反应&#xf…

C++ DAY2作业

1.课堂struct练习&#xff0c;用class&#xff1b; #include <iostream>using namespace std;class Stu { private:int age;char sex;int high; public:double score;void set_values(int a,char b,int c,double d);int get_age();char get_sex();int get_high(); }; vo…

SpringBoot 3.2.0 结合Redisson接入Redis

依赖版本 JDK 17 Spring Boot 3.2.0 Redisson 3.25.0 工程源码&#xff1a;Gitee 集成Redis步骤 导入依赖 <properties><redisson.version>3.25.0</redisson.version> </properties> <dependencies><dependency><groupId>org.pr…

echart地图的小demo12.27

图形&#xff1a; DataV.GeoAtlas地理小工具系列 点击以上链接进入--》 再点击箭头---》复制坐标到文件&#xff1a; 取名为 china.json中 &#xff08;文件名自定义&#xff09; <template><div class"map" ref"chartMap">地图</div>…

案例232:基于微信小程序的学生实习与就业管理系统设计与实现

文末获取源码 开发语言&#xff1a;Java 框架&#xff1a;springboot JDK版本&#xff1a;JDK1.8 数据库&#xff1a;mysql 5.7 开发软件&#xff1a;eclipse/myeclipse/idea Maven包&#xff1a;Maven3.5.4 小程序框架&#xff1a;uniapp 小程序开发软件&#xff1a;HBuilder …

java旅游攻略管理系统Myeclipse开发mysql数据库web结构java编程计算机网页项目

一、源码特点 java Web旅游攻略管理系统是一套完善的java web信息管理系统&#xff0c;对理解JSP java编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。开发环境为 TOMCAT7.0,Myeclipse8.5开发&#xff0c;数据库为Mysql…

ASUS华硕ROG幻16笔记本电脑2023款GU604VI VZ VY原装出厂Windows11系统22H2

华硕玩家国度幻16笔记本原厂W11系统&#xff0c;适用型号&#xff1a;GU604VI、GU604VZ、GU604VY 链接&#xff1a;https://pan.baidu.com/s/166x6FNUFEpA3Qbzeory3Hg?pwdlwau 提取码&#xff1a;lwau 系统自带所有驱动、出厂主题壁纸、Office办公软件、MyASUS华硕电脑管…

『JavaScript』全面解析JavaScript中的防抖与节流技术及其应用场景

&#x1f4e3;读完这篇文章里你能收获到 理解防抖&#xff08;Debouncing&#xff09;和节流&#xff08;Throttling&#xff09;的概念&#xff1a;了解这两种性能优化技术如何帮助我们更有效地处理频繁触发的事件掌握防抖与节流的实现方法&#xff1a;学习如何在JavaScript中…

C# 常用数据类型及取值范围

1.常见数据类型和取值范围 序号数据类型占字节数取值范围1byte10 到 2552sbyte1-128 到 1273short 2-32,768 到 32,7674ushort20 到 65,5355int4-2,147,483,648 到 2,147,483,6476uint40 到 4,294,967,2957float41.5 x 10−45 至 3.4 x 10388double85.0 10−324 到 1.…

走进电子技术之光敏电阻、电位器、开关

同学们大家好&#xff0c;今天我们继续学习杨欣的《电子设计从零开始》&#xff0c;这本书从基本原理出发&#xff0c;知识点遍及无线电通讯、仪器设计、三极管电路、集成电路、传感器、数字电路基础、单片机及应用实例&#xff0c;可以说是全面系统地介绍了电子设计所需的知识…

关于Java并发、JVM面试题

前言 之前为了准备面试&#xff0c;收集整理了一些面试题。 本篇文章更新时间2023年12月27日。 最新的内容可以看我的原文&#xff1a;https://www.yuque.com/wfzx/ninzck/cbf0cxkrr6s1kniv 并发 进程与线程的区别 线程属于进程&#xff0c;进程可以拥有多个线程。进程独享…

BAQ压缩MATLAB仿真

本专栏目录: ​​​​​​​全球SAR卫星大盘点与回波数据处理专栏目录-CSDN博客 我们按照上一期文章的BAQ原理编写MATLAB代码,进行baq压缩与解压缩的全流程验证,并分析BAQ压缩对信号指标造成的影响。 生成3个点目标回波数据,加入高斯噪声,对回波进行BAQ压缩和解BAQ压缩,…

Android集成OpenSSL实现加解密-集成

导入so 将编译生成的 OpenSSL 动态库文件&#xff08;.so 文件&#xff09;复制到你的 Android 项目的 libs 目录中 导入头文件 将编译生成的include文件夹导入到项目中 build.gradle添加配置 defaultConfig {……testInstrumentationRunner "androidx.test.runner…

【10】ES6:Promise 对象

一、同步和异步 1、JS 是单线程语言 JavaScript 是一门单线程的语言&#xff0c;因此同一个时间只能做一件事情&#xff0c;这意味着所有任务都需要排队&#xff0c;前一个任务执行完&#xff0c;才会执行下一个任务。但是&#xff0c;如果前一个任务的执行时间很长&#xff…