RabbitMQ工作模式-主题模式

主题模式

官方文档参考:https://www.rabbitmq.com/tutorials/tutorial-five-python.html

使用topic类型的交换器,队列绑定到交换器、bingingKey时使用通配符,交换器将消息路由转发到具体队列时,会根据消息routingKey模糊匹配,比较灵活。

在Direct类型的交换器做到了根据日志级别的不同,将消息发送给了不同队列的。

这里再加入一个需求,不仅想根据日志级别进行划分,还想根据日志的来源分日志,如何来做呢?

使用topic类型的交换器, routingKey就不能随便写了,它必须是点分单词,单词可以随便写,一般按消息的特征,该点分单词字符串最长255字节。

bindingKey也必须是这种形式。top类型的交换器背后原理跟direct类型类似只要队列的bingingkey的值与消息的routingKey的匹配,队列就可以收到该消息。有两个不同

  1. * (star)匹配一个单词。
  2. # 匹配0到多个单词。

在这里插入图片描述

上报的数据的RoutingKey,格式如下

地区.业务.日志级别 如shanghai.busi.INFO 、 hangzhou.line.ERROR

生产者

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.nio.charset.StandardCharsets;
import java.util.concurrent.ThreadLocalRandom;public class Product {private static final String[] ADDRESS_ARRAYS = {"shanghai", "suzhou", "hangzhou"};private static final String[] BUSI_NAMES = {"product", "user", "schedule"};private static final String[] LOG_LEVEL = {"ERROR", "WARN", "INFO"};public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 定义交换机channel.exchangeDeclare("ex.busi.topic",BuiltinExchangeType.TOPIC,// 持久化标识false,// 是否自动删除false,// 属性信息null);for (int i = 0; i < 50; i++) {String level = LOG_LEVEL[ThreadLocalRandom.current().nextInt(0, LOG_LEVEL.length)];String busiName = BUSI_NAMES[ThreadLocalRandom.current().nextInt(0, BUSI_NAMES.length)];String address =ADDRESS_ARRAYS[ThreadLocalRandom.current().nextInt(0, ADDRESS_ARRAYS.length)];String routingKey = address + "." + busiName + "." + level;String pushMsg = "地址[" + address + "],业务[" + busiName + "],级别[" + level + "],消息:" + i;channel.basicPublish("ex.busi.topic", routingKey, null, pushMsg.getBytes(StandardCharsets.UTF_8));}}
}

上海的消费者

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ThreadLocalRandom;/*** 上海地区的消费都,获取所有的上海信息*/
public class ShangHaiConsumer {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 定义交换机channel.exchangeDeclare("ex.busi.topic",BuiltinExchangeType.TOPIC,// 持久化标识false,// 是否自动删除false,// 属性信息null);// 定义队列channel.queueDeclare("shanghai.all.log",// 持久化存储true,// 排他false,// 自动删除true,// 属性null);// 将队列与交换机进行绑定channel.queueBind("shanghai.all.log", "ex.busi.topic", "shanghai.#", null);channel.basicConsume("shanghai.all.log",(consumerTag, message) -> {String dataMsg = new String(message.getBody(), StandardCharsets.UTF_8);System.out.println("shanghai consumer 收到数据:" + dataMsg);},consumerTag -> {});}
}

所有错误日志的消费者

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;public class ErrorLogConsumer {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 定义交换机channel.exchangeDeclare("ex.busi.topic",BuiltinExchangeType.TOPIC,// 持久化标识false,// 是否自动删除false,// 属性信息null);// 定义队列channel.queueDeclare("log.all.error",// 持久化存储true,// 排他false,// 自动删除true,// 属性null);// 将队列与交换机进行绑定channel.queueBind("log.all.error", "ex.busi.topic", "#.ERROR", null);channel.basicConsume("log.all.error",(consumerTag, message) -> {String dataMsg = new String(message.getBody(), StandardCharsets.UTF_8);System.out.println("错误日志 consumer 收到数据:" + dataMsg);},consumerTag -> {});}
}

苏州用户的消费者

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;public class SuZhouUserConsumer {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 定义交换机channel.exchangeDeclare("ex.busi.topic",BuiltinExchangeType.TOPIC,// 持久化标识false,// 是否自动删除false,// 属性信息null);// 定义队列channel.queueDeclare("suzhou.user.consumer",// 持久化存储true,// 排他false,// 自动删除true,// 属性null);// 将队列与交换机进行绑定channel.queueBind("suzhou.user.consumer", "ex.busi.topic", "suzhou.user.*", null);channel.basicConsume("suzhou.user.consumer",(consumerTag, message) -> {String dataMsg = new String(message.getBody(), StandardCharsets.UTF_8);System.out.println("suzhou consumer 收到数据:" + dataMsg);},consumerTag -> {});}
}

首先启动三个消费者,查看队列和交换器的信息

[root@nullnull-os ~]# rabbitmqctl list_exchanges --formatter pretty_table
Listing exchanges for vhost / ...
┌────────────────────┬─────────┐
│ name               │ type    │
├────────────────────┼─────────┤
│ amq.fanout         │ fanout  │
├────────────────────┼─────────┤
│ ex.busi.topic      │ topic   │
├────────────────────┼─────────┤
│ amq.rabbitmq.trace │ topic   │
├────────────────────┼─────────┤
│ amq.headers        │ headers │
├────────────────────┼─────────┤
│ amq.topic          │ topic   │
├────────────────────┼─────────┤
│ amq.direct         │ direct  │
├────────────────────┼─────────┤
│                    │ direct  │
├────────────────────┼─────────┤
│ ex.routing         │ direct  │
├────────────────────┼─────────┤
│ amq.match          │ headers │
└────────────────────┴─────────┘
[root@nullnull-os ~]# rabbitmqctl list_bindings --formatter pretty_table
Listing bindings for vhost /...
┌───────────────┬─────────────┬──────────────────────┬──────────────────┬──────────────────────┬───────────┐
│ source_name   │ source_kind │ destination_name     │ destination_kind │ routing_key          │ arguments │
├───────────────┼─────────────┼──────────────────────┼──────────────────┼──────────────────────┼───────────┤
│               │ exchange    │ suzhou.user.consumer │ queue            │ suzhou.user.consumer │           │
├───────────────┼─────────────┼──────────────────────┼──────────────────┼──────────────────────┼───────────┤
│               │ exchange    │ shanghai.all.log     │ queue            │ shanghai.all.log     │           │
├───────────────┼─────────────┼──────────────────────┼──────────────────┼──────────────────────┼───────────┤
│               │ exchange    │ log.all.error        │ queue            │ log.all.error        │           │
├───────────────┼─────────────┼──────────────────────┼──────────────────┼──────────────────────┼───────────┤
│ ex.busi.topic │ exchange    │ log.all.error        │ queue            │ #.ERROR              │           │
├───────────────┼─────────────┼──────────────────────┼──────────────────┼──────────────────────┼───────────┤
│ ex.busi.topic │ exchange    │ shanghai.all.log     │ queue            │ shanghai.#           │           │
├───────────────┼─────────────┼──────────────────────┼──────────────────┼──────────────────────┼───────────┤
│ ex.busi.topic │ exchange    │ suzhou.user.consumer │ queue            │ suzhou.user.*        │           │
└───────────────┴─────────────┴──────────────────────┴──────────────────┴──────────────────────┴───────────┘
[root@nullnull-os ~]# 

观察可以发现,此队列与消息的绑定已经成功。接下使用生产者发送消息。观察控制台输出:

错误日志消费者

错误日志 consumer 收到数据:地址[suzhou],业务[schedule],级别[ERROR],消息:6
错误日志 consumer 收到数据:地址[suzhou],业务[product],级别[ERROR],消息:8
错误日志 consumer 收到数据:地址[shanghai],业务[product],级别[ERROR],消息:10
错误日志 consumer 收到数据:地址[shanghai],业务[schedule],级别[ERROR],消息:12
错误日志 consumer 收到数据:地址[suzhou],业务[schedule],级别[ERROR],消息:15
错误日志 consumer 收到数据:地址[hangzhou],业务[user],级别[ERROR],消息:16
错误日志 consumer 收到数据:地址[shanghai],业务[schedule],级别[ERROR],消息:17
错误日志 consumer 收到数据:地址[shanghai],业务[product],级别[ERROR],消息:18
错误日志 consumer 收到数据:地址[hangzhou],业务[user],级别[ERROR],消息:21
错误日志 consumer 收到数据:地址[shanghai],业务[product],级别[ERROR],消息:22
错误日志 consumer 收到数据:地址[shanghai],业务[product],级别[ERROR],消息:24
错误日志 consumer 收到数据:地址[hangzhou],业务[product],级别[ERROR],消息:28
错误日志 consumer 收到数据:地址[suzhou],业务[schedule],级别[ERROR],消息:33
错误日志 consumer 收到数据:地址[hangzhou],业务[schedule],级别[ERROR],消息:39
错误日志 consumer 收到数据:地址[suzhou],业务[user],级别[ERROR],消息:40
错误日志 consumer 收到数据:地址[suzhou],业务[user],级别[ERROR],消息:43
错误日志 consumer 收到数据:地址[shanghai],业务[schedule],级别[ERROR],消息:46

上海地区的消费者

shanghai consumer 收到数据:地址[shanghai],业务[schedule],级别[WARN],消息:0
shanghai consumer 收到数据:地址[shanghai],业务[user],级别[INFO],消息:1
shanghai consumer 收到数据:地址[shanghai],业务[schedule],级别[INFO],消息:2
shanghai consumer 收到数据:地址[shanghai],业务[schedule],级别[WARN],消息:5
shanghai consumer 收到数据:地址[shanghai],业务[product],级别[ERROR],消息:10
shanghai consumer 收到数据:地址[shanghai],业务[schedule],级别[ERROR],消息:12
shanghai consumer 收到数据:地址[shanghai],业务[schedule],级别[ERROR],消息:17
shanghai consumer 收到数据:地址[shanghai],业务[product],级别[ERROR],消息:18
shanghai consumer 收到数据:地址[shanghai],业务[product],级别[ERROR],消息:22
shanghai consumer 收到数据:地址[shanghai],业务[product],级别[ERROR],消息:24
shanghai consumer 收到数据:地址[shanghai],业务[user],级别[INFO],消息:32
shanghai consumer 收到数据:地址[shanghai],业务[schedule],级别[INFO],消息:35
shanghai consumer 收到数据:地址[shanghai],业务[product],级别[INFO],消息:38
shanghai consumer 收到数据:地址[shanghai],业务[schedule],级别[WARN],消息:41
shanghai consumer 收到数据:地址[shanghai],业务[schedule],级别[ERROR],消息:46
shanghai consumer 收到数据:地址[shanghai],业务[user],级别[INFO],消息:48

苏州用户的消费者

suzhou consumer 收到数据:地址[suzhou],业务[user],级别[WARN],消息:37
suzhou consumer 收到数据:地址[suzhou],业务[user],级别[ERROR],消息:40
suzhou consumer 收到数据:地址[suzhou],业务[user],级别[ERROR],消息:43
suzhou consumer 收到数据:地址[suzhou],业务[user],级别[WARN],消息:45

至此topic模式操作成功。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/117486.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

NLP(六十七)BERT模型训练后动态量化(PTDQ)

本文将会介绍BERT模型训练后动态量化&#xff08;Post Training Dynamic Quantization&#xff0c;PTDQ&#xff09;。 量化 在深度学习中&#xff0c;量化&#xff08;Quantization&#xff09;指的是使用更少的bit来存储原本以浮点数存储的tensor&#xff0c;以及使用更少的…

Java泛型机制

✅作者简介&#xff1a;大家好&#xff0c;我是Leo&#xff0c;热爱Java后端开发者&#xff0c;一个想要与大家共同进步的男人&#x1f609;&#x1f609; &#x1f34e;个人主页&#xff1a;Leo的博客 &#x1f49e;当前专栏&#xff1a;每天一个知识点 ✨特色专栏&#xff1a…

【半监督医学图像分割】2022-MedIA-UWI

【半监督医学图像分割】2022-MedIA-UWI 论文题目&#xff1a;Semi-supervise d me dical image segmentation via a triple d-uncertainty guided mean teacher model with contrastive learning 中文题目&#xff1a;基于对比学习的三维不确定性指导平均教师模型的半监督图像分…

“新KG”视点 | 陈华钧——大模型时代的知识处理:新机遇与新挑战

OpenKG 大模型专辑 导读 知识图谱和大型语言模型都是用来表示和处理知识的手段。大模型补足了理解语言的能力&#xff0c;知识图谱则丰富了表示知识的方式&#xff0c;两者的深度结合必将为人工智能提供更为全面、可靠、可控的知识处理方法。在这一背景下&#xff0c;OpenKG组织…

微机原理 || 第2次测试:汇编指令(加减乘除运算,XOR,PUSH,POP,寻址方式,物理地址公式,状态标志位)(测试题+手写解析)

&#xff08;一&#xff09;测试题目&#xff1a; 1.数[X]补1111,1110B&#xff0c;则其真值为 2.在I/O指令中,可用于表示端口地址的寄存器 3. MOV AX,[BXSl]的指令中&#xff0c;源操作数的物理地址应该如何计算 4.执行以下两条指令后&#xff0c;标志寄存器FLAGS的六个状态…

Cmake qt ,vtkDataArray.cxx.obj: File too big

解决方法&#xff1a; Qt4 在pro 加入“QMAKE_CXXFLAGS -BigObj” 可以解决 Qt5 在网上用“-Wa,-mbig-obj” 不能解决&#xff0c;最后通过“QMAKE_CXXFLAGS -Ofast -flto”解决问题。 Qt4 在pro 加入“QMAKE_CXXFLAGS -BigObj” 可以解决Qt5 在网上用“-Wa,-mbig-obj” …

wxWidgets从空项目开始Hello World

前文回顾 接上篇&#xff0c;已经是在CodeBlocks20.03配置了wxWidgets3.0.5&#xff0c;并且能够通过项目创建导航创建一个新的工程&#xff0c;并且成功运行。 那么上一个是通过CodeBlocks的模板创建的&#xff0c;一进去就已经是2个头文件2个cpp文件&#xff0c;总是感觉缺…

OAuth2.0二 JWT以及Oauth2实现SSO

一 JWT 1.1 什么是JWT JSON Web Token&#xff08;JWT&#xff09;是一个开放的行业标准&#xff08;RFC 7519&#xff09;&#xff0c;它定义了一种简介的、自包含的协议格式&#xff0c;用于在通信双方传递json对象&#xff0c;传递的信息经过数字签名可以被验证和信任。JW…

python web 开发与 Node.js + Express 创建web服务器入门

目录 1. Node.js Express 框架简介 2 Node.js Express 和 Python 创建web服务器的对比 3 使用 Node.js Express 创建web服务器示例 3.1 Node.js Express 下载安装 3.2 使用Node.js Express 创建 web服务器流程 1. Node.js Express 框架简介 Node.js Express 是一种…

机器学习---决策树的划分依据(熵、信息增益、信息增益率、基尼值和基尼指数)

1. 熵 物理学上&#xff0c;熵 Entropy 是“混乱”程度的量度。 系统越有序&#xff0c;熵值越低&#xff1b;系统越混乱或者分散&#xff0c;熵值越⾼。 1948年⾹农提出了信息熵&#xff08;Entropy&#xff09;的概念。 从信息的完整性上进⾏的描述&#xff1a;当系统的有序…

myspl使用指南

mysql数据库 使用命令行工具连接数据库 mysql -h -u 用户名 -p -u表示后面是用户名-p表示后面是密码-h表示后面是主机名&#xff0c;登录当前设备可省略。 如我们要登录本机用户名为root&#xff0c;密码为123456的账户&#xff1a; mysql -u root -p按回车&#xff0c;然后…

大数据组件-Flume集群环境的启动与验证

&#x1f947;&#x1f947;【大数据学习记录篇】-持续更新中~&#x1f947;&#x1f947; 个人主页&#xff1a;beixi 本文章收录于专栏&#xff08;点击传送&#xff09;&#xff1a;【大数据学习】 &#x1f493;&#x1f493;持续更新中&#xff0c;感谢各位前辈朋友们支持…

gitlab-rake gitlab:backup:create 执行报错 Errno::ENOSPC: No space left on device

gitlab仓库备份执行 gitlab-rake gitlab:backup:create报错如下&#xff1a; 问题分析&#xff1a;存储备份的空间满 解决方法&#xff1a; 方法1&#xff1a;清理存放路径&#xff0c;删除不需要文件&#xff0c;释放空间。 方法2&#xff1a;创建一个根目录的挂载点&#x…

八一参考文献:[八一新书]许少辉.乡村振兴战略下传统村落文化旅游设计[M]北京:中国建筑出版传媒,2022.

八一参考文献&#xff1a;&#xff3b;八一新书&#xff3d;许少辉&#xff0e;乡村振兴战略下传统村落文化旅游设计&#xff3b;&#xff2d;&#xff3d;北京&#xff1a;中国建筑出版传媒&#xff0c;&#xff12;&#xff10;&#xff12;&#xff12;&#xff0e;

机器视觉工程师,有哪几种类型

1.光学实验室&#xff08;打光机器视觉工程师&#xff0c;一般此职位&#xff0c;要求有光学学历的背景最佳&#xff09; 2.机器视觉算法开发工程师&#xff08;此职位国内稀缺&#xff09;3.机器视觉工程师/机器视觉开发工程师&#xff08;MV工程师/MV工程师&#xff09;&…

常见项目管理中npm包操作总结

前言 我们在日常工作中&#xff0c;可能需要下载包、创建包、发布包等等。本篇推文将记录日常项目中关于npm包的操作。 引用包 npm仓库公开的包我们都可以通过npm install的命令进行引用下载。 而我们开发的业务公共组件需要在公司内部项目公共引用&#xff0c;而不希望公开为外…

Android——基本控件(下)(二十)

1. 树型组件&#xff1a;ExpandableListView 1.1 知识点 &#xff08;1&#xff09;掌握树型组件的定义&#xff1b; &#xff08;2&#xff09;可以使用事件对树操作进行监听。 2. 具体内容 既然这个组件可以完成列表的功能&#xff0c;肯定就需要一个可以操作的数据&…

el-select 选择一条数据后,把其余数据带过来

1. 案例&#xff1a; ps: 票号是下拉框选择&#xff0c;风险分类、场站名称以及开始时间是选择【票号】后带过来的。 2. 思路: 使用官网上给的方法&#xff0c;选择之后&#xff0c;触发change方法从而给其余字段赋值 3. 代码 <el-form-itemlabel"票号&#xff1a;&…

(leetcode1761一个图中连通三元组的最小度数,暴力+剪枝)-------------------Java实现

&#xff08;leetcode1761一个图中连通三元组的最小度数&#xff0c;暴力剪枝&#xff09;-------------------Java实现 题目表述 给你一个无向图&#xff0c;整数 n 表示图中节点的数目&#xff0c;edges 数组表示图中的边&#xff0c;其中 edges[i] [ui, vi] &#xff0c;…

2023_Spark_实验四:SCALA基础

一、在IDEA中执行以下语句 或者用windows徽标R 输入cmd 进入命令提示符 输入scala直接进入编写界面 1、Scala的常用数据类型 注意&#xff1a;在Scala中&#xff0c;任何数据都是对象。例如&#xff1a; scala> 1 res0: Int 1scala> 1.toString res1: String 1scala…