springboot整合rabbitmq发布确认高级

在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 RabbitMQ 重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。于是,我们如何才能进行 RabbitMQ 的消息可靠投递。

发布确认  

发布确认方案

 架构

 

配置文件 

在配置文件当中添加 spring.rabbitmq.publisher-confirm-type=correlated

  NONE:禁用发布确认模式,是默认值
  CORREL:ATED发布消息成功到交换器后会触发回调方法
spring.rabbitmq.host=43.139.59.23
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123
spring.rabbitmq.publisher-confirm-type=correlated

配置类

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class ConfirmConfig {public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";public static final String CONFIRM_QUEUE_NAME = "confirm.queue";//声明业务 Exchange@Bean("confirmExchange")public DirectExchange confirmExchange(){return new DirectExchange(CONFIRM_EXCHANGE_NAME);}// 声明确认队列@Bean("confirmQueue")public Queue confirmQueue(){return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();}// 声明确认队列绑定关系@Beanpublic Binding queueBinding(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with("key1");}
}

 生产者

import com.example.demo.component.MyCallBack;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.PostConstruct;
import javax.annotation.Resource;@RestController
@RequestMapping("/confirm")
@Slf4j
public class Producer {public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";@Resourceprivate RabbitTemplate rabbitTemplate;@Resourceprivate MyCallBack myCallBack;//依赖注入 rabbitTemplate 之后再设置它的回调对象@PostConstructpublic void init(){rabbitTemplate.setConfirmCallback(myCallBack);}@GetMapping("sendMessage/{message}")public void sendMessage(@PathVariable String message){//指定消息 id 为 1CorrelationData correlationData1=new CorrelationData("1");String routingKey="key1";rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey,message+routingKey,correlationData1);CorrelationData correlationData2=new CorrelationData("2");routingKey="key2";rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey,message+routingKey,correlationData2);log.info("发送消息内容:{}",message);}
}

 回调接口

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback {/*** 交换机不管是否收到消息的一个回调方法* CorrelationData* 消息相关数据* ack* 交换机是否收到消息*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {String id=correlationData!=null?correlationData.getId():"";if(ack){log.info("交换机已经收到 id 为:{}的消息",id);}else{log.info("交换机还未收到 id 为:{}消息,由于原因:{}",id,cause);}}
}

 消费者

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class ConfirmConsumer {public static final String CONFIRM_QUEUE_NAME = "confirm.queue";@RabbitListener(queues =CONFIRM_QUEUE_NAME)public void receiveMsg(Message message){String msg=new String(message.getBody());log.info("接受到队列 confirm.queue 消息:{}",msg);}
}

 结果

可以看到,发送了两条消息,第一条消息的 RoutingKey "key1" ,第二条消息的 RoutingKey
"key2" ,两条消息都成功被交换机接收,也收到了交换机的确认回调,但消费者只收到了一条消息,因为第二条消息的 RoutingKey 与队列的 BindingKey 不一致,也没有其它队列能接收这个消息,所以第二条消息被直接丢弃了。

 回退消息

Mandatory 参数 

在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息 果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的 。那么如何让无法被路由的消息帮我想办法处理一下?最起码通知我一声,我好自己处理啊。通过设置 mandatory 参 数可以在当消息传递过程中不可达目的地时将消息返回给生产者。

 生产者

import com.example.demo.component.MyCallBack;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.UUID;@RestController
@RequestMapping("/confirm")
@Slf4j
public class Producer {public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";@Resourceprivate RabbitTemplate rabbitTemplate;@Resourceprivate MyCallBack myCallBack;//依赖注入 rabbitTemplate 之后再设置它的回调对象@PostConstructpublic void init(){rabbitTemplate.setConfirmCallback(myCallBack);/*** true:* 交换机无法将消息进行路由时,会将该消息返回给生产者* false:* 如果发现消息无法进行路由,则直接丢弃*/rabbitTemplate.setMandatory(true);//设置回退消息交给谁处理rabbitTemplate.setReturnCallback(myCallBack);}@GetMapping("sendMessage")public void sendMessage(String message){//让消息绑定一个 id 值CorrelationData correlationData1 = new CorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,"key1",message+"key1",correlationData1);log.info("发送消息 id 为:{}内容为{}",correlationData1.getId(),message+"key1");CorrelationData correlationData2 = new CorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,"key2",message+"key2",correlationData2);log.info("发送消息 id 为:{}内容为{}",correlationData2.getId(),message+"key2");}
}

 回调接口

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {/*** 交换机不管是否收到消息的一个回调方法* CorrelationData* 消息相关数据* ack* 交换机是否收到消息*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {String id=correlationData!=null?correlationData.getId():"";if(ack){log.info("交换机已经收到 id 为:{}的消息",id);}else{log.info("交换机还未收到 id 为:{}消息,由于原因:{}",id,cause);}}//当消息无法路由的时候的回调方法@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, Stringexchange, String routingKey) {log.error(" 消 息 {}, 被交换机 {} 退回,退回原因 :{}, 路 由 key:{}",newString(message.getBody()),exchange,replyText,routingKey);}
}

 结果

 接收到被退回的消息

备份交换机 

有了 mandatory 参数和回退消息,我们获得了对无法投递消息的感知能力,有机会在生产者的消息无法被投递时发现并处理。但有时候,我们并不知道该如何处理这些无法路由的消息,最多打个日志,然后触发报警,再来手动处理。而通过日志来处理这些无法路由的消息是很不优雅的做法,特别是当生产者所在的服务有多台机器的时候,手动复制日志会更加麻烦而且容易出错。而且设置 mandatory 参数会增加生产者的复杂性,需要添加处理这些被退回的消息的逻辑。如果既不想丢失消息,又不想增加生产者的复杂性,该怎么做呢?前面在设置死信队列的文章中,我们提到,可以为队列设置死信交换机来存储那些处理失败的消息,可是这些不可路由消息根本没有机会进入到队列,因此无法使用死信队列来保存消息。在 RabbitMQ 中,有一种备份交换机的机制存在,可以很好的应对这个问题。什么是备份交换机呢?备份交换机可以理解为 RabbitMQ 中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时,就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进入这个队列了。

架构

 配置类

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class ConfirmConfig {public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";public static final String CONFIRM_QUEUE_NAME = "confirm.queue";public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";public static final String BACKUP_QUEUE_NAME = "backup.queue";public static final String WARNING_QUEUE_NAME = "warning.queue";// 声明确认队列@Bean("confirmQueue")public Queue confirmQueue(){return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();}//声明确认队列绑定关系@Beanpublic Binding queueBinding(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with("key1");}//声明备份 Exchange@Bean("backupExchange")public FanoutExchange backupExchange(){return new FanoutExchange(BACKUP_EXCHANGE_NAME);}//声明确认 Exchange 交换机的备份交换机@Bean("confirmExchange")public DirectExchange confirmExchange(){ExchangeBuilder exchangeBuilder =ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true)//设置该交换机的备份交换机.withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME);return (DirectExchange)exchangeBuilder.build();}// 声明警告队列@Bean("warningQueue")public Queue warningQueue(){return QueueBuilder.durable(WARNING_QUEUE_NAME).build();}// 声明报警队列绑定关系@Beanpublic Binding warningBinding(@Qualifier("warningQueue") Queue queue,@Qualifier("backupExchange") FanoutExchange backupExchange){return BindingBuilder.bind(queue).to(backupExchange);}// 声明备份队列@Bean("backQueue")public Queue backQueue(){return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();}// 声明备份队列绑定关系@Beanpublic Binding backupBinding(@Qualifier("backQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange backupExchange){return BindingBuilder.bind(queue).to(backupExchange);}
}

 报警消费者

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class ConfirmConsumer {public static final String CONFIRM_QUEUE_NAME = "confirm.queue";@RabbitListener(queues =CONFIRM_QUEUE_NAME)public void receiveMsg(Message message){String msg=new String(message.getBody());log.info("接受到队列 confirm.queue 消息:{}",msg);}
}

 

结果 

mandatory 参数与备份交换机可以一起使用的时候,如果两者同时开启,消息究竟何去何从?谁优先级高,经过上面结果显示答案是备份交换机优先级高

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/105286.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

npm install 安装依赖,报错 Host key verification failed

设置 git 的身份和邮箱 git config --global user.name "你的名字" > 用户名 git config --global user.email “你的邮箱" > 邮箱进入 > 用户 > [你的用户名] > .ssh文件夹下,删除 known_hosts 文件即可 进入之后有可能会看到 known_hosts…

知识图谱Neo4j安装到实践全过程

前言: Hello大家好,我是Dream。 在本次实战中,我们将一起完成知识图谱Neo4j安装到实践全过程,探索其中的关系和属性。知识图谱是一种以三元组形式存储的数据结构,由实体、关系和属性组成,能够帮助我们更好地…

SpringMVC之@RequestMapping注解

文章目录 前言一、RequestMapping介绍二、详解(末尾附源码,自行测试)1.RequestMapping注解的位置2.RequestMapping注解的value属性3.RequestMapping注解的method属性4.RequestMapping注解的params属性(了解)5.RequestM…

飞书小程序开发

1.tt.showModal后跳转页面 跳转路径要为绝对路径,相对路径跳转无响应。 2.手机息屏后将不再进入onload()生命周期,直接进入onshow()生命周期。 onLoad()在页面初始化的时候触发,一个页面只调用一次。 onShow()在切入前台时就会触发&#x…

网红景区游乐设备普乐蛙5d动感影院体验馆设备组成内容

一个5D7D动感影院体验馆的全套设备组成通常包括以下几个方面: 电影播放设备:包括主控制器、电影播放器、电影储存设备等,用于播放5D电影。 影院座椅:一般采用特殊设计的动感座椅,具备震动、摇晃、抖动等功能&#xff0…

免费的png打包plist工具CppTextu,一款把若干资源图片拼接为一张大图的免费工具

经常做游戏打包贴图的都知道,要把图片打包为一张或多张大图,要使用打包工具TexturePacker。 TexturePacker官方版可以直接导入PSD、SWF、PNG、BMP等常见的图片格式,主要用于网页、游戏和动画的制作,它可以将多个小图片汇聚成一个…

保研面试题复习

信源/信道编码的目的和种类? 这个图是每个人在学习通信原理的时候,都会遇到的图。包含了三要素:信源、信道和信宿。这个图直接可以回答最开始的问题,所谓信源编码就是针对信源编码,所谓信道编码就是针对信道编码。 有…

docker之DockerFile与网络

目录 DockerFile 构建步骤 基础知识 指令 实战:构建自己的centos 第一步:编写dockerfile文件 第二步:构建镜像文件 docker网络 原理 功能 网络模式 host模式 container模式 none模式 bridge模式 DockerFile dockerfile 是用来…

Java | IDEA中Netty运行多个client的方法

想要运行多个client但出现这种提示: 解决方法 1、打开IDEA,右上角找到下图,并点击 2、勾选

Node.js 的 Buffer 是什么?一站式了解指南

在 Node.js 中,Buffer 是一种用于处理二进制数据的机制。它允许你在不经过 JavaScript 垃圾回收机制的情况下直接操作原始内存,从而更高效地处理数据,特别是在处理网络流、文件系统操作和其他与 I/O 相关的任务时。Buffer 是一个全局对象&…

layui框架学习(37:学习laytpl基本语法)

layui中的模板引擎模块laytpl属于轻量的 JavaScript 模板引擎,支持在页面中将指定的数据按指定的模板进行展示或处理,此处的模板是指一段包含html和脚本的文本(感觉类似asp.net core中的razor标记语言,在网页中嵌入基于服务器的代…

Cent OS 中各个文件夹功能

目录 一、根目录简述 二、目录详细分级说明 (一)bin (二)sbin (三)lib (四)etc (五)dev (六)usr (七&#xff09…

vue组件的使用

一、首先要穿件组件构造器对象,或者导入组件 1..在本部分注册组件其中组件为子组件 2.在本部分注册组件 二、而后注册组件 1.在本部分注册组件其中组件为子组件 2.在本部分注册组件 三、 接着,使用组件。 1.在本部分注册组件其中组件为子组件 其中v-i…

开源硬件:下一个技术革命?

🌷🍁 博主猫头虎 带您 Go to New World.✨🍁 🦄 博客首页——猫头虎的博客🎐 🐳《面试题大全专栏》 文章图文并茂🦕生动形象🦖简单易学!欢迎大家来踩踩~🌺 &a…

【Git游戏】提交的技巧

修改历史的提交 rebase 通过git rebase -i 将要修改的提交提到最前端, 然后修改,再通过git commit --amend提交该记录,最后通过git rebase -i 在替换会原始的位置 (该过程中有可能会产生rebase confict) cherry-pick …

EMR电子病历系统 SaaS电子病历编辑器源码 电子病历模板编辑器

EMR(Electronic Medical Record)指的是电子病历。它是一种基于电子文档的个人医疗记录,可以包括病人的病史、诊断、治疗方案、药物处方、检查报告和护理计划等信息。EMR采用计算机化的方式来存储、管理和共享这些信息,以便医生和医…

浅谈视频汇聚平台EasyCVR视频平台在城市安全综合监测预警台风天气中的重要作用

夏日已至,台风和暴雨等极端天气频繁出现。在城市运行过程中,台风所带来的暴雨可能会导致城市内涝等次生灾害,引发交通瘫痪、地铁停运、管网泄漏爆管、路面塌陷、防洪排涝、燃气爆炸、供热安全、管廊安全、消防火灾等安全隐患,影响…

储能运行约束的Matlab建模方法

最近一段时间有很多人问我最优潮流计算中储能系统的建模方法。部分朋友的问题我回复了,有些没有回消息的,我就不再一一回复了,在这里我写一篇博客统一介绍一下。 1.储能系统介绍 首先,让【GPT】简单介绍一下储能系统:…

Qt --- 显示相关设置 窗口属性等

主界面,窗口 最小化 最大化 关闭按钮、显示状态自定义: setWindowFlags(Qt::CustomizeWindowHint); setWindowFlags(Qt::WindowCloseButtonHint); //只要关闭按钮 setWindowFlags(Qt::WindowFlags type) Qt::FrameWindowHint:没有边框的窗口 Qt::Window…

WPF基础入门-Class3-WPF数据模板

WPF基础入门 Class3&#xff1a;WPF数据模板 1、先在cs文件中定义一些数据 public partial class Class_4 : Window{public Class_4(){InitializeComponent();List<Color> test new List<Color>();test.Add(new Color() { Code "Yellow", Name &qu…