RabbitMQ 路由(Routing)通讯方式详解

在现代分布式系统中,消息队列(Message Queue)是实现异步通信、解耦系统组件的重要工具。RabbitMQ 作为一个广泛使用的消息代理(Message Broker),提供了多种消息传递模式,其中路由(Routing)模式是一种非常强大且灵活的通讯方式。本文将深入探讨 RabbitMQ 中的路由模式,帮助读者理解其工作原理、应用场景以及如何通过 Java 代码实现。


1. 什么是路由模式?

在 RabbitMQ 中,路由模式是基于 Direct Exchange 的一种消息传递模式。与简单的 Fanout Exchange 不同,Direct Exchange 允许消息发送者根据特定的路由键(Routing Key)将消息发送到特定的队列。这种模式提供了更细粒度的消息分发控制,使得消息可以根据业务需求被精确地路由到目标队列。

在这里插入图片描述

1.1 关键概念

  1. Direct Exchange: 直接交换机,根据消息的路由键将消息路由到绑定到该交换机的队列。
  2. Routing Key: 路由键是消息的一个属性,用于指定消息的目标队列。交换机会根据路由键将消息路由到匹配的队列。
  3. Binding: 绑定是交换机和队列之间的关联关系。在绑定过程中,可以指定一个路由键,交换机会根据这个路由键将消息路由到相应的队列。

2. 路由模式的工作原理

在路由模式中,消息的发送者和接收者通过交换机进行通信。以下是路由模式的工作流程:

  1. 生产者(Producer) 发送消息到 Direct Exchange,并指定一个路由键。
  2. Direct Exchange 根据消息的路由键,将消息路由到与之绑定的队列。
  3. 消费者(Consumer) 从队列中接收消息并进行处理。

2.1 示例场景

假设我们有一个日志系统,需要将不同级别的日志(如 infoerrorwarning)发送到不同的队列,以便不同的消费者处理。我们可以使用路由模式来实现这一需求。

  1. 定义交换机: 创建一个 Direct Exchange,命名为 logs_exchange
  2. 定义队列: 创建三个队列,分别命名为 info_queueerror_queuewarning_queue
  3. 绑定队列: 将 info_queue 绑定到 logs_exchange,并指定路由键为 info;将 error_queue 绑定到 logs_exchange,并指定路由键为 error;将 warning_queue 绑定到 logs_exchange,并指定路由键为 warning
  4. 发送消息: 生产者发送消息到 logs_exchange,并指定路由键为 infoerrorwarning
  5. 接收消息: 消费者从相应的队列中接收消息并进行处理。

3. 路由模式的 Java 实现

以下是一个使用 Java 和 RabbitMQ Java Client 库实现路由模式的示例代码。

3.1 添加依赖

首先,在 pom.xml 中添加 RabbitMQ 的依赖:

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.20.0</version>
</dependency>

3.2 生产者代码

生产者负责发送消息到 Direct Exchange,并指定路由键。

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.nio.charset.StandardCharsets;public class DirectProducer {private static final String EXCHANGE_NAME = "routing_exchange";public static void main(String[] argv) throws Exception {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.200.138");factory.setPort(5672);factory.setVirtualHost("/test");factory.setUsername("test");factory.setPassword("test");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 声明 Direct Exchangechannel.exchangeDeclare(EXCHANGE_NAME, "direct");// 定义路由键和消息String severity01 = "info"; // 可以是 "info", "error", "warning"String message01 = "This is an info message";// 发送消息到交换机,并指定路由键channel.basicPublish(EXCHANGE_NAME, severity01, null, message01.getBytes(StandardCharsets.UTF_8));System.out.println(" [x] Sent '" + severity01 + "':'" + message01 + "'");// 定义路由键和消息String severity02 = "warning"; // 可以是 "info", "error", "warning"String message02 = "This is an info message";// 发送消息到交换机,并指定路由键channel.basicPublish(EXCHANGE_NAME, severity02, null, message02.getBytes(StandardCharsets.UTF_8));System.out.println(" [x] Sent '" + severity02 + "':'" + message02 + "'");}}
}

3.3 消费者代码

消费者负责从队列中接收消息并处理。

3.3.1 消费者DirectConsumer01

  • 接受消息中包含info, error, warning的数据。
import com.rabbitmq.client.*;import java.nio.charset.StandardCharsets;public class DirectConsumer01 {private static final String EXCHANGE_NAME = "routing_exchange";public static void main(String[] argv) throws Exception {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.200.138");factory.setPort(5672);factory.setVirtualHost("/test");factory.setUsername("test");factory.setPassword("test");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 声明 Direct Exchangechannel.exchangeDeclare(EXCHANGE_NAME, "direct");// 创建临时队列String queueName = channel.queueDeclare().getQueue();// 绑定队列到交换机,并指定路由键String[] severities = {"info", "error", "warning"}; // 可以只绑定部分路由键for (String severity : severities) {channel.queueBind(queueName, EXCHANGE_NAME, severity);}System.out.println(" [*] Waiting for messages. To exit press CTRL+C");// 定义消息处理回调函数DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};// 开始消费消息channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});}
}

3.3.2 消费者DirectConsumer02

  • 接受消息中包含error, warning的数据,但不接受消息中有info的数据。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;import java.nio.charset.StandardCharsets;public class DirectConsumer02 {private static final String EXCHANGE_NAME = "routing_exchange";public static void main(String[] argv) throws Exception {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.200.138");factory.setPort(5672);factory.setVirtualHost("/test");factory.setUsername("test");factory.setPassword("test");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 声明 Direct Exchangechannel.exchangeDeclare(EXCHANGE_NAME, "direct");// 创建临时队列String queueName = channel.queueDeclare().getQueue();// 绑定队列到交换机,并指定路由键String[] severities = {"error", "warning"}; // 可以只绑定部分路由键for (String severity : severities) {channel.queueBind(queueName, EXCHANGE_NAME, severity);}System.out.println(" [*] Waiting for messages. To exit press CTRL+C");// 定义消息处理回调函数DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};// 开始消费消息channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});}
}

4. 运行示例

  1. 启动 RabbitMQ 服务: 确保 RabbitMQ 服务已启动并运行在 192.168.200.138
  2. 运行消费者: 启动消费者程序,绑定队列到交换机,并等待消息。
  3. 运行生产者: 启动生产者程序,发送带有不同路由键的消息。

4.1 输出示例

4.1.1 生产者输出
 [x] Sent 'info':'This is an info message'[x] Sent 'warning':'This is an info message'
4.1.2 消费者DirectConsumer01输出
 [*] Waiting for messages. To exit press CTRL+C[x] Received 'info':'This is an info message'[x] Received 'warning':'This is an info message'
4.1.3 消费者DirectConsumer02输出
 [*] Waiting for messages. To exit press CTRL+C[x] Received 'warning':'This is an info message'

在这里插入图片描述


5. 路由模式的应用场景

  1. 日志系统: 将不同级别的日志(如 infoerrorwarning)发送到不同的队列,以便不同的消费者处理。
  2. 通知系统: 根据用户的订阅类型(如 emailsmspush)将通知发送到不同的队列。
  3. 任务分发: 根据任务的类型(如 high_prioritylow_priority)将任务分发到不同的队列。

总结

RabbitMQ 的路由模式(Routing)通过 Direct Exchange 提供了灵活的消息分发机制,使得消息可以根据路由键被精确地路由到目标队列。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/493176.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

uniapp自定义树型结构数据弹窗,给默认选中的节点,禁用所有子节点

兼容H5、安卓App、微信小程序 实现逻辑&#xff1a;给默认选中节点的所有子节点添加一个disabled属性&#xff0c;以此禁用子节点。 /components/sonTreeNode/sonTreeNode.vue 封装成组件 <template><view><view :class"[item,item.is_level1?pL1:item…

疾风大模型气象系统:精准到分钟,预见天气未来

精准到分钟,预见天气未来 在现代社会中,气象预报的精准度直接关系到人们的生活质量和生产效率。传统的天气预报虽然能为我们提供趋势性参考,但在短时突发天气变化的应对上仍有一定局限。而疾风大模型气象系统凭借其领先的技术和精细化的预测能力,为气象预报树立了新的标杆…

2024年合肥师范学院信息安全小组内部选拔赛(c211)WP

目录 前言MISC签到题_熟悉吗又来一道签到题文件包含 CRYPTO古典1古典2RSA webbaby_sql 前言 [HFNU 校级选拔] 已经结束&#xff0c;接下来一起了解下题目是怎么做的。 通过网盘分享的文件&#xff1a;ARCHPR_4.66.266.0_汉化绿色版.7z 链接: https://pan.baidu.com/s/1N_c0PJX…

15.初识接口1 C#

这是一个用于实验接口的代码 适合初认识接口的人 【CSDN开头介绍】&#xff08;文心一言AI生成&#xff09; 在C#编程世界中&#xff0c;接口&#xff08;Interface&#xff09;扮演着至关重要的角色&#xff0c;它定义了一组方法&#xff0c;但不提供这些方法的实现。它要求所…

3.使用SD卡挂载petalinux根文件系统

前言 说明为什么使用SD卡挂载petalinux根文件系统如何使用SD卡挂载根文件系统 配置根文件写入类型制作SD分区格式化SD卡将工程目录下的rootfs.tar.gz解压到SD EXT4分区 为什么使用SD卡挂载petalinux根文件系统 Petalinux 默认的根文件系统类型是 INITRAMFS&#xff0c;不能…

【Vulkan入门】16-IndexBuffer

TOC 先叨叨 上篇介绍了如何使用VertexBuffer传入顶点信息。两个多星期了我们一直在玩三个点&#xff0c;本篇介绍如何渲染更多的点。 在渲染前考虑一个问题&#xff0c;渲染一个三角形需要三个点&#xff0c;渲染两个相接的三角形需要几个点&#xff1f; 答案是6个点&#xf…

计算机工作流程

分析下面的计算机工作流程&#xff1a; 1.取数a至ACC&#xff1a;PC程序寄存器自增1&#xff0c;变成0&#xff08;可以理解为PC初始从-1开始自增&#xff09;&#xff1b;接着PC把当前指令的地址给到MAR&#xff08;地址寄存器&#xff09;&#xff1b;MAR拿到当前地址后&…

Restaurants WebAPI(二)——DTO/CQRS

文章目录 项目地址一、DTO1.1 创建Restaurant的Dto1.2 修改之前未使用Dto的接口1.2.1 修改GetRestaurantByIdUseCase1.2.2 修改IGetRestaurantByIdUseCase接口1.2.3 再次请求接口1.3 显示Dish List1.3.1创建DishDto1.3.2 在RestaurantDto里添加DishDto1.3.3 使用Include添加Dis…

202412月最新植物大战僵尸杂交版【V3.0.1】更新内容与下载

以下是对UI优化和新内容添加的摘要&#xff1a; UI优化摘要&#xff1a; 主界面重做&#xff1a;对游戏的主界面进行全面的设计更新&#xff0c;提升用户体验。商店重做&#xff1a;对游戏内的商店界面进行重新设计&#xff0c;以改善玩家的购物体验。选卡界面增加图鉴功能&a…

MCU驱动使用

一、时钟的配置&#xff1a; AG32 通常使用 HSE 外部晶体&#xff08;范围&#xff1a;4M~16M&#xff09;。 AG32 中不需要手动设置 PLL 时钟&#xff08;时钟树由系统自动配置&#xff0c;无须用户关注&#xff09;。用户只需在配置文件中给出外部晶振频率和系统主频即可。 …

服务器防火墙设置某个端口号只允许固定 ip地址访问

服务器防火墙设置某个端口号只允许固定 ip地址访问是运维常见的功能&#xff0c;今天我们分享一下&#xff1a; 一、Linux环境 1、firewall 方式 1&#xff09;允许特定 IP 地址访问 23 端口 sudo firewall-cmd --zonepublic --add-rich-rulerule family"ipv4" s…

Hexo Next主题集成百度统计

个人博客地址&#xff1a;Hexo Next主题集成百度统计 | 一张假钞的真实世界。 首先&#xff0c;需要在百度统计控制台新增自己的站点。 点击“新增网站”按钮&#xff1a; 按照要求输入相关信息并保存&#xff0c;页面跳转至代码获取页面。从代码页面中拷贝网站的ID&#xff1…

8K+Red+Raw+ProRes422分享5个影视级视频素材网站

Hello&#xff0c;大家好&#xff0c;我是后期圈&#xff01; 在视频创作中&#xff0c;电影级的视频素材能够为作品增添专业质感&#xff0c;让画面更具冲击力。无论是广告、电影短片&#xff0c;还是品牌宣传&#xff0c;高质量的视频素材都是不可或缺的资源。然而&#xff…

JumpServer开源堡垒机搭建及使用

目录 一,产品介绍 二,功能介绍 三,系统架构 3.1 应用架构 3.2 组件说明 3.3 逻辑架构 3.3 逻辑架构 四,linux单机部署及方式选择 4.1 操作系统要求(JumpServer-v3系列版本) 4.1.1 数据库 4.1.3创建数据库参考 4.2 在线安装 4.2.1 环境访问 4.3 基于docker容…

华为云计算HCIE笔记01

第一章 华为云Stack解决方案 2018年云栖大会马云提出的数据科学时代&#xff08;Data technology&#xff09;&#xff0c;相较于传统信息时代&#xff0c;技术的变更主要集中在过去我们更加看重的是传输&#xff0c;也就是传统的网络建设&#xff0c;随着目前国家网络建设的完…

Redis的主从集群以及哨兵机制学习总结

Redis的主从集群以及哨兵机制 为什么要使用主从集群&#xff1f;部署主从集群主从集群怎么同步数据&#xff1f;数据同步的方式和时机实例查看主从数据同步原理增量同步潜在的问题主从集群的优化 主节点宕机怎么办&#xff1f;哨兵机制 为什么要使用主从集群&#xff1f; 我们…

【机器学习】机器学习的基本分类-强化学习(Reinforcement Learning, RL)

强化学习&#xff08;Reinforcement Learning, RL&#xff09;是一种基于试错的方法&#xff0c;旨在通过智能体与环境的交互&#xff0c;学习能够最大化累积奖励的策略。以下是强化学习的详细介绍。 强化学习的核心概念 智能体&#xff08;Agent&#xff09; 执行动作并与环境…

行政管理痛点解决方案:OA系统助力企业提效减负

作为企业行政管理的中枢&#xff0c;行政部门承担着企业运转的核心职责。从办公物资采购到会议室安排&#xff0c;从流程审批到企业文化建设&#xff0c;行政工作繁杂且细致。然而&#xff0c;在传统管理模式下&#xff0c;行政工作往往面临以下痛点&#xff1a; 流程繁琐&…

Flask内存马学习

文章目录 参考文章环境搭建before_request方法构造内存马after_request方法构造内存马errorhandler方式构造内存马add_url_rule方式构造内存马 参考文章 https://www.mewo.cc/archives/10/ https://www.cnblogs.com/gxngxngxn/p/18181936 前人栽树, 后人乘凉 大佬们太nb了, …

小红书关键词搜索采集 | AI改写 | 无水印下载 | 多维表格 | 采集同步飞书

小红书关键词搜索采集 | AI改写 | 无水印下载 | 多维表格 | 采集同步飞书 一、下载影刀&#xff1a; https://www.winrobot360.com/share/activity?inviteUserUuid595634970300317698 二、加入应用市场 https://www.yingdao.com/share/accede/?inviteKeyb2d3f22a-fd6c-4a…