Spring RabbitMQ那些事(1-交换机配置消息发送订阅实操)

这里写目录标题

  • 一、序言
  • 二、配置文件application.yml
  • 三、RabbitMQ交换机和队列配置
    • 1、定义4个队列
    • 2、定义Fanout交换机和队列绑定关系
    • 2、定义Direct交换机和队列绑定关系
    • 3、定义Topic交换机和队列绑定关系
    • 4、定义Header交换机和队列绑定关系
  • 四、RabbitMQ消费者配置
  • 五、RabbitMQ生产者
  • 六、测试用例
    • 1、发送到FanoutExchage
    • 2、发送到DirectExchage
    • 3、发送到TopicExchange
    • 4、发动到HeadersExchage
  • 七、结语

一、序言

在上一节 RabbitMQ中的核心概念和交换机类型 中我们介绍了RabbitMQ中的一些核心概念,尤其是各种交换机的类型,接下来我们将具体讲解各种交换机的配置和消息订阅实操。


二、配置文件application.yml

我们先上应用启动配置文件application.yml,如下:

server:port: 8080
spring:rabbitmq:addresses: localhost:5672username: adminpassword: adminvirtual-host: /listener:type: simplesimple:acknowledge-mode: autoconcurrency: 5max-concurrency: 20prefetch: 5

备注:这里我们指定了RabbitListenerContainerFactory的类型为SimpleRabbitListenerContainerFactory,并且指定消息确认模式为自动确认

三、RabbitMQ交换机和队列配置

Spring官方提供了一套 流式API 来定义队列交换机绑定关系,非常的方便,接下来我们定义4种类型的交换机和相应队列的绑定关系。

1、定义4个队列

/*** 定义4个队列*/
@Configuration
protected static class QueueConfig {@Beanpublic Queue queue1() {return QueueBuilder.durable("queue-1").build();}@Beanpublic Queue queue2() {return QueueBuilder.durable("queue-2").build();}@Beanpublic Queue queue3() {return QueueBuilder.durable("queue-3").build();}@Beanpublic Queue queue4() {return QueueBuilder.durable("queue-4").build();}
}

2、定义Fanout交换机和队列绑定关系

/*** 定义Fanout交换机和对应的绑定关系*/
@Configuration
protected static class FanoutExchangeBindingConfig {@Beanpublic FanoutExchange fanoutExchange() {return ExchangeBuilder.fanoutExchange("fanout-exchange").build();}/*** 定义多个Fanout交换机和队列的绑定关系* @param fanoutExchange* @param queue1* @param queue2* @param queue3* @param queue4* @return*/@Beanpublic Declarables bindQueueToFanoutExchange(FanoutExchange fanoutExchange, Queue queue1, Queue queue2, Queue queue3, Queue queue4) {Binding queue1Binding = BindingBuilder.bind(queue1).to(fanoutExchange);Binding queue2Binding = BindingBuilder.bind(queue2).to(fanoutExchange);Binding queue3Binding = BindingBuilder.bind(queue3).to(fanoutExchange);Binding queue4Binding = BindingBuilder.bind(queue4).to(fanoutExchange);return new Declarables(queue1Binding, queue2Binding, queue3Binding, queue4Binding);}}

备注:这里我们将4个队列绑定到了名为fanout-exchange的交换机上。

2、定义Direct交换机和队列绑定关系

@Configuration
protected static class DirectExchangeBindingConfig {@Beanpublic DirectExchange directExchange() {return ExchangeBuilder.directExchange("direct-exchange").build();}@Beanpublic Binding bindingQueue3ToDirectExchange(DirectExchange directExchange, Queue queue3) {return BindingBuilder.bind(queue3).to(directExchange).with("queue3-route-key");}
}

备注:这里我们定义了名为direct-exchange的交换机并通过路由keyqueue3-route-keyqueue-3绑定到了该交换机上。


3、定义Topic交换机和队列绑定关系

@Configuration
protected static class TopicExchangeBindingConfig {@Beanpublic TopicExchange topicExchange() {return ExchangeBuilder.topicExchange("topic-exchange").build();}@Beanpublic Declarables bindQueueToTopicExchange(TopicExchange topicExchange, Queue queue1, Queue queue2) {Binding queue1Binding = BindingBuilder.bind(queue1).to(topicExchange).with("com.order.*");Binding queue2Binding = BindingBuilder.bind(queue2).to(topicExchange).with("com.#");return new Declarables(queue1Binding, queue2Binding);}
}

这里我们定义了名为topic-exchange类型的交换机,该类型交换机支持路由key通配符匹配,*代表一个任意字符,#代表一个或多个任意字符。

备注:

  1. 通过路由keycom.order.*queue-1绑定到了该交换机上。
  2. 通过路由key com.#queue-2也绑定到了该交换机上。

4、定义Header交换机和队列绑定关系

@Configuration
protected static class HeaderExchangeBinding {@Beanpublic HeadersExchange headersExchange() {return ExchangeBuilder.headersExchange("headers-exchange").build();}@Beanpublic Binding bindQueueToHeadersExchange(HeadersExchange headersExchange, Queue queue4) {return BindingBuilder.bind(queue4).to(headersExchange).where("function").matches("logging");}
}

备注:这里我们定义了名为headers-exchange类型的交换机,并通过参数function=loggingqueue-4绑定到了该交换机上。


四、RabbitMQ消费者配置

Spring RabbitMQ中支持注解式监听端点配置,用于异步接收消息,如下:

@Slf4j
@Component
public class RabbitMqConsumer {@RabbitListener(queues = "queue-1")public void handleMsgFromQueue1(String msg) {log.info("Message received from queue-1, message body: {}", msg);}@RabbitListener(queues = "queue-2")public void handleMsgFromQueue2(String msg) {log.info("Message received from queue-2, message body: {}", msg);}@RabbitListener(queues = "queue-3")public void handleMsgFromQueue3(String msg) {log.info("Message received from queue-3, message body: {}", msg);}@RabbitListener(queues = "queue-4")public void handleMsgFromQueue4(String msg) {log.info("Message received from queue-4, message body: {}", msg);}
}

备注:这里我们分别定义了4个消费者,分别用来接受4个队列的消息。

五、RabbitMQ生产者

@Slf4j
@Component
@RequiredArgsConstructor
public class RabbitMqProducer {private final RabbitTemplate rabbitTemplate;public void sendMsgToFanoutExchange(String body) {log.info("开始发送消息到fanout-exchange, 消息体:{}", body);Message message = MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).build();rabbitTemplate.send("fanout-exchange", StringUtils.EMPTY, message);}public void sendMsgToDirectExchange(String body) {log.info("开始发送消息到direct-exchange, 消息体:{}", body);Message message = MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).build();rabbitTemplate.send("direct-exchange", "queue3-route-key", message);}public void sendMsgToTopicExchange(String routingKey, String body) {log.info("开始发送消息到topic-exchange, 消息体:{}", body);Message message = MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).build();rabbitTemplate.send("topic-exchange", routingKey, message);}public void sendMsgToHeadersExchange(String body) {log.info("开始发送消息到headers-exchange, 消息体:{}", body);MessageProperties messageProperties = MessagePropertiesBuilder.newInstance().setHeader("function", "logging").build();Message message = MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).andProperties(messageProperties).build();rabbitTemplate.send("headers-exchange", StringUtils.EMPTY, message);}}

六、测试用例

这里写了个简单的Controller用来测试,如下:

@RestController
@RequiredArgsConstructor
public class RabbitMsgController {private final RabbitMqProducer rabbitMqProducer;@RequestMapping("/exchange/fanout")public ResponseEntity<String> sendMsgToFanoutExchange(String body) {rabbitMqProducer.sendMsgToFanoutExchange(body);return ResponseEntity.ok("广播消息发送成功");}@RequestMapping("/exchange/direct")public ResponseEntity<String> sendMsgToDirectExchange(String body) {rabbitMqProducer.sendMsgToDirectExchange(body);return ResponseEntity.ok("消息发送到Direct交换成功");}@RequestMapping("/exchange/topic")public ResponseEntity<String> sendMsgToTopicExchange(String routingKey, String body) {rabbitMqProducer.sendMsgToTopicExchange(routingKey, body);return ResponseEntity.ok("消息发送到Topic交换机成功");}@RequestMapping("/exchange/headers")public ResponseEntity<String> sendMsgToHeadersExchange(String body) {rabbitMqProducer.sendMsgToHeadersExchange(body);return ResponseEntity.ok("消息发送到Headers交换机成功");}}

1、发送到FanoutExchage

直接访问http://localhost:8080/exchange/fanout?body=hello,可以看到该消息广播到了4个队列上。

2023-11-07 17:41:12.959  INFO 39460 --- [nio-8080-exec-9] c.u.r.i.producer.RabbitMqProducer        : 开始发送消息到fanout-exchange, 消息体:hello
2023-11-07 17:41:12.972  INFO 39460 --- [ntContainer#1-5] c.u.r.i.consumer.RabbitMqConsumer        : Message received from queue-1, message body: hello
2023-11-07 17:41:12.972  INFO 39460 --- [ntContainer#0-4] c.u.r.i.consumer.RabbitMqConsumer        : Message received from queue-4, message body: hello
2023-11-07 17:41:12.972  INFO 39460 --- [ntContainer#3-3] c.u.r.i.consumer.RabbitMqConsumer        : Message received from queue-3, message body: hello
2023-11-07 17:41:12.972  INFO 39460 --- [ntContainer#2-4] c.u.r.i.consumer.RabbitMqConsumer        : Message received from queue-2, message body: hello

2、发送到DirectExchage

访问http://localhost:8080/exchange/direct?body=hello,可以看到消息通过路由keyqueue3-route-key发送到了queue-3上。

2023-11-07 17:43:26.804  INFO 39460 --- [nio-8080-exec-1] c.u.r.i.producer.RabbitMqProducer        : 开始发送消息到direct-exchange, 消息体:hello
2023-11-07 17:43:26.822  INFO 39460 --- [ntContainer#3-5] c.u.r.i.consumer.RabbitMqConsumer        : Message received from queue-3, message body: hello

3、发送到TopicExchange

访问http://localhost:8080/exchange/topic?body=hello&routingKey=com.order.create,路由key为 com.order.create的消息分别发送到了queue-1queue-2上。

2023-11-07 17:44:45.301  INFO 39460 --- [nio-8080-exec-4] c.u.r.i.producer.RabbitMqProducer        : 开始发送消息到topic-exchange, 消息体:hello
2023-11-07 17:44:45.312  INFO 39460 --- [ntContainer#1-3] c.u.r.i.consumer.RabbitMqConsumer        : Message received from queue-1, message body: hello
2023-11-07 17:44:45.312  INFO 39460 --- [ntContainer#2-3] c.u.r.i.consumer.RabbitMqConsumer        : Message received from queue-2, message body: hello

4、发动到HeadersExchage

访问http://localhost:8080/exchange/headers?body=hello,消息通过头部信息function=logging发送到了headers-exchange上。

2023-11-07 17:47:21.736  INFO 39460 --- [nio-8080-exec-9] c.u.r.i.producer.RabbitMqProducer        : 开始发送消息到headers-exchange, 消息体:hello
2023-11-07 17:47:21.749  INFO 39460 --- [ntContainer#0-3] c.u.r.i.consumer.RabbitMqConsumer        : Message received from queue-4, message body: hello

七、结语

下一节我们将会介绍通过两种方式借由RabbitMQ实现延迟消息发送和订阅,敬请期待。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/186353.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

各大电商平台关于预制菜品种酸菜鱼销售量

# 导入需要的包 library(rvest) # 用于网页抓取 library(tidyverse) # 用于数据处理 library(stringr) # 用于字符串处理# 设置代理信息 proxy_host <- "www.duoip.cn" proxy_port <- 8000# 设置要爬取的网页 url <- "https://jshk.com.cn/products/sa…

【正点原子STM32连载】 第四十九章 SD卡实验 摘自【正点原子】APM32F407最小系统板使用指南

1&#xff09;实验平台&#xff1a;正点原子stm32f103战舰开发板V4 2&#xff09;平台购买地址&#xff1a;https://detail.tmall.com/item.htm?id609294757420 3&#xff09;全套实验源码手册视频下载地址&#xff1a; http://www.openedv.com/thread-340252-1-1.html## 第四…

Spring的循环依赖问题

文章目录 1.什么是循环依赖2.代码演示3.分析问题4.问题解决5.Spring循环依赖6. 疑问点6.1 为什么需要三级缓存6.2 没有三级缓存能解决吗&#xff1f;6.3 三级缓存分别什么作用 1.什么是循环依赖 上图是循环依赖的三种情况&#xff0c;虽然方式有点不一样&#xff0c;但是循环依…

Yolov8模型训练报错:torch.cuda.OutOfMemoryError

最近在使用自己的数据训练Yolov8模型的时候遇到了很多错误&#xff0c;下面将逐一解答。 问题报错 在训练过程中红字报错&#xff1a;torch.cuda.OutOfMemoryError: CUDA out of memory. 后面还会跟着一大段报错&#xff1a; Tried to allocate XXX MiB (GPU 0; XXX GiB to…

【云原生】使用nginx反向代理后台多服务器

背景 随着业务发展&#xff0c; 用户访问量激增&#xff0c;单台服务器已经无法满足现有的访问压力&#xff0c;研究后需要将后台服务从原来的单台升级为多台服务器&#xff0c;那么原来的访问方式无法满足&#xff0c;所以引入nginx来代理多台服务器&#xff0c;统一请求入口…

OLED透明屏的应用场景有哪些

OLED透明屏在其他领域的应用包括&#xff1a; 商业展示&#xff1a;在商业展示中&#xff0c;OLED透明屏可以作为展示窗口&#xff0c;展示产品信息、广告宣传和品牌形象。通过将透明屏幕安装在展柜、货架或商业窗口中&#xff0c;可以吸引顾客的注意力并提供引人注目的展示效…

不用开会员就能在线编辑、管理及分享各类地理空间数据!

「四维轻云」作为一款地理空间数据云管理平台&#xff0c;具有三维模型、正射影像、激光点云、数字高程模型、人工模型和矢量数据等地理空间数据的在线管理、浏览及分享等功能&#xff0c;致力于为用户提供更加方便、快捷的地理空间数据解决方案。 一、发布、管理超大空间数据…

人大金仓三大兼容:SQL Server迁移无忧

SQL Server在数据库领域一直占据着重要地位。作为一款成熟稳定的关系型数据库管理系统&#xff0c;SQL Server在国内有着广泛的用户群体&#xff0c;医疗、海关、政务等行业的核心业务系统多采用SQL Server数据库。随着政策与市场的双重驱动&#xff0c;信息技术应用创新产业的…

Node.js中的文件系统(file system)模块

聚沙成塔每天进步一点点 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 欢迎来到前端入门之旅&#xff01;感兴趣的可以订阅本专栏哦&#xff01;这个专栏是为那些对Web开发感兴趣、刚刚踏入前端领域的朋友们量身打造的。无论你是完全的新手还是有一些基础的开发…

强大好用的shell:shell的工作原理是什么

Shell的工作原理可以简要概括为以下几个步骤&#xff1a; 1.命令行输入&#xff1a;用户在命令行界面输入命令。 2.命令解析&#xff1a;Shell接收用户的输入&#xff0c;并对命令进行解析。这个过程包括解析命令名、参数、选项等&#xff0c;将其转换成计算机可以理解的形式。…

jsonlite库编写代码示例

r # 导入jsonlite库 library(jsonlite) # 设置主机和端口 proxy_host <- proxy_port <- # 使用httr库创建一个对象 proxy <- create_proxy(proxy_host, proxy_port) # 使用httr库的GET方法下载网页内容 url <- "" response <- GET(url, proxy pr…

将 Figma 轻松转换为 Sketch 的免费方法

最近浏览网站的时候&#xff0c;发现很多人不知道Figma是怎么转Sketch的。众所周知&#xff0c;Figma支持Sketch文件的导入&#xff0c;但不支持Sketch的导出&#xff0c;那么Figma是如何转Sketch的呢&#xff1f;不用担心&#xff0c;建议使用神器即时设计。它是一个可以实现在…

《嵌入式虚拟化技术与应用》:深入浅出阐述嵌入式虚拟机原理,实现“小而能”嵌入式虚拟机!

目录 关于博主前言专家推荐本书适合谁&#xff1f;内容简介书本目录权威作者团队其他 关于博主 &#x1f680;Python爬虫项目实战系列文章&#xff01;&#xff01; ⭐⭐欢迎订阅⭐⭐ 【Python爬虫项目实战一】获取Chatgpt3.5免费接口文末付代码&#xff08;过Authorization认…

高能数造电池3D打印智能制造小试线,开启全固态电池数字化新时代

在科技创新的浪潮中&#xff0c;电池制造领域又迎来了一次突破性的进展。近日&#xff0c;高能数造(西安)技术有限公司重磅推出了其最新电池数字制造装备——全固态电池3D打印智能制造小试线 &#xff0c;这一创新性的技术开启了全固态电池的数字化智造新时代&#xff0c;为全固…

如何存储队列位置信息

实际运行中的系统&#xff0c;难免会遇到重新消费某条消息、跳过一段时间内的消息等情况。这些异常情况的处理&#xff0c;都和Offset有关。本节主要分析Offset的存储位置&#xff0c;以及如何根据需要调整Offset的值。 首先来明确一下Offset的含义&#xff0c;RocketMQ中&…

Linux每日智囊

每日分享三个Linux命令&#xff0c;悄悄培养读者的Linux技能。 info 作用 查看程序、库和系统文档的详细信息。 info命令和man命令都用于查看命令和程序的帮助信息&#xff0c;区别如下&#xff1a; man命令&#xff1a;是最常用的命令之一&#xff0c;用于查看Linux系统上…

Apipost-Helper:IDEA中的类postman工具

今天给大家推荐一款IDEA插件&#xff1a;Apipost-Helper-2.0&#xff0c;写完代码IDEA内一键生成API文档&#xff0c;无需安装、打开任何其他软件&#xff1b;写完代码IDEA内一键调试&#xff0c;无需安装、打开任何其他软件&#xff1b;生成API目录树&#xff0c;双击即可快速…

如何以管理员的身份运行Powershell

大全&#xff01;珍藏 方式一&#xff1a;在Cortana搜索栏中打开带管理员权限的PowerShell Windows 10的任务栏自带了搜索。或者开始菜单选搜索只需在搜索框中输入powershell。 在出来的搜索结果中右击Windows PowerShell&#xff0c;然后选择以管理员方式运行。 随后会弹出UA…

React进阶之路(二)-- 组件通信、组件进阶

文章目录 组件通信组件通信的意义父传子实现props说明子传父实现兄弟组件通信跨组件通信Context通信案例 React组件进阶children属性props校验组件生命周期 组件通信 组件通信的意义 组件是独立且封闭的单元&#xff0c;默认情况下组件只能使用自己的数据&#xff08;state&a…

Go cobra简介

当你需要为你的 Go 项目创建一个强大的命令行工具时&#xff0c;你可能会遇到许多挑战&#xff0c;比如如何定义命令、标志和参数&#xff0c;如何生成详细的帮助文档&#xff0c;如何支持子命令等等。为了解决这些问题&#xff0c;github.com/spf13/cobra 就可以派上用场。 g…