RabbitMQ之交换机

文章目录

  • 一、Exchanges
    • 1、Exchanges 概念
    • 2、Exchanges 的类型
    • 3、无名 exchange
  • 二、临时队列
  • 三、绑定(bindings)
  • 四、Fanout(扇出)
    • 1、Fanout 介绍
    • 2、Fanout 实战
  • 五、Direct exchange(直连交换机)
    • 1、Direct exchange 介绍
    • 2、多重绑定
    • 3、实战代码
  • 六、Topics
    • 1、Topic 的要求
    • 2、Topic 匹配案例
    • 3、实战
  • 总结


一、Exchanges

1、Exchanges 概念

RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。

相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。
在这里插入图片描述

2、Exchanges 的类型

总共有以下类型:

  • 直接(direct)
  • 主题(topic)
  • 标题(headers)
  • 扇出(fanout)

3、无名 exchange

之前能实现将消息发送到队列的原因是因为我们使用的是默认交换,我们通过空字符串(“”)进行标识。
在这里插入图片描述第一个参数是交换机的名称。空字符串表示默认或无名称交换机:消息能路由发送到队列中其实是由 routingKey(bindingkey)绑定 key 指定的,如果它存在的话。

二、临时队列

队列的名称对我们来说至关重要-我们需要指定我们的消费者去消费哪个队列的消息。

每当我们连接到 Rabbit 时,我们都需要一个全新的空队列,为此我们可以创建一个具有随机名称的队列,或者能让服务器为我们选择一个随机队列名称那就更好了。其次一旦我们断开了消费者的连接,队列将被自动删除

创建临时队列的方式如下:

String queueName = channel.queueDeclare().getQueue();

创建出来之后长成这样:
在这里插入图片描述

三、绑定(bindings)

什么是 bingding 呢,binding 其实是 exchange 和 queue 之间的桥梁,它告诉我们 exchange 和那个队列进行了绑定关系。也可以这么理解:队列只对它绑定的交换机的消息感兴趣。绑定用参数:routingKey 来表示也可称该参数为 binding key,创建绑定我们用代码:channel.queueBind(queueName, EXCHANGE_NAME, “routingKey”);绑定之后的意义由其交换类型决定。比如说下面这张图告诉我们的就是 X 与 Q1 和 Q2 进行了绑定

在这里插入图片描述

四、Fanout(扇出)

1、Fanout 介绍

Fanout 这种类型非常简单。正如从名称中猜到的那样,它是将接收到的所有消息广播到它知道的所有队列中。系统中默认有些 exchange 类型
在这里插入图片描述

2、Fanout 实战

在这里插入图片描述

Logs 和临时队列的绑定关系如下图

在这里插入图片描述

ReceiveLogs01 将接收到的消息打印在控制台

public class ReceiveLogs01 {private static final String EXCHANGE_NAME = "logs";public static void main(String[] argv) throws Exception {Channel channel = RabbitUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, "fanout");/*** 生成一个临时的队列 队列的名称是随机的* 当消费者断开和该队列的连接时 队列自动删除*/String queueName = channel.queueDeclare().getQueue();//把该临时队列绑定我们的 exchange 其中 routingkey(也称之为 binding key)为空字符串channel.queueBind(queueName, EXCHANGE_NAME, "");System.out.println("等待接收消息,把接收到的消息打印在屏幕.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("控制台打印接收到的消息"+message);};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });}
}

ReceiveLogs02 将接收到的消息存储在磁盘

public class ReceiveLogs02 {private static final String EXCHANGE_NAME = "logs";public static void main(String[] argv) throws Exception {Channel channel = RabbitUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, "fanout");/*** 生成一个临时的队列 队列的名称是随机的* 当消费者断开和该队列的连接时 队列自动删除*/String queueName = channel.queueDeclare().getQueue();//把该临时队列绑定我们的 exchange 其中 routingkey(也称之为 binding key)为空字符串channel.queueBind(queueName, EXCHANGE_NAME, "");System.out.println("等待接收消息,把接收到的消息写到文件.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");File file = new File("C:\\work\\rabbitmq_info.txt");FileUtils.writeStringToFile(file,message,"UTF-8");System.out.println("数据写入文件成功");};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });}
}

EmitLog 发送消息给两个消费者接收

public class EmitLog {private static final String EXCHANGE_NAME = "logs";public static void main(String[] argv) throws Exception {try (Channel channel = RabbitUtils.getChannel()) {/*** 声明一个 exchange* 1.exchange 的名称* 2.exchange 的类型*/channel.exchangeDeclare(EXCHANGE_NAME, "fanout");Scanner sc = new Scanner(System.in);System.out.println("请输入信息");while (sc.hasNext()) {String message = sc.nextLine();channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));System.out.println("生产者发出消息" + message);}}}
}

五、Direct exchange(直连交换机)

1、Direct exchange 介绍

我们希望将日志消息写入磁盘的程序仅接收严重错误(errros),而不存储哪些警告(warning)或信息(info)日志消息避免浪费磁盘空间。Fanout 这种交换类型并不能给我们带来很大的灵活性-它只能进行无意识的广播,在这里我们将使用 direct 这种类型来进行替换,这种类型的工作方式是,消息只去到它绑定的routingKey 队列中去。
在这里插入图片描述

在上面这张图中,我们可以看到 X 绑定了两个队列,绑定类型是 direct。队列 Q1 绑定键为 orange,队列 Q2 绑定键有两个:一个绑定键为 black,另一个绑定键为 green.

在这种绑定情况下,生产者发布消息到 exchange 上,绑定键为 orange 的消息会被发布到队列Q1。绑定键为 blackgreen 和的消息会被发布到队列 Q2,其他消息类型的消息将被丢弃。

2、多重绑定

在这里插入图片描述
当然如果 exchange 的绑定类型是 direct,但是它绑定的多个队列的 key 如果都相同,在这种情况下虽然绑定类型是 direct 但是它表现的就和 fanout 有点类似了,就跟广播差不多,如上图所示。

3、实战代码

在这里插入图片描述
在这里插入图片描述
消费者1:

public class ReceiveLogsDirect01 {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] argv) throws Exception {Channel channel = RabbitUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);String queueName = "disk";channel.queueDeclare(queueName, false, false, false, null);channel.queueBind(queueName, EXCHANGE_NAME, "error");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");message="接收绑定键:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message;File file = new File("C:\\work\\rabbitmq_info.txt");FileUtils.writeStringToFile(file,message,"UTF-8");System.out.println("错误日志已经接收");};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});}
}

消费者2:

public class ReceiveLogsDirect02 {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] argv) throws Exception {Channel channel = RabbitUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);String queueName = "console";channel.queueDeclare(queueName, false, false, false, null);channel.queueBind(queueName, EXCHANGE_NAME, "info");channel.queueBind(queueName, EXCHANGE_NAME, "warning");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" 接收绑定键 :"+delivery.getEnvelope().getRoutingKey()+", 消息:"+message);};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});}
}

发布者:

public class EmitLogDirect {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] argv) throws Exception {try (Channel channel = RabbitUtils.getChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//创建多个 bindingKeyMap<String, String> bindingKeyMap = new HashMap<>();bindingKeyMap.put("info","普通 info 信息");bindingKeyMap.put("warning","警告 warning 信息");bindingKeyMap.put("error","错误 error 信息");//debug 没有消费这接收这个消息 所有就丢失了bindingKeyMap.put("debug","调试 debug 信息");for (Map.Entry<String, String> bindingKeyEntry: bindingKeyMap.entrySet()){String bindingKey = bindingKeyEntry.getKey();String message = bindingKeyEntry.getValue();channel.basicPublish(EXCHANGE_NAME,bindingKey, null, message.getBytes("UTF-8"));System.out.println("生产者发出消息:" + message);}}}
}

六、Topics

在上面,我们改进了日志记录系统。我们没有使用只能进行随意广播的 fanout 交换机,而是使用了 direct 交换机,从而有能实现有选择性地接收日志。

尽管使用 direct 交换机改进了我们的系统,但是它仍然存在局限性-比方说我们想接收的日志类型有info.base 和 info.advantage,某个队列只想 info.base 的消息,那这个时候 direct 就办不到了。这个时候就只能使用 topic 类型。

1、Topic 的要求

发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。这些单词可以是任意单词,比如说:“stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”.这种类型的。当然这个单词列表最多不能超过 255 个字节。

在这个规则列表中,其中有两个替换符是大家需要注意的

  • *(星号)可以代替一个单词
  • #(井号)可以替代零个或多个单词

2、Topic 匹配案例

下图绑定关系如下

  • Q1–>绑定的是
    • 中间带 orange 带 3 个单词的字符串(.orange.)
  • Q2–>绑定的是
    • 最后一个单词是 rabbit 的 3 个单词(..rabbit)
    • 第一个单词是 lazy 的多个单词(lazy.#)

在这里插入图片描述
上图是一个队列绑定关系图,我们来看看他们之间数据接收情况是怎么样的

quick.orange.rabbit被队列 Q1Q2 接收到
lazy.orange.elephant被队列 Q1Q2 接收到
quick.orange.fox被队列 Q1 接收到
lazy.brown.fox被队列 Q2 接收到
lazy.pink.rabbit虽然满足两个绑定但只被队列 Q2 接收一次
quick.brown.fox不匹配任何绑定不会被任何队列接收到会被丢弃
quick.orange.male.rabbit是四个单词不匹配任何绑定会被丢弃
lazy.orange.male.rabbit是四个单词但匹配 Q2

当队列绑定关系是下列这种情况时需要引起注意

  • 当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像 fanout 了
  • 如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是 direct 了

3、实战

在这里插入图片描述
发布者:

public class EmitLogTopic {private static final String EXCHANGE_NAME = "topic_logs";public static void main(String[] argv) throws Exception {try (Channel channel = RabbitUtils.getChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "topic");/*** Q1-->绑定的是* 中间带 orange 带 3 个单词的字符串(*.orange.*)* Q2-->绑定的是* 最后一个单词是 rabbit 的 3 个单词(*.*.rabbit)* 第一个单词是 lazy 的多个单词(lazy.#)**/Map<String, String> bindingKeyMap = new HashMap<>();bindingKeyMap.put("quick.orange.rabbit","被队列 Q1Q2 接收到");bindingKeyMap.put("lazy.orange.elephant","被队列 Q1Q2 接收到");bindingKeyMap.put("quick.orange.fox","被队列 Q1 接收到");bindingKeyMap.put("lazy.brown.fox","被队列 Q2 接收到");bindingKeyMap.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列 Q2 接收一次");bindingKeyMap.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃");bindingKeyMap.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃");bindingKeyMap.put("lazy.orange.male.rabbit","是四个单词但匹配 Q2");for (Map.Entry<String, String> bindingKeyEntry: bindingKeyMap.entrySet()){String bindingKey = bindingKeyEntry.getKey();String message = bindingKeyEntry.getValue();channel.basicPublish(EXCHANGE_NAME,bindingKey, null, message.getBytes("UTF-8"));System.out.println("生产者发出消息" + message);}}}
}

消费者1:

public class ReceiveLogsTopic01 {private static final String EXCHANGE_NAME = "topic_logs";public static void main(String[] argv) throws Exception {Channel channel = RabbitUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, "topic");//声明 Q1 队列与绑定关系String queueName="Q1";channel.queueDeclare(queueName, false, false, false, null);channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" 接收队列 :"+queueName+" 绑 定键:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message);};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});}
}

消费者2:

public class ReceiveLogsTopic02 {private static final String EXCHANGE_NAME = "topic_logs";public static void main(String[] argv) throws Exception {Channel channel = RabbitUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, "topic");//声明 Q2 队列与绑定关系String queueName="Q2";channel.queueDeclare(queueName, false, false, false, null);channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" 接收队列 :"+queueName+" 绑 定键:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message);};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});}
}

总结

以上就是RabbitMQ之交换机的一些相关知识点,希望对你有所帮助。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/193301.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

相对强弱指标 RSI

SMA&#xff08;A,B,1)MA AA ,一天前的收盘价&#xff1b; BB&#xff0c;如果时涨的&#xff0c;把涨幅返回&#xff1b; CC,12天的涨幅占12天全部涨跌幅的多少&#xff1b; 画一条50 的线条。

一道 python 数据分析的题目

python 数据分析的题目。 做题方法&#xff1a;使用 pandas 读取数据&#xff0c;然后分析。 知识点&#xff1a;pandas&#xff0c;正则表达式&#xff0c;py知识。 过程&#xff1a;不断使用 GPT&#xff0c;遇到有问题的地方自己分析&#xff0c;把分析的结果告诉 GPT&am…

vagrant+virtualbox的踩坑记录

vagrant virtualbox 文章目录 vagrant virtualbox一、导入虚拟机ova文件失败二、修改虚拟机的保存位置三、无法使用xshell等软件用密码进行连接四、vagrant up失败 一、导入虚拟机ova文件失败 背景&#xff1a;手动删除了虚拟机文件导致无法重新导入相同名称虚拟机的ova文件…

HTML5响应式网页设计(考试题:旅游项目)

效果图 .html代码 <!DOCTYPE html> <html><head><meta name"viewport"content"widthdevice-width,initial-scale1,minimum-scale1,maximum-scale1,user-scalableno" /><meta charset"utf-8" /><title></…

【Phoenix】请求的生命周期

本文的目的是讨论Phoenix请求的生命周期。我们实战添加两个新的页面&#xff0c;并讨论整个过程是如何串起来的。 让我们从添加第一个新页面开始。 添加一个新页面 web应用通常通过将HTTP方法和路径映射到应用的某个函数来处理请求。Phoenix通过路由器来实现这个匹配。例如将…

Debian/Ubuntu 安装 NodeJS【详细步骤】

文章目录 NodeSource 简介Debian/Ubuntu 安装 NodeJS第 1 步:进入 jenkins 容器第 2 步:下载和导入 NodeSource第 3 步:创建 deb 仓库第 4 步:安装 NodeJS第 5 步:卸载NodeJS参考👉 背景:在 Docker 中安装了 Jenkins,Jenkins 镜像为 Debian 11 bullseye。 👉 目标:…

5. HTML中常用标签

5. html常用标签 5.1 标签语义 学习标签是有技巧的&#xff0c;重点是记住每个标签的语义。简单理解就是指标签的含义。即这个标签是用来干嘛的。 根据标签的语义&#xff0c;在合适的地方给一个最为合理的标签。可以让页面结构给清晰。 5.2 标题标签 <h1>-<h6>…

openssl+sha256开发实例(C++)

文章目录 一、 sha256介绍二、sha256原理三、openssl sha256实现 一、 sha256介绍 SHA-256&#xff08;Secure Hash Algorithm 256-bit&#xff09;是一种哈希算法&#xff0c;属于 SHA-2&#xff08;Secure Hash Algorithm 2&#xff09;家族的一员。SHA-256 产生的哈希值是一…

HTTP1.1协议详解

目录 协议介绍协议的特点存在的问题协议优化方案与HTTP 1.0协议的区别 协议介绍 HTTP 1.1是一种基于文本的互联网实体信息交互协议&#xff0c;是Web上任何数据交换和客户端-服务器交互的基础。它允许获取各种类型的资源&#xff0c;如HTML文档&#xff0c;并支持在互联网上交…

嵌入式养成计划-53----ARM--串口通信

一百三十四、串口通信 134.1 串口的概念 串口&#xff08;UART&#xff09;&#xff1a;Universal asynchronous receiver transmitter (USART/UART)&#xff0c;通用异步接收发送器通过串口可以实现两个不同机器之间的信息交互串口通信属于总线通信的一种 134.2 总线的概念…

2023.11.16-hive sql高阶函数lateral view,与行转列,列转行

目录 0.lateral view简介 1.行转列 需求1: 需求2: 2.列转行 解题思路: 0.lateral view简介 hive函数 lateral view 主要功能是将原本汇总在一条&#xff08;行&#xff09;的数据拆分成多条&#xff08;行&#xff09;成虚拟表&#xff0c;再与原表进行笛卡尔积&#xff0c…

基于 Redis 实现的分布式锁

获取锁 互斥&#xff1a;确保只有一个线程获得锁 # 添加锁 利用setnx的互斥性 127.0.0.1:6379> setnx lock thread1释放锁 手动释放锁 超时释放&#xff1a;获取锁时设置一个超时时间 #释放锁 删除即可 127.0.0.1:6379> del lock两步合成一步 help setSET key value …

人工智能基础_机器学习037_多项式回归升维实战4_使用随机梯度下降模型_对天猫双十一销量数据进行预测_拟合---人工智能工作笔记0077

上一节我们使用线性回归模型最终拟合了双十一天猫销量数据,升维后的数据. 我们使用SGDRegressor的时候,随机梯度下降的时候,发现有问题, 对吧,怎么都不能拟合我们看看怎么回事现在 可以看到上面是之前的代码 上面是对数据的准备 这里我们还是修改,使用 poly=PolynomialFeatur…

Python数据容器之(元组)

我们前面所了解的列表是可以修改的&#xff0c;但如果想要传递的信息&#xff0c;不被篡改&#xff0c;列表就不合适了。 元组同列表一样&#xff0c;都是可以封装多个、不同类型的元素在内。 但最大的不同点在于&#xff1a; 元组一旦定义完成&#xff0c;就不可修改 所以…

通过Python设置及读取PDF属性,轻松管理PDF文档

PDF文档属性是嵌入在PDF文档中的一些与文档有关的信息&#xff0c;如作者、制作软件、标题、主题等。PDF属性分为默认属性和自定义属性两种&#xff0c;其中默认属性是一些固定的文档信息&#xff0c;部分信息自动生成&#xff08;如文件大小、页数、页面大小等信息&#xff09…

【ASP.NET】Hello World

文章目录 1. 几个概念2. 搭建开发环境2.1 .NET SDK2.2 IDE & Editor 3 First Project3.1 步骤3.2 模板3.3 项目结构3.4 请求的处理流程 Reference Link 1. 几个概念 .NET 是一个平台&#xff0c;包括 .NET Framework、.NET Core、ASP.NET、C#等&#xff0c;可以构建桌面、W…

requests库验证错误解决方法

用户在使用requests库进行http请求时&#xff0c;遇到了一个AuthenticationRequired&#xff08;身份验证必须&#xff09;的错误。但是&#xff0c;当使用urllib.request.urlopen进行相同的操作时&#xff0c;却能够成功。同时&#xff0c;用户提供了自己的系统信息&#xff0…

javaweb---maventomcat使用教程

文章目录 今日内容0 复习昨日1 Maven1.0 引言1.1 介绍1.2 下载1.3 安装1.3.1 解压1.3.2 配置环境变量1.3.3 测试 1.4 仓库1.5 Maven配置1.5.1 修改仓库位置1.5.2 设置镜像 2 IDEA - MAVEN2.1 idea关联maven2.2 创建java项目2.3 java项目结构2.4 pom2.5 导入依赖2.5.1 查找依赖2…

nodejs+vue黄河风景线旅游网站的设计与实现-微信小程序-安卓-python-PHP-计算机毕业设计

本文首先对该系统进行了详细地描述&#xff0c;然后对该系统进行了详细的描述。管理人员增加了系统首页、个人中心、用户管理、景点分类管理、景点简介管理、旅游路线管理、文章分类管理、公告文章管理、系统管理理等功能。这套黄河风景线旅游网站是根据当前的现实需要&#xf…

微机原理_9

一、单项选择题(本大题共15小题,每小题3分,共45分。在每小题给出的四个备选项中,选出一个正确的答案。 1.当运算结果的最高位为1时&#xff0c;标志位(&#xff09; A. CF1 B. OF1 C. SF1 D. ZF1 2、汇编语言源程序中,每个语句由四项组成,如语句要完成一定功能,那么该语句中不可…