SpringBoot集成多个rabbitmq

1、pom文件

<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.4.9</version>
</dependency>

2、rabbitmq的连接配置文件

spring:rabbitmq:mq1:host: xxx.xxx.xxx.xxxport: 5672username: xxxxpassword: xxxxxenable: truemq2:host: xxx.xxx.xxx.xxxport: 5672username: xxxxxpassword: xxxxxenable: true

3、mq1的相关代码  MQ1RabbitConfiguration.java

package com.pojo.config;import lombok.Data;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;@Data
@Component("mq1RabbitmqConfig")
@ConfigurationProperties(prefix = "spring.rabbitmq.mq1") //读取mq1的配置信息
@ConditionalOnProperty(name = "spring.rabbitmq.mq1.enable", havingValue = "true") //是否启用
public class MQ1RabbitConfiguration {private String host;private Integer port;private String username;private String password;@Autowiredprivate ReturnCallBack1 returnCallBack1;@Autowiredprivate ConfirmCallBack1 confirmCallBack1;@Bean(name = "mq1ConnectionFactory")//命名mq1的ConnectionFactory,如果项目中只有一个mq则不必如此@Primarypublic ConnectionFactory createConnectionFactory() {//消息队列1的连接CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setHost(host);connectionFactory.setPort(port);connectionFactory.setUsername(username);connectionFactory.setPassword(password);//开启发送到交换机和队列的回调connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);return connectionFactory;}@Bean(name = "mq1RabbitTemplate")//命名mq1的RabbitTemplate,如果项目中只有一个mq则不必如此@Primarypublic RabbitTemplate brainRabbitTemplate(@Qualifier("mq1ConnectionFactory") ConnectionFactory connectionFactory) {//消息生产RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);//发送消息时设置强制标志,仅当提供了returnCallback时才适用rabbitTemplate.setMandatory(true);//确保消息是否发送到交换机,成功与失败都会触发rabbitTemplate.setConfirmCallback(confirmCallBack1);//确保消息是否发送到队列,成功发送不触发,失败触发rabbitTemplate.setReturnsCallback(returnCallBack1);return rabbitTemplate;}@Bean(name = "simpleRabbitListenerContainerFactory1")@Primarypublic SimpleRabbitListenerContainerFactory firstFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,@Qualifier("mq1ConnectionFactory") ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();configurer.configure(factory, connectionFactory);return factory;}@Bean(name = "subQueue01")public Queue firstQueue() {return new Queue("subQueue01");}@Bean(name = "subQueue02")public Queue secondQueue() {return new Queue("subQueue02");}@Bean(name = "subQueue03")public Queue thirdQueue() {return new Queue("subQueue03", true);}@Bean(name = "subQueue04")public Queue fourQueue() {return new Queue("subQueue04", true);}@Bean(name = "topicExchangeOne")public TopicExchange topicExchange() {
//        Direct exchange(直连交换机)
//        Fanout exchange(扇型交换机)
//        Topic exchange(主题交换机)
//        Headers exchange(头交换机)
//        Dead Letter Exchange(死信交换机)return new TopicExchange("topicExchangeOne");}@Bean(name = "binding1")public Binding binding1(@Qualifier("subQueue01") Queue queue, TopicExchange exchange) {//绑定队列1到TopicExchange  routingKey是队列1的队列名return BindingBuilder.bind(queue).to(exchange).with("subQueue01");}@Bean(name = "fanoutExchangeOne")public FanoutExchange fanoutExchange() {
//        Direct exchange(直连交换机)
//        Fanout exchange(扇型交换机)
//        Topic exchange(主题交换机)
//        Headers exchange(头交换机)
//        Dead Letter Exchange(死信交换机)return new FanoutExchange("fanoutExchangeOne");}@Bean(name = "binding3")public Binding binding3(@Qualifier("subQueue03") Queue queue, FanoutExchange exchange) {//绑定队列3到fanoutExchange  队列3和队列4都能消费fanoutExchange的消息return BindingBuilder.bind(queue).to(exchange);}@Bean(name = "binding4")public Binding binding4(@Qualifier("subQueue04") Queue queue, FanoutExchange exchange) {//绑定队列4到fanoutExchange  队列3和队列4都能消费fanoutExchange的消息return BindingBuilder.bind(queue).to(exchange);}}

ConfirmCallBack1 .java

package com.pojo.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class ConfirmCallBack1 implements RabbitTemplate.ConfirmCallback {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String s) {if (!ack) {log.info("ConfirmCallBack1消息发送交换机失败:{}", s);} else {log.info("ConfirmCallBack1消息发送交换机成功");}}
}
ReturnCallBack1.java
package com.pojo.config;import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class ReturnCallBack1 implements RabbitTemplate.ReturnsCallback {@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.info("ReturnCallBack1消息发送队列失败:{}", JSON.toJSON(returnedMessage));}
}

4、mq2的相关代码

  MQ2RabbitConfiguration.java

package com.pojo.config;import lombok.Data;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;@Data
@Component("mq2RabbitmqConfig")
@ConfigurationProperties(prefix = "spring.rabbitmq.mq2") //读取mq1的配置信息
@ConditionalOnProperty(name = "spring.rabbitmq.mq2.enable", havingValue = "true") //是否启用
public class MQ2RabbitConfiguration {private String host;private Integer port;private String username;private String password;@Autowiredprivate ReturnCallBack2 returnCallBack2;@Autowiredprivate ConfirmCallBack2 confirmCallBack2;@Bean(name = "mq2ConnectionFactory")   //命名mq1的ConnectionFactory,如果项目中只有一个mq则不必如此public ConnectionFactory createConnectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setHost(host);connectionFactory.setPort(port);connectionFactory.setUsername(username);connectionFactory.setPassword(password);//开启发送到交换机和队列的回调connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);return connectionFactory;}@Bean(name = "mq2RabbitTemplate") //命名mq1的RabbitTemplate,如果项目中只有一个mq则不必如此public RabbitTemplate brainRabbitTemplate(@Qualifier("mq2ConnectionFactory") ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);//发送消息时设置强制标志,仅当提供了returnCallback时才适用rabbitTemplate.setMandatory(true);//确保消息是否发送到交换机,成功与失败都会触发rabbitTemplate.setConfirmCallback(confirmCallBack2);//确保消息是否发送到队列,成功发送不触发,失败触发rabbitTemplate.setReturnsCallback(returnCallBack2);return rabbitTemplate;}@Bean(name = "simpleRabbitListenerContainerFactory2")public SimpleRabbitListenerContainerFactory secondFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,@Qualifier("mq2ConnectionFactory") ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();configurer.configure(factory, connectionFactory);return factory;}}

ConfirmCallBack2.java

package com.pojo.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class ConfirmCallBack2 implements RabbitTemplate.ConfirmCallback {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String s) {if (!ack) {log.info("ConfirmCallBack2消息发送交换机失败:{}", s);} else {log.info("ConfirmCallBack2消息发送交换机成功");}}
}

ReturnCallBack2.java

package com.pojo.config;import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class ReturnCallBack2 implements RabbitTemplate.ReturnsCallback {@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.info("ReturnCallBack2消息发送队列失败:{}", JSON.toJSON(returnedMessage));}
}

5、消息生产者

package com.pojo.prj.controller;import com.pojo.common.anno.NoNeedLogin;
import com.pojo.common.base.ApplicationContextUtils;
import com.pojo.common.base.BaseController;
import com.pojo.util.ResponseResult;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;/*** <p>* 项目表 前端控制器* </p>** @author zhushangjin* @menu 项目管理* @since 2022-11-14*/
@RestController
@Slf4j
public class ProjectController extends BaseController {@Resource(name = "mq1RabbitTemplate")//初始化mq1的RabbitTemplate对象private RabbitTemplate mq1RabbitTemplate;@Resource(name = "mq2RabbitTemplate")//初始化mq1的RabbitTemplate对象private RabbitTemplate mq2RabbitTemplate;/*** 获取项目下拉列表** @return* @status done*/@GetMapping("/prj/project/list")@NoNeedLoginpublic ResponseResult<String> list() {String active = ApplicationContextUtils.getActiveProfile();logger.error(ApplicationContextUtils.getActiveProfile());return ResponseResult.ok("ReturnCallBack2");}@GetMapping("/prj/project/test1")public ResponseResult test1() {//发送到topicExchangeOne类型的交换机,根据routekey去找发送到哪个队列里,// 只有这一个队列才能收到这条消息String str = "test1test1test1test1test1";mq1RabbitTemplate.convertAndSend("topicExchangeOne","subQueue01", str);return buildResponseResult(true);}@GetMapping("/prj/project/test2")public ResponseResult test2() {//发送到direct类型的交换机,根据routekey去找发送到哪个队列里,//只有这一个队列才能收到这条消息mq2RabbitTemplate.convertAndSend("subQueue02", "test2test2test2test2test2");return buildResponseResult(true);}@GetMapping("/prj/project/test3")public ResponseResult test3() {//发送到fanout类型的交换机,跟这个交换机绑定的队列都会收到这一条消息,// 故第二个参数routekey无需填写mq1RabbitTemplate.convertAndSend("fanoutExchangeOne", null, "test3test3test3test3test3");return buildResponseResult(true);}}

6、消息消费者

Receiver1.java

package com.pojo.config;import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@RabbitListener(queues = "subQueue01", containerFactory = "simpleRabbitListenerContainerFactory1")
public class Receiver1 {@RabbitHandler(isDefault = true)public void process(String hello) {System.out.println("Receiver1: " + hello);}}

Receiver2.java

package com.pojo.config;import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@RabbitListener(queues = "subQueue02", containerFactory = "simpleRabbitListenerContainerFactory2")
public class Receiver2 {@RabbitHandler(isDefault = true)public void process(String hello) {System.out.println("Receiver2: " + hello);}}

Receiver3.java

package com.pojo.config;import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@RabbitListener(queues = "subQueue03", containerFactory = "simpleRabbitListenerContainerFactory1")
public class Receiver3 {@RabbitHandler(isDefault = true)public void process(String hello) {System.out.println("Receiver3 : " + hello);}}

Receiver4.java

package com.pojo.config;import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@RabbitListener(queues = "subQueue04", containerFactory = "simpleRabbitListenerContainerFactory1")
public class Receiver4 {@RabbitHandler(isDefault = true)public void process(String hello) {System.out.println("Receiver4 : " + hello);}}

创建队列

@Bean(name = "uavTopicQueue")public Queue topicQueue() {Map<String, Object> argsMap = new HashMap<String, Object>();argsMap.put("x-max-priority", 5);Queue queue = new Queue(UAV_QUEUE, true, false, false, argsMap);return queue;}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/475836.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

利用 TensorFlow Profiler:在 AMD GPU 上优化 TensorFlow 模型

TensorFlow Profiler in practice: Optimizing TensorFlow models on AMD GPUs — ROCm Blogs 简介 TensorFlow Profiler 是一组旨在衡量 TensorFlow 模型执行期间资源利用率和性能的工具。它提供了关于模型如何与硬件资源交互的深入见解&#xff0c;包括执行时间和内存使用情…

游戏引擎学习第15天

视频参考:https://www.bilibili.com/video/BV1mbUBY7E24 关于游戏中文件输入输出&#xff08;IO&#xff09;操作的讨论。主要分为两类&#xff1a; 只读资产的加载 这部分主要涉及游戏中用于展示和运行的只读资源&#xff0c;例如音乐、音效、美术资源&#xff08;如 3D 模型和…

(二)Ubuntu22.04+Stable-Diffusion-webui AI绘画 中英双语插件安装

一、说明 看情况添加 二、双语安装 双语插件 https://github.com/journey-ad/sd-webui-bilingual-localization 中文语言包 https://github.com/dtlnor/stable-diffusion-webui-localization-zh_CN 先装中文语言包 错误&#xff1a;AssertionError: extension access disable…

UE5 DownloadImage加载jpg失败的解决方法

DownloadImage加载jpg失败的解决方法 现象解决方案具体方法 现象 用UE自带的 DownloadImage 无法下载成功&#xff0c;从 failure 引脚出来。 接入一个由监控器自动保存起的图像&#xff0c;有些可以正常加载成功&#xff0c;有些无法加载成功。 经调查问题出现在&#xff0c;…

Elasticsearch 中的热点以及如何使用 AutoOps 解决它们

作者&#xff1a;来自 Elastic Sachin Frayne 探索 Elasticsearch 中的热点以及如何使用 AutoOps 解决它。 Elasticsearch 集群中出现热点的方式有很多种。有些我们可以控制&#xff0c;比如吵闹的邻居&#xff0c;有些我们控制得较差&#xff0c;比如 Elasticsearch 中的分片分…

Statsmodels之OLS回归

目录 Statsmodels基本介绍OLS 回归实战实战1&#xff1a;实战2&#xff1a; Statsmodels基本介绍 Statsmodels 是 Python 中一个强大的统计分析包&#xff0c;包含了回归分析、时间序列分析、假设检验等等的功能。Statsmodels 在计量的简便性上是远远不及 Stata 等软件的&…

【接口封装】—— 1、加载样式表

函数定义 static void loadStyleSheet(QWidget* widget, const QString &fileName,const QString& otherStyleQString());&#xff08;头文件&#xff09;&#xff1a; #include <qfile.h> #include <QWidget> 源文件: void CommonUtils::loadStyleSheet(…

AI、VR与空间计算:教育和文旅领域的数字转型力量

在这个数字技术高速发展的时代&#xff0c;AI、VR技术及大空间计算技术&#xff0c;已成为推动多个行业革新的强劲动力。近日&#xff0c;世优科技推出了最新研发的VR大空间产品《山海经》&#xff0c;这一全新的沉浸式体验项目不仅重新定义了观展方式&#xff0c;还为文化旅游…

AWTK 最新动态:支持鸿蒙系统(HarmonyOS Next)

HarmonyOS是全球第三大移动操作系统&#xff0c;有巨大的市场潜力&#xff0c;在国产替代的背景下&#xff0c;机会多多&#xff0c;AWTK支持HarmonyOS&#xff0c;让AWTK开发者也能享受HarmonyOS生态的红利。 AWTK全称为Toolkit AnyWhere&#xff0c;是ZLG倾心打造的一套基于C…

数据库、数据仓库、数据湖、数据中台、湖仓一体的概念和区别

数据库、数据仓库、数据湖、数据中台和湖仓一体是数据管理和分析领域的不同概念&#xff0c;各自有不同的特点和应用场景。以下是它们的主要区别&#xff1a; 1. 数据库&#xff08;Database&#xff09; 定义&#xff1a;结构化的数据存储系统&#xff0c;用于高效地存储、检…

Linux运维篇-iscsi存储搭建

目录 概念实验介绍环境准备存储端软件安装使用targetcli来管理iSCSI共享存储 客户端软件安装连接存储 概念 iSCSI是一种在Internet协议上&#xff0c;特别是以太网上进行数据块传输的标准&#xff0c;它是一种基于IP Storage理论的存储技术&#xff0c;该技术是将存储行业广泛…

Django一分钟:django中收集关联对象关联数据的方法

场景&#xff1a;我有一个模型&#xff0c;被其它多个模型关联&#xff0c;我配置了CASCADE级联删除&#xff0c;我想要告知用户删除该实例之后&#xff0c;哪些关联数据将会被一同删除。 假设我们当前有这样一组模型&#xff1a; class Warehouse(models.Model):""…

iPhone 17 Air看点汇总:薄至6mm 刷新苹果轻薄纪录

我们姑且将这款iPhone 17序列的超薄SKU称为“iPhone 17 Air”&#xff0c;Jeff Pu在报告中提到&#xff0c;我同意最近关于 iPhone 17超薄机型采用6 毫米厚度超薄设计的传言。 如果这一测量结果被证明是准确的&#xff0c;那么将有几个值得注意的方面。 首先&#xff0c;iPhone…

Tcp协议Socket编程

&#x1f30e; Tcp协议Socket编程 本次socket编程需要使用到 日志文件&#xff0c;此为具体日志编写过程。以及 线程池&#xff0c;线程池原理比较简单&#xff0c;看注释即可。 文章目录&#xff1a; Tcp协议Socket编程 TCP Socket API简介 构建Tcp_echo_server      …

嵌入式系统中QT实现网络通信方法

大家好,今天主要给大家分享一下,如何使用QT中的网络编程实现。 第一:QT网络编程基本简介 QT中网络模块为提供了可以使用TCP/IP客户端与服务器的类。它提供了较低级别的类,例如代表低级网络概念的 QTcpSocket, QTcpServer 和 QUdpSocket,以及诸如 QNetworkRequest, QNetw…

【卡尔曼滤波】数据预测Prediction观测器的理论推导及应用 C语言、Python实现(Kalman Filter)

【卡尔曼滤波】数据预测Prediction观测器的理论推导及应用 C语言、Python实现&#xff08;Kalman Filter&#xff09; 更新以gitee为准&#xff1a; 文章目录 数据预测概念和适用方式线性系统的适用性 数据预测算法和卡尔曼滤波公式推导状态空间方程和观测器先验估计后验估计…

大模型时代的具身智能系列专题(十三)

迪士尼研究中心 瑞士苏黎世迪斯尼研究中心致力于不同领域的业务活动&#xff0c;其中包括电影、电视、公园和度假村以及消费产品。我们针对所有这些领域进行科研工作。我们开发能使我们将后道生产元素整合到前级生产中的技术。由此可节省许多昂贵的效果&#xff0c;这些效果最…

IDEA2023设置控制台日志输出到本地文件

1、Run->Edit Configurations 2、选择要输出日志的日志&#xff0c;右侧&#xff0c;IDEA2023的Logs在 Modify option 里 选中就会展示Logs栏。注意一定要先把这个日志文件创建出来&#xff0c;不然不会自动创建日志文件的 IDEA以前版本的Logs会直接展示出来 3、但是…

o1的风又吹到多模态,直接吹翻了GPT-4o-mini

开源LLaVA-o1&#xff1a;一个设计用于进行自主多阶段推理的新型VLM。与思维链提示不同&#xff0c;LLaVA-o1独立地参与到总结、视觉解释、逻辑推理和结论生成的顺序阶段。 LLaVA-o1超过了一些更大甚至是闭源模型的性能&#xff0c;例如Gemini-1.5-pro、GPT-4o-mini和Llama-3.…

AJAX的基本使用

AJAX的基本使用 &#x1f389;&#x1f389;&#x1f389;欢迎来到我的博客,我是一名自学了2年半前端的大一学生,熟悉的技术是JavaScript与Vue.目前正在往全栈方向前进, 如果我的博客给您带来了帮助欢迎您关注我,我将会持续不断的更新文章!!!&#x1f64f;&#x1f64f;&#x…