RabbitMQ 消息丢失解决 (高级发布确认、消息回退与重发、备份交换机)

目录

一、发布确认SpringBoot版本

确认机制图例:

代码实战:

代码架构图:

1.1交换机的发布确认

添加配置类

消息消费者

消息生产者发布消息后的回调接口

测试:

 1.2回退消息并重发(队列的发布确认)

修改回调接口

生产者:

测试:

二、备份交换机

实战

生产者

报警消费者:

测试:


一、发布确认SpringBoot版本

        首先发布消息后进行备份在缓存里,如果消息成功发布确认到交换机,则从缓存里删除该消息,如果没有成功发布,则设置一个定时任务,重新从缓存里获取消息发布到交换机,直到成功发布到交换机。

确认机制图例:

代码实战:

一个交换机:confirm.exchange,一个队列:confirm.queue,一个消费者:confirm.consumer

其中交换机类型时 direct,与队列关联的 routingKey 是 key1

代码架构图:

1.1交换机的发布确认

配置文件中添加:

server:port: 8888
spring:rabbitmq:host: 192.168.163.132port: 5672username: 2252631565password: 2252631565
#    高级发布确认 发布消息成功后将会触发回调方法publisher-confirm-type: correlated
  • NONE 值是禁用发布确认模式,是默认值
  • CORRELATED 值是发布消息成功到交换器后会触发回调方法
  • SIMPLE 值经测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法,其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker;

添加配置类

声明交换机和队列,并且将交换机和队列进行绑定:

@Configuration
public class ConfirmConfig {public static final String CONFIRM_EXCHANGE="confirm.exchange";public static final String CONFIRM_QUEUE="confirm.queue";public static final String ROUTING_KEY="key1";@Beanpublic DirectExchange confirmExchange(){return new DirectExchange(CONFIRM_EXCHANGE,false,false);}@Beanpublic Queue confirmQueue(){return QueueBuilder.durable(CONFIRM_QUEUE).build();}@Beanpublic Binding EAndQBind(@Qualifier("confirmExchange") DirectExchange exchange,@Qualifier("confirmQueue")Queue queue){return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);}
}

消息生产者

也可以说是 Controller 层,在这里发送两条消息给两个交换机,其中一个交换机是我们设置好的,另一个交换机不存在;这样就可以清晰看出交换机应答效果。

@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {@Autowiredprivate RabbitTemplate rabbitTemplate;//高级发布确认模式@GetMapping("/sendConfirmMsg/{message}")public void sendConfirmMsg(@PathVariable String message){log.info("发送一条时长为的消息给第一个队列内容是:{}",new Date().toString(),message);CorrelationData correlationData=new CorrelationData("1");correlationData.setReturnedMessage(new org.springframework.amqp.core.Message(message.getBytes()));rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE,ConfirmConfig.ROUTING_KEY,message,correlationData);//向一个不存在的交换机发送消息log.info("发送一条时长为的消息给第一个队列内容是:{}",new Date().toString(),message);CorrelationData correlationData2=new CorrelationData("2");correlationData2.setReturnedMessage(new org.springframework.amqp.core.Message(message.getBytes()));rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE+123,ConfirmConfig.ROUTING_KEY,message,correlationData2);}}

消息消费者

监听 confirm.queue 队列

@Slf4j
@Component
public class ConfirmLetterQueue {@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE)public void confirmConsumer(Message message, Channel channel){log.info("收到了消息:{}",new String(message.getBody()));}
}

消息生产者发布消息后的回调接口

只要生产者发布消息,交换机不管是否收到消息,都会调用该类的 confirm 方法

@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void init(){//注入rabbitTemplate.setConfirmCallback(this::confirm);}/*交换机确认回调1.交换机收到了消息 触发回调1.1 correlationData(我们在发消息的时候自己创建的) 消息的ID以及消息内容1.2 ack 交换机收到消息 true1.3 cause 交换机收到消息的原因 null---------------------------------2.交换机未收到消息 触发回调2.1 correlationData 消息的ID以及消息内容2.2 ack 交换机未收到消息 false2.3 cause 失败原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if(ack){log.info("发送消息到交换机成功!消息体为:{}",new String(correlationData.getReturned().getMessage().getBody()));}else {log.info("发送消息到交换机失败!原因为:{}",cause.toString());}}
}

测试:

 效果很明显,我们配置的交换机成功收到消息并转发给队列;不存在的交换机没有接受到消息并作出反应。

 1.2回退消息并重发(队列的发布确认)

在配置文件中开启消息回退功能

server:port: 8888
spring:rabbitmq:host: 192.168.163.133port: 5672username: 2252631565password: 2252631565
#    高级发布确认 发布消息成功后将会触发回调方法publisher-confirm-type: correlated#    消息回退 当消息未路由至队列时触发publisher-returns: true

修改回调接口

实现 RabbitTemplate.ReturnsCallback 接口,并实现方法

@Slf4j
@Component
public class MyCallBack  implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback{@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void init(){//注入rabbitTemplate.setConfirmCallback(this::confirm);rabbitTemplate.setReturnsCallback(this::returnedMessage);}/*交换机确认回调1.交换机收到了消息 触发回调1.1 correlationData(我们在发消息的时候自己创建的) 消息的ID以及消息内容1.2 ack 交换机收到消息 true1.3 cause 交换机收到消息的原因 null---------------------------------2.交换机未收到消息 触发回调2.1 correlationData 消息的ID以及消息内容2.2 ack 交换机未收到消息 false2.3 cause 失败原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if(ack){log.info("时间:{}发送消息到交换机成功!",new Date());}else {log.info("发送消息到交换机失败!原因为:{}",cause.toString());}}//当消息未路由到队列时触发 只有失败时才触发 若消息发送至延迟队列则一定会触发回退 记得根据交换机名称排除延迟队列@Overridepublic void returnedMessage(ReturnedMessage returned) {log.error("消息:'{}',被交换机:{}回退,回退原因为:{},routingKey为:{}",new String(returned.getMessage().getBody()),returned.getExchange(),returned.getReplyText(),returned.getRoutingKey());//10s后消息重发try {Thread.sleep(10000);log.info("时间:{},生产者重新发消息",new Date());rabbitTemplate.convertAndSend(returned.getExchange(),ConfirmConfig.ROUTING_KEY,new String(returned.getMessage().getBody()));}catch (InterruptedException e) {throw new RuntimeException(e);}}
}

生产者:

向交换机中发送消息,指定错误的routingkey,触发队列回退消息并重发消息。

    //高级发布确认模式@GetMapping("/sendConfirmMsg/{message}")public void sendConfirmMsg(@PathVariable String message){//向一个不存在的队列发送消息log.info("时间:{}生产者发送一条的消息给第一个队列内容是:{}",new Date().toString(),message);CorrelationData correlationData2=new CorrelationData("2");rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE,ConfirmConfig.ROUTING_KEY+123,message,correlationData2);}

测试:

回退未进入队列的消息并重新发送消息。 

二、备份交换机

        什么是备份交换机呢?备份交换机可以理解为 RabbitMQ 中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时,就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout (扇出),这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进 入这个队列了。当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警并可以重发消息。

实战

需要一个备份交换机 backup.exchange,类型为 fanout,该交换机发送消息到队列 backup.queue 和 warning.queue。

 修改高级确认发布 配置类

@Configuration
public class ConfirmConfig {public static final String CONFIRM_EXCHANGE="confirm.exchange";public static final String CONFIRM_QUEUE="confirm.queue";public static final String ROUTING_KEY="key1";//备份交换机public static final String BACKUP_EXCHANGE="backup.exchange";//备份队列public static final String BACKUP_QUEUE="backup.queue";//报警队列public static final String WARNING_QUEUE="warning.queue";@Beanpublic DirectExchange confirmExchange(){//绑定确认交换机与备份交换机Map<String,Object> argument=new HashMap<>();argument.put("alternate-exchange",BACKUP_EXCHANGE);return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE).withArguments(argument).build();}//备份交换机@Beanpublic FanoutExchange backupExchange(){return new FanoutExchange(BACKUP_EXCHANGE);}@Beanpublic Queue confirmQueue(){return QueueBuilder.durable(CONFIRM_QUEUE).build();}//备份队列@Beanpublic Queue backupQueue(){return QueueBuilder.durable(BACKUP_QUEUE).build();}//警告队列@Beanpublic Queue warningQueue(){return QueueBuilder.durable(WARNING_QUEUE).build();}@Beanpublic Binding EAndQBind(@Qualifier("confirmExchange") DirectExchange exchange,@Qualifier("confirmQueue")Queue queue){return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);}//绑定备份交换机与两个队列@Beanpublic Binding BAndBBing(@Qualifier("backupExchange") FanoutExchange exchange,@Qualifier("backupQueue")Queue queue){return BindingBuilder.bind(queue).to(exchange);}@Beanpublic Binding BAndWBing(@Qualifier("backupExchange") FanoutExchange exchange,@Qualifier("warningQueue")Queue queue){return BindingBuilder.bind(queue).to(exchange);}
}

生产者

        生产者发送两条消息 一个配置正确的路由,另一个是错误的路由。预期目标是正确路由正常接收消息,错误路由传输的信息由警告队列接收。

    //高级发布确认模式@GetMapping("/sendConfirmMsg/{message}")public void sendConfirmMsg(@PathVariable String message){log.info("时间:{}生产者发送两条消息队列内容是:{}",new Date().toString(),message);CorrelationData correlationData=new CorrelationData("1");rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE,ConfirmConfig.ROUTING_KEY,message,correlationData);//向一个不存在的队列发送消息CorrelationData correlationData2=new CorrelationData("2");rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE,ConfirmConfig.ROUTING_KEY+123,message,correlationData2);}

报警消费者:

接收不可路由的消息

/*** 报警消费者*/
@Slf4j
@Component
public class WarningConsumer {//监听报警消息@RabbitListener(queues = ConfirmConfig.WARNING_QUEUE)public void receiveWarningMsg(Message message){String msg=new String(message.getBody());log.info("报警发现不可路由消息:{},重发消息",msg);}
}

测试:

         生产者发送两条消息 一个配置正确的路由,另一个是错误的路由。预期目标是正确路由正常接收消息,错误路由传输的信息由警告队列接收。

        在此案例中,也设置了消息回退的配置,但是没有触发消息回退。由此得出:备份交换机的优先级更高。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/196874.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

编写程序,要求输入x的值,输出y的值。分别用(1)不嵌套的if语句(2)嵌套的if语句(3)if-else语句(4)switch语句。

编写程序&#xff0c;要求输入x的值&#xff0c;输出y的值。分别用&#xff08;1&#xff09;不嵌套的if语句&#xff08;2&#xff09;嵌套的if语句&#xff08;3&#xff09;if-else语句&#xff08;4&#xff09;switch语句。 选择结构是编程语言中常用的一种控制结构&…

适用于 Windows 的 10 个最佳视频转换器:快速转换高清视频

您是否遇到过由于格式不兼容而无法在您的设备上播放视频或电影的情况&#xff1f;您想随意播放从您的相机、GoPro 导入的视频&#xff0c;还是以最合适的格式将它们上传到媒体网站&#xff1f;您的房间里是否有一堆 DVD 光盘&#xff0c;想将它们转换为数字格式以便于播放&…

清华学霸告诉你:如何自学人工智能?

清华大学作为中国顶尖的学府之一&#xff0c;培养了许多优秀的人才&#xff0c;其中不乏在人工智能领域有所成就的学霸。通过一位清华学霸的经验分享&#xff0c;揭示如何自学人工智能&#xff0c;帮助你在这场科技浪潮中勇往直前。 一、夯实基础知识 数学基础&#xff1a;学习…

2023年首届天府杯数学建模国际大赛问题A思路详解与参考代码:大地测量数据中异常现象的特征和识别

地球变形观测是固体潮汐曲线分析和地震前体研究的重要手段&#xff0c;也是地球观测技术的重要组成部分。基于各种精密科学仪器的变形观测点主要集中在洞穴、地下井等易的自然灾害&#xff08;雷暴、强降雨、降雪等&#xff09;&#xff0c;人工维护、人工爆破等外部条件&#…

详解如何使用Jenkins一键打包部署SpringBoot项目

目录 1、Jenkins简介 2、Jenkins的安装及配置 2.1、Docker环境下的安装​编辑 2.2、Jenkins的配置 3、打包部署SpringBoot应用 3.1、在Jenkins中创建执行任务 3.2、测试结果 1、Jenkins简介 任何简单操作的背后&#xff0c;都有一套相当复杂的机制。本文将以SpringBoot应…

春秋云境靶场CVE-2022-32991漏洞复现(sql手工注入)

文章目录 前言一、CVE-2022-32991靶场简述二、找注入点三、CVE-2022-32991漏洞复现1、判断注入点2、爆显位个数3、爆显位位置4 、爆数据库名5、爆数据库表名6、爆数据库列名7、爆数据库数据 总结 前言 此文章只用于学习和反思巩固sql注入知识&#xff0c;禁止用于做非法攻击。…

电子商务、搜索引擎

电子商务 域名 网络服务 网络樱肖 搜索引擎优化

掌握深度学习利器——TensorFlow 2.x实战应用与进阶

掌握深度学习利器——TensorFlow 2.x实战应用与进阶 摘要&#xff1a;随着人工智能技术的飞速发展&#xff0c;深度学习已成为当下最热门的领域之一。作为深度学习领域的重要工具&#xff0c;TensorFlow 2.x 备受关注。本文将通过介绍TensorFlow 2.x的基本概念和特性&#xff…

duplicate复制数据库单个数据文件复制失败报错rman-03009 ora-03113

duplicate复制数据库单个数据文件复制失败报错rman-03009 ora-03113 搭建dg过程中&#xff0c;发现有一个数据文件在复制过程中没有复制过来&#xff0c;在备库数据文件目录找不到这个数据文件 处理方法&#xff1a; 第一步&#xff1a;主库备份86#数据文件 C:\Users\Admi…

低代码编辑平台后台实现

背景 之前做过一个前端低代码编辑平台&#xff0c;可以实现简单的移动端页面组件拖拽编辑&#xff1a; https://github.com/li-car-fei/react-visual-design 最近基于C的oatpp框架实现了一下后台。使用oatpp框架做web后台开发时&#xff0c;发现按照官方的示例使用的话&#…

AI Navigation导航系统_unity基础开发教程

AI Navigation导航系统 安装插件烘焙导航系统障碍物创建人物的AI导航动态障碍物 在unity编辑器中&#xff0c;有一个灰常好用的插件&#xff1a;Navigation。有了它1&#xff0c;你就可以实现人物自动走到你鼠标点击的位置&#xff0c;而且还会自动避开障碍物&#xff0c;下面就…

CronExpression

CronTrigger配置格式: 格式: [秒] [分] [小时] [日] [月] [周] [年]序号 说明 是否必填 允许填写的值 允许的通配符 1 秒 是 0-59 , - * / 2 分 是 0-59 , - * / 3 小时 是 0-23 , - * / 4 日 是 1-31 , - * ? / L W 5 月 是 1-12 or JA…

服务容错之限流之 Tomcat 限流 Tomcat 线程池的拒绝策略

在文章开头&#xff0c;先和大家抛出两个问题&#xff1a; 每次提到服务限流为什么都不考虑基于 Tomcat 来做呢&#xff1f;大家有遇到过 Tomcat 线程池触发了拒绝策略吗&#xff1f; JUC 线程池 在谈 Tomcat 的线程池前&#xff0c;先看一下 JUC 中线程池的执行流程&#x…

安全框架springSecurity+Jwt+Vue-1(vue环境搭建、动态路由、动态标签页)

一、安装vue环境&#xff0c;并新建Vue项目 ①&#xff1a;安装node.js 官网(https://nodejs.org/zh-cn/) 2.安装完成之后检查下版本信息&#xff1a; ②&#xff1a;创建vue项目 1.接下来&#xff0c;我们安装vue的环境 # 安装淘宝npm npm install -g cnpm --registryhttps:/…

如何零基础自学AI人工智能

随着人工智能&#xff08;AI&#xff09;的快速发展&#xff0c;越来越多的有志之士被其强大的潜力所吸引&#xff0c;希望投身其中。然而&#xff0c;对于许多零基础的人来说&#xff0c;如何入门AI成了一个难题。本文将为你提供一份详尽的自学AI人工智能的攻略&#xff0c;帮…

SpringCloud微服务:Ribbon负载均衡

目录 负载均衡策略&#xff1a; 负载均衡的两种方式&#xff1a; 饥饿加载 1. Ribbon负载均衡规则 规则接口是IRule 默认实现是ZoneAvoidanceRule&#xff0c;根据zone选择服务列表&#xff0c;然后轮询 2&#xff0e;负载均衡自定义方式 代码方式:配置灵活&#xff0c;但修…

OpenCV C++ 图像 批处理 (批量调整尺寸、批量重命名)

文章目录 图像 批处理(调整尺寸、重命名)图像 批处理(调整尺寸、重命名) 拿着棋盘格,对着相机变换不同的方角度,采集十张以上(以10~20张为宜);或者棋盘格放到桌上,拿着相机从不同角度一通拍摄。 以棋盘格,第一个内焦点为坐标原点,便于计算世界坐标系下三维坐标; …

Pycharm之配置python虚拟环境

最近给身边的人写了脚本&#xff0c;在自己电脑可以正常运行。分享给我身边的人&#xff0c;却运行不起来&#xff0c;然后把报错的截图给我看了&#xff0c;所以难道不会利用pycharm搭建虚拟的环境&#xff1f;记录一下配置的过程。 第一步&#xff1a;右键要打开的python的代…

利用jquery对HTML中的名字进行替代

想法&#xff1a;将网页中经常要修改的名字放在一个以jquery编写的js文件中&#xff0c;如果需要修改名字&#xff0c;直接修改js文件中的名字即可。 新建name_07.html文件&#xff0c;写入下面的代码&#xff1a; <!DOCTYPE html> <html> <head><meta …

mp4视频批量截取!!!

mp4视频批量截取&#xff01;&#xff01;&#xff01; 问题&#xff1a;如果我们想截取一个mp4视频中的多个片段&#xff0c;一个一个截会很麻烦&#xff01; 可以将想要截取的开始时间和结束时间保存到 excel表 中&#xff0c;进行批量截取。 1、对一个视频&#xff0c;记…