RabbitMQ工作模式-路由模式

官方文档参考:https://www.rabbitmq.com/tutorials/tutorial-four-python.html
在这里插入图片描述

使用direct类型的Exchange,发N条消息并使用不同的routingKey,消费者定义队列并将队列routingKey、Exchange绑定。此时使用direct模式Exchange必须要routingKey完成匹配的情况下消息才会转发到对应的队列中被消费。

样例使用日志分发为样例。即按日志不同的级别,分发到不同的队列。每个队列只处理自己的对应的级别日志。

创建生产者

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.nio.charset.StandardCharsets;
import java.util.concurrent.ThreadLocalRandom;public class Product {private static final String[] LOG_LEVEL = {"ERROR", "WARN", "INFO"};public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 声明交换机,交换器和消息队列的绑定不需要在这里处理。channel.exchangeDeclare("ex.routing",BuiltinExchangeType.DIRECT,// 持久的标识false,// 自动删除的标识false,// 属性null);for (int i = 0; i < 30; i++) {String level = LOG_LEVEL[ThreadLocalRandom.current().nextInt(0, LOG_LEVEL.length)];String dataMsg = "[" + level + "] 消息发送 :" + i;// 发送消息channel.basicPublish("ex.routing", level, null, dataMsg.getBytes(StandardCharsets.UTF_8));}}
}

创建ERROR的消费者

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.nio.charset.StandardCharsets;public class ErrorConsumer {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 声明队列并绑定channel.exchangeDeclare("ex.routing",BuiltinExchangeType.DIRECT,// 持久的标识false,// 自动删除的标识false,// 属性null);// 此也可以声明为临时队列,但是如果消息很重要,不要声明临时队列。channel.queueDeclare("log.error",// 永久false,// 排他false,// 自动删除true,// 属性null);//消费者享有绑定到交换器的权力。channel.queueBind("log.error", "ex.routing", "ERROR");// 通过chanel消费消息channel.basicConsume("log.error",(consumerTag, message) -> {System.out.println("ERROR收到的消息:" + new String(message.getBody(), StandardCharsets.UTF_8));},consumerTag -> {});}
}

创建INFO级的消费者

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;public class InfoConsumer {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 声明队列并绑定channel.exchangeDeclare("ex.routing",BuiltinExchangeType.DIRECT,// 持久的标识false,// 自动删除的标识true,// 属性null);// 此也可以声明为临时队列,但是如果消息很重要,不要声明临时队列。channel.queueDeclare("log.info",// 永久false,// 排他false,// 自动删除false,// 属性null);//消费者享有绑定到交换器的权力。channel.queueBind("log.info", "ex.routing", "INFO");// 通过chanel消费消息channel.basicConsume("log.info",(consumerTag, message) -> {System.out.println("INFO收到的消息:" + new String(message.getBody(), StandardCharsets.UTF_8));},consumerTag -> {});}
}

创建WARN级别的消息者

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;public class WarnConsumer {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 声明队列并绑定channel.exchangeDeclare("ex.routing",BuiltinExchangeType.DIRECT,// 持久的标识false,// 自动删除的标识false,// 属性null);// 此也可以声明为临时队列,但是如果消息很重要,不要声明临时队列。channel.queueDeclare("log.warn",// 永久false,// 排他false,// 自动删除true,// 属性null);//消费者享有绑定到交换器的权力。channel.queueBind("log.warn", "ex.routing", "WARN");// 通过chanel消费消息channel.basicConsume("log.warn",(consumerTag, message) -> {System.out.println("warn收到的消息:" + new String(message.getBody(), StandardCharsets.UTF_8));},consumerTag -> {});}
}

首先启动三个消费者:

查看队列及交换机情况

[root@nullnull-os ~]# rabbitmqctl list_exchanges --formatter pretty_table
Listing exchanges for vhost / ...
┌────────────────────┬─────────┐
│ name               │ type    │
├────────────────────┼─────────┤
│ amq.fanout         │ fanout  │
├────────────────────┼─────────┤
│ amq.rabbitmq.trace │ topic   │
├────────────────────┼─────────┤
│ amq.headers        │ headers │
├────────────────────┼─────────┤
│ amq.topic          │ topic   │
├────────────────────┼─────────┤
│ amq.direct         │ direct  │
├────────────────────┼─────────┤
│                    │ direct  │
├────────────────────┼─────────┤
│ ex.routing         │ direct  │
├────────────────────┼─────────┤
│ amq.match          │ headers │
└────────────────────┴─────────┘
[root@nullnull-os ~]# rabbitmqctl list_bindings --formatter pretty_table
Listing bindings for vhost /...
┌─────────────┬─────────────┬──────────────────┬──────────────────┬─────────────┬───────────┐
│ source_name │ source_kind │ destination_name │ destination_kind │ routing_key │ arguments │
├─────────────┼─────────────┼──────────────────┼──────────────────┼─────────────┼───────────┤
│             │ exchange    │ log.info         │ queue            │ log.info    │           │
├─────────────┼─────────────┼──────────────────┼──────────────────┼─────────────┼───────────┤
│             │ exchange    │ log.warn         │ queue            │ log.warn    │           │
├─────────────┼─────────────┼──────────────────┼──────────────────┼─────────────┼───────────┤
│             │ exchange    │ log.error        │ queue            │ log.error   │           │
├─────────────┼─────────────┼──────────────────┼──────────────────┼─────────────┼───────────┤
│ ex.routing  │ exchange    │ log.error        │ queue            │ ERROR       │           │
├─────────────┼─────────────┼──────────────────┼──────────────────┼─────────────┼───────────┤
│ ex.routing  │ exchange    │ log.info         │ queue            │ INFO        │           │
├─────────────┼─────────────┼──────────────────┼──────────────────┼─────────────┼───────────┤
│ ex.routing  │ exchange    │ log.warn         │ queue            │ WARN        │           │
└─────────────┴─────────────┴──────────────────┴──────────────────┴─────────────┴───────────┘
[root@nullnull-os ~]# 

可以发现,交换器ex.routing 绑定了三个队列log.errorlog.info log.warn并指定了路由键。

启动消费者,查看消息通否被正常消费。

ERROR的消费者控制台输出

ERROR收到的消息:[ERROR] 消息发送 :1
ERROR收到的消息:[ERROR] 消息发送 :2
ERROR收到的消息:[ERROR] 消息发送 :6
ERROR收到的消息:[ERROR] 消息发送 :8
ERROR收到的消息:[ERROR] 消息发送 :9
ERROR收到的消息:[ERROR] 消息发送 :11
ERROR收到的消息:[ERROR] 消息发送 :15
ERROR收到的消息:[ERROR] 消息发送 :16
ERROR收到的消息:[ERROR] 消息发送 :19
ERROR收到的消息:[ERROR] 消息发送 :20
ERROR收到的消息:[ERROR] 消息发送 :21
ERROR收到的消息:[ERROR] 消息发送 :23
ERROR收到的消息:[ERROR] 消息发送 :24
ERROR收到的消息:[ERROR] 消息发送 :27
ERROR收到的消息:[ERROR] 消息发送 :28

INFO的消费者控制台输出:

INFO收到的消息:[INFO] 消息发送 :0
INFO收到的消息:[INFO] 消息发送 :3
INFO收到的消息:[INFO] 消息发送 :4
INFO收到的消息:[INFO] 消息发送 :13
INFO收到的消息:[INFO] 消息发送 :14
INFO收到的消息:[INFO] 消息发送 :22
INFO收到的消息:[INFO] 消息发送 :25

WARN的消费都控制台输出:

warn收到的消息:[WARN] 消息发送 :5
warn收到的消息:[WARN] 消息发送 :7
warn收到的消息:[WARN] 消息发送 :10
warn收到的消息:[WARN] 消息发送 :12
warn收到的消息:[WARN] 消息发送 :17
warn收到的消息:[WARN] 消息发送 :18
warn收到的消息:[WARN] 消息发送 :26
warn收到的消息:[WARN] 消息发送 :29

至此,验证已经完成。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/113768.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

正中优配:什么叫融资融券

融资融券是股市中常见的一种买卖方法。融资是指投资者通过某些途径借到资金&#xff0c;用以购买股票。融券是指投资者借股票卖出&#xff0c;并承诺在未来某一时点将股票偿还。 融资融券的实质是一种杠杆买卖&#xff1a;投资者通过融资或融券&#xff0c;增加了自己的资金量…

【微服务部署】02-配置管理

文章目录 1.ConfigMap1.1 创建ConfigMap方式1.2 使用ConfigMap的方式1.3 ConfigMap使用要点建议 2 分布式配置中心解决方案2.1 什么时候选择配置中心2.2 Apollo配置中心系统的能力2.2.1 Apollo创建配置项目2.2.2 项目使用2.2.3 K8s中使用Apollo 1.ConfigMap ConfigMap是K8s提供…

深入理解Reactor模型的原理与应用

1、什么是Reactor模型 Reactor意思是“反应堆”&#xff0c;是一种事件驱动机制。 和普通函数调用的不同之处在于&#xff1a;应用程序不是主动的调用某个 API 完成处理&#xff0c;而是恰恰相反&#xff0c;Reactor逆置了事件处理流程&#xff0c;应用程序需要提供相应的接口并…

非科班菜鸡算法学习记录 | 代码随想录算法训练营第51天||309.最佳买卖股票时机含冷冻期 714.买卖股票的最佳时机含手续费 股票总结

309.最佳买卖股票时机含冷冻期 309. Best Time to Buy and Sell Stock with Cooldown(英文力扣连接) 知识点&#xff1a;动规 状态&#xff1a;看思路ok 思路&#xff1a; 四个状态需要想&#xff0c;持有/不持有且过了冷却期/当天卖/正处于冷却期&#xff1b; 具体看注释…

如何用Python爬虫持续监控商品价格

目录 持续监控商品价格步骤 1. 选择合适的爬虫库&#xff1a; 2. 选择目标网站&#xff1a; 3. 编写爬虫代码&#xff1a; 4. 设定监控频率&#xff1a; 5. 存储和展示数据&#xff1a; 6. 设置报警机制&#xff1a; 7. 异常处理和稳定性考虑&#xff1a; 可能会遇到的…

李跳跳下载-《告别广告困扰,让李跳跳助力打造清爽浏览体验》

大家好&#xff0c;&#x1f44b;今天我想向大家介绍一款非常好用的应用程序——李跳跳 App &#x1f680;。 随着智能手机的普及&#xff0c;应用程序已经成为了我们日常生活中必不可少的一部分。但是&#xff0c;随之而来的是各种各样的广告&#xff0c;这些广告不仅浪费我们…

交换机端口安全实验

文章目录 一、实验的背景与目的二、实验拓扑三、实验需求四、实验解法1. PC配置IP地址部分2. 在SW1上开启802.1X身份验证3. 创建一个用户身份验证的用户。用户名为wangdaye&#xff0c;密码为1234564.创建一个端口隔离组&#xff0c;实现三台PC无法互相访问 摘要&#xff1a; 本…

vue中使用window.open打开assets文件夹下的pdf文件

需求&#xff1a;系统有个操作手册&#xff0c;点击会在浏览器新开个窗口并打开pdf文件。这个pdf文件存储在本地assets文件夹中。 文件结构&#xff1a; 注&#xff1a;直接使用window.open(文件路径)不能打开&#xff0c;需要在vue.config.js中配置所需文件 引入图中红框中的…

Ruoyi微服务启动流程

1、执行sql 执行sql ry-quarty.sql ry_2023706.sql 到ry-cloud 数据库 2、下载nacos 修改配置文件 修改连接地址 启动nacos 看到下面的配置文件即为成功 修改配置文件里面的数据库连接信息 3、修改nacos 为单机启动 4、启动项目即可 nacos自取 链接: https://pan.baidu…

云上办公系统项目

云上办公系统项目 1、云上办公系统1.1、介绍1.2、核心技术1.3、开发环境说明1.4、产品展示后台前台 1.5、 个人总结 2、后端环境搭建2.1、建库建表2.2、创建Maven项目pom文件guigu-oa-parentcommoncommon-utilservice-utilmodelservice-oa 配置数据源、服务器端口号application…

Python钢筋混凝土结构计算.pdf-已知弯矩确定混凝土梁截面尺寸

计算原理 确定混凝土梁截面的合理尺寸通常需要考虑弯矩、受力要求和约束条件等多个因素。以下是一种常见的计算公式&#xff0c;用于基于已知弯矩确定混凝土梁截面的合理尺寸&#xff1a; 请注意&#xff0c;以上公式仅提供了一种常见的计算方法&#xff0c;并且具体的规范和设…

脚本掌控,Linux上实现Spring Boot(JAR包)开机自启

&#x1f60a; 作者&#xff1a; 一恍过去 &#x1f496; 主页&#xff1a; https://blog.csdn.net/zhuocailing3390 &#x1f38a; 社区&#xff1a; Java技术栈交流 &#x1f389; 主题&#xff1a; 脚本掌控&#xff0c;Linux上实现Spring Boot&#xff08;JAR包&#x…

城市内涝积水监测预警系统 yolov8

城市内涝积水监测预警系统通过yolov8网络深度学习框架&#xff0c;算法一旦识别到道路出现积水&#xff0c;城市内涝积水监测预警系统会立即发出预警信号。并及时通知相关人员。YOLO检测速度非常快。标准版本的YOLO可以每秒处理 45 张图像&#xff1b;YOLO的极速版本每秒可以处…

聚观早报|OpenAI宣布推出企业版ChatGPT;苹果公司开设8家新店

【聚观365】8月30日消息 OpenAI宣布推出企业版ChatGPT 比亚迪上半年净利润109.5亿元 歌尔股份上半年净利润4.22亿元 一起教育科技Q2营收6925万元 苹果公司今年开设8家新店 OpenAI宣布推出企业版ChatGPT 据外媒报道&#xff0c;当地时间周一&#xff0c;美国人工智能研究…

k8s的交付与部署案例操作

一 k8s的概念 1.1 k8s k8s是一个轻量级的&#xff0c;用于管理容器化应用和服务的平台。通过k8s能够进行应用的自动化部署和扩容缩容。 1.2 k8s核心部分 1.prod: 最小的部署单元&#xff1b;一组容器的集合&#xff1b;共享网络&#xff1b;生命周期是短暂的&#xff1b; …

QT概括-Rainy

Qt 虽然经常被当做一个 GUI 库&#xff0c;用来开发图形界面应用程序&#xff0c;但这并不是 Qt 的全部&#xff1b;Qt 除了可以绘制漂亮的界面&#xff08;包括控件、布局、交互&#xff09;&#xff0c;还包含很多其它功能&#xff0c;比如多线程、访问数据库、图像处理、音频…

【JasperReports笔记06】JasperReport报表开发之常见的组件元素(Table、Subreport、Barcode等)

这篇文章&#xff0c;主要介绍JasperReport报表开发之常见的组件元素&#xff08;Table、Subreport、Barcode等&#xff09;。 目录 一、基础组件元素 1.1、StaticText 1.2、TextField 1.3、Image 1.4、Break分页 1.5、Rectangle矩形区域 1.6、Ellipse椭圆区域 1.7、Li…

Mybatis中 list.size() = 1 但显示 All elements are null

一、Bug展示 二、原因分析 2.1.情形一&#xff1a;Mybatis的XML中返回类型映射错误 <select id"selectByDesc" parameterType"com.task.bean.OrderInfo"resultType"com.task.bean.OrderInfo">select MER_ID,SETTLE_DATE,ICE_NAME,ORDER_S…

探索未来世界,解密区块链奥秘!

你是否曾好奇&#xff0c;区块链是如何影响着我们的生活与未来&#xff1f;想要轻松了解这个引领着技术革命的概念吗&#xff1f;那么这本令人着迷的新书《区块链导论》绝对值得你拥有&#xff01; 内容丰富多彩&#xff0c;让你轻松掌握&#xff1a; **1章&#xff1a;区块链…

文件传输协议

文章目录 一、FTP1. 定义2. 端口3. 数据传输方式主动方式被动方式 二、TFTP三、常用命令 首先可以看下思维导图&#xff0c;以便更好的理解接下来的内容。 一、FTP 1. 定义 文件传输协议&#xff08;FTP&#xff09;是一种用于在客户端和服务器之间进行文件传输的标准网络协…