RabbitMQ中的Topic模式

在现代分布式系统中,消息队列(Message Queue)是实现异步通信、解耦系统组件的重要工具。RabbitMQ 是一个广泛使用的开源消息代理,支持多种消息传递模式,其中 Topic 模式 是一种灵活且强大的模式,允许生产者和消费者通过通配符匹配的方式进行消息传递。本文将深入探讨 RabbitMQ 中 Topic 模式的工作原理,并通过 Java 代码示例展示其实现方式。


1. Topic 模式的工作原理

1.1 Topic 模式概述

在 RabbitMQ 中,Topic 模式是基于 交换机类型为 topic 的一种消息传递模式。与 Direct 模式(精确匹配)和 Fanout 模式(广播)不同,Topic 模式允许生产者发送消息到特定的交换机,并根据消息的 路由键(Routing Key)绑定键(Binding Key) 的匹配规则,将消息分发到相应的队列。

在这里插入图片描述

1.2 关键概念

1.2.1 交换机(Exchange)

在 Topic 模式中,消息不会直接发送到队列,而是发送到一个 topic 类型的交换机。交换机根据消息的路由键和队列的绑定键进行匹配,决定将消息分发到哪些队列。

1.2.2 路由键(Routing Key)

路由键是生产者在发送消息时指定的字符串,用于描述消息的主题或类别。路由键通常由多个单词组成,单词之间用点号(.)分隔,例如:user.logs.info

1.2.3 绑定键(Binding Key)

绑定键是消费者在绑定队列到交换机时指定的字符串,用于描述队列感兴趣的主题或类别。绑定键的格式与路由键相同,但支持通配符匹配。

1.2.4 通配符

Topic 模式支持两种通配符:

  • *(星号):匹配一个单词。
  • #(井号):匹配零个或多个单词。

例如:

  • *.logs.*:匹配所有包含 logs 的消息,如 user.logs.infosystem.logs.error
  • #.error:匹配所有以 error 结尾的消息,如 system.logs.erroruser.error

1.3 消息分发流程

  1. 生产者发送消息到 topic 类型的交换机,并指定路由键。
  2. 交换机根据路由键和队列的绑定键进行匹配。
  3. 如果匹配成功,消息会被分发到相应的队列。
  4. 消费者从队列中消费消息。

2. Topic 模式的 Java 代码实现

下面通过一个简单的 Java 代码示例,展示如何在 RabbitMQ 中实现 Topic 模式。

2.1 环境准备

在开始之前,请确保已经安装并运行了 RabbitMQ 服务,并且安装了 RabbitMQ 的 Java 客户端库。可以通过 Maven 引入依赖:

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.20.0</version>
</dependency>

2.2 生产者代码

生产者负责发送消息到 topic 交换机,并指定路由键。

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class TopicProducer {private static final String EXCHANGE_NAME = "topic_logs";public static void main(String[] args) throws Exception {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");// 创建连接和通道try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 声明一个 topic 类型的交换机channel.exchangeDeclare(EXCHANGE_NAME, "topic");// 定义路由键和消息内容String routingKey = "user.logs.info"; // 可以修改为其他路由键String message = "This is a log message from user.";// 发送消息到交换机channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + routingKey + "': '" + message + "'");}}
}

2.3 消费者代码

消费者负责从队列中接收消息,并根据绑定键过滤感兴趣的消息。

import com.rabbitmq.client.*;public class TopicConsumer {private static final String EXCHANGE_NAME = "topic_logs";public static void main(String[] args) throws Exception {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");// 创建连接和通道Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 声明一个 topic 类型的交换机channel.exchangeDeclare(EXCHANGE_NAME, "topic");// 创建一个临时队列,并绑定到交换机String queueName = channel.queueDeclare().getQueue();String bindingKey = "user.#"; // 可以修改为其他绑定键channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");// 创建消费者并开始消费消息DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "': '" + message + "'");};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });}
}

3. 运行示例

3.1 启动 RabbitMQ 服务

确保 RabbitMQ 服务已经启动并运行。如果使用 Docker,可以通过以下命令启动 RabbitMQ:

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

3.2 运行生产者和消费者

  1. 运行 TopicProducer 类,发送消息到交换机。
  2. 运行 TopicConsumer 类,接收并处理消息。

3.3 测试不同的路由键和绑定键

  • 修改生产者的 routingKey,例如:system.logs.error
  • 修改消费者的 bindingKey,例如:#.error*.logs.*

观察消息的分发情况,验证 Topic 模式的通配符匹配功能。

在这里插入图片描述


4. 总结

RabbitMQ 的 Topic 模式通过通配符匹配的方式,提供了灵活的消息分发机制,适用于复杂的场景。通过本文的介绍和代码示例,读者可以深入理解 Topic 模式的工作原理,并掌握如何在 Java 中实现 Topic 模式。

在实际应用中,Topic 模式可以用于日志收集、事件驱动架构等场景,帮助开发者构建高效、可扩展的分布式系统。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/495257.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Scala_【1】概述

第一章 语言特点环境搭建(Windows)idea编写scalaHelloWorld注意事项 Scala是一门以Java虚拟机&#xff08;JVM&#xff09;为运行环境并将面向对象和函数式编程的最佳特性结合在一起的静态类型编程语言 语言特点 Scala是一门多范式的编程语言&#xff0c;Scala支持面向对象和函…

【时间之外】IT人求职和创业应知【74】-运维机器人

目录 OpenAI最强推理模型o3发布&#xff0c;AGI测试能力暴涨 英伟达宣布收购以色列AI初创企业Runai 汤姆猫首款AI机器人产品明日发售 心勿贪&#xff0c;贵知足。 感谢所有打开这个页面的朋友。人生不如意&#xff0c;开越野车去撒野&#xff0c;会害了自己&#xff0c;不如…

移动端网页兼容适配方案小结

文章目录 前言一、使用viewport配置&#xff0c;确保完美视口二、使用rem实现弹性布局三、CSS媒体查询处理不同尺寸四、1px边框问题解决方案五、安全区域适配六、图片适配方案七、横屏适配处理八、软键盘弹出处理总结 前言 移动端适配一直是前端开发中的重点难题&#xff0c;分…

教培行业数字化未来:一站​式开发在线教育系统源码与网校APP详解

时下&#xff0c;如何有效地搭建一个全面、稳定、易于管理的在线教育系统&#xff0c;已成为越来越多教育机构关注的焦点。本篇文章&#xff0c;小编将深入探讨如何一站式开发在线教育系统源码与网校APP&#xff0c;为教培行业的数字化未来提供技术支持。 一、开发在线教育系统…

【C++基础】09、结构体

一、结构体(struct) C/C 数组允许定义可存储相同类型数据项的变量&#xff0c;但是结构体是 C 中另一种用户自定义的可用的数据类型&#xff0c;它允许存储不同类型的数据项。 结构体用于表示一条记录&#xff0c;假设现在想要跟踪图书馆中书本的动态&#xff0c;可能需要跟踪每…

android sqlite 数据库简单封装示例(java)

sqlite 数据库简单封装示例&#xff0c;使用记事本数据库表进行示例。 首先继承SQLiteOpenHelper 使用sql语句进行创建一张表。 public class noteDBHelper extends SQLiteOpenHelper {public noteDBHelper(Context context, String name, SQLiteDatabase.CursorFactory fact…

在vscode的ESP-IDF中使用自定义组件

以hello-world为例&#xff0c;演示步骤和注意事项 1、新建ESP-IDF项目 选择模板 从hello-world模板创建 2、打开项目 3、编译结果没错 正在执行任务: /home/azhu/.espressif/python_env/idf5.1_py3.10_env/bin/python /home/azhu/esp/v5.1/esp-idf/tools/idf_size.py /home…

golangci-lint安装与Goland集成

golangci-lint安装与Goland集成 1.golangci-lint概述2.golangci-lint安装3.Goland 中集成 golangci-lint4.golangci-lint 的使用5.排除代码检查 1.golangci-lint概述 golangci-lint是用于go语言的代码静态检查工具集 官网地址&#xff1a;golangci-lint 特性&#xff1a; 快…

一次成功流水账-RBDL库的安装与验证

1.安装 2.编写CMakeLists.txt文件并验证例子 1.安装 从git源码下载&#xff0c;安装依赖&#xff0c;cmake编译并安装 安装依赖库 sudo apt update sudo apt upgrade ​ sudo apt install cmake ​ sudo apt install libeigen3-dev ​ sudo apt-get install build-essentia…

【JavaEE】Spring Boot 项目创建

目录 一、idea创建Spring Boot项目1.1 创建过程1.2 依赖下载问题 二、网页创建Spring Boot项目三、目录介绍四、运⾏项⽬&#xff0c;看是否创建成功4.1 请求响应流程分析 五、常见报错5.1 Whitelabel Error Page4.1.1 注解写错&#xff1a;5.1.2 500 ⽆法访问此⽹站 六、状态码…

瑞吉外卖项目学习笔记(七)新增菜品、(批量)删除菜品

瑞吉外卖项目学习笔记(一)准备工作、员工登录功能实现 瑞吉外卖项目学习笔记(二)Swagger、logback、表单校验和参数打印功能的实现 瑞吉外卖项目学习笔记(三)过滤器实现登录校验、添加员工、分页查询员工信息 瑞吉外卖项目学习笔记(四)TableField(fill FieldFill.INSERT)公共字…

TCP/IP 模型中,网络层对 IP 地址的分配与路由选择

TCP/IP 模型中&#xff0c;网络层对 IP 地址的分配与路由选择 一. IP 地址的分配1.1 IP 地址的结构与分类1.2 IP 地址的分配方式 二. 路由选择2.3 路由协议2.4 路由表的结构2.5 路由选择的算法2.6 默认路由与静态路由 三. 网络层的 IP 地址分配与路由选择总结 前言 这是我在这个…

WebRTC搭建与应用(五)-Coturn踩坑记

WebRTC搭建与应用(五)-Coturn踩坑记 近期由于项目需要在研究前端WebGL渲染转为云渲染&#xff0c;借此机会对WebRTC等有了初步了解&#xff0c;在此记录一下&#xff0c;以防遗忘。 第五章 WebRTC搭建与应用(五)-Coturn踩坑记 文章目录 WebRTC搭建与应用(五)-Coturn踩坑记前…

亚信安全举办“判大势 悟思想 强实践”主题党日活动

为深入学习和贯彻党的二十届三中全会精神&#xff0c;近日&#xff0c;亚信安全举办了 “学习贯彻党的二十届三中全会精神——‘判大势 悟思想 强实践’党日活动”&#xff0c;并取得圆满成功。 本次活动特邀南京市委宣讲团成员、南京市委党校市情研究中心主任王辉龙教授出席。…

EsChatPro 接入国内 DeepSeek 大模型

EsChatPro 接入国内 DeepSeek 大模型 前言 上一篇文章 我们讲了 EsChatPro 如何在本地安装运行&#xff0c;接下来给大家带来接入 deepseek 大模型的教程&#xff0c;实现 AI对话 功能 详见&#xff1a;EsChatPro本地开发运行指南 前置准备 首先我们打开 deepseek 的官网&…

Linux挖矿程序排查

一、背景 我们收到一个阿里云安全告警&#xff0c;内容是服务器可能存在挖矿程序。 二、杀死挖矿程序 2.1 找到可疑服务器进程 #1.输入top命令&#xff0c;输入shift P会按照cpu的使用率大小从大到小进行排序&#xff0c;cpu使用率高的就是可疑进程。 top #2.查看运行该进程…

flask基础

from flask import Flask, requestapp Flask(__name__)# app.route(/) # def hello_world(): # put applications code here # return Hello World!app.route(/) # 路由 当用户访问特定 URL 时&#xff0c;Flask 会调用对应的视图函数来处理请求 def index():return …

OpenCV学习——图像融合

import cv2 as cv import cv2 as cvbg cv.imread("test_images/background.jpg", cv.IMREAD_COLOR) fg cv.imread("test_images/forground.png", cv.IMREAD_COLOR)# 打印图片尺寸 print(bg.shape) print(fg.shape)resize_size (1200, 800)bg cv.resize…

Spring Boot 项目创建

创建一个新项目&#xff1a; 打开 Spring Initializr 网址&#xff1a;https://start.spring.io/ &#xff0c;然后创建一个新项目&#xff1a; springboot3.3.5_jdk17&#xff1a; Project&#xff08;Maven&#xff09;编程语言&#xff08;Java 17&#xff09;Spring Boo…

GTID下复制问题和解决

环境介绍 数据库1主2从&#xff0c;mysql版本是v5.19 表结构 一、主库新增记录&#xff0c;从库提示主键冲突 模拟故障 1&#xff0c; master上关闭 sql_log_bin,删除id 103 后打开 2&#xff0c; 确认此时从库有id103,主库没有 3&#xff0c; master insert id103 主从异常…