RabbitMQ学习总结

目录

一:介绍        

二:应用场景

三:工作原理

组成部分说明

消息发布接收流程

四:下载安装

五:环境搭建

创建Maven工程

生产者

消费者 

 六:工作模式

Work queues

Publish/subscribe

生产者

消费者 

Routing 

生产者

​消费者

思考

 Topics

生产者

匹配规则 

Header模式

生产者

RPC

说明

七:SpringBoot整合RabbitMQ



一:介绍        

MQ全称为Message Queue,即消息队列, RabbitMQ是由erlang语言开发.

RabbitMQ官方地址:http://www.rabbitmq.com/

二:应用场景

1)任务异步处理

2)应用程序解耦合

三:工作原理

组成部分说明

 Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue。

 Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。

 Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的消费方。

 Producer:消息生产者,即生产方客户端,生产方客户端将消息发送到MQ。

 Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。

消息发布接收流程

 发送消息:

 1、生产者和Broker建立TCP连接。
 2、生产者和Broker建立通道。
 3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。
 4、Exchange将消息转发到指定的Queue(队列)

 接收消息:

 1、消费者和Broker建立TCP连接
 2、消费者和Broker建立通道
 3、消费者监听指定的Queue(队列)
 4、当有消息到达Queue时Broker默认将消息推送给消费者。
 5、消费者接收到消息

四:下载安装

 RabbitMQ的下载地址:http://www.rabbitmq.com/download.html

 本项目使用Erlang/OTP 20.3版本和RabbitMQ3.7.3版本。

 下载erlang: http://erlang.org/download/otp_win64_20.3.exe

  erlang安装完成需要配置erlang环境变量: ERLANG_HOME=D:\Program Files\erl9.3 在path中 添加%ERLANG_HOME%\bin;

安装erlang直接下一步就好

 安装RabbitMQ: Release RabbitMQ 3.7.3 · rabbitmq/rabbitmq-server · GitHub

安装完开始菜单会有显示:

RabbitMQ Service-install :安装服务
RabbitMQ Service-remove 删除服务
RabbitMQ Service-start 启动
RabbitMQ Service-stop 启动

如果没有开始菜单则进入安装目录下sbin目录手动启动:

1)安装并运行服务
rabbitmq-service.bat install 安装服务 rabbitmq-service.bat stop 停止服务 rabbitmq-service.bat start 启动服务
2)安装管理插件
安装rabbitMQ的管理插件,方便在浏览器端管理RabbitMQ
管理员身份运行 rabbitmq-plugins.bat enable rabbitmq_management

3、启动成功 登录RabbitMQ
进入浏览器,输入:http://localhost:15672

初始账号和密码:guest/guest

注:每个虚拟机就相当于一个独立的MQ,默认虚拟机的名字为/

4、如果启动失败找到.erlang.cookie,位于C:\windows\system32\config\systemprofile下,将此处的.erlang.cookie覆盖C:\user\admin.erlang.cookie后重启RabbitMQ即可解决问题

注意事项

1、安装erlang和rabbitMQ以管理员身份运行。
2、当卸载重新安装时会出现RabbitMQ服务注册失败,此时需要进入注册表清理erlang搜索RabbitMQ、ErlSrv,将对应的项全部删除。

五:环境搭建

创建Maven工程

创建生产者工程和消费者工程,分别加入RabbitMQ java client的依赖。
test-rabbitmq-producer:生产者工程
test-rabbitmq-consumer:消费者工程

<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp‐client</artifactId>
<version>4.0.3</version><!‐‐此版本与spring boot 1.5.9版本匹配‐‐>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring‐boot‐starter‐logging</artifactId>
</dependency>

生产者

public class Producer01 {//队列名称private static final String QUEUE = "helloworld";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = null;Channel channel = null;try {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器//创建与RabbitMQ服务的TCP连接connection = factory.newConnection();//创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务channel = connection.createChannel();/*** 声明队列,如果Rabbit中没有此队列将自动创建* param1:队列名称* param2:durable是否持久化,如果持久化,mq重启后队列还在* param3:exclusive队列是否独占此连接,队列只允许在该连接中访问,如果连接关闭队列自动删除,如果将此参数设置为true可用于临时队列的创建* param4:autoDelete队列不再使用时是否自动删除此队列和exclusive搭配使用* param5:队列参数(可以设置队列的扩展参数)*/channel.queueDeclare(QUEUE, true, false, false, null);String message = "helloworld小明" + System.currentTimeMillis();/*** 消息发布方法* param1:Exchange的名称,如果没有指定,则使用Default Exchange* param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列,如果使用默认交换机,routingKey设置为队列的名称* param3:消息包含的属性* param4:消息体*//*** 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显示绑定或解除绑定* 默认的交换机,routingKey等于队列名称*/channel.basicPublish("", QUEUE, null, message.getBytes());System.out.println("Send Message is:'" + message + "'");} catch (Exception ex) {ex.printStackTrace();} finally {if (channel != null) {channel.close();}if (connection != null) {connection.close();}}}
}

此时显示创建了一个队列,有1条消息待消费, 消息总数为1

 

 注:队列中是可以拿到发送的消息的

 注:Purge Message可以清空该队列的消息

 注:Publish message可以在指定的队列中发消息,因此生产者发送消息时要打印发送的body,消费者端做幂等,一旦出现问题可通过控制台重新发送该消息

消费者 

注:两边都要声明队列,防止消费者启动在生产者之前报错

public class Consumer01 {private static final String QUEUE = "helloworld";public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();//设置MabbitMQ所在服务器的ip和端口factory.setHost("127.0.0.1");factory.setPort(5672);Connection connection = factory.newConnection();Channel channel = connection.createChannel();//声明队列channel.queueDeclare(QUEUE, true, false, false, null);//定义消费方法DefaultConsumer consumer = new DefaultConsumer(channel) {/*** 消费者接收消息调用此方法* @param consumerTag 消费者的标签,在channel.basicConsume()去指定* @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)* @param properties* @param body* @throws IOException}*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交换机String exchange = envelope.getExchange();//路由keyString routingKey = envelope.getRoutingKey();//消息idlong deliveryTag = envelope.getDeliveryTag();//消息内容String msg = new String(body);System.out.println("receive message.." + msg);}};/*** 监听队列String queue, boolean autoAck,Consumer callback* 参数明细* 1、队列名称* 2、是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动回复* 3、消费消息的方法,消费者接收到消息后调用此方法*/channel.basicConsume(QUEUE, true, consumer);}
}

 六:工作模式

RabbitMQ有以下几种工作模式 :
1、Work queues
2、Publish/Subscribe
3、Routing
4、Topics
5、Header
6、RPC

Work queues

工作队列模式

work queues与入门程序相比:多了一个消费端,两个消费端共同消费同一个队列中的消息。
应用场景:对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。

测试:
1、使用入门程序,启动多个消费者。
2、生产者发送多个消息。

结果:
1、一条消息只会被一个消费者接收;
2、rabbit采用轮询的方式将消息是平均发送给消费者的;
3、消费者在处理完某条消息后,才会收到下一条消息。

 设置idea同时启用多个客户端

Publish/subscribe

发布订阅模式

发布订阅模式
1、每个消费者监听自己的队列。
2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息 

案例:
用户通知,当用户充值成功或转账完成系统通知用户,通知方式有短信、邮件多种方法 。

生产者

1、声明exchange_fanout_inform交换机。
2、声明两个队列并且绑定到此交换机,绑定时不需要指定routingkey
3、发送消息时不需要指定routingkey

public class Producer02_publish {private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";private static final String QUEUE_INFORM_SMS = "queue_inform_sms";private static final String EXCHANGE_FANOUT_INFORM = "exchange_fanout_inform";public static void main(String[] args) {Connection connection = null;Channel channel = null;try {//创建一个与MQ的连接ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器//创建一个连接connection = factory.newConnection();//创建与交换机的通道,每个通道代表一个会话channel = connection.createChannel();//声明交换机 String exchange, BuiltinExchangeType type/*** 参数明细* 1、交换机名称* 2、交换机类型,fanout、topic、direct、headers*/channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, "fanout");//声明队列// (String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String,Object> arguments)/*** 参数明细:* 1、队列名称* 2、是否持久化* 3、是否独占此队列* 4、队列不用是否自动删除* 5、参数*/channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);//交换机和队列绑定String queue, String exchange, String routingKey/*** 参数明细* 1、队列名称* 2、交换机名称* 3、路由key*/channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_FANOUT_INFORM, "");channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_FANOUT_INFORM, "");//发送消息for (int i = 0; i < 10; i++) {String message = "inform to user" + i;//向交换机发送消息 String exchange, String routingKey, BasicProperties props, byte[] body/*** 参数明细* 1、交换机名称,不指令使用默认交换机名称 Default Exchange* 2、routingKey(路由key),根据key名称将消息转发到具体的队列,这里填写队列名称表示消息将发到此队列* 3、消息属性* 4、消息内容*/channel.basicPublish(EXCHANGE_FANOUT_INFORM, "", null, message.getBytes());System.out.println("Send Message is:'" + message + "'");}} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();} finally {if (channel != null) {try {channel.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}}if (connection != null) {try {connection.close();} catch (IOException e) {e.printStackTrace();}}}}
}

 可以看到新声明的交换机exchange_fanout_inform

 新生成两个队列,每个队列接收到10条消息

消费者 

邮件发送消费者

public class Consumer02_subscribe_email {//队列名称private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";private static final String EXCHANGE_FANOUT_INFORM = "inform_exchange_fanout";public static void main(String[] args) throws IOException, TimeoutException {//创建一个与MQ的连接ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器//创建一个连接Connection connection = factory.newConnection();//创建与交换机的通道,每个通道代表一个会话Channel channel = connection.createChannel();//声明交换机 String exchange, BuiltinExchangeType type/*** 参数明细* 1、交换机名称* 2、交换机类型,fanout、topic、direct、headers*/channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, "fanout");//声明队列// channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)/*** 参数明细:* 1、队列名称* 2、是否持久化* 3、是否独占此队列* 4、队列不用是否自动删除* 5、参数*/channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);//交换机和队列绑定String queue, String exchange, String routingKey/*** 参数明细* 1、队列名称* 2、交换机名称* 3、路由key*/channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_FANOUT_INFORM, "");//定义消费方法DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {long deliveryTag = envelope.getDeliveryTag();String exchange = envelope.getExchange();//消息内容String message = new String(body);System.out.println(message);}};/*** 监听队列String queue, boolean autoAck,Consumer callback* 参数明细* 1、队列名称* 2、是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动回复* 3、消费消息的方法,消费者接收到消息后调用此方法*/channel.basicConsume(QUEUE_INFORM_EMAIL, true, defaultConsumer);}}

可以看到邮件的信息被消费了,短信的还在

可以看到该交换机绑定了两个队列 

Routing 

路由模式
1、每个消费者监听自己的队列,并且设置routingkey。
2、生产者将消息发给交换机,由交换机根据routingkey来转发消息到指定的队列。
 

生产者

1、声明exchange_routing_inform交换机。
2、声明两个队列并且绑定到此交换机,绑定时需要指定routingkey
3、发送消息时需要指定routingkey

public class Producer03_routing {//队列名称private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";private static final String QUEUE_INFORM_SMS = "queue_inform_sms";private static final String EXCHANGE_ROUTING_INFORM = "exchange_routing_inform";public static void main(String[] args) {Connection connection = null;Channel channel = null;try {//创建一个与MQ的连接ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器//创建一个连接connection = factory.newConnection();//创建与交换机的通道,每个通道代表一个会话channel = connection.createChannel();//声明交换机 String exchange, BuiltinExchangeType type/*** 参数明细* 1、交换机名称* 2、交换机类型,fanout、topic、direct、headers*/channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, "direct");//声明队列// channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)/*** 参数明细:* 1、队列名称* 2、是否持久化* 3、是否独占此队列* 4、队列不用是否自动删除* 5、参数*/channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);//交换机和队列绑定String queue, String exchange, String routingKey/*** 参数明细* 1、队列名称* 2、交换机名称* 3、路由key*/channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_EMAIL);channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_SMS);//发送邮件消息for (int i = 0; i < 10; i++) {String message = "email inform to user" + i;//向交换机发送消息 String exchange, String routingKey, BasicProperties props, byte[] body/*** 参数明细* 1、交换机名称,不指令使用默认交换机名称 Default Exchange* 2、routingKey(路由key),根据key名称将消息转发到具体的队列,这里填写队列名称表示消息将发到此队列* 3、消息属性* 4、消息内容*/channel.basicPublish(EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_EMAIL, null, message.getBytes());System.out.println("Send Message is:'" + message + "'");}//发送短信消息for (int i = 0; i < 10; i++) {String message = "sms inform to user" + i;//向交换机发送消息 String exchange, String routingKey, BasicProperties props, byte[] bodychannel.basicPublish(EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_SMS, null, message.getBytes());System.out.println("Send Message is:'" + message + "'");}} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();} finally {if (channel != null) {try {channel.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}}if (connection != null) {try {connection.close();} catch (IOException e) {e.printStackTrace();}}}}
}

可以看到创建了exchange_routing_inform交换机,并绑定了两个队列

可以看到对列绑定了Routing_key

消费者

邮件发送消费者

public class Consumer03_routing_email {//队列名称private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";private static final String EXCHANGE_ROUTING_INFORM = "exchange_routing_inform";public static void main(String[] args) throws IOException, TimeoutException {//创建一个与MQ的连接ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器//创建一个连接Connection connection = factory.newConnection();//创建与交换机的通道,每个通道代表一个会话Channel channel = connection.createChannel();//声明交换机 String exchange, BuiltinExchangeType type/*** 参数明细* 1、交换机名称* 2、交换机类型,fanout、topic、direct、headers*/channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, "direct");//声明队列// channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)/*** 参数明细:* 1、队列名称* 2、是否持久化* 3、是否独占此队列* 4、队列不用是否自动删除* 5、参数*/channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);//交换机和队列绑定String queue, String exchange, String routingKey/*** 参数明细* 1、队列名称* 2、交换机名称* 3、路由key*/channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_EMAIL);//定义消费方法DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {long deliveryTag = envelope.getDeliveryTag();String exchange = envelope.getExchange();//消息内容String message = new String(body);System.out.println(message);}};/*** 监听队列String queue, boolean autoAck,Consumer callback* 参数明细* 1、队列名称* 2、是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动回复* 3、消费消息的方法,消费者接收到消息后调用此方法*/channel.basicConsume(QUEUE_INFORM_EMAIL, true, defaultConsumer);}}

 思考

1、Routing模式和Publish/subscibe有啥区别?
Routing模式要求队列在绑定交换机时要指定routingkey,消息会转发到符合routingkey的队列。

 Topics

路由模式:
1、每个消费者监听自己的队列,并且设置带统配符的routingkey。
2、生产者将消息发给broker,由交换机根据routingkey来转发消息到指定的队列。 

案例:

根据用户的通知设置去通知用户,设置接收Email的用户只接收Email,设置接收sms的用户只接收sms,设置两种通知类型都接收的则两种通知都有效。

生产者

public class Producer04_topics {//队列名称private static final String QUEUE_INFORM_EMAIL = "queue_inform_email_topics";private static final String QUEUE_INFORM_SMS = "queue_inform_sms_topics";private static final String EXCHANGE_TOPICS_INFORM = "exchange_topics_inform";public static void main(String[] args) {Connection connection = null;Channel channel = null;try {//创建一个与MQ的连接ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器//创建一个连接connection = factory.newConnection();//创建与交换机的通道,每个通道代表一个会话channel = connection.createChannel();//声明交换机 String exchange, BuiltinExchangeType type/*** 参数明细* 1、交换机名称* 2、交换机类型,fanout、topic、direct、headers*/channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, "topic");//声明队列/*** 参数明细:* 1、队列名称* 2、是否持久化* 3、是否独占此队列* 4、队列不用是否自动删除* 5、参数*/channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);//绑定email通知队列channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_TOPICS_INFORM,"inform.#.email.#");//绑定sms通知队列channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_TOPICS_INFORM,"inform.#.sms.#");//发送邮件消息for (int i = 0; i < 10; i++) {String message = "email inform to user" + i;//向交换机发送消息 String exchange, String routingKey, BasicProperties props, byte[] body/*** 参数明细* 1、交换机名称,不指令使用默认交换机名称 Default Exchange* 2、routingKey(路由key),根据key名称将消息转发到具体的队列,这里填写队列名称表示消息将发到此队列* 3、消息属性* 4、消息内容*/channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.email", null, message.getBytes());System.out.println("Send Message is:'" + message + "'");}//发送短信消息for (int i = 0; i < 10; i++) {String message = "sms inform to user" + i;channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.sms", null, message.getBytes());System.out.println("Send Message is:'" + message + "'");}//发送短信和邮件消息for (int i = 0; i < 10; i++) {String message = "sms and email inform to user" + i;channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.sms.email", null, message.getBytes());System.out.println("Send Message is:'" + message + "'");}} catch (IOException | TimeoutException e) {e.printStackTrace();} finally {if (channel != null) {try {channel.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}}if (connection != null) {try {connection.close();} catch (IOException e) {e.printStackTrace();}}}}}

可以看到exchange_topics_inform绑定了两个Routing_key,进行通配符匹配

匹配规则 

统配符规则:
中间以“.”分隔。
符号#可以匹配多个词,符号*可以匹配一个词语。

对列绑定的Routing_key为:

inform.#.email.#

inform.#.sms.#

发送消息指定的Routing_key为:

inform.email 匹配 inform.#.email.#

inform.sms 匹配 inform.#.sms.#

inform.sms.email 匹配 inform.#.email.# 和 inform.#.sms.# 两个队列都能发送

(即使什么都没有,#也可以匹配个空)

Header模式

header模式与routing不同的地方在于,header模式取消routingkey,使用header中的 key/value(键值对)匹配队列。

案例:
根据用户的通知设置去通知用户,设置接收Email的用户只接收Email,设置接收sms的用户只接收sms,设置两种通知类型都接收的则两种通知都有效

生产者

Map<String, Object> headers_email = new Hashtable<String, Object>();
headers_email.put("inform_type", "email");
Map<String, Object> headers_sms = new Hashtable<String, Object>();
headers_sms.put("inform_type", "sms");
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADERS_INFORM,"",headers_email);
channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_HEADERS_INFORM,"",headers_sms);

通知

String message = "email inform to user"+i;
Map<String,Object> headers = new Hashtable<String, Object>();
headers.put("inform_type", "email");//匹配email通知消费者绑定的header
//headers.put("inform_type", "sms");//匹配sms通知消费者绑定的header
AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder();
properties.headers(headers);
//Email通知
channel.basicPublish(EXCHANGE_HEADERS_INFORM, "", properties.build(), message.getBytes());

发送邮件消费者 

channel.exchangeDeclare(EXCHANGE_HEADERS_INFORM, BuiltinExchangeType.HEADERS);
Map<String, Object> headers_email = new Hashtable<String, Object>();
headers_email.put("inform_email", "email");
//交换机和队列绑定
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADERS_INFORM,"",headers_email);
//指定消费队列
channel.basicConsume(QUEUE_INFORM_EMAIL, true, consumer);

RPC

说明

RPC即客户端远程调用服务端的方法 ,使用MQ可以实现RPC的异步调用,基于Direct交换机实现,流程如下:
1、客户端即是生产者就是消费者,向RPC请求队列发送RPC调用消息,同时监听RPC响应队列。
2、服务端监听RPC请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果
3、服务端将RPC方法 的结果发送到RPC响应队列
4、客户端(RPC调用方)监听RPC响应队列,接收到RPC调用结果。

七:SpringBoot整合RabbitMQ

待更新...

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/351957.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[FFmpeg学习]windows环境sdl播放音频试验

参考资料&#xff1a; FFmpeg和SDL2播放mp4_sdl 播放mp4 声音-CSDN博客 SimplePlayer/SimplePlayer.c at master David1840/SimplePlayer GitHub 在前面的学习中&#xff0c;通过获得的AVFrame进行了播放画面&#xff0c; [FFmpeg学习]初级的SDL播放mp4测试-CSDN博客 播放…

数据中心布线管理:预标记线缆与移动扫描技术的融合

随着信息技术的飞速发展&#xff0c;数据中心布线管理面临着前所未有的挑战。传统的布线管理方式已无法满足现代数据中心高效、准确和可靠的需求。在这样一个背景下&#xff0c;预标记线缆与移动扫描技术的结合&#xff0c;为数据中心布线管理带来了革命性的解决方案。 布线管理…

opencv安装笔记 各种平台

目录 python安装opencv-python c 麒麟arm系统安装和用法 python安装opencv-python pypi上搜索 Search results PyPI 现在安装是一个版本&#xff0c;大于3.6都可以安装 c 麒麟arm系统安装和用法 参考&#xff1a; ffmpeg rknn麒麟系统 安装 opencv_ffmpeg4 解码示例-CSDN…

C#下WinForm多语种切换

这是应一个网友要求写的&#xff0c;希望对你有所帮助。本文将介绍如何在一个WinForm应用程序中实现多语种切换。通过一个简单的示例&#xff0c;你将了解到如何使用资源文件管理不同语言的文本&#xff0c;并通过用户界面实现语言切换。 创建WinForm项目 打开Visual Studio&a…

【一步一步了解Java系列】:认识String类

看到这句话的时候证明&#xff1a;此刻你我都在努力 加油陌生人 个人主页&#xff1a;Gu Gu Study专栏&#xff1a;一步一步了解Java 喜欢的一句话&#xff1a; 常常会回顾努力的自己&#xff0c;所以要为自己的努力留下足迹 喜欢的话可以点个赞谢谢了。 作者&#xff1a;小闭…

【Netty】nio阻塞非阻塞Selector

阻塞VS非阻塞 阻塞 阻塞模式下&#xff0c;相关方法都会导致线程暂停。 ServerSocketChannel.accept() 会在没有建立连接的时候让线程暂停 SocketChannel.read()会在没有数据的时候让线程暂停。 阻塞的表现就是线程暂停了&#xff0c;暂停期间不会占用CPU&#xff0c;但线程…

ANSYS EMC解决方案与经典案例

EMC问题非常复杂&#xff0c;各行各业都会涉及&#xff0c;例如航空、航天、船舶、汽车、火车、高科技、物联网、消费电子。要考虑EMC的对象很多&#xff0c;包含整个系统、设备、PCB、线缆、电源、芯片封装。而且技术领域覆盖广&#xff0c;涉及高频问题、低频问题&#xff1b…

Databricks超10亿美元收购Tabular;Zilliz 推出 Milvus Lite ; 腾讯云支持Redis 7.0

重要更新 1. Databricks超10亿美元收购Tabular&#xff0c;Databricks将增强 Delta Lake 和 Iceberg 社区合作&#xff0c;以实现 Lakehouse 底层格式的开放与兼容([1] [2])。 2. Zilliz 推出 Milvus Lite 轻量级向量数据库&#xff0c;支持本地运行&#xff1b;Milvus Lite 复…

统计信号处理基础 习题解答10-16

题目&#xff1a; 对于例10.1&#xff0c;证明由观察数据得到的信息是&#xff1a; 解答&#xff1a; 基于习题10-15的结论&#xff0c;&#xff0c;那么&#xff1a; 而根据习题10-15的结论&#xff1a; 此条件概率也是高斯分布&#xff0c;即&#xff1a; 根据相同的计算&a…

win10能用微信、QQ,不能打开网页

今天上班&#xff0c;打开电脑&#xff0c;突然遇到一个问题&#xff0c;发现QQ、微信可以登录&#xff0c;但是任何网页都打不开&#xff0c;尝试了重启电脑和路由器都不行&#xff0c;最终解决了电脑可以访问网页的问题&#xff0c;步骤如下&#xff1a; 1、打开电脑的网络设…

Parallels Desktop 19 for mac破解版安装激活使用指南

Parallels Desktop 19 for Mac 乃是一款适配于 Mac 的虚拟化软件。它能让您在 Mac 计算机上同时运行多个操作系统。您可借此创建虚拟机&#xff0c;并于其中装设不同的操作系统&#xff0c;如 Windows、Linux 或 macOS。使用 Parallels Desktop 19 mac 版时&#xff0c;您可在 …

QT实现QGraphicsView绘图 重写QGraphicsSvgItem类实现边框动画

目录导读 简述使用 QTimer 实现 QGraphicsSvgItem 边框动画效果 简述 在了解学习WPS的流程图的时候&#xff0c;发现它这个选择图元有个动态边框效果&#xff0c;而且连接线还会根据线生成点从头移动到尾的动画。像这种&#xff1a; 在QML中实现这种动画属性很简单&#xff0…

11.1 Go 标准库的组成

&#x1f49d;&#x1f49d;&#x1f49d;欢迎莅临我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:「stormsha的主页」…

Vue54-浏览器的本地存储webStorage

一、本地存储localStorage的作用 二、本地存储的代码实现 2-1、存储数据 注意&#xff1a; localStorage是window上的函数&#xff0c;所以&#xff0c;可以把window.localStorage直接写成localStorage&#xff08;直接调用&#xff01;&#xff09; 默认调了p.toString()方…

Linux中文件查找相关命令比较

Linux中与文件定位的命令有find、locate、whereis、which&#xff0c;type。 一、find find命令最强&#xff0c;能搜索各种场景下的文件&#xff0c;需要配合相关参数&#xff0c;搜索速度慢。在文件系统中递归查找文件。 find /path/to/search -name "filename"…

UniVue更新日志:使用ObservableList优化LoopList/LoopGrid组件的使用

github仓库 稳定版本仓库&#xff1a;https://github.com/Avalon712/UniVue 开发版本仓库&#xff1a;https://github.com/Avalon712/UniVue-Develop UniVue扩展框架-UniVue源生成器仓库&#xff1a;https://github.com/Avalon712/UniVue-SourceGenerator 更新说明 如果大家…

【电机控制】FOC算法验证步骤——PWM、ADC

【电机控制】FOC算法验证步骤 文章目录 前言一、PWM——不接电机1、PWMA-H-50%2、PWMB-H-25%3、PWMC-H-0%4、PWMA-L-50%5、PWMB-L-75%6、PWMC-L-100% 二、ADC——不接电机1.电流零点稳定性、ADC读取的OFFSET2.电流钳准备3.运放电路分析1.电路OFFSET2.AOP3.采样电路的采样值范围…

小白Linux提权

1.脏牛提权 原因&#xff1a; 内存子系统处理写入复制时&#xff0c;发生内存条件竞争&#xff0c;任务执行顺序异常&#xff0c;可导致应用崩溃&#xff0c;进一步执行其他代码。get_user_page内核函数在处理Copy-on-Write(以下使用COW表示)的过程中&#xff0c;可能产出竞态…

基于文本和图片输入的3D数字人化身生成技术解析

随着虚拟现实、增强现实和元宇宙等技术的飞速发展,对高度逼真且具有表现力的3D数字人化身的需求日益增长。传统的3D数字人生成方法往往需要依赖大量的3D数据集,这不仅增加了数据收集和处理的成本,还限制了生成的多样性和灵活性。为了克服这些挑战,我们提出了一种基于文本提…

Cocos Creator,Youtube 小游戏!

YouTube 官方前段时间发布了一则重磅通知&#xff0c;宣布平台旗下小游戏功能 Youtube Playables 正式登录全平台&#xff08;安卓、iOS、网页&#xff09;&#xff0c;并内置了数十款精选小游戏。 Youtube Playables 入口&#xff1a; https://www.youtube.com/playables Coco…