rabbitmq五种模式的总结——附java-se实现(详细)

rabbitmq五种模式的总结

完整项目地址:https://github.com/9lucifer/rabbitmq4j-learning

在这里插入图片描述

一、简单模式

(一)简单模式概述

RabbitMQ 的简单模式是最基础的消息队列模式,包含以下两个角色:

  1. 生产者:负责发送消息到队列。
  2. 消费者:负责从队列中接收并处理消息。

在简单模式中,消息的传递是单向的,生产者将消息发送到队列,消费者从队列中接收消息。

image-20250216063036914


(二)生产者代码解析

代码

生产者负责创建消息并将其发送到指定的队列中。

package top.miqiu._01_hello;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost("ip(要换成真实的ip哦)");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");// 3. 创建连接对象Connection connection = connectionFactory.newConnection();// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明队列/*** 参数说明:* 1. 队列名称:01-hello2* 2. 是否持久化:true(重启后队列仍然存在)* 3. 是否独占队列:false(允许多个消费者连接)* 4. 是否自动删除:false(队列不会自动删除)* 5. 额外参数:null*/channel.queueDeclare("01-hello2", true, false, false, null);// 6. 发送消息/*** 参数说明:* 1. 交换机名称:空字符串(使用默认交换机)* 2. 路由键:队列名称(01-hello2)* 3. 额外属性:null* 4. 消息内容:字节数组*/channel.basicPublish("", "01-hello2", null, "hello rabbitmq2".getBytes());System.out.println("消息发送成功");// 7. 关闭资源channel.close();connection.close();}
}
结果

image-20250216063335314


(三)消费者代码解析

代码

消费者负责从队列中接收并处理消息。

package top.miqiu._01_hello_c;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost("ip(要换成真实的ip哦");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");// 3. 创建连接对象Connection connection = connectionFactory.newConnection();// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明队列(需与生产者保持一致)channel.queueDeclare("01-hello2", false, false, false, null);// 6. 接收消息/*** 参数说明:* 1. 队列名称:01-hello2* 2. 是否自动确认:true(消息被消费后自动确认)* 3. 消息处理回调:DeliverCallback* 4. 消息取消回调:CancelCallback*/channel.basicConsume("01-hello2", true, new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery delivery) throws IOException {System.out.println("接收到消息:" + new String(delivery.getBody()));}}, new CancelCallback() {@Overridepublic void handle(String consumerTag) throws IOException {System.out.println("消息被取消");}});}
}
结果

image-20250216063418597

在mq中查看

image-20250216063443454


(四)总结

  1. 简单模式:适用于一对一的简单消息传递场景。
  2. 生产者:负责创建队列并发送消息。
  3. 消费者:负责从队列中接收并处理消息。
  4. 注意事项
    • 队列名称需保持一致,不然一定会报错!
    • 消息确认机制需根据业务需求选择自动或手动确认。
    • 使用完资源后需显式关闭 ChannelConnection

二、工作模式

(一)工作模式概述

工作模式是 RabbitMQ 的一种常见模式,用于将任务分发给多个消费者。它的特点是:

  1. 一个生产者:负责发送消息到队列。
  2. 多个消费者:共同消费同一个队列中的消息。
  3. 消息分发机制:默认情况下,RabbitMQ 会以轮询(Round-Robin)的方式将消息分发给消费者。

工作模式适用于任务分发场景,例如将耗时的任务分发给多个 Worker 处理。

image-20250216065036476


(二)生产者代码解析

生产者负责创建消息并将其发送到指定的队列中。

package top.miqiu._02_work;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost("你的ip!别忘了改");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");// 3. 创建连接对象Connection connection = connectionFactory.newConnection();// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明队列/*** 参数说明:* 1. 队列名称:02-work1* 2. 是否持久化:true(重启后队列仍然存在)* 3. 是否独占队列:false(允许多个消费者连接)* 4. 是否自动删除:false(队列不会自动删除)* 5. 额外参数:null*/channel.queueDeclare("02-work1", true, false, false, null);// 6. 发送消息for (int i = 0; i < 20; i++) {String message = "hello work:" + i;channel.basicPublish("", "02-work1", null, message.getBytes());}System.out.println("消息发送成功");// 7. 关闭资源channel.close();connection.close();}
}
关键点:
  1. 队列声明(queueDeclare):创建队列并设置队列属性。
  2. 消息发送(basicPublish):通过循环发送多条消息到队列。
  3. 持久化队列:设置为 true,确保队列在 RabbitMQ 重启后仍然存在。

(三)消费者代码解析

代码

消费者负责从队列中接收并处理消息。

package top.miqiu._02_work;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost("你的ip!别忘了改");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");// 3. 创建连接对象Connection connection = connectionFactory.newConnection();// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明队列(需与生产者保持一致)channel.queueDeclare("02-work1", true, false, false, null);// 6. 设置每次只接收一条消息channel.basicQos(1);// 7. 接收消息/*** 参数说明:* 1. 队列名称:02-work1* 2. 是否自动确认:false(手动确认消息)* 3. 消息处理回调:DeliverCallback* 4. 消息取消回调:CancelCallback*/channel.basicConsume("02-work1", false, new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery delivery) throws IOException {try {// 模拟消息处理耗时Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消费者1 接收到消息:" + new String(delivery.getBody()));// 手动确认消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}, new CancelCallback() {@Overridepublic void handle(String consumerTag) throws IOException {System.out.println("消息被取消");}});}
}
关键点:
  1. 队列声明(queueDeclare):确保队列存在,需与生产者保持一致。
  2. 消息预取(basicQos):设置每次只接收一条消息,避免某个消费者处理过多消息。
  3. 手动确认(basicAck):消息处理完成后手动确认,确保消息不会丢失。
  4. 消息处理耗时:通过 Thread.sleep(1000) 模拟消息处理耗时。
效果

image-20250216065155794

image-20250216065214992


(四)工作模式的特点

  1. 消息分发机制
    • 默认情况下,RabbitMQ 会以轮询的方式将消息分发给多个消费者。
    • 可以通过 basicQos 设置每次只接收一条消息,避免某个消费者处理过多消息。
  2. 消息确认机制
    • 设置为手动确认(autoAck=false),确保消息处理完成后才确认。(防止业务处理失败的情况下丢失消息)
    • 如果消费者在处理消息时崩溃,未确认的消息会重新分发给其他消费者。
  3. 适用场景
    • 任务分发场景,例如将耗时的任务分发给多个 Worker 处理。

(五)总结

  1. 工作模式:适用于任务分发场景,多个消费者共同消费同一个队列中的消息。
  2. 生产者:负责发送消息到队列。
  3. 消费者:负责接收并处理消息,支持手动确认和消息预取。
  4. 注意事项
    • 队列名称需保持一致。
    • 消息确认机制需根据业务需求选择自动或手动确认。
    • 使用 basicQos 控制消息分发,避免某个消费者处理过多消息。

三、发布订阅模式

(一)发布订阅模式概述

发布订阅模式(Publish/Subscribe Mode)是 RabbitMQ 的一种模式,用于将消息广播给多个消费者。它的特点是:

  1. 一个生产者:将消息发送到交换机(Exchange)。
  2. 多个消费者:每个消费者都有自己的队列,并与交换机绑定。
  3. 消息广播:交换机将消息广播给所有绑定的队列。

发布订阅模式适用于消息广播场景,例如日志系统、通知系统等。

image-20250216071658856


(二)生产者代码解析

生产者负责创建消息并将其发送到指定的交换机中。

package top.miqiu._03_pubsub;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost("用自己的ip!!");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");// 3. 创建连接对象Connection connection = connectionFactory.newConnection();// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明交换机/*** 参数说明:* 1. 交换机名称:03-pubsub* 2. 交换机类型:fanout(广播模式)*/channel.exchangeDeclare("03-pubsub", "fanout");// 6. 发送消息for (int i = 0; i < 20; i++) {String message = "hello work:" + i;/*** 参数说明:* 1. 交换机名称:03-pubsub* 2. 路由键:空字符串(fanout 模式忽略路由键)* 3. 消息属性:MessageProperties.TEXT_PLAIN* 4. 消息内容:字节数组*/channel.basicPublish("03-pubsub", "", MessageProperties.TEXT_PLAIN, message.getBytes());}System.out.println("消息发送成功");// 7. 关闭资源channel.close();connection.close();}
}
关键点:
  1. 交换机声明(exchangeDeclare):创建交换机并设置类型为 fanout(广播模式)。
  2. 消息发送(basicPublish):将消息发送到交换机,路由键为空字符串(fanout 模式忽略路由键)。
  3. 消息广播:消息会被广播到所有绑定到该交换机的队列。

(三)消费者代码解析

代码

消费者负责从队列中接收并处理消息。

package top.miqiu._03_pubsub;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost("用自己的ip!!");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");// 3. 创建连接对象Connection connection = connectionFactory.newConnection();// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明交换机channel.exchangeDeclare("03-pubsub", "fanout");// 6. 创建临时队列String queue = channel.queueDeclare().getQueue();// 7. 绑定队列到交换机/*** 参数说明:* 1. 队列名称:queue* 2. 交换机名称:03-pubsub* 3. 路由键:空字符串(fanout 模式忽略路由键)*/channel.queueBind(queue, "03-pubsub", "");// 8. 接收消息/*** 参数说明:* 1. 队列名称:queue* 2. 是否自动确认:true(自动确认消息)* 3. 消息处理回调:DeliverCallback* 4. 消息取消回调:CancelCallback*/channel.basicConsume(queue, true, new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery delivery) throws IOException {try {// 模拟消息处理耗时Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消费者2 接收到消息:" + new String(delivery.getBody()));}}, new CancelCallback() {@Overridepublic void handle(String consumerTag) throws IOException {System.out.println("消息被取消");}});}
}
关键点:
  1. 交换机声明(exchangeDeclare):确保交换机存在,需与生产者保持一致。
  2. 临时队列(queueDeclare):创建一个临时队列,队列名称由 RabbitMQ 自动生成。
  3. 队列绑定(queueBind):将队列绑定到交换机,路由键为空字符串(fanout 模式忽略路由键)。
  4. 消息接收(basicConsume):从队列中接收消息并处理。
结果

image-20250216071734904

image-20250216071749761

可以看到两个consumer都消费了相同的消息


(四)发布订阅模式的特点

  1. 消息广播:交换机将消息广播给所有绑定的队列。
  2. 临时队列:消费者可以创建临时队列,队列名称由 RabbitMQ 自动生成。
  3. 适用场景
    • 日志系统:将日志消息广播给多个消费者。
    • 通知系统:将通知消息广播给多个用户。

(五)总结

  1. 发布订阅模式:适用于消息广播场景,多个消费者各自接收相同的消息。
  2. 生产者:负责将消息发送到交换机。
  3. 消费者:负责创建队列并绑定到交换机,接收并处理消息。
  4. 注意事项
    • 交换机类型需设置为 fanout
    • 队列绑定到交换机时,路由键为空字符串。
    • 临时队列的名称由 RabbitMQ 自动生成。

(六)RabbitMQ 交换机类型总结

交换机类型描述路由行为适用场景
Fanout广播模式,将消息发送到所有绑定到该交换机的队列。忽略路由键(Routing Key),消息会被广播到所有绑定的队列。日志系统、通知系统等需要广播消息的场景。
Direct直接模式,根据路由键将消息发送到匹配的队列。消息的路由键必须与队列绑定的路由键完全匹配。任务分发、点对点通信等需要精确路由的场景。
Topic主题模式,根据路由键的模式匹配将消息发送到符合条件的队列。支持通配符匹配:* 匹配一个单词,# 匹配零个或多个单词。消息分类、多条件路由等需要灵活匹配的场景。
Headers头部模式,根据消息的头部属性(Headers)进行匹配。不依赖路由键,而是通过消息的头部属性匹配队列绑定的条件。复杂的路由逻辑,例如根据消息的元数据进行路由。

详细说明

1. Fanout 交换机(广播,常用)
  • 特点
    • 消息会被广播到所有绑定到该交换机的队列。
    • 忽略路由键(Routing Key)。
  • 适用场景
    • 日志系统:将日志消息广播给多个消费者。
    • 通知系统:将通知消息广播给多个用户。
2. Direct 交换机
  • 特点
    • 消息的路由键必须与队列绑定的路由键完全匹配。
    • 支持一对一或一对多的精确路由。
  • 适用场景
    • 任务分发:将特定任务路由到特定的 Worker。
    • 点对点通信:将消息发送到特定的接收者。
3. Topic 交换机
  • 特点
    • 支持通配符匹配:
      • * 匹配一个单词。
      • # 匹配零个或多个单词。
    • 路由键的格式通常是点分字符串(如 user.create)。
  • 适用场景
    • 消息分类:根据消息的主题进行路由。
    • 多条件路由:支持灵活的路由规则。
4. Headers 交换机
  • 特点
    • 不依赖路由键,而是通过消息的头部属性(Headers)进行匹配。
    • 支持复杂的匹配规则(如 x-match 参数)。
  • 适用场景
    • 复杂的路由逻辑:根据消息的元数据进行路由。
    • 需要高度灵活性的场景。

对比
场景FanoutDirectTopicHeaders
日志广播所有消费者接收相同的日志消息。不适用。不适用。不适用。
任务分发不适用。将任务路由到特定的 Worker。将任务分类路由到不同的 Worker。根据任务的元数据进行路由。
通知系统所有用户接收相同的通知。特定用户接收特定通知。根据通知类型路由到不同用户。根据通知的元数据进行路由。
消息分类不适用。不适用。根据消息主题进行路由。根据消息的头部属性进行路由。

总结
  • Fanout:适用于广播场景。
  • Direct:适用于精确路由场景。
  • Topic:适用于灵活的路由场景。
  • Headers:适用于复杂的路由逻辑。

四、路由模式

(一)路由模式概述

路由模式是 RabbitMQ 的一种模式,使用 Direct 交换机 根据消息的 路由键(Routing Key) 将消息发送到匹配的队列。它的特点是:

  1. 一个生产者:将消息发送到 Direct 交换机,并指定路由键。
  2. 多个消费者:每个消费者可以绑定一个或多个路由键,只有匹配的路由键的消息才会被接收。
  3. 精确路由:消息的路由键必须与队列绑定的路由键完全匹配。

路由模式适用于需要根据特定条件精确路由消息的场景,例如日志级别分类、任务分发等。

image-20250216073521308


(二)生产者代码解析

生产者负责创建消息并将其发送到 Direct 交换机,同时指定路由键。

package top.miqiu._04_routing;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost("你的ip!!!");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");// 3. 创建连接对象Connection connection = connectionFactory.newConnection();// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明 Direct 交换机/*** 参数说明:* 1. 交换机名称:04-routing* 2. 交换机类型:direct*/channel.exchangeDeclare("04-routing", "direct");// 6. 发送消息for (int i = 0; i < 20; i++) {String message = "hello work:" + i;/*** 参数说明:* 1. 交换机名称:04-routing* 2. 路由键:err(消息将发送到绑定 err 路由键的队列)* 3. 消息属性:MessageProperties.TEXT_PLAIN* 4. 消息内容:字节数组*/channel.basicPublish("04-routing", "err", MessageProperties.TEXT_PLAIN, message.getBytes());}System.out.println("消息发送成功");// 7. 关闭资源channel.close();connection.close();}
}
关键点:
  1. 交换机声明(exchangeDeclare):创建 Direct 交换机,类型为 direct
  2. 消息发送(basicPublish):指定路由键(如 err),消息会被发送到绑定该路由键的队列。
  3. 路由键匹配:只有队列绑定的路由键与消息的路由键完全匹配时,消息才会被路由到该队列。

(三)消费者代码解析

代码

消费者负责创建队列并绑定到 Direct 交换机,同时指定路由键。

package top.miqiu._04_routing;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost("你的ip!!!");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");// 3. 创建连接对象Connection connection = connectionFactory.newConnection();// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明 Direct 交换机channel.exchangeDeclare("04-routing", "direct");// 6. 创建临时队列String queue = channel.queueDeclare().getQueue();// 7. 绑定队列到交换机,并指定路由键/*** 参数说明:* 1. 队列名称:queue* 2. 交换机名称:04-routing* 3. 路由键:info、err、waring*/channel.queueBind(queue, "04-routing", "info");channel.queueBind(queue, "04-routing", "err");channel.queueBind(queue, "04-routing", "waring");// 8. 接收消息/*** 参数说明:* 1. 队列名称:queue* 2. 是否自动确认:true(自动确认消息)* 3. 消息处理回调:DeliverCallback* 4. 消息取消回调:CancelCallback*/channel.basicConsume(queue, true, new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery delivery) throws IOException {try {// 模拟消息处理耗时Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消费者1 接收到消息:" + new String(delivery.getBody()));}}, new CancelCallback() {@Overridepublic void handle(String consumerTag) throws IOException {System.out.println("消息被取消");}});}
}
关键点:
  1. 交换机声明(exchangeDeclare):确保 Direct 交换机存在,需与生产者保持一致。
  2. 临时队列(queueDeclare):创建一个临时队列,队列名称由 RabbitMQ 自动生成。
  3. 队列绑定(queueBind):将队列绑定到交换机,并指定路由键(如 infoerrwaring)。
  4. 消息接收(basicConsume):从队列中接收消息并处理。
效果

consumer1绑定了[info,err,waring],所以在producer绑定了info时发送消息的情况下,consumer1可以接收到信息

image-20250216073403669

由于consumer2绑定的是trace,所以consumer2是接收不到消息的

image-20250216073447112


(四)路由模式的特点

  1. 精确路由:消息的路由键必须与队列绑定的路由键完全匹配。
  2. 多路由键支持:一个队列可以绑定多个路由键,接收多种类型的消息。
  3. 适用场景
    • 日志级别分类:将不同级别的日志(如 infoerr)路由到不同的队列。
    • 任务分发:将特定任务路由到特定的 Worker。

(五)总结

  1. 路由模式:适用于需要根据路由键精确路由消息的场景。
  2. 生产者:负责将消息发送到 Direct 交换机,并指定路由键。
  3. 消费者:负责创建队列并绑定到 Direct 交换机,同时指定路由键。
  4. 注意事项
    • 路由键必须完全匹配。
    • 一个队列可以绑定多个路由键,接收多种类型的消息。

五、Topic 模式

(一)Topic 模式概述

Topic 模式是 RabbitMQ 的一种模式,使用 Topic 交换机 根据消息的 路由键(Routing Key) 进行模式匹配,将消息发送到符合条件的队列。它的特点是:

  1. 一个生产者:将消息发送到 Topic 交换机,并指定路由键。
  2. 多个消费者:每个消费者可以绑定一个或多个路由键模式,只有匹配的路由键的消息才会被接收。
  3. 灵活的路由:支持通配符匹配:
    • * 匹配一个单词。
    • # 匹配零个或多个单词。

Topic 模式适用于需要根据复杂条件灵活路由消息的场景,例如消息分类、多条件路由等。

image-20250216075115591


(二)生产者代码解析

代码

生产者负责创建消息并将其发送到 Topic 交换机,同时指定路由键。

package top.miqiu._05_topic;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost("用自己的ip!!");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");// 3. 创建连接对象Connection connection = connectionFactory.newConnection();// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明 Topic 交换机/*** 参数说明:* 1. 交换机名称:05-topic* 2. 交换机类型:topic*/channel.exchangeDeclare("05-topic", "topic");// 6. 发送消息for (int i = 0; i < 20; i++) {String message = "hello work:" + i;/*** 参数说明:* 1. 交换机名称:05-topic* 2. 路由键:user.hi(消息将发送到匹配 user.* 或 user.# 的队列)* 3. 消息属性:MessageProperties.TEXT_PLAIN* 4. 消息内容:字节数组*/channel.basicPublish("05-topic", "user.hi", MessageProperties.TEXT_PLAIN, message.getBytes());}System.out.println("消息发送成功");// 7. 关闭资源channel.close();connection.close();}
}
关键点:
  1. 交换机声明(exchangeDeclare):创建 Topic 交换机,类型为 topic
  2. 消息发送(basicPublish):指定路由键(如 user.hi),消息会被发送到匹配的队列。
  3. 通配符匹配
    • * 匹配一个单词(如 user.* 匹配 user.hi,但不匹配 user.hi.there)。
    • # 匹配零个或多个单词(如 user.# 匹配 user.hiuser.hi.there)。

(三)消费者代码解析

代码

消费者负责创建队列并绑定到 Topic 交换机,同时指定路由键模式。

package top.miqiu._05_topic;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost("用自己的ip!!");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");// 3. 创建连接对象Connection connection = connectionFactory.newConnection();// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明 Topic 交换机channel.exchangeDeclare("05-topic", "topic");// 6. 创建临时队列String queue = channel.queueDeclare().getQueue();// 7. 绑定队列到交换机,并指定路由键模式/*** 参数说明:* 1. 队列名称:queue* 2. 交换机名称:05-topic* 3. 路由键模式:user.*(匹配 user.hi、user.hello 等)*/channel.queueBind(queue, "05-topic", "user.*");// 8. 接收消息/*** 参数说明:* 1. 队列名称:queue* 2. 是否自动确认:true(自动确认消息)* 3. 消息处理回调:DeliverCallback* 4. 消息取消回调:CancelCallback*/channel.basicConsume(queue, true, new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery delivery) throws IOException {try {// 模拟消息处理耗时Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消费者2 user.* 接收到消息:" + new String(delivery.getBody()));}}, new CancelCallback() {@Overridepublic void handle(String consumerTag) throws IOException {System.out.println("消息被取消");}});}
}
关键点:
  1. 交换机声明(exchangeDeclare):确保 Topic 交换机存在,需与生产者保持一致。
  2. 临时队列(queueDeclare):创建一个临时队列,队列名称由 RabbitMQ 自动生成。
  3. 队列绑定(queueBind):将队列绑定到交换机,并指定路由键模式(如 user.*)。
  4. 消息接收(basicConsume):从队列中接收消息并处理。
效果

当我在producer使用“employee.hi”作为路由key的时候,绑定了“employee.*”的consumer1可以消费这个消息

image-20250216075306371


(四)Topic 模式的特点

  1. 灵活的路由:支持通配符匹配,可以根据复杂的条件路由消息。
  2. 多路由键支持:一个队列可以绑定多个路由键模式,接收多种类型的消息。
  3. 适用场景
    • 消息分类:根据消息的主题进行路由。
    • 多条件路由:支持灵活的路由规则。

(五)总结

  1. Topic 模式:适用于需要根据复杂条件灵活路由消息的场景。
  2. 生产者:负责将消息发送到 Topic 交换机,并指定路由键。
  3. 消费者:负责创建队列并绑定到 Topic 交换机,同时指定路由键模式。
  4. 注意事项
    • 路由键模式支持通配符 *#
    • 一个队列可以绑定多个路由键模式,接收多种类型的消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/19161.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LangChain大模型应用开发:提示词工程应用与实践

介绍 大家好&#xff0c;博主又来给大家分享知识了。今天给大家分享的内容是LangChain提示词工程应用与实践。 在如今火热的大语言模型应用领域里&#xff0c;LangChain可是一个相当强大且实用的工具。而其中的提示词(Prompt)&#xff0c;更是我们与语言模型进行有效沟通的关…

4.buuctf [SWPU2019]Web1及知识点

进入题目页面如下 猜测是二次注入 先注册一个账号 再登录&#xff0c;页面如下 点击申请发布广告 页面如上&#xff0c;存在注入点&#xff0c;尝试 判读是整数型注入还是字符型注入 猜解字段数&#xff0c;尝试发现or,#,空格等被过滤了&#xff0c;只能一个一个试 使用联合…

Lua笔记

Lua语法 --注释 #字符串长度、table从1开始连续元素的长度 ..字符串拼接 逻辑运算符 and or not 条件语句 if xxx then elseif yyy then else end 循环语句 for i1,xxx do end xLua AppDomain does not contain a definition for DefineDynamicAssembly&#xff…

开业盛典活动策划方案拆解

道叔来给大家详细剖析咱们方案库里刚收录的这份《蜀大侠火锅店武侠风开业盛典活动策划方案》了&#xff0c;保证让你看完直呼过瘾&#xff0c;收获满满&#xff01; 一、主题创意&#xff1a;武侠风&#xff0c;直击人心 首先&#xff0c;咱们得夸一下这活动的主题——“XXX‘…

三、Unity基础(主要框架)

一、Unity场景概念 如果把游戏运行过程理解成表演&#xff0c;那么场景就是舞台&#xff1b; 场景本质上是一个配置文件&#xff0c;这个配置文件决定了场景中有哪些东西&#xff1b; 二、Scene和Game窗口 1、Scene 滚轮缩放、拖动 单独选中也可以 最下面这个是全能工具…

微软官方出品GPT大模型编排工具:7个开源项目

今天一起盘点下&#xff0c;12月份推荐的7个.Net开源项目&#xff08;点击标题查看详情&#xff09;。 1、一个浏览器自动化操作的.Net开源库 这是一个基于 Google 开源的 Node.js 库 Puppeteer 的 .NET 开源库&#xff0c;方便开发人员使用无头 Web 浏览器抓取 Web、检索 Ja…

C++笔记之类型大小、变量大小,vector与string在栈上内存、堆上内存和总内存的关系

C++笔记之类型大小、变量大小,vector与string在栈上内存、堆上内存和总内存的关系 code review! 文章目录 C++笔记之类型大小、变量大小,vector与string在栈上内存、堆上内存和总内存的关系1.`std::vector<float>` 的内存占用2.`std::vector<float>` 的 `capaci…

华为昇腾920b服务器部署DeepSeek翻车现场

最近到祸一台HUAWEI Kunpeng 920 5250&#xff0c;先看看配置。之前是部署的讯飞大模型&#xff0c;发现资源利用率太低了。把5台减少到3台&#xff0c;就出了他 硬件配置信息 基本硬件信息 按照惯例先来看看配置。一共3块盘&#xff0c;500G的系统盘&#xff0c; 2块3T固态…

【工具变量】ZF引导基金合集(1900-2024年)

政府引导基金是以股权或债权等方式投资于创业风险投资机构或新设的创业风险投资基金&#xff0c;主要用于支持创业企业的发展。根据不同类型的基金&#xff0c;基金出资结构有所不同&#xff0c;可能由政府全额或部分出资&#xff0c;并吸引社会资本和金融机构的参与。 一、政府…

【Java 面试 八股文】常见集合篇

常见集合篇 1. 常见集合有哪些2. ArrayList底层实现的原理是什么&#xff1f;3. ArrayList listnew ArrayList(10)中的list扩容几次4. 如何实现数组和List之间的转换5. ArrayList和LinkedList的区别是什么&#xff1f;6. 说一下HashMap的实现原理&#xff1f;7. HashMap的jdk1.…

使用 DeepSeek 生成商城流程图

步骤 1.下载 mermaid 2.使用 DeepSeek 生成 mermaid 格式 3.复制内容到 4.保存备用。 结束。

STM32 Flash详解教程文章

目录 Flash基本概念理解 Flash编程接口FPEC Flash擦除/写入流程图 Flash选项字节基本概念理解 Flash电子签名 函数读取地址下存放的数据 Flash的数据处理限制部分 编写不易&#xff0c;请勿搬运&#xff0c;感谢理解&#xff01;&#xff01;&#xff01; Flash基本概念…

高精度 A+B Problem

题目描述 高精度加法&#xff0c;相当于 ab problem&#xff0c;不用考虑负数。 输入格式 分两行输入。a,b ≤ 。 输出格式 输出只有一行&#xff0c;代表 ab 的值。 输入输出样例 输入 #1 1 1 输出 #1 2 输入 #2 1001 9099 输出 #2 10100 #include<iostream…

spring boot单元测试

在pom文件中添加测试依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency> 复制粘贴自动创建的单元测试类 文件名改为HelloCo…

A Unified Model for Multi-class Anomaly Detection

多类别异常检测的统一模型 文章链接&#xff1a;点这里 源码链接&#xff1a;点这里 研究目的 1.解决多类别异常检测的挑战 现有的异常检测方法通常需要为每个类别单独训练模型&#xff0c;如Figure1图(c)所示&#xff0c;这种方法在类别数量增加时会消耗大量资源&#xff…

封装neo4j的持久层和服务层

目录 持久层 mp 模仿&#xff1a; 1.抽取出通用的接口类 2.创建自定义的repository接口 服务层 mp 模仿&#xff1a; 1.抽取出一个IService通用服务类 2.创建ServiceImpl类实现IService接口 3.自定义的服务接口 4.创建自定义的服务类 工厂模式 为什么可以使用工厂…

2024各地低空经济政策汇编资料

互联网各领域资料分享专区(不定期更新)&#xff1a; Sheet 前言 由于内容较多&#xff0c;且不便于排版&#xff0c;为避免资源失效&#xff0c;请用手机点击链接进行保存&#xff0c;若链接生效请及时反馈&#xff0c;谢谢~ 正文 链接如下&#xff08;为避免资源失效&#x…

基于JAVA的幼儿园管理系统的设计与实现源码(springboot+vue+mysql)

项目简介 幼儿园管理系统实现了以下功能&#xff1a; 基于JAVA的幼儿园管理系统的设计与实现的主要使用者管理员可以管理系统基本信息&#xff1b;管理轮播图、系统简介、教师管理、课程管理、幼儿活动管理、餐饮管理、留言管理等功能&#xff1b;前台用户注册登录&#xff0…

智能车摄像头开源—8 元素处理

目录 一、前言 二、无元素状态 三、直线与弯道 四、十字与环岛 1、十字识别处理 2、环岛识别处理 五、坡道 六、障碍物 七、斑马线 八、入库 九、出界停车 一、前言 在写这篇文章之前&#xff0c;考虑了很久到底该写到什么程度&#xff0c;但思来想去&#xff0c;不同…

LC-随机链表的复制、排序链表、合并K个升序链表、LRU缓存

随机链表的复制 为了在 O(n) 时间复杂度内解决这个问题&#xff0c;并且使用 O(1) 的额外空间&#xff0c;可以利用以下技巧&#xff1a; 将新节点插入到原节点后面&#xff1a;我们可以将复制节点插入到原节点后面。例如&#xff0c;如果链表是 A -> B -> C&#xff0c…