Rabbitmq消息不丢失

目录

  • 一、消息不丢失
    • 1.消息确认
    • 2.消息确认业务封装
      • 2.1 发送确认消息测试
      • 2.2 消息发送失败,设置重发机制

一、消息不丢失

消息的不丢失,在MQ角度考虑,一般有三种途径:
1,生产者不丢数据
2,MQ服务器不丢数据
3,消费者不丢数据
保证消息不丢失有两种实现方式:
1,开启事务模式
2,消息确认模式
说明:开启事务会大幅降低消息发送及接收效率,使用的相对较少,因此我们生产环境一般都采取消息确认模式,以下我们只是讲解消息确认模式

1.消息确认

消息持久化
如果希望RabbitMQ重启之后消息不丢失,那么需要对以下3种实体均配置持久化
Exchange
声明exchange时设置持久化(durable = true)并且不自动删除(autoDelete = false)
Queue
声明queue时设置持久化(durable = true)并且不自动删除(autoDelete = false)
message
发送消息时通过设置deliveryMode=2持久化消息

处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。这样,如果消息持久化磁盘之前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。那么如何持久化呢,其实也很容易,就下面两步:
1、将queue的持久化标识durable设置为true,则代表是一个持久的队列
2、发送消息的时候将deliveryMode=2
这样设置以后,rabbitMQ就算挂了,重启后也能恢复数据

发送确认
有时,业务处理成功,消息也发了,但是我们并不知道消息是否成功到达了rabbitmq,如果由于网络等原因导致业务成功而消息发送失败,那么发送方将出现不一致的问题,此时可以使用rabbitmq的发送确认功能,即要求rabbitmq显式告知我们消息是否已成功发送。

手动消费确认
有时,消息被正确投递到消费方,但是消费方处理失败,那么便会出现消费方的不一致问题。比如:订单已创建的消息发送到用户积分子系统中用于增加用户积分,但是积分消费方处理却都失败了,用户就会问:我购买了东西为什么积分并没有增加呢?
要解决这个问题,需要引入消费方确认,即只有消息被成功处理之后才告知rabbitmq以ack,否则告知rabbitmq以nack

2.消息确认业务封装

service-mq修改配置
开启rabbitmq消息确认配置,在common的配置文件中都已经配置好了!

spring:rabbitmq:host: 192.168.121.140port: 5672username: adminpassword: adminpublisher-confirms-type: correlated  #交换机的确认publisher-returns: true  #队列的确认listener:simple:acknowledge-mode: manual #默认情况下消息消费者是自动确认消息的,如果要手动确认消息则需要修改确认模式为manualprefetch: 1 # 消费者每次从队列获取的消息数量。此属性当不设置时为:轮询分发,设置为1为:公平分发

搭建rabbit-util模块
由于消息队列是公共模块,我们把mq的相关业务封装到该模块,其他service微服务模块都可能使用,因此我们把他封装到一个单独的模块,需要使用mq的模块直接引用该模块即可
搭建方式如:
pom.xml

    <dependencies><!--rabbitmq消息队列--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><!--rabbitmq 协议--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-bus-amqp</artifactId></dependency></dependencies>

4.2.4 封装发送端消息确认

/*** @Description 消息发送确认* <p>* ConfirmCallback  只确认消息是否正确到达 Exchange 中* ReturnCallback   消息没有正确到达队列时触发回调,如果正确到达队列不执行* <p>* 1. 如果消息没有到exchange,则confirm回调,ack=false* 2. 如果消息到达exchange,则confirm回调,ack=true* 3. exchange到queue成功,则不回调return* 4. exchange到queue失败,则回调return* */
@Component
@Slf4j
public class MQProducerAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;// 修饰一个非静态的void()方法,在服务器加载Servlet的时候运行,并且只会被服务器执行一次在构造函数之后执行,init()方法之前执行。@PostConstructpublic void init() {rabbitTemplate.setConfirmCallback(this);            //指定 ConfirmCallbackrabbitTemplate.setReturnCallback(this);             //指定 ReturnCallback}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {log.info("消息发送成功:" + JSON.toJSONString(correlationData));} else {log.info("消息发送失败:" + cause + " 数据:" + JSON.toJSONString(correlationData));}}@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {// 反序列化对象输出System.out.println("消息主体: " + new String(message.getBody()));System.out.println("应答码: " + replyCode);System.out.println("描述:" + replyText);System.out.println("消息使用的交换器 exchange : " + exchange);System.out.println("消息使用的路由键 routing : " + routingKey);}}

封装消息发送

@Service
public class RabbitService {@Autowiredprivate RabbitTemplate rabbitTemplate;/***  发送消息* @param exchange 交换机* @param routingKey 路由键* @param message 消息*/public boolean sendMessage(String exchange, String routingKey, Object message) {rabbitTemplate.convertAndSend(exchange, routingKey, message);return true;}}

2.1 发送确认消息测试

消息发送端

@RestController
@RequestMapping("/mq")
public class MqController {@Autowiredprivate RabbitService rabbitService;/*** 消息发送*///http://localhost:8282/mq/sendConfirm@GetMapping("sendConfirm")public Result sendConfirm() {rabbitService.sendMessage("exchange.confirm", "routing.confirm", "来人了,开始接客吧!");return Result.ok();}
}

消息接收端

@Component
public class ConfirmReceiver {@SneakyThrows
@RabbitListener(bindings=@QueueBinding(value = @Queue(value = "queue.confirm",autoDelete = "false"),exchange = @Exchange(value = "exchange.confirm",autoDelete = "true"),key = {"routing.confirm"}))
public void process(Message message, Channel channel){System.out.println("RabbitListener:"+new String(message.getBody()));// false 确认一个消息,true 批量确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}
}

测试:http://localhost:8282/mq/sendConfirm

2.2 消息发送失败,设置重发机制

实现思路:借助redis来实现重发机制
模块中添加依赖

<!-- redis -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency><!-- spring2.X集成redis所需common-pool2-->
<dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId>
</dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId>
</dependency>

自定义一个实体类来接收消息

@Data
public class GmallCorrelationData extends CorrelationData {//  消息主体private Object message;//  交换机private String exchange;//  路由键private String routingKey;//  重试次数private int retryCount = 0;//  消息类型  是否是延迟消息private boolean isDelay = false;//  延迟时间private int delayTime = 10;
}

修改发送方法

//  封装一个发送消息的方法
public Boolean sendMsg(String exchange,String routingKey, Object msg){//  将发送的消息 赋值到 自定义的实体类GmallCorrelationData gmallCorrelationData = new GmallCorrelationData();//  声明一个correlationId的变量String correlationId = UUID.randomUUID().toString().replaceAll("-","");gmallCorrelationData.setId(correlationId);gmallCorrelationData.setExchange(exchange);gmallCorrelationData.setRoutingKey(routingKey);gmallCorrelationData.setMessage(msg);//  发送消息的时候,将这个gmallCorrelationData 对象放入缓存。redisTemplate.opsForValue().set(correlationId, JSON.toJSONString(gmallCorrelationData),10, TimeUnit.MINUTES);//  调用发送消息方法//this.rabbitTemplate.convertAndSend(exchange,routingKey,msg);this.rabbitTemplate.convertAndSend(exchange,routingKey,msg,gmallCorrelationData);//  默认返回truereturn true;
}发送失败调用重发方法  MQProducerAckConfig 类中修改
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {//  ack = true 说明消息正确发送到了交换机if (ack){System.out.println("哥们你来了.");log.info("消息发送到了交换机");}else {//  消息没有到交换机log.info("消息没发送到交换机");//  调用重试发送方法this.retrySendMsg(correlationData);}
}@Override
public void returnedMessage(Message message, int code, String codeText, String exchange, String routingKey) {System.out.println("消息主体: " + new String(message.getBody()));System.out.println("应答码: " + code);System.out.println("描述:" + codeText);System.out.println("消息使用的交换器 exchange : " + exchange);System.out.println("消息使用的路由键 routing : " + routingKey);//  获取这个CorrelationData对象的Id  spring_returned_message_correlationString correlationDataId = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");//  因为在发送消息的时候,已经将数据存储到缓存,通过 correlationDataId 来获取缓存的数据String strJson = (String) this.redisTemplate.opsForValue().get(correlationDataId);//  消息没有到队列的时候,则会调用重试发送方法GmallCorrelationData gmallCorrelationData = JSON.parseObject(strJson,GmallCorrelationData.class);//  调用方法  gmallCorrelationData 这对象中,至少的有,交换机,路由键,消息等内容.this.retrySendMsg(gmallCorrelationData);
}/*** 重试发送方法* @param correlationData   父类对象  它下面还有个子类对象 GmallCorrelationData*/
private void retrySendMsg(CorrelationData correlationData) {//  数据类型转换  统一转换为子类处理GmallCorrelationData gmallCorrelationData = (GmallCorrelationData) correlationData;//  获取到重试次数 初始值 0int retryCount = gmallCorrelationData.getRetryCount();//  判断if (retryCount>=3){//  不需要重试了log.error("重试次数已到,发送消息失败:"+JSON.toJSONString(gmallCorrelationData));} else {//  变量更新retryCount+=1;//  重新赋值重试次数 第一次重试 0->1 1->2 2->3gmallCorrelationData.setRetryCount(retryCount);System.out.println("重试次数:\t"+retryCount);//  更新缓存中的数据this.redisTemplate.opsForValue().set(gmallCorrelationData.getId(),JSON.toJSONString(gmallCorrelationData),10, TimeUnit.MINUTES);//  调用发送消息方法 表示发送普通消息  发送消息的时候,不能调用 new RabbitService().sendMsg() 这个方法this.rabbitTemplate.convertAndSend(gmallCorrelationData.getExchange(),gmallCorrelationData.getRoutingKey(),gmallCorrelationData.getMessage(),gmallCorrelationData);}
}

测试:只需修改(错误信息)
在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/93335.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MySQL索引(Index)

Index 数据库中的索引&#xff08;Index&#xff09;是一种数据结构&#xff0c;用于提高数据库查询性能和加速数据检索过程。索引可以看作是数据库表中某个或多个列的数据结构&#xff0c;类似于书中的目录&#xff0c;可以帮助数据库管理系统更快地定位和访问数据。它们是数…

Docker 容器内无法使用vim命令 解决方法

目录 1. 问题所示2. 原理分析3. 解决方法1. 问题所示 进入Docker容器后 无法使用vim编辑器,出现如下问题:bash: vim: command not found 如图所示: 想着通过apt-get 安装vim,出现如下问题: root@b9f0fd330d5b:/# apt-get install vim Reading package lists... Done B…

Layui列表表头去掉复选框改为选择

效果&#xff1a; 代码&#xff1a; // 表头复选框去掉改为选择 $(".layui-table th[data-field"0"] .layui-table-cell").html("<span>选择</span>");

svn 过滤文件

1. 右键点击&#xff0c;依次选择 TortoiseSVN -> Settings 2. 添加需要过滤的后缀/关键词【 *.iml *.idea *.jar *.class 】

在Vue中动态引入图片为什么要用require

静态资源和动态资源 静态资源 动态的添加src 动态资源 我们通过网络请求从后端获取的资源 动态的添加src会被当成静态资源 动态的添加src最终会被打包成&#xff1a; 动态的添加图片最会会被编译成一个静态的字符串&#xff0c;然后再浏览器运行中会去项目中查找这个资源…

面试热题(反转字符串中的单词)

给你一个字符串 s &#xff0c;请你反转字符串中 单词 的顺序。 单词 是由非空格字符组成的字符串。s 中使用至少一个空格将字符串中的 单词 分隔开。 返回 单词 顺序颠倒且 单词 之间用单个空格连接的结果字符串。 注意&#xff1a;输入字符串 s中可能会存在前导空格、尾随空格…

RunnerGo的相比较JMeter优势,能不能替代?

目前在性能测试领域市场jmeter占有率是非常高的&#xff0c;主要原因是相对比其他性能测试工具使用更简单&#xff08;开源、易扩展&#xff09;&#xff0c;功能更强大&#xff08;满足多种协议的接口&#xff09;&#xff0c;但是随着研发协同的升级&#xff0c;平台化的性能…

关于统一事件管理,一定有你想知道的(二)

本文部分内容来源于布博士----擎创科技资深产品专家 哈喽~又见面啦~ 上期内容我们说到了事件以及事件管理&#xff0c;戳这里一键恢复上期精彩内容&#xff1a;关于统一事件管理&#xff0c;一定有你想知道的&#xff08;一&#xff09; 这期主要带大家看看事件管理是怎么运用…

C++Qt动态增加垂直滚动条

本博文源于笔者正在工作的一个小内容&#xff0c;内容涉及到为qt动态增加垂直滚动条。文章分为三个部分&#xff0c;问题起源&#xff0c;问题解决方案&#xff0c;问题解决成功效果。思路清晰&#xff0c;文章干货满满&#xff0c;复制源码即可使用。 问题起源 qt中一个页面…

1572. 矩阵对角线元素的和

题目描述&#xff1a; 给你一个正方形矩阵 mat&#xff0c;请你返回矩阵对角线元素的和。 请你返回在矩阵主对角线上的元素和副对角线上且不在主对角线上元素的和。 示例&#xff1a; 解题思路&#xff1a; 同时求对角线和副对角线上元素的和再减去重合的元素 相关代码&#xf…

聊聊火车的发展

目录 1.火车的概念 2.火车的发展历史 3.火车对战争的影响 4.火车对人们出行造成的影响 1.火车的概念 火车是一种由机械动力驱动的陆上交通工具&#xff0c;通常用来运输人员和货物。它由一列或多列的连接在一起的车厢组成&#xff0c;有轨道作为其行驶的基础&#xff0c;并通…

Python之Qt输出UI

安装PySide2 输入pip install PySide2安装Qt for Python&#xff0c;如果安装过慢需要翻墙&#xff0c;则可以使用国内清华镜像下载&#xff0c;输入命令pip install --user -i https://pypi.tuna.tsinghua.edu.cn/simple PySide2&#xff0c;如下图&#xff0c; 示例Demo i…

深入探索JavaEE单体架构、微服务架构与云原生架构

课程链接&#xff1a; 链接: https://pan.baidu.com/s/1xSI1ofwYXfqOchfwszCZnA?pwd4s99 提取码: 4s99 复制这段内容后打开百度网盘手机App&#xff0c;操作更方便哦 --来自百度网盘超级会员v4的分享 课程介绍&#xff1a; &#x1f50d;【00】模块零&#xff1a;开营直播&a…

帆软大屏2.0企业制作

&#xfffc; 数字化观点中心 / 当前页 如何从0-1制作数据大屏&#xff0c;我用大白话给你解释清楚了 文 | 商业智能BI相关文章 阅读次数&#xff1a;18,192 次浏览 2023-06-08 11:51:49 好莱坞大片《摩天营救》中有这么一个场景&#xff1a; &#xfffc; 你可以看见反派大b…

mysql面试

基础篇 通用语法及分类 DDL: 数据定义语言&#xff0c;用来定义数据库对象&#xff08;数据库、表、字段&#xff09;DML: 数据操作语言&#xff0c;用来对数据库表中的数据进行增删改DQL: 数据查询语言&#xff0c;用来查询数据库中表的记录DCL: 数据控制语言&#xff0c;用…

Linux 5种网络IO模型

Linux IO模型 网络IO的本质是socket的读取&#xff0c;socket在linux系统被抽象为流&#xff0c;IO可以理解为对流的操作。刚才说了&#xff0c;对于一次IO访问&#xff08;以read举例&#xff09;&#xff0c;数据会先被拷贝到操作系统内核的缓冲区中&#xff0c;然后才会从操…

【STM32RT-Thread零基础入门】 3. PIN设备(GPIO)的使用

硬件&#xff1a;STM32F103ZET6、ST-LINK、usb转串口工具、4个LED灯、1个蜂鸣器、4个1k电阻、2个按键、面包板、杜邦线 文章目录 前言一、PIN设备介绍1. 引脚编号获取2. 设置引脚的输入/输出模式3. 设置引脚的电平值4. 读取引脚的电平值5. 绑定引脚中断回调函数6. 脱离引脚中断…

爬虫逆向实战(十四)--某培训平台登录

一、数据接口分析 主页地址&#xff1a;某培训平台 1、抓包 通过抓包可以发现登录是表单提交到j_spring_security_check 2、判断是否有加密参数 请求参数是否加密&#xff1f; 通过查看“载荷”模块可以发现有一个j_password加密参数 请求头是否加密&#xff1f; 无响应是…

java初级算法(杨辉三角)

java初级算法&#xff08;杨辉三角&#xff09; java初级算法&#xff08;杨辉三角&#xff09;内容&#xff1a;思路解法&#xff1a;代码实现 学习时间&#xff1a;2023/08/16 java初级算法&#xff08;杨辉三角&#xff09; 每日一算法&#xff1a;杨辉三角 内容&#xff1a…

好用画流程图软件推荐 excalidraw

作者&#xff1a;明明如月学长&#xff0c; CSDN 博客专家&#xff0c;蚂蚁集团高级 Java 工程师&#xff0c;《性能优化方法论》作者、《解锁大厂思维&#xff1a;剖析《阿里巴巴Java开发手册》》、《再学经典&#xff1a;《EffectiveJava》独家解析》专栏作者。 热门文章推荐…