22.备份交换机-处理无法投递的消息

前面提到当消息发送给交换机,交换机出故障,或者队列出现故障,会反馈给生产者。

如果交换机备份,将无法投递的消息发送给备份交换机,再由备份交换机给备份队列和告警队列的思路,来防止消息不丢失。

小提示

之前如果创建了confirm.exhange这个交换机,需要删除后重新创建。

 代码

配置文件

spring:rabbitmq:host: 192.168.171.128username: adminpassword: 123port: 5672publisher-confirm-type: correlatedpublisher-returns: true

消费者

确认队列的消费者

package com.xkj.org.listener;import com.rabbitmq.client.Channel;
import com.xkj.org.config.BarkExchangeConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class ConfirmQueueConsumer {@RabbitListener(queues = BarkExchangeConfig.CONFIRM_QUEUE)public void receiveMsg(Message message, Channel channel) throws Exception {String msg = new String(message.getBody(), "UTF-8");log.info("收到队列的消息:{}",  msg);}
}

告警队列的消费者

package com.xkj.org.listener;import com.rabbitmq.client.Channel;
import com.xkj.org.config.BarkExchangeConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class WarningQueueConsumer {@RabbitListener(queues = BarkExchangeConfig.WARNING_QUEUE)public void receiveWarningMsg(Message message, Channel channel) throws Exception {String msg = new String(message.getBody(), "UTF-8");log.error("报警发现不可路由消息:{}", msg);}
}

生产者

@ApiOperation("测试发布确认发消息")@GetMapping("/sendMessage/{msg}")public void sendMessage(@ApiParam(value = "消息内容", required = true)@PathVariable("msg") String message) {CorrelationData correlation = new CorrelationData("1");rabbitTemplate.convertAndSend("confirm.exchange", "key1"+"123", message, correlation);log.info("发送消息内容{}", message);}

说明:这里故意把routingkey写错,让消息发送失败。 

配置类

package com.xkj.org.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class BarkExchangeConfig {//确认交换机public static final String CONFIRM_EXCHANGE = "confirm.exchange";//备份交换机public static final String BACKUP_EXCHANGE = "backup.exchange";//确认队列public static final String CONFIRM_QUEUE = "confirm.queue";//备份队列public static final String BACKUP_QUEUE = "backup.queue";//告警队列public static final String WARNING_QUEUE = "warning.queue";@Beanpublic DirectExchange confirmExchange() {//将确认交换机与备份交换机连接起来Map<String, Object> arguments = new HashMap<>();arguments.put("alternate-exchange", BACKUP_EXCHANGE);return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE).durable(true).withArguments(arguments).build();}@Beanpublic FanoutExchange backupExchange() {return new FanoutExchange(BACKUP_EXCHANGE);}@Beanpublic Queue confirmQueue() {return QueueBuilder.durable(CONFIRM_QUEUE).build();}@Beanpublic Queue backupQueue() {return QueueBuilder.durable(BACKUP_QUEUE).build();}@Beanpublic Queue warningQueue() {return QueueBuilder.durable(WARNING_QUEUE).build();}@Beanpublic Binding bindingConfirmQueueToExchange(@Qualifier("confirmQueue") Queue confirmQueue,@Qualifier("confirmExchange") DirectExchange confirmExchange) {return BindingBuilder.bind(confirmQueue).to(confirmExchange).with("key1");}@Beanpublic Binding bindingBackupQueueToExchange(@Qualifier("backupQueue") Queue backupQueue,@Qualifier("backupExchange") FanoutExchange backupExchange) {//删除交换机与队列绑定是不需要routingkey的return BindingBuilder.bind(backupQueue).to(backupExchange);}@Beanpublic Binding bindingWarningQueueToExchange(@Qualifier("warningQueue") Queue warningQueue,@Qualifier("backupExchange") FanoutExchange backupExchange) {return BindingBuilder.bind(warningQueue).to(backupExchange);}}

回调函数

package com.xkj.org.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import java.io.UnsupportedEncodingException;/*** 回调接口*/
@Slf4j
@Component // 1.第一步实例化MyCallback这个bean
public class MyCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {@Autowired // 2.第二步将rabbitTemplate实例依赖注入进来private RabbitTemplate rabbitTemplate;@PostConstructpublic void init() { //3.第三步执行此方法//将本类对象(ConfirmCallback的实现类对象)注入到RabbitTemplate中rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnCallback(this);}/*** 交换机确认回调方法* @param correlationData* @param ack* @param cause* 1.发消息 交换机收到 调用*  1.1 correlationData 回调消息的id及相关信息*  1.2 交换机收到消息 ack = true*  1.3 cause null* 2.发消息 交换机接收失败 回调*  2.1 correlationData 回调消息的id及相关信息*  2.2 交换机收到消息 ack = false*  2.3 cause 失败的原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {String id = correlationData != null ? correlationData.getId(): "";if(ack) {log.info("交换机已经接收到id为:{}的消息", id);}else {log.info("交换机还未收到id为:{}的消息,原因:{}", id, cause);}}/*** 可以在当消息传递过程中不可达目的时将消息返回给生产者* 注意此方法是消息传递失败才会调用,成功就不会执行* @param message* @param replyCode* @param replyText* @param exchange* @param routingKey*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {try {log.error("消息:{},被交换机:{},给退回了,原因:{},RoutingKey={}",new String(message.getBody(), "UTF-8"),exchange,replyText,routingKey);} catch (UnsupportedEncodingException e) {e.printStackTrace();}}
}

结论

这里mandatory参数与备份交换机一起使用的时候,如果两者同时开启,备份交换机的优先级更高。因为报警队列接收到没有路由成功的消息,但是没有输出消息被退回的错误。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/385666.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

探索LLM世界:新手小白的学习路线图

随着人工智能的发展&#xff0c;语言模型&#xff08;Language Models, LLM&#xff09;在自然语言处理&#xff08;NLP&#xff09;领域的应用越来越广泛。对于新手小白来说&#xff0c;学习LLM不仅能提升技术水平&#xff0c;还能为职业发展带来巨大的机遇。那么&#xff0c;…

BGP路由反射器

原理概述 缺省情况下&#xff0c;路由器从它的一个 IBGP对等体那里接收到的路由条目不会被该路由器再传递给其他IBGP对等体&#xff0c;这个原则称为BGP水平分割原则&#xff0c;该原则的根本作用是防止 AS内部的BGP路由环路。因此&#xff0c;在AS内部&#xff0c;一般需要每台…

SAP PP学习笔记31 - 计划运行的步骤2 - Scheduling(日程计算),BOM Explosion(BOM展开)

上一章讲了计划运行的5大步骤中的前两步&#xff0c;计算净需求和计算批量大小。 SAP PP学习笔记30 - 计划运行的步骤1 - Net requirements calculation 计算净需求(主要讲了安全库存要素)&#xff0c;Lot-size calculation 计算批量大小-CSDN博客 本章继续讲计划运行的后面几…

Golang | Leetcode Golang题解之第283题移动零

题目&#xff1a; 题解&#xff1a; func moveZeroes(nums []int) {left, right, n : 0, 0, len(nums)for right < n {if nums[right] ! 0 {nums[left], nums[right] nums[right], nums[left]left}right} }

vue3前端开发-小兔鲜项目-使用pinia插件完成token的本地存储

vue3前端开发-小兔鲜项目-使用pinia插件完成token的本地存储&#xff01;实际业务开发中&#xff0c;token是一个表示着用户登录状态的重要信息&#xff0c;它有自己的生命周期。因此&#xff0c;这个参数值必须实例化存储在本地中。不能跟着pinia。因为pinia是基于内存设计的模…

使用法国云手机进行面向法国的社媒营销

在当今数字化和全球化的时代&#xff0c;社交媒体已经成为企业营销和拓展市场的重要工具。对于想进入法国市场的企业来说&#xff0c;如何在海外社媒营销中脱颖而出、抓住更多的市场份额&#xff0c;成为了一个关键问题。法国云手机正为企业提供全新的营销工具&#xff0c;助力…

photoshop学习笔记——移动工具

移动工具&#xff0c;可以对图层进行移动&#xff0c;快捷键 V 使用的素材已经放上了&#xff0c;直接下载即可 按住ctrl 可以自动选取&#xff0c;鼠标点击哪个对象&#xff0c;自动选中哪个图层 按住 shift 校正角度&#xff08;只能沿着直线移动&#xff09; 按住 alt 拖…

Redis的分布式锁

目录 一、定义与原理 基于Redis的分布式锁 获取锁 释放锁 锁误删问题&#xff1a;因为key值一样,将别人的锁删掉了 锁误判问题二&#xff1a;判断锁和释放锁不是原子性的 Lua脚本 分布式锁&#xff1a;满足分布式系统或集群模式下多进程可见并且互斥的锁 分布式锁的优点…

Linux驱动开发——字符设备驱动开发

1 概述 1.1 说明 本文是学习rk3568开发板驱动开发的记录&#xff0c;代码依托于rk3568开发板 1.2 字符设备介绍 字符设备是 Linux 驱动中最基本的一类设备驱动&#xff0c;字符设备就是一个一个字节&#xff0c;按照字节流进行读写操作的设备&#xff0c;读写数据是分先后顺…

Navicat Charts Creator for Mac:数据可视化利器

Navicat Charts Creator for Mac是一款专为Mac用户设计的数据可视化工具&#xff0c;它将复杂的数据转化为直观、易懂的图表&#xff0c;帮助用户更好地理解和分析数据。 该软件支持连接到多种数据库&#xff0c;如MySQL、MariaDB、PostgreSQL等&#xff0c;轻松获取实时数据&…

23 PCBEditor封装创建向导介绍24 PCBEditor3D封装展示25 PCB封装库的管理与调用

23 PCBEditor封装创建向导介绍_BGA为例&&24 PCBEditor3D封装展示&&25 PCB封装库的管理与调用 第一部分 23 PCBEditor封装创建向导介绍_BGA为例一、创建焊盘二、PCBEditor创建封装 第二部分 24 PCBEditor3D封装展示第三部分 25 PCB封装库的管理与调用一、指定库…

黑马头条vue2.0项目实战(二)——登录注册功能的实现

1. 布局结构 目标 能实现登录页面的布局 能实现基本登录功能 能掌握 Vant 中 Toast 提示组件的使用 能理解 API 请求模块的封装 能理解发送验证码的实现思路 能理解 Vant Form 实现表单验证的使用 这里主要使用到三个 Vant 组件&#xff1a; NavBar 导航栏 Form 表单 F…

Linux文件恢复

很麻烦 一般还是小心最好 特别恢复的时候 可能不能选择某个文件夹去扫描恢复 所以 删除的时候 用rm -i代替rm 一定小心 以及 探索下linux的垃圾箱机制 注意 一定要恢复到不同文件夹 省的出问题 法1 系统自带工具 debugfs 但是好像不能重启&#xff1f; testdisk 1、安装 …

C++项目——高并发内存池

一、什么是内存池 内存池(Memory Pool) 是一种动态内存分配与管理技术。 通常情况下&#xff0c;程序员习惯直接使用new、delete、malloc、free 等API申请分配和释放内存&#xff0c;这样导致的后果是&#xff1a;当程序长时间运行时&#xff0c;由于所申请内存块的大小不定&…

OpenCV 图像预处理—图像金字塔

文章目录 相关概念高斯金字塔拉普拉斯金字塔应用 构建高斯金字塔为什么要对当前层进行模糊&#xff1f;1. 平滑处理2. 减少混叠&#xff08;Aliasing&#xff09;3. 多尺度表示4. 图像降采样 举个栗子创建高斯金字塔和拉普拉斯金字塔&#xff0c;并用拉普拉斯金字塔恢复图像 相…

【VUE】个人记录:父子页面数据传递

1. 父传子 在父页面中&#xff0c;调用子页面的组件位置处&#xff0c;通过“&#xff1a;”进行参数传递 <child-component :childData"parentData"></child-component>parentData对象&#xff0c;需要在父页面的data中进行初始化声明 在子页面中&am…

百易云资产管理运营系统 comfileup.php 文件上传致RCE漏洞复现(XVE-2024-18154)

0x01 产品简介 百易云资产管理运营系统,是专门针对企业不动产资产管理和运营需求而设计的一套综合解决方案。该系统能够覆盖资产的全生命周期管理,包括资产的登记、盘点、评估、处置等多个环节,同时提供强大的运营分析功能,帮助企业优化资产配置,提升运营效率。 0x02 漏…

为RTEMS Raspberrypi4 BSP添加SPI支持

为RTEMS Raspberrypi4 BSP添加SPI支持 主要参考了dev/bsps/shared/dev/spi/cadence-spi.c RTEMS 使用了基于linux的SPI框架&#xff0c;SPI总线驱动已经在内核中实现。在这个项目中我需要实习的是 RPI4的SPI主机控制器驱动 SPI在RTEMS中的实现如图&#xff1a; 首先需要将S…

Profinet从站转TCP/IP协议转化网关(功能与配置)

如何将Profinet和TCP/IP网络连接通讯起来呢?近来几天有几个朋友问到这个问题&#xff0c;那么作者在这里统一说明一下。其实有一个不错的设备产品可以很轻易地解决这个问题&#xff0c;名为JM-DNT-PN。接下来作者就从该设备的功能及配置详细说明一下。 一&#xff0c;设备主要…

Python:随机数、随机选择的应用

step1:导入 导入的random相当于是创建了random文件里的的一个对象 import random random() 产生0~1随机数 randint(a,b)产生a~b的整数 闭区间&#xff0c;可以取到a,b random.choice(touple_name)从touple_name&#xff08;数组、列表..&#xff09;中随机选择元素 import rand…