SpringBoot集成MQ,四种交换机的实例

RabbitMQ交换机(Exchange)的核心作用

在RabbitMQ中,​交换机 是消息路由的核心组件,负责接收生产者发送的消息,并根据规则(如路由键、头信息等)将消息分发到对应的队列中。
不同交换机类型决定了消息的路由逻辑,使用不同的交换机在不同的场景下可以提高消息系统的高可用性。

1. 直连交换机(Direct Exchange)​

路由机制
  • 精确匹配路由键(Routing Key)​:消息会被发送到与 Routing Key ​完全匹配 的队列。
  • 典型场景:一对一或一对多的精确消息分发。
应用场景
  • 任务分发:如订单处理系统,根据订单类型(如 order.paymentorder.shipping)分发到不同队列。
  • 日志分类:将不同级别的日志(log.errorlog.info)路由到对应的处理服务。

 使用直连交换机实现消息发送和接收

1.创建一个SpringBoot项目,在yml文件配置如下:

server:port: 8021
spring: application:name: rabbitmq-provider#配置rabbitMq 服务器rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guest

 2.初始化队列和交换机,并进行绑定

package com.atguigu.demomq;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 功能:* 作者:程序员ZXY* 日期:2025/3/8 下午1:55*/
@Configuration
public class DirectRabbitConfig {@Beanpublic  Queue TestDirectQueue(){return new Queue("TestDirectQueue",true);}@BeanDirectExchange TestDirectExchange(){return new DirectExchange("TestDirectExchange",true,false);}@BeanBinding bindingDirect(){return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");}
} 

 3.实现sendDirectMessage发送消息请求,由生产者发送到MQ,TestDirectRouting作为Key,用于精确转发。

package com.atguigu.demomq;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;/*** 功能:* 作者:程序员ZXY* 日期:2025/3/8 下午2:12*/
@RestController
public class SendMessageController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/sendDirectMessage")public String sendDirectMessage() {String messageId = String.valueOf(UUID.randomUUID());String messageData = "Hello MQ!";String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));Map<String,Object> map=new HashMap<>();map.put("messageId",messageId);map.put("messageData",messageData);map.put("createTime",createTime);//将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchangerabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map);return "OK";}
}

4.此时就可以启动项目发送消息了,使用PostMan发送消息,返回OK说明发送成功

5.进入http://localhost:15672/,可以看到消息发送成功,我这里是请求了两次(也就是发了两条消息)。

6.接下来写消费者的消费过程,新创建一个SpringBoot项目,在yml文件配置如下

server:port: 8022
spring:application:name: rabbitmq-provider#配置rabbitMq 服务器rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guest

7.消费者配置类,同样TestDirectRouting用于唯一识别Key

package com.atguigu.demomq2;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 功能:* 作者:程序员ZXY* 日期:2025/3/8 下午 */
@Configuration
public class DirectRabbitConfig {@Beanpublic Queue TestDirectQueue() {return new Queue("TestDirectQueue",true);}@BeanDirectExchange TestDirectExchange() {return new DirectExchange("TestDirectExchange");}@BeanBinding bindingDirect() {return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");}
}

8.消费者 接收消息@RabbitListener(queues = "TestDirectQueue")用于监听指定队列发送的消息

package com.atguigu.demomq2;import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;@Component
@RabbitListener(queues = "TestDirectQueue")
public class DirectReceiver {@RabbitHandlerpublic void process(Map testMessage) {System.out.println("DirectReceiver消费者收到消息  : " + testMessage.toString());}}

 9.启动消费者,成功接收消息

10.查看MQ控制台,消息成功被消费 

2. 扇出交换机(Fanout Exchange)​

路由机制(一个交换机转发到多个队列)
  • 广播模式:忽略 Routing Key,将消息发送到所有绑定的队列
  • 典型场景:消息的全局通知或并行处理。
应用场景
  • 实时通知系统:如用户注册成功后,同时发送邮件、短信、更新缓存。
  • 日志广播:多个服务订阅同一日志源,各自独立处理。

 使用扇出交换机实现消息发送和接收

1.扇出交换机配置

package com.atguigu.demomq;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class FanoutExchangeConfig {// 定义扇出交换机@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange("fanout.user.register", true, false);}// 定义邮件队列@Beanpublic Queue emailQueue() {return new Queue("fanout.user.email", true);}// 定义短信队列@Beanpublic Queue smsQueue() {return new Queue("fanout.user.sms", true);}// 绑定所有队列到扇出交换机(无需路由键)@Beanpublic Binding emailBinding() {return BindingBuilder.bind(emailQueue()).to(fanoutExchange());}@Beanpublic Binding smsBinding() {return BindingBuilder.bind(smsQueue()).to(fanoutExchange());}
}

2.生产者

package com.atguigu.demomq;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class FanoutUserService {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/sendFanoutMessage")public String sendRegisterBroadcast() {rabbitTemplate.convertAndSend("fanout.user.register", "", // 扇出交换机忽略路由键"message MQ");return "OK Fan";}
}

3.消费者

package com.atguigu.demomq2;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class FanoutNotificationConsumer {@RabbitListener(queues = "fanout.user.email")public void handleEmail(String message) {System.out.println("[Email] Received: " + message);}@RabbitListener(queues = "fanout.user.sms")public void handleSms(String message) {System.out.println("[SMS] Received: " + message);}
}

4.请求并查看消费结果 

可以看到一个交换机完成消费两条消息 

3. 主题交换机(Topic Exchange)​

路由机制
  • 模式匹配路由键:使用 *(匹配一个单词)和 #(匹配多个单词)通配符。
  • 典型场景:灵活的多条件消息路由。
应用场景
  • 新闻订阅系统:用户订阅特定主题(如 news.sports.*news.tech.#)。
  • 设备状态监控:根据设备类型和区域路由消息(如 sensor.temperature.room1)。

1.配置主题交换机

package com.atguigu.demomq;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class TopicExchangeConfig {// 定义主题交换机@Beanpublic TopicExchange topicExchange() {return new TopicExchange("topic.news", true, false);}// 定义体育新闻队列@Beanpublic Queue sportsQueue() {return new Queue("topic.news.sports", true);}// 定义科技新闻队列@Beanpublic Queue techQueue() {return new Queue("topic.news.tech", true);}// 绑定体育队列:匹配 news.sports.*@Beanpublic Binding sportsBinding() {return BindingBuilder.bind(sportsQueue()).to(topicExchange()).with("news.sports.*");}// 绑定科技队列:匹配 news.tech.#@Beanpublic Binding techBinding() {return BindingBuilder.bind(techQueue()).to(topicExchange()).with("news.tech.#");}
}

2.生产者

package com.atguigu.demomq;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class TopicNewsService {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/sendTopicMessage1")public String  sendSportsNews() {rabbitTemplate.convertAndSend("topic.news", "news.sports.football","* message:news.sports.football");return "*OK";}@GetMapping("/sendTopicMessage2")public String sendTechNews() {rabbitTemplate.convertAndSend("topic.news", "news.tech.ai.abc.123456","# message:news.tech.ai.abc.123456");return "#OK";}
}

3. 消费者

package com.atguigu.demomq2;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class TopicNewsConsumer {@RabbitListener(queues = "topic.news.sports")public void handleSports(String message) {System.out.println("[Sports] Received: " + message);}@RabbitListener(queues = "topic.news.tech")public void handleTech(String message) {System.out.println("[Tech] Received: " + message);}
}

4.发送请求

 可以看到消息成功消费,第一个为*通配符,第二个为#通配符

4. 头交换机(Headers Exchange)​

路由机制( 我的理解是一种基于 ​多条件组合 的消息路由机制
  • 基于消息头(Headers)匹配:忽略 Routing Key,通过键值对(Headers)匹配队列绑定的条件。
  • 匹配规则x-match 参数设为 all(需全部匹配)或 any(匹配任意一个)。
应用场景
  • 复杂路由逻辑:如根据消息的版本号、语言等元数据路由。
  • 多维度过滤:如同时匹配用户类型(user_type: vip)和地理位置(region: asia)。

1.头交换机配置

package com.atguigu.demomq;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class HeadersExchangeConfig {// 定义头交换机@Beanpublic HeadersExchange headersExchange() {return new HeadersExchange("headers.user", true, false);}// 定义VIP用户队列@Beanpublic Queue vipQueue() {return new Queue("headers.user.vip", true);}// 绑定VIP队列,要求同时匹配 userType=vip 和 region=asia@Beanpublic Binding vipBinding() {Map<String, Object> headers = new HashMap<>();headers.put("userType", "vip");headers.put("region", "asia");return BindingBuilder.bind(vipQueue()).to(headersExchange()).whereAll(headers).match(); // whereAll 表示需全部匹配}
}

2.生产者

package com.atguigu.demomq;import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class HeaderUserVipService {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/sendHeaderMessage")public String  sendVipMessage() {MessageProperties props = new MessageProperties();props.setHeader("userType", "vip");props.setHeader("region", "asia");Message msg = new Message("HeaderMessage".getBytes(), props);rabbitTemplate.send("headers.user", "", msg);return "OK";}
}

3.消费者

package com.atguigu.demomq2;import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class HeaderUserVipConsumer {@RabbitListener(queues = "headers.user.vip")public void handleVip(Message message) {String body = new String(message.getBody());System.out.println("[VIP] Received: " + body);}
}

4.PostMan测试 

 

这里仅消费交换机初始化时满足所有设定条件的消息,我们可以测试一下不满足条件时发送消息

消费者不消费消息 

总结 

需要代码自己进行测试的 可以Git自取

git clone https://gitee.com/myselfzxy/mq-producer.git

git clone https://gitee.com/myselfzxy/mq-customer.git

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/31980.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Docker 配置镜像源

》》Daemon {"registry-mirrors": ["https://docker.1ms.run","https://docker.xuanyuan.me"] }》》》然后在重新 docker systemctl restart docker

llamafactory 微调教程

文章目录 llamlafactory微调deepseekr1-0.5b1.1 说明1.2 搭建环境创建GPU实例连接实例部署llama_factory创建隧道&#xff0c;配置端口转发访问llama_factory 1.3 微调大模型从huggingface上下载基座模型查看模型是否下载成功准备数据集微调评估微调效果导出合并后的模型 释放实…

[项目]基于FreeRTOS的STM32四轴飞行器: 七.遥控器按键

基于FreeRTOS的STM32四轴飞行器: 七.遥控器 一.遥控器按键摇杆功能说明二.摇杆和按键的配置三.按键扫描 一.遥控器按键摇杆功能说明 两个手柄四个ADC。 左侧手柄&#xff1a; 前后推为飞控油门&#xff0c;左右推为控制飞机偏航角。 右侧手柄&#xff1a; 控制飞机飞行方向&a…

2025-03-08 学习记录--C/C++-PTA 习题10-1 判断满足条件的三位数

合抱之木&#xff0c;生于毫末&#xff1b;九层之台&#xff0c;起于累土&#xff1b;千里之行&#xff0c;始于足下。&#x1f4aa;&#x1f3fb; 一、题目描述 ⭐️ 裁判测试程序样例&#xff1a; #include <stdio.h> #include <math.h>int search( int n );int…

光谱相机检测肉类新鲜度的原理

光谱相机通过分析肉类样本在特定波长范围内的光谱反射特性&#xff0c;结合化学与生物指标的变化规律&#xff0c;实现对其新鲜度的无损检测。其核心原理可概括为以下方面&#xff1a; 一、光谱特征与物质成分的关联性 ‌物质特异性吸收/反射‌ 不同化学成分&#xff08;如水分…

【一起学Rust | Tauri2.0框架】基于 Rust 与 Tauri 2.0 框架实现软件开机自启

文章目录 前言 一、准备工作1.1 环境搭建1.2 创建 Tauri 项目1.3 添加依赖 二、实现开机自启的基本原理2.1 开机自启的基本概念2.2 Tauri 应用的生命周期 三、Windows 平台实现3.1 Windows 注册表机制3.2 实现步骤3.3 注意事项 四、Linux 平台实现4.1 Linux systemd 服务4.2 实…

Windows10下docker desktop命令行操作指南(大部分也适用于Linux)

Windows系统最大的特点就是可视化操作&#xff0c;点点鼠标就能操作软件。但是在特殊的情况下&#xff0c;比如docker desktop图标点了之后没反应&#xff0c;但是看后台程序&#xff0c;它又已经运行了&#xff0c;这时候就要使用命令行来操作了。 针对这次情况&#xff0c;所…

静态时序分析:无法满足的生成时钟(TIM-255警告、UITE-461或PTE-075错误)

相关阅读 静态时序分析https://blog.csdn.net/weixin_45791458/category_12567571.html?spm1001.2014.3001.5482 在阅读本文前&#xff0c;强烈建议首先阅读介绍生成时钟的文章&#xff0c;尤其是其中关于时钟极性和反相的相关内容。 静态时序分析&#xff1a;SDC约束命令cr…

计算机网络--访问一个网页的全过程

文章目录 访问一个网页的全过程应用层在浏览器输入URL网址http://www.aspxfans.com:8080/news/index.aspboardID5&ID24618&page1#r_70732423通过DNS获取IP地址生成HTTP请求报文应用层最后 传输层传输层处理应用层报文建立TCP连接传输层最后 网络层网络层对TCP报文进行处…

从零开发Chrome广告拦截插件:开发、打包到发布全攻略

从零开发Chrome广告拦截插件&#xff1a;开发、打包到发布全攻略 想打造一个属于自己的Chrome插件&#xff0c;既能拦截烦人的广告&#xff0c;又能优雅地发布到Chrome Web Store&#xff1f;别担心&#xff0c;这篇教程将带你从零开始&#xff0c;动手开发一个功能强大且美观…

AI智能眼镜主控芯片:技术演进与产业生态的深度解析

一、AI智能眼镜的技术挑战与主控芯片核心诉求 AI智能眼镜作为XR&#xff08;扩展现实&#xff09;技术的代表产品&#xff0c;其核心矛盾在于性能、功耗与体积的三角平衡。主控芯片作为设备的“大脑”&#xff0c;需在有限空间内实现复杂计算、多模态交互与全天候续航&#xf…

elasticsearch 8.17.3部署文档

elasticsearch 8.17.3部署文档 一、架构拓扑 ip主机名角色192.168.241.151slave1master192.168.241.152slave2node1192.168.241.153slave3node2 二、安装包下载——分别下载上传至所有的节点 下载地址https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-…

PySide(PyQT),QGraphicsItem的pos()和scenePos()区别

在QGraphicsItem中&#xff0c;pos()和scenePos()是两个重要的方法&#xff0c;用于描述图形项的位置&#xff0c;但它们的含义和用途有所不同。理解它们的区别对于正确操作和管理QGraphicsItem的位置至关重要。 1. pos()方法 • 定义&#xff1a;pos()返回的是QGraphicsItem在…

Linux 进程控制:创建、终止、等待与程序替换全解析

亲爱的读者朋友们&#x1f603;&#xff0c;此文开启知识盛宴与思想碰撞&#x1f389;。 快来参与讨论&#x1f4ac;&#xff0c;点赞&#x1f44d;、收藏⭐、分享&#x1f4e4;&#xff0c;共创活力社区。 目录 1.进程创建 1-1 fork函数初识​ 1-2 fork函数返回值​ 1-3…

GStreamer —— 2.18、Windows下Qt加载GStreamer库后运行 - “播放教程 6:音频可视化“(附:完整源码)

运行效果 介绍 GStreamer 带有一组将音频转换为视频的元素。他们 可用于科学可视化或为您的音乐增添趣味 player 的本教程展示了&#xff1a; • 如何启用音频可视化 • 如何选择可视化元素 启用音频可视化实际上非常简单。设置相应的标志&#xff0c;当纯音频流为 found&#…

Excel多级联动下拉菜单设置

1.问题描述 现有数据表如下图所示&#xff1a; 该表中包括省、市、县三级目录。 现要将其整理成数据表模板&#xff0c;如下图所示&#xff1a; 要求制作成下拉菜单的形式&#xff0c;且每一级目录的下拉菜单列表要根据上一级目录的内容来确定。 如上图所示&#xff0c;只有…

Web基础:HTML快速入门

HTML基础语法 HTML&#xff08;超文本标记语言&#xff09; 是用于创建网页内容的 标记语言&#xff0c;通过定义页面的 结构和内容 来告诉浏览器如何呈现网页。 超文本&#xff08;Hypertext&#xff09; 是一种通过 链接&#xff08;Hyperlinks&#xff09; 将不同文本、图像…

VSTO(C#)Excel开发2:Excel对象模型和基本操作

初级代码游戏的专栏介绍与文章目录-CSDN博客 我的github&#xff1a;codetoys&#xff0c;所有代码都将会位于ctfc库中。已经放入库中我会指出在库中的位置。 这些代码大部分以Linux为目标但部分代码是纯C的&#xff0c;可以在任何平台上使用。 源码指引&#xff1a;github源…

PostgreSQL学习笔记:PostgreSQL vs MySQL

PostgreSQL 和 MySQL 都是广泛使用的关系型数据库管理系统&#xff0c;它们有以下一些对比&#xff1a; 一、功能特性 1. 数据类型支持 PostgreSQL&#xff1a;支持丰富的数据类型&#xff0c;包括数组、JSON、JSONB、范围类型、几何类型等。对于复杂数据结构的存储和处理非…

Matlab:矩阵运算篇——矩阵

目录 1.定义 实例——创建矩阵 实例——创建复数矩阵 2.矩阵的生成 实例——M文件矩阵 2.利用文本创建 实例——创建生活用品矩阵 3.创建特殊矩阵 实例——生成特殊矩阵 4.矩阵元素的运算 1.矩阵元素的修改 实例——新矩阵的生成 2.矩阵的变维 实例——矩阵维度修…