目录
一:介绍
二:应用场景
三:工作原理
组成部分说明
消息发布接收流程
四:下载安装
五:环境搭建
创建Maven工程
生产者
消费者
六:工作模式
Work queues
Publish/subscribe
生产者
消费者
Routing
生产者
消费者
思考
Topics
生产者
匹配规则
Header模式
生产者
RPC
说明
七:SpringBoot整合RabbitMQ
一:介绍
MQ全称为Message Queue,即消息队列, RabbitMQ是由erlang语言开发.
RabbitMQ官方地址:http://www.rabbitmq.com/
二:应用场景
1)任务异步处理
2)应用程序解耦合
三:工作原理
组成部分说明
Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue。
Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。
Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的消费方。
Producer:消息生产者,即生产方客户端,生产方客户端将消息发送到MQ。
Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。
消息发布接收流程
发送消息:
1、生产者和Broker建立TCP连接。
2、生产者和Broker建立通道。
3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。
4、Exchange将消息转发到指定的Queue(队列)
接收消息:
1、消费者和Broker建立TCP连接
2、消费者和Broker建立通道
3、消费者监听指定的Queue(队列)
4、当有消息到达Queue时Broker默认将消息推送给消费者。
5、消费者接收到消息
四:下载安装
RabbitMQ的下载地址:http://www.rabbitmq.com/download.html
本项目使用Erlang/OTP 20.3版本和RabbitMQ3.7.3版本。
下载erlang: http://erlang.org/download/otp_win64_20.3.exe
erlang安装完成需要配置erlang环境变量: ERLANG_HOME=D:\Program Files\erl9.3 在path中 添加%ERLANG_HOME%\bin;
安装erlang直接下一步就好
安装RabbitMQ: Release RabbitMQ 3.7.3 · rabbitmq/rabbitmq-server · GitHub
安装完开始菜单会有显示:
RabbitMQ Service-install :安装服务
RabbitMQ Service-remove 删除服务
RabbitMQ Service-start 启动
RabbitMQ Service-stop 启动
如果没有开始菜单则进入安装目录下sbin目录手动启动:
1)安装并运行服务
rabbitmq-service.bat install 安装服务 rabbitmq-service.bat stop 停止服务 rabbitmq-service.bat start 启动服务
2)安装管理插件
安装rabbitMQ的管理插件,方便在浏览器端管理RabbitMQ
管理员身份运行 rabbitmq-plugins.bat enable rabbitmq_management
3、启动成功 登录RabbitMQ
进入浏览器,输入:http://localhost:15672
初始账号和密码:guest/guest
注:每个虚拟机就相当于一个独立的MQ,默认虚拟机的名字为/
4、如果启动失败找到.erlang.cookie,位于C:\windows\system32\config\systemprofile下,将此处的.erlang.cookie覆盖C:\user\admin.erlang.cookie后重启RabbitMQ即可解决问题
注意事项
1、安装erlang和rabbitMQ以管理员身份运行。
2、当卸载重新安装时会出现RabbitMQ服务注册失败,此时需要进入注册表清理erlang搜索RabbitMQ、ErlSrv,将对应的项全部删除。
五:环境搭建
创建Maven工程
创建生产者工程和消费者工程,分别加入RabbitMQ java client的依赖。
test-rabbitmq-producer:生产者工程
test-rabbitmq-consumer:消费者工程
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp‐client</artifactId>
<version>4.0.3</version><!‐‐此版本与spring boot 1.5.9版本匹配‐‐>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring‐boot‐starter‐logging</artifactId>
</dependency>
生产者
public class Producer01 {//队列名称private static final String QUEUE = "helloworld";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = null;Channel channel = null;try {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器//创建与RabbitMQ服务的TCP连接connection = factory.newConnection();//创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务channel = connection.createChannel();/*** 声明队列,如果Rabbit中没有此队列将自动创建* param1:队列名称* param2:durable是否持久化,如果持久化,mq重启后队列还在* param3:exclusive队列是否独占此连接,队列只允许在该连接中访问,如果连接关闭队列自动删除,如果将此参数设置为true可用于临时队列的创建* param4:autoDelete队列不再使用时是否自动删除此队列和exclusive搭配使用* param5:队列参数(可以设置队列的扩展参数)*/channel.queueDeclare(QUEUE, true, false, false, null);String message = "helloworld小明" + System.currentTimeMillis();/*** 消息发布方法* param1:Exchange的名称,如果没有指定,则使用Default Exchange* param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列,如果使用默认交换机,routingKey设置为队列的名称* param3:消息包含的属性* param4:消息体*//*** 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显示绑定或解除绑定* 默认的交换机,routingKey等于队列名称*/channel.basicPublish("", QUEUE, null, message.getBytes());System.out.println("Send Message is:'" + message + "'");} catch (Exception ex) {ex.printStackTrace();} finally {if (channel != null) {channel.close();}if (connection != null) {connection.close();}}}
}
此时显示创建了一个队列,有1条消息待消费, 消息总数为1
注:队列中是可以拿到发送的消息的
注:Purge Message可以清空该队列的消息
注:Publish message可以在指定的队列中发消息,因此生产者发送消息时要打印发送的body,消费者端做幂等,一旦出现问题可通过控制台重新发送该消息
消费者
注:两边都要声明队列,防止消费者启动在生产者之前报错
public class Consumer01 {private static final String QUEUE = "helloworld";public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();//设置MabbitMQ所在服务器的ip和端口factory.setHost("127.0.0.1");factory.setPort(5672);Connection connection = factory.newConnection();Channel channel = connection.createChannel();//声明队列channel.queueDeclare(QUEUE, true, false, false, null);//定义消费方法DefaultConsumer consumer = new DefaultConsumer(channel) {/*** 消费者接收消息调用此方法* @param consumerTag 消费者的标签,在channel.basicConsume()去指定* @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)* @param properties* @param body* @throws IOException}*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交换机String exchange = envelope.getExchange();//路由keyString routingKey = envelope.getRoutingKey();//消息idlong deliveryTag = envelope.getDeliveryTag();//消息内容String msg = new String(body);System.out.println("receive message.." + msg);}};/*** 监听队列String queue, boolean autoAck,Consumer callback* 参数明细* 1、队列名称* 2、是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动回复* 3、消费消息的方法,消费者接收到消息后调用此方法*/channel.basicConsume(QUEUE, true, consumer);}
}
六:工作模式
RabbitMQ有以下几种工作模式 :
1、Work queues
2、Publish/Subscribe
3、Routing
4、Topics
5、Header
6、RPC
Work queues
工作队列模式
work queues与入门程序相比:多了一个消费端,两个消费端共同消费同一个队列中的消息。
应用场景:对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
测试:
1、使用入门程序,启动多个消费者。
2、生产者发送多个消息。
结果:
1、一条消息只会被一个消费者接收;
2、rabbit采用轮询的方式将消息是平均发送给消费者的;
3、消费者在处理完某条消息后,才会收到下一条消息。
设置idea同时启用多个客户端
Publish/subscribe
发布订阅模式
发布订阅模式:
1、每个消费者监听自己的队列。
2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息
案例:
用户通知,当用户充值成功或转账完成系统通知用户,通知方式有短信、邮件多种方法 。
生产者
1、声明exchange_fanout_inform交换机。
2、声明两个队列并且绑定到此交换机,绑定时不需要指定routingkey
3、发送消息时不需要指定routingkey
public class Producer02_publish {private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";private static final String QUEUE_INFORM_SMS = "queue_inform_sms";private static final String EXCHANGE_FANOUT_INFORM = "exchange_fanout_inform";public static void main(String[] args) {Connection connection = null;Channel channel = null;try {//创建一个与MQ的连接ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器//创建一个连接connection = factory.newConnection();//创建与交换机的通道,每个通道代表一个会话channel = connection.createChannel();//声明交换机 String exchange, BuiltinExchangeType type/*** 参数明细* 1、交换机名称* 2、交换机类型,fanout、topic、direct、headers*/channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, "fanout");//声明队列// (String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String,Object> arguments)/*** 参数明细:* 1、队列名称* 2、是否持久化* 3、是否独占此队列* 4、队列不用是否自动删除* 5、参数*/channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);//交换机和队列绑定String queue, String exchange, String routingKey/*** 参数明细* 1、队列名称* 2、交换机名称* 3、路由key*/channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_FANOUT_INFORM, "");channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_FANOUT_INFORM, "");//发送消息for (int i = 0; i < 10; i++) {String message = "inform to user" + i;//向交换机发送消息 String exchange, String routingKey, BasicProperties props, byte[] body/*** 参数明细* 1、交换机名称,不指令使用默认交换机名称 Default Exchange* 2、routingKey(路由key),根据key名称将消息转发到具体的队列,这里填写队列名称表示消息将发到此队列* 3、消息属性* 4、消息内容*/channel.basicPublish(EXCHANGE_FANOUT_INFORM, "", null, message.getBytes());System.out.println("Send Message is:'" + message + "'");}} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();} finally {if (channel != null) {try {channel.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}}if (connection != null) {try {connection.close();} catch (IOException e) {e.printStackTrace();}}}}
}
可以看到新声明的交换机exchange_fanout_inform
新生成两个队列,每个队列接收到10条消息
消费者
邮件发送消费者
public class Consumer02_subscribe_email {//队列名称private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";private static final String EXCHANGE_FANOUT_INFORM = "inform_exchange_fanout";public static void main(String[] args) throws IOException, TimeoutException {//创建一个与MQ的连接ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器//创建一个连接Connection connection = factory.newConnection();//创建与交换机的通道,每个通道代表一个会话Channel channel = connection.createChannel();//声明交换机 String exchange, BuiltinExchangeType type/*** 参数明细* 1、交换机名称* 2、交换机类型,fanout、topic、direct、headers*/channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, "fanout");//声明队列// channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)/*** 参数明细:* 1、队列名称* 2、是否持久化* 3、是否独占此队列* 4、队列不用是否自动删除* 5、参数*/channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);//交换机和队列绑定String queue, String exchange, String routingKey/*** 参数明细* 1、队列名称* 2、交换机名称* 3、路由key*/channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_FANOUT_INFORM, "");//定义消费方法DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {long deliveryTag = envelope.getDeliveryTag();String exchange = envelope.getExchange();//消息内容String message = new String(body);System.out.println(message);}};/*** 监听队列String queue, boolean autoAck,Consumer callback* 参数明细* 1、队列名称* 2、是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动回复* 3、消费消息的方法,消费者接收到消息后调用此方法*/channel.basicConsume(QUEUE_INFORM_EMAIL, true, defaultConsumer);}}
可以看到邮件的信息被消费了,短信的还在
可以看到该交换机绑定了两个队列
Routing
路由模式:
1、每个消费者监听自己的队列,并且设置routingkey。
2、生产者将消息发给交换机,由交换机根据routingkey来转发消息到指定的队列。
生产者
1、声明exchange_routing_inform交换机。
2、声明两个队列并且绑定到此交换机,绑定时需要指定routingkey
3、发送消息时需要指定routingkey
public class Producer03_routing {//队列名称private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";private static final String QUEUE_INFORM_SMS = "queue_inform_sms";private static final String EXCHANGE_ROUTING_INFORM = "exchange_routing_inform";public static void main(String[] args) {Connection connection = null;Channel channel = null;try {//创建一个与MQ的连接ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器//创建一个连接connection = factory.newConnection();//创建与交换机的通道,每个通道代表一个会话channel = connection.createChannel();//声明交换机 String exchange, BuiltinExchangeType type/*** 参数明细* 1、交换机名称* 2、交换机类型,fanout、topic、direct、headers*/channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, "direct");//声明队列// channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)/*** 参数明细:* 1、队列名称* 2、是否持久化* 3、是否独占此队列* 4、队列不用是否自动删除* 5、参数*/channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);//交换机和队列绑定String queue, String exchange, String routingKey/*** 参数明细* 1、队列名称* 2、交换机名称* 3、路由key*/channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_EMAIL);channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_SMS);//发送邮件消息for (int i = 0; i < 10; i++) {String message = "email inform to user" + i;//向交换机发送消息 String exchange, String routingKey, BasicProperties props, byte[] body/*** 参数明细* 1、交换机名称,不指令使用默认交换机名称 Default Exchange* 2、routingKey(路由key),根据key名称将消息转发到具体的队列,这里填写队列名称表示消息将发到此队列* 3、消息属性* 4、消息内容*/channel.basicPublish(EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_EMAIL, null, message.getBytes());System.out.println("Send Message is:'" + message + "'");}//发送短信消息for (int i = 0; i < 10; i++) {String message = "sms inform to user" + i;//向交换机发送消息 String exchange, String routingKey, BasicProperties props, byte[] bodychannel.basicPublish(EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_SMS, null, message.getBytes());System.out.println("Send Message is:'" + message + "'");}} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();} finally {if (channel != null) {try {channel.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}}if (connection != null) {try {connection.close();} catch (IOException e) {e.printStackTrace();}}}}
}
可以看到创建了exchange_routing_inform交换机,并绑定了两个队列
可以看到对列绑定了Routing_key
消费者
邮件发送消费者
public class Consumer03_routing_email {//队列名称private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";private static final String EXCHANGE_ROUTING_INFORM = "exchange_routing_inform";public static void main(String[] args) throws IOException, TimeoutException {//创建一个与MQ的连接ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器//创建一个连接Connection connection = factory.newConnection();//创建与交换机的通道,每个通道代表一个会话Channel channel = connection.createChannel();//声明交换机 String exchange, BuiltinExchangeType type/*** 参数明细* 1、交换机名称* 2、交换机类型,fanout、topic、direct、headers*/channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, "direct");//声明队列// channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)/*** 参数明细:* 1、队列名称* 2、是否持久化* 3、是否独占此队列* 4、队列不用是否自动删除* 5、参数*/channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);//交换机和队列绑定String queue, String exchange, String routingKey/*** 参数明细* 1、队列名称* 2、交换机名称* 3、路由key*/channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_EMAIL);//定义消费方法DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {long deliveryTag = envelope.getDeliveryTag();String exchange = envelope.getExchange();//消息内容String message = new String(body);System.out.println(message);}};/*** 监听队列String queue, boolean autoAck,Consumer callback* 参数明细* 1、队列名称* 2、是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动回复* 3、消费消息的方法,消费者接收到消息后调用此方法*/channel.basicConsume(QUEUE_INFORM_EMAIL, true, defaultConsumer);}}
思考
1、Routing模式和Publish/subscibe有啥区别?
Routing模式要求队列在绑定交换机时要指定routingkey,消息会转发到符合routingkey的队列。
Topics
路由模式:
1、每个消费者监听自己的队列,并且设置带统配符的routingkey。
2、生产者将消息发给broker,由交换机根据routingkey来转发消息到指定的队列。
案例:
根据用户的通知设置去通知用户,设置接收Email的用户只接收Email,设置接收sms的用户只接收sms,设置两种通知类型都接收的则两种通知都有效。
生产者
public class Producer04_topics {//队列名称private static final String QUEUE_INFORM_EMAIL = "queue_inform_email_topics";private static final String QUEUE_INFORM_SMS = "queue_inform_sms_topics";private static final String EXCHANGE_TOPICS_INFORM = "exchange_topics_inform";public static void main(String[] args) {Connection connection = null;Channel channel = null;try {//创建一个与MQ的连接ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器//创建一个连接connection = factory.newConnection();//创建与交换机的通道,每个通道代表一个会话channel = connection.createChannel();//声明交换机 String exchange, BuiltinExchangeType type/*** 参数明细* 1、交换机名称* 2、交换机类型,fanout、topic、direct、headers*/channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, "topic");//声明队列/*** 参数明细:* 1、队列名称* 2、是否持久化* 3、是否独占此队列* 4、队列不用是否自动删除* 5、参数*/channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);//绑定email通知队列channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_TOPICS_INFORM,"inform.#.email.#");//绑定sms通知队列channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_TOPICS_INFORM,"inform.#.sms.#");//发送邮件消息for (int i = 0; i < 10; i++) {String message = "email inform to user" + i;//向交换机发送消息 String exchange, String routingKey, BasicProperties props, byte[] body/*** 参数明细* 1、交换机名称,不指令使用默认交换机名称 Default Exchange* 2、routingKey(路由key),根据key名称将消息转发到具体的队列,这里填写队列名称表示消息将发到此队列* 3、消息属性* 4、消息内容*/channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.email", null, message.getBytes());System.out.println("Send Message is:'" + message + "'");}//发送短信消息for (int i = 0; i < 10; i++) {String message = "sms inform to user" + i;channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.sms", null, message.getBytes());System.out.println("Send Message is:'" + message + "'");}//发送短信和邮件消息for (int i = 0; i < 10; i++) {String message = "sms and email inform to user" + i;channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.sms.email", null, message.getBytes());System.out.println("Send Message is:'" + message + "'");}} catch (IOException | TimeoutException e) {e.printStackTrace();} finally {if (channel != null) {try {channel.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}}if (connection != null) {try {connection.close();} catch (IOException e) {e.printStackTrace();}}}}}
可以看到exchange_topics_inform绑定了两个Routing_key,进行通配符匹配
匹配规则
统配符规则:
中间以“.”分隔。
符号#可以匹配多个词,符号*可以匹配一个词语。
对列绑定的Routing_key为:
inform.#.email.#
inform.#.sms.#
发送消息指定的Routing_key为:
inform.email 匹配 inform.#.email.#
inform.sms 匹配 inform.#.sms.#
inform.sms.email 匹配 inform.#.email.# 和 inform.#.sms.# 两个队列都能发送
(即使什么都没有,#也可以匹配个空)
Header模式
header模式与routing不同的地方在于,header模式取消routingkey,使用header中的 key/value(键值对)匹配队列。
案例:
根据用户的通知设置去通知用户,设置接收Email的用户只接收Email,设置接收sms的用户只接收sms,设置两种通知类型都接收的则两种通知都有效
生产者
Map<String, Object> headers_email = new Hashtable<String, Object>();
headers_email.put("inform_type", "email");
Map<String, Object> headers_sms = new Hashtable<String, Object>();
headers_sms.put("inform_type", "sms");
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADERS_INFORM,"",headers_email);
channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_HEADERS_INFORM,"",headers_sms);
通知
String message = "email inform to user"+i;
Map<String,Object> headers = new Hashtable<String, Object>();
headers.put("inform_type", "email");//匹配email通知消费者绑定的header
//headers.put("inform_type", "sms");//匹配sms通知消费者绑定的header
AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder();
properties.headers(headers);
//Email通知
channel.basicPublish(EXCHANGE_HEADERS_INFORM, "", properties.build(), message.getBytes());
发送邮件消费者
channel.exchangeDeclare(EXCHANGE_HEADERS_INFORM, BuiltinExchangeType.HEADERS);
Map<String, Object> headers_email = new Hashtable<String, Object>();
headers_email.put("inform_email", "email");
//交换机和队列绑定
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADERS_INFORM,"",headers_email);
//指定消费队列
channel.basicConsume(QUEUE_INFORM_EMAIL, true, consumer);
RPC
说明
RPC即客户端远程调用服务端的方法 ,使用MQ可以实现RPC的异步调用,基于Direct交换机实现,流程如下:
1、客户端即是生产者就是消费者,向RPC请求队列发送RPC调用消息,同时监听RPC响应队列。
2、服务端监听RPC请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果
3、服务端将RPC方法 的结果发送到RPC响应队列
4、客户端(RPC调用方)监听RPC响应队列,接收到RPC调用结果。
七:SpringBoot整合RabbitMQ
待更新...