rabbitMQ(3)

RabbitMq 交换机

文章目录

  • 1. 交换机的介绍
  • 2. 交换机的类型
  • 3. 临时队列
  • 4. 绑定 (bindings)
  • 5. 扇形交换机(Fanout ) 演示
  • 6. 直接交换机 Direct exchange
    • 6.1 多重绑定
    • 6.2 direct 代码案例
  • 7. 主题交换机
    • 7.1 Topic 匹配案例
    • 7.2 Topic 代码案例
  • 8. headers 头交换机


前言:

在之前的文章中,我们一直都没有提到交换机,只是在介绍 rabbitmq 的时候 说过一嘴交换机 , 另外在 之前文章的代码案例中,我们其实

是使用到过交换机的 ,

比如:

channel.basicPublish("",QUEUE_NAME,null,message.getBytes());


这里第一个参数就是用来知名交换机的,但是我们用 “” 来作为交换机的名,rabbitmq 就会默认使用 Direct Exchange(直连交换机) , >

既然 之前我们没有讲到过 交换机,下面我们就来学习一下交换机.

1. 交换机的介绍

RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。

相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。

在这里插入图片描述


简单一句话: 交换机是用来将 生产者生产的消息 转发到对应的队列中.


简单看完交换机的介绍,下面我们来看看 rabbitmq 中交换机的几种类型 .

2. 交换机的类型


引用:

  1. Direct Exchange(直连交换机):根据消息的 routing key 将消息路由到与之完全匹配的队列。适用于一对一的消息传递。 比如 一个队列绑定到该交换机上 要求 routing key (路由键) 为 abc ,那么只有被标记为 abc 的消息才能被转发,不会转发 abc.def , 也不会转发 aaa.bbb.ccc 只会转发 abc.

  1. Fanout Exchange(扇形交换机):将消息广播到所有与该交换机绑定的队列。适用于一对多的消息广播。扇形交换机不处理路由键,我们只需要简单的将队列绑定到交换机上,一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout 交换机转发消息是最快的。

  1. Topic Exchange(主题交换机):根据消息的 routing key 和交换机与队列绑定时的 routing pattern 进行匹配,将消息路由到满足条件的队列。支持通配符匹配,比如 符号“#” 匹配一个或多个词,符号 * 匹配不多不少一个词。因此 abc.# 能够匹配到 abc.def.ghi,但是 abc.* 只会匹配到 abc.def。

  1. Headers Exchange(头交换机):根据消息的头部属性进行匹配,并将消息路由到满足条件的队列。

    解释: 不处理路由键。而是根据发送的消息内容中的headers属性进行匹配。在绑定 Queue 与 Exchange 时指定一组键值对;当消息发送
    到RabbitMQ 时会 取到该消息的 headers 与 Exchange 绑定时指定的键值对进行匹配;如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers 属性是一个键值对,可以是 Hashtable,键值对的值可以是任何类型。而 fanout,direct,topic 的路由键都需要要字符串形式的

    匹配规则 x-match 有下列两种类型:

    x-match = all :表示所有的键值对都匹配才能接受到消息

    x-match = any :表示只要有键值对匹配就能接受到消息
  2. Default Exchange(默认交换机):它是一个特殊的直连交换机(类型为 direct),当没有指定交换机时,默认的交换机会将消息根据消息的 routing key 发送到同名的队列上


看完了 交换机的类型,这里先来 学习一下 临时队列 之后在演示绑定交换机 会使用到.

3. 临时队列

在 之前的文章中 ,我们每次创建的队列都是具有特定的名字 比如 hello , ack_queue , 可以说 队列的名字对我们来说是非常重要的 ,因为我们要指定 消费者 去那个队列消费消息 . 但是我们 每次都要去想名字,有时候我们愿 想名字 (取名字啥的最烦了) .为了解决这个问题,我们就可以创建一个具有随机名称的队列 , 或者让服务器为我们选择一个随机对立的名称. 另外 如果 我们 想要 在使用完队列 后 断开连接 就删除 (一次性的队列) 就可以 通过下面这种方式来创建 临时队列.

String queueName = channel.queueDeclare().getQueue();


创建出来的队列:

在这里插入图片描述


知道了如何 创建临时队列,下面我们来看看 绑定 (bindings) ,这里是 使用 交换机 最重要的 一环节 , 如果 没有绑定 交换机 就无法绑定到 队列中,交换机就无法 把 消息 转发给 队列.

4. 绑定 (bindings)

什么是 绑定 呢,绑定 其实是 exchange 和 queue 之间的桥梁,它告诉我们 交换机 和 那个队列进行了绑定关系。

比如说下面这张图告诉我们的就是 X 与 Q1 和 Q2 进行了绑定

在这里插入图片描述

关于 临时队列 和 绑定 预备知识点看完后,我们就来 使用 交换机.

5. 扇形交换机(Fanout ) 演示

扇形交换机: 将消息广播到所有与该交换机绑定的队列。适用于一对多的消息广播。

演示: 这里我们创建一个简单的日志系统来完成代码样式 .

创建一个消费者 用来 生产消息 ,创建两个消费者 ,一个消费者 将接收到的消息 显示到 控制台,另外一个将消息 存储到 本地磁盘 .


大致:

在这里插入图片描述


代码案例:


消费者:

ReceiveLogs01

package org.example.five;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMQUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;// 消费者1 --> 将消息显示到控制台
public class ReceiveLogs01 {// 交换机名字private static final String EXCHANGE_NAME = "logs";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();// 声明一个交换机 --> 扇形 fanoutchannel.exchangeDeclare(EXCHANGE_NAME, "fanout");// 声明一个队列 , 临时队列 --> 名字随机,当消费者与临死队列断开连接后 队列自动删除String queueName = channel.queueDeclare().getQueue();// 绑定交换机与队列 --> 第三个参数为路由键 因为使用的是扇形交换机 , 所以路由键可以不写 用 ""channel.queueBind(queueName, EXCHANGE_NAME, "");// 接受消息 -->  消费者接受消息时的回调DeliverCallback deliverCallback = (tag, message) -> {System.out.println("控制台打印接收到的消息:" + new String(message.getBody(), "UTF-8"));};channel.basicConsume(queueName, true, deliverCallback, tag -> {});}
}


ReceiveLogs02

package org.example.five;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMQUtils;import java.io.*;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.TimeoutException;// 消费者2 --> 将消息 写入本地
public class ReceiveLogs02 {// 交换机名字private static final String EXCHANGE_NAME = "logs";private static final String path = "E:\\java\\java_lx\\practice\\javaTest\\blog_rabbitmq\\src\\main\\java\\org\\example\\five";public static void main(String[] args) throws IOException, TimeoutException, URISyntaxException {Channel channel = RabbitMQUtils.getChannel();// 声明一个交换机 --> 扇形 fanoutchannel.exchangeDeclare(EXCHANGE_NAME, "fanout");// 声明一个队列 , 临时队列 --> 名字随机,当消费者与临死队列断开连接后 队列自动删除String queueName = channel.queueDeclare().getQueue();// 绑定交换机与队列 --> 第三个参数为路由键 因为使用的是扇形交换机 , 所以路由键可以不写 用 ""channel.queueBind(queueName, EXCHANGE_NAME, "");// 接受消息 -->  消费者接受消息时的回调DeliverCallback deliverCallback = (tag, message) -> {// 文件操作File file = new File(path + "/test.txt");// 文件不存在创建文件if (!file.exists()) {file.createNewFile();}// 使用 Files.newOutputStream 共创创建 outputStream 对象try (OutputStream outputStream = Files.newOutputStream(file.toPath());) {outputStream.write(message.getBody());} catch (IOException e) {throw new RuntimeException("写入文件时发生错误: " + e.getMessage());}};channel.basicConsume(queueName, true, deliverCallback, tag -> {});}
}


生产者: EmitLog

package org.example.five;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMQUtils;import java.io.*;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.TimeoutException;// 消费者2 --> 将消息 写入本地
public class ReceiveLogs02 {// 交换机名字private static final String EXCHANGE_NAME = "logs";private static final String path = "E:\\java\\java_lx\\practice\\javaTest\\blog_rabbitmq\\src\\main\\java\\org\\example\\five";public static void main(String[] args) throws IOException, TimeoutException, URISyntaxException {Channel channel = RabbitMQUtils.getChannel();// 声明一个交换机 --> 扇形 fanoutchannel.exchangeDeclare(EXCHANGE_NAME, "fanout");// 声明一个队列 , 临时队列 --> 名字随机,当消费者与临死队列断开连接后 队列自动删除String queueName = channel.queueDeclare().getQueue();// 绑定交换机与队列 --> 第三个参数为路由键 因为使用的是扇形交换机 , 所以路由键可以不写 用 ""channel.queueBind(queueName, EXCHANGE_NAME, "");// 接受消息 -->  消费者接受消息时的回调DeliverCallback deliverCallback = (tag, message) -> {// 文件操作File file = new File(path + "/test.txt");// 文件不存在创建文件if (!file.exists()) {file.createNewFile();}// 使用 Files.newOutputStream 共创创建 outputStream 对象 , ture 表示 写文件不覆盖之前的内容try (OutputStream outputStream = new FileOutputStream(file, true)) {outputStream.write(message.getBody());} catch (IOException e) {throw new RuntimeException("写入文件时发生错误: " + e.getMessage());}};channel.basicConsume(queueName, true, deliverCallback, tag -> {});}
}


注意:

先启动两个消费者再启动生产者。

生产者生产消息后,如果没有对应的消费者接收,则该消息是遗弃的消息


启动看看效果:

在这里插入图片描述


可以看到 一个 生产者生产者的消息被多个消费者消费 ,到此 fanout 交换机 就 演示完成了 .

6. 直接交换机 Direct exchange

直接交换机: 根据消息的 routing key 将消息路由到与之完全匹配的队列。适用于一对一的消息传递。


在使用 Fanout 交换机 ,完成了对日志系统的构造 ,但是还不够,因为 日志 有很多 ,我们并不好 一下找到 严重错误 , 我们可以让一个消费者去操作文件存入全部日志, 让 另外一个消费者 消费严重错误 (如果是严重的错误 才发送给消费者) 将 严重错误的消息存盘 . 此时就好找错误了.


我们要实现这个功能 扇形交换机就不好完成,因为扇形交换机会将消息传播给全部绑定的队列中 ,这里我们就可以使用直接交换机 (direct)


使用 queueBind 方法 绑定路由键

channel.queueBind(queueName, EXCHANGE_NAME, "routingKey");


绑定玩路由键后 ,消息只会去 它绑定的 路由键队列中.

在这里插入图片描述


在上面这张图中,我们可以看到 X 绑定了两个队列,绑定类型是 direct。队列 Q1 绑定键为 orange, 队列 Q2 绑定键有两个:一个绑定键为 black,另一个绑定键为 green.


在这种绑定情况下,生产者发布消息到 exchange 上,绑定键为 orange 的消息会被发布到队列 Q1。绑定键为 black ,green 和的消息会被发布到队列 Q2,其他消息类型的消息将被丢弃。

6.1 多重绑定

在这里插入图片描述


当然如果 exchange 的绑定类型是direct,但是它绑定的多个队列的 key 如果都相同,在这种情况下虽然绑定类型是 direct 但是它表现的就和 fanout 有点类似了,就跟广播差不多,如上图所示。

6.2 direct 代码案例


这里我们 实现 让 生产者发送多个消息 ,多个消费者 消费不同的消息.

图:

在这里插入图片描述


C1 消费者:绑定 console 队列,routingKey 为 info、warning

C2 消费者:绑定 disk 队列,routingKey 为 error


当生产者生产消息到 direct_logs 交换机里,该交换机会检测消息的 routingKey 条件,然后分配到满足条件的队列里,最后由消费者从队列消费消息。


代码: 这里就不写文件操作了,直接将消费者拿到的消息 放到 控制到上.


生产者

package org.example.five;import com.rabbitmq.client.Channel;
import org.example.utils.RabbitMQUtils;import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;public class DirectLogs {// 交换机public static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();Scanner sc = new Scanner(System.in);while (sc.hasNext()) {String message = sc.next();// 路由键为 infochannel.basicPublish(EXCHANGE_NAME, "warning", null, message.getBytes("UTF-8"));System.out.println("生产者发出消息: " + message);}}
}


启动后 先用 路由键为 info 发送多个消息 ,然后更改 为 error 和 warning 发送消息.


消费者 c1

package org.example.five;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMQUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;// 消费者 c1 -- 消费 路由键为 info 和 warning 的消息
public class ReceiveLogsDirect01 {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();// 声明一个交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);// 声明一个队列channel.queueDeclare("console", false, false, false, null);// 绑定 --> 队列 console , 交换机 direct_logs , 路由键 infochannel.queueBind("console", EXCHANGE_NAME, "info");// 绑定 --> 队列 console , 交换机 direct_logs , 路由键 warningchannel.queueBind("console", EXCHANGE_NAME, "warning");// 接受消息DeliverCallback deliverCallback = (tag, message) -> {System.out.println("ReceiveLogsDirect01控制台打印接收到的消息:" + new String(message.getBody(), "UTF-8"));};channel.basicConsume("console", true, deliverCallback, (tag) -> {});}
}


消费者 c2

package org.example.five;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMQUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;// 消费者 c2 -- 消费 路由键为 error
public class ReceiveLogsDirect02 {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();// 声明一个交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);// 声明一个队列channel.queueDeclare("disk", false, false, false, null);// 绑定 --> 队列 console , 交换机 direct_logs , 路由键 infochannel.queueBind("disk", EXCHANGE_NAME, "error");// 接受消息DeliverCallback deliverCallback = (tag, message) -> {System.out.println("ReceiveLogsDirect01控制台打印接收到的消息:" + new String(message.getBody(), "UTF-8"));};channel.basicConsume("disk", true, deliverCallback, (tag) -> {});}
}


效果:


图一:

在这里插入图片描述


图二:

在这里插入图片描述

7. 主题交换机


主题交换机: 根据消息的 routing key 和交换机与队列绑定时的 routing pattern 进行匹配,将消息路由到满足条件的队列。支持通配符匹配,

上面我们使用 直接交换机改进了 日志记录系统 ,。我们没有使用只能进行随意广播的 fanout 交换机,而是使用了 direct 交换机,从而有能实现有选择性地接收日志。


尽管使用 direct 交换机改进了我们的系统,但是它仍然存在局限性——比方说我们想接收的日志类型有 info.base 和 info.advantage,某个队列只想 info.base 的消息,那这个时候direct 就办不到了。这个时候就只能使用 topic 类型


另外 使用 topic 需要注意:

发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表以点号分隔开。这些单词可以是任意单词

比如说:“stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit” 这种类型的。

当然这个单词列表最多不能超过 255 个字节。

在这个规则列表中,其中有两个替换符是大家需要注意的:

  • *(星号)可以代替一个位置
  • #(井号)可以替代零个或多个位置

7.1 Topic 匹配案例


绑定关系图

在这里插入图片描述

  • Q1–>绑定的是
    • 中间带 orange 带 3 个单词的字符串 (*.orange.*)
  • Q2–>绑定的是
    • 最后一个单词是 rabbit 的 3 个单词 (*.*.rabbit)
    • 第一个单词是 lazy 的多个单词 (lazy.#)


对上面 q1 和 q2 绑定的路由键 , 举几个例子

例子说明
uick.orange.rabbit被队列 Q1Q2 接收到
azy.orange.elephant被队列 Q1Q2 接收到
quick.orange.fox被队列 Q1 接收到
lazy.brown.fox被队列 Q2 接收到
lazy.pink.rabbit虽然满足两个绑定但只被队列 Q2 接收一次
quick.brown.fox不匹配任何绑定不会被任何队列接收到会被丢弃
quick.orange.male.rabbit是四个单词不匹配任何绑定会被丢弃
lazy.orange.male.rabbit是四个单词但匹配 Q2


如果一个队列绑定的路由键为 # , 此时队列可以接收到所有的数据,此时就相当于绑定了 一个 fanout (扇形) 交换机了 ,如果 一个队列 绑定的路由键 没有 # 和 * 此时就可以认为 绑定了一个 direct(直接) 交换机了.


看完这些,下面就来看看代码案例

7.2 Topic 代码案例

创建一个生产者 ,生产多个消息到交换机,交换机按照通配符分配消息到不同的队列中,队列由消费者进行消费


生产者 EmitLogTopic:

package org.example.six;import com.rabbitmq.client.Channel;
import org.example.utils.RabbitMQUtils;import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;public class EmitLogTopic {// 交换机的名称public static final String EXCHANGE_NAME = "topic_logs";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();/*** Q1-->绑定的是*      中间带 orange 带 3 个单词的字符串(*.orange.*)* Q2-->绑定的是*      最后一个单词是 rabbit 的 3 个单词(*.*.rabbit)*      第一个单词是 lazy 的多个单词(lazy.#)*/HashMap<String, String> bindingKeyMap = new HashMap<>();bindingKeyMap.put("quick.orange.rabbit", "被队列 Q1Q2 接收到");bindingKeyMap.put("lazy.orange.elephant", "被队列 Q1Q2 接收到");bindingKeyMap.put("quick.orange.fox", "被队列 Q1 接收到");bindingKeyMap.put("lazy.brown.fox", "被队列 Q2 接收到");bindingKeyMap.put("lazy.pink.rabbit", "虽然满足两个绑定但只被队列 Q2 接收一次");bindingKeyMap.put("quick.brown.fox", "不匹配任何绑定不会被任何队列接收到会被丢弃");bindingKeyMap.put("quick.orange.male.rabbit", "是四个单词不匹配任何绑定会被丢弃");bindingKeyMap.put("lazy.orange.male.rabbit", "是四个单词但匹配 Q2");// 遍历 map 发送消息for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) {String routingKey = bindingKeyEntry.getKey();String message = bindingKeyEntry.getValue();channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));System.out.println("生产者发出消息: " + message);}}
}


消费者c1

package org.example.six;// 消费者 c1 --> 绑定的路由键为 *.orange.*import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMQUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ReceiveLogsTopic01 {// 交换机名称public static final String EXCHANGE_NAME = "topic_logs";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();// 声明交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);// 声明队列String queueName = "Q1";channel.queueDeclare(queueName, false, false, false, null);channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");System.out.println("等待接受消息.....");// 接收到消息的回调DeliverCallback deliverCallback = (tag, message) -> {System.out.println(new String(message.getBody(), "UTF-8"));System.out.println("接收队列:" + queueName + "  绑定键:" + message.getEnvelope().getRoutingKey());};// 接受消息channel.basicConsume(queueName, true, deliverCallback, (message) -> {});}
}


消费者2

package org.example.six;// 消费者 c2 绑定的路由键为 *.*.rabbit  , lazy.#import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMQUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ReceiveLogsTopic02 {// 交换机名称public static final String EXCHANGE_NAME = "topic_logs";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();// 声明交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);// 声明队列String queueName = "Q2";channel.queueDeclare(queueName, false, false, false, null);channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");System.out.println("c2 等待接受消息.....");// 接收到消息的回调DeliverCallback deliverCallback = (tag, message) -> {System.out.println(new String(message.getBody(), "UTF-8"));System.out.println("接收队列:" + queueName + "  绑定键:" + message.getEnvelope().getRoutingKey());};// 接受消息channel.basicConsume(queueName, true, deliverCallback, (message) -> {});}
}


效果展示:

在这里插入图片描述

8. headers 头交换机


headers 皮皮额 AMQP 消息的 headr 而不是 路由键 ,此外 headers 交换机和 direct 交换机完全一致 ,但是性能差很多 ,目前几乎 用不到 ,了解即可.


消费方指定的 headers 中必须包含一个 “x-match” 的键 。

键 “x-match” 的值有两个

  1. x-match = all :表示所有的键值对都匹配才能接收到消息
  2. x-mathc = any : 表示只有键值对匹配就能接受到消息

在这里插入图片描述


代码演示:

生产者:

package org.example.six;import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import org.example.utils.RabbitMQUtils;import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;public class ProducerHeaders {public static String EXCHANGE = "header_exchange";public static String QUEUE = "header_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();// 声明一个交换机channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.HEADERS, true, false, null);// 声明一个队列channel.queueDeclare(QUEUE, true, false, false, null);Map<String, Object> headerMap = new HashMap<>();headerMap.put("name", "abcdef");headerMap.put("sex", "男");AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder().headers(headerMap);// 发送消息String message = "hello_header";channel.basicPublish(EXCHANGE, "", properties.build(), message.getBytes());}}

消费者:

package org.example.six;import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.example.utils.RabbitMQUtils;import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeoutException;public class ConsumerHeader {public static String EXCHANGE = "header_exchange";public static String QUEUE = "header_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();DeliverCallback deliverCallback = (tag, message) -> {System.out.println("接收到消息: " + new String(message.getBody()));};CancelCallback callback = (tag) -> {System.out.println("消息被中断");};Map<String, Object> headerMap = new HashMap<>();headerMap.put("x-match", "all");// 除了 all 还有 any headerMap.put("name", "abcdef");headerMap.put("sex", "男");channel.queueBind(QUEUE, EXCHANGE, "", headerMap);channel.basicConsume(QUEUE, true, deliverCallback, callback);}
}

效果:

图一:

在这里插入图片描述

图二:

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/166671.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ESXI6.5安装教程

设置从IPMI Virtual Disk 3000启动&#xff0c;出现如下界面&#xff1a; 默认选择第一项&#xff0c;回车安装 安装程序正在检测服务器硬件信息&#xff0c;如果不满足系统安装条件会跳出错误提示。 检测完成之后会出现下面界面 回车 按F11 这里列出了服务器硬盘信息&#…

安防视频监控系统EasyCVR视频汇聚存储平台定制化开发:新增kafka配置

安防视频监控/视频集中存储/云存储/磁盘阵列EasyCVR平台可拓展性强、视频能力灵活、部署轻快&#xff0c;可支持的主流标准协议有国标GB28181、RTSP/Onvif、RTMP等&#xff0c;以及支持厂家私有协议与SDK接入&#xff0c;包括海康Ehome、海大宇等设备的SDK等。平台可拓展性强、…

Flyway Desktop updated

Flyway Desktop updated 为比较工件序列化和反序列化添加了额外的调试日志记录。 Flyway Desktop现在将记住以前用于创建项目和匹配克隆的位置。 新的脱机许可工作流现在已在Microsoft Windows上启用。 现在&#xff0c;在配置目标数据库列表时&#xff0c;环境ID是可见的。 现…

c++ pcl点云变换骨架枝干添加树叶源码实例

程序示例精选 c pcl点云变换骨架枝干添加树叶源码实例 如需安装运行环境或远程调试&#xff0c;见文章底部个人QQ名片&#xff0c;由专业技术人员远程协助&#xff01; 前言 这篇博客针对《c pcl点云变换骨架枝干添加树叶源码实例》编写代码&#xff0c;代码整洁&#xff0c;…

文具办公品经营小程序商城的作用是什么

在购买文具方面&#xff0c;线下文具品牌门店除了疫情冲击外&#xff0c;还有同行间的激烈竞争&#xff0c;消费者对品牌概念较为模糊&#xff0c;同质化产品严重&#xff0c;消费者选择度高&#xff0c;店铺流量稀少&#xff0c;线下经营成本变高。 如今很多消费者已经习惯于线…

【论文阅读】以及部署BEVFusion: A Simple and Robust LiDAR-Camera Fusion Framework

BEVFusion: A Simple and Robust LiDAR-Camera Fusion Framework BEVFusion&#xff1a;一个简单而强大的LiDAR-相机融合框架 NeurIPS 2022 多模态传感器融合意味着信息互补、稳定&#xff0c;是自动驾驶感知的重要一环&#xff0c;本文注重工业落地&#xff0c;实际应用 融…

【NPM】particles.vue3 + tsparticles 实现粒子效果

在 NPM 官网搜索这两个库并安装&#xff1a; npm install element-plus --save npm i tsparticles使用提供的 vue 案例和方法&#xff1a; <template><div><vue-particlesid"tsparticles":particlesInit"particlesInit":particlesLoaded&…

Unity3D 基础——使用 Mathf.SmoothDamp 函数制作相机的缓冲跟踪效果

使用 Mathf.SmoothDamp 函数制作相机的缓冲跟踪效果&#xff0c;让物体的移动不是那么僵硬&#xff0c;而是做减速的缓冲效果。将以下的脚本绑定在相机上&#xff0c;然后设定好 target 目标对象&#xff0c;即可看到相机的缓动效果。通过设定 smoothTime 的值&#xff0c;可以…

Windows安装virtualenv虚拟环境

需要先安装好python环境 1 创建虚拟环境目录 还是在D:\Program\ 的文件夹新建 .env 目录&#xff08;你也可以不叫这个名字&#xff0c;一般命名为 .env 或者 .virtualenv &#xff0c;你也可以在其他目录中创建&#xff09; 2 配置虚拟环境目录的环境变量 3 安装虚拟环境 进…

嵌入式硬件库的基本操作方式与分析

本次要介绍的开源软件是 c-periphery&#xff1a; https://github.com/vsergeev/c-periphery一个用 C 语言编写的硬件外设访问库。 我们可以用它来读写 Serial、SPI、I2C 等&#xff0c;非常适合在嵌入式产品上使用。 我们可以基于它优秀的代码框架&#xff0c;不断地扩展出更…

51单片机定时器和中断(03)

eg1&#xff1a;数码管如何显示出字符 51单片机40个引脚的功能需要记住** RXD&#xff1a;表示的是串行输入口INT0&#xff1a;外部中断0INT1&#xff1a;外部中断1TO : 外部中断0T1 &#xff1a;外部中断1WR: 外部输入存储器写RD: 外部输出存储器读XTK2/XTL1 单片机晶振的输…

1.3 矩阵

一、向量与矩阵 下面是三个向量 u \boldsymbol u u、 v \boldsymbol v v、 w \boldsymbol w w&#xff1a; u [ 1 − 1 0 ] v [ 0 1 − 1 ] w [ 0 0 1 ] \boldsymbol u\begin{bmatrix}\,\,\,\,1\\-1\\\,\,\,\,0\end{bmatrix}\kern 10pt\boldsymbol v\begin{bmatrix}\,\,\,…

你还不会DeBug?太low了吧

编程时调试是不可缺少的&#xff0c;Unity中用于调试的方法均在Debug类中。 浅试一下 新建一个物体和脚本&#xff0c;并把脚本挂载到物体上&#xff01; using System.Collections; using System.Collections.Generic; using UnityEngine;public class DeBugTest : MonoBeh…

【Qt控件之QTabWidget】介绍及使用

描述 QTabWidget类提供了一个带有选项卡的小部件堆栈。 选项卡小部件提供了一个选项卡栏&#xff08;参见QTabBar&#xff09;和一个“页面区域”&#xff0c;用于显示与每个选项卡相关联的页面。默认情况下&#xff0c;选项卡栏显示在页面区域的上方&#xff0c;但可以使用…

【斗破年番】再遭群嘲,美杜莎怀孕之事被魔改,三方联手除萧潇?

【侵权联系删除】【文/郑尔巴金】 斗破苍穹年番第67集已经更新了。和很多人一样&#xff0c;小郑也去看了&#xff0c;只是小郑万万没有想到&#xff0c;我满怀期待的去看这一集&#xff0c;这一集却能魔改成这样。魔改成什么样了呢&#xff1f;下面来分析下吧&#xff01; 一&…

CSS 效果 圆形里一个文字居中

效果实现源码&#xff1a; 宽度&#xff0c;高度必须确认&#xff0c;且相等 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>Title</title><style>.circlew {width: 45px;height: 45p…

2023年淘宝双十一预售红包入口介绍

2023年淘宝双十一预售红包入口介绍 近两年&#xff0c;淘宝双十一推出了预售玩法会场。在会场中&#xff0c;大家可以做预售任务&#xff0c;领取现金红包&#xff0c;让购物变得更省。那么&#xff0c;2023年淘宝双十一预售红包入口在哪里?下面小编就给大家介绍下&#xff0c…

ssm+vue的软考系统(有报告)。Javaee项目,ssm vue前后端分离项目。

演示视频&#xff1a; ssmvue的软考系统&#xff08;有报告&#xff09;。Javaee项目&#xff0c;ssm vue前后端分离项目。 项目介绍&#xff1a; 采用M&#xff08;model&#xff09;V&#xff08;view&#xff09;C&#xff08;controller&#xff09;三层体系结构&#xff…

ModuleNotFoundError: No module named ‘torch‘

目录 情况1,真的没有安装pytorch情况2(安装了与CUDA不对应的pytorch版本导致无法识别出torch) 情况1,真的没有安装pytorch 虚拟环境里面真的是没有torch,这种情况就easy job了,点击此链接直接安装与CUDA对应的pytorch版本,CTRLF直接搜索对应CUDA版本即可查找到对应的命令.按图…

23 种设计模式详解(C#案例)

&#x1f680;设计模式简介 设计模式&#xff08;Design pattern&#xff09;代表了最佳的实践&#xff0c;通常被有经验的面向对象的软件开发人员所采用。设计模式是软件开发人员在软件开发过程中面临的一般问题的解决方案。这些解决方案是众多软件开发人员经过相当长的一段时…