一、RabbitMQ 介绍
在介绍 RabbitMQ 之前,我们先来看下面一个电商项目的场景:
- 商品的原始数据保存在数据库中,增删改查都在数据库中完成。
- 搜索服务数据来源是索引库(Elasticsearch),如果数据库商品发生变化,索引库数据不能及时更新。
- 商品详情做了页面静态化处理,静态页面数据也不会随着数据库商品更新而变化。
如果我们在后台修改了商品的价格,搜索页面和商品详情页显示的依然是旧的价格,这样显然不对。该如何解决?
我们可能会想到这么做:
- 方案 1:每当后台对商品做增删改操作,同时修改索引库数据及更新静态页面。
- 方案 2:搜索服务和商品页面静态化服务对外提供操作接口,后台在商品增删改后,调用接口。
这两种方案都有个严重的问题:就是代码耦合,后台服务中需要嵌入搜索和商品页面服务,违背了微服务的独立原则。
这时,我们就会采用另外一种解决办法,那就是消息队列!
商品服务对商品增删改以后,无需去操作索引库和静态页面,只需向 MQ 发送一条消息(比如包含商品 id 的消息),也不关心消息被谁接收。 搜索服务和静态页面服务监听 MQ,接收消息,然后分别去处理索引库和静态页面(根据商品 id 去更新索引库和商品详情静态页面)。
什么是消息队列
MQ 全称为 Message Queue,即消息队列。“消息队列”是在消息的传输过程中保存消息的容器。它是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦。
开发中消息队列通常有如下应用场景:
1. 任务异步处理:
高并发环境下,由于来不及同步处理,请求往往会发生堵塞,比如说,大量的 insert,update 之类的请求同时到达 MySQL,直接导致无数的行锁表锁,甚至最后请求会堆积过多,从而触发 too many connections 错误。通过使用消息队列,我们可以异步处理请求,从而缓解系统的压力。将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。减少了应用程序的响应时间。
2. 应用程序解耦合:
MQ 相当于一个中介,生产方通过 MQ 与消费方交互,它将应用程序进行解耦合。
AMQP 和 JMS
MQ 是消息通信的模型,并发具体实现。现在实现 MQ 的有两种主流方式:AMQP、JMS。
两者间的区别和联系:
- JMS 是定义了统一的接口,来对消息操作进行统一;AMQP 是通过规定协议来统一数据交互的格式。
- JMS 限定了必须使用 Java 语言;AMQP 只是协议,不规定实现方式,因此是跨语言的。
- JMS 规定了两种消息模型;而 AMQP 的消息模型更加丰富。
常见 MQ 产品
- ActiveMQ:基于 JMS。
- RabbitMQ:基于 AMQP 协议,erlang 语言开发,稳定性好。
- RocketMQ:基于 JMS,阿里巴巴产品,目前交由 Apache 基金会。
- Kafka:分布式消息系统,高吞吐量。
1.1 AMQP 简介
AMQP (Advanced Message Queuing Protocol ,高级消息队列协议)是 个线路层的协议规范,而不是 API 规范(例如 JMS )。由于 AMQP 是一个线路层协议规范,因此它天然就是跨平台的,就像 SMTP HTTP 等协议 样,只要开发者按照规范的格式发送数据,任何平台都可以通过 AMQP 进行消息交互。像目前流行的 StormMQ RabbitMQ 等都实现了 AMQP。
1.2 RabbitMQ 简介
RabbitMQ 一个实现了 AMQP 的开源消息中间件,使用高性能的 Erlang 编写。RabbitMQ 有可靠性、支持多种协议、高可用、支持消息集群以及多语言客户端等特点,在分布式系统中存储转发消息,具有不错的性能表现。
1.3 RabbitMQ 的工作原理
组成部分说明:
- Broker:消息队列服务进程,此进程包括两个部分:Exchange 和 Queue。
- Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。
- Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的。
- Producer:消息生产者,即生产方客户端,生产方客户端将消息发送。
- Consumer:消息消费者,即消费方客户端,接收 MQ 转发的消息。
生产者发送消息流程:
1. 生产者和 Broker 建立 TCP 连接。
2. 生产者和 Broker 建立通道。
3. 生产者通过通道消息发送给 Broker,由 Exchange 将消息进行转发。
4. Exchange 将消息转发到指定的 Queue(队列)。
消费者接收消息流程:
1. 消费者和 Broker 建立 TCP 连接。
2. 消费者和 Broker 建立通道。
3. 消费者监听指定的 Queue(队列)。
4. 当有消息到达 Queue 时 Broker 默认将消息推送给消费者。
5. 消费者接收到消息。
6. ack 回复。
二、RabbitMQ 安装
2.1 配置要求
系统:[Centos7](https://so.csdn.net/so/search?q=Centos7&spm=1001.2101.3001.7020)
Linux 内核:官方建议 3.10 以上
Linux 内核:官方建议 3.10 以上
<span style="background:#ffff00">注意:本文的命令使用的是 root 用户登录执行,不是 root 的话所有命令前面要加 sudo</span>
1. 查看当前的内核版本
[root@localhost ~]# uname -r
3.10.0-1160.el7.x86_64
2. 更新 yum 包(使用 root 权限,生产环境中此步操作需慎重)
[root@localhost ~]# yum -y update
> yum -y update 升级所有包同时也升级软件和系统内核;
> yum -y upgrade 只升级所有包,不升级软件和系统内核。
3. 卸载旧版本(如果之前安装过的话)
[root@localhost ~]# yum remove docker docker-common docker-selinux docker-engine
2.2 安装 Docker
1. 安装软件包
安装需要的软件包,yum-util 提供 yum-config-manager 功能,另两个是 devicemapper 驱动依赖。
[root@localhost ~]# yum install -y yum-utils device-mapper-persistent-data lvm2
2. 设置 yum 源
[root@localhost ~]# yum-config-manager --add-repo http://download.docker.com/linux/centos/docker-ce.repo(中央仓库)
[root@localhost ~]# yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo(阿里仓库)
3. 选择 docker 版本
查看可用版本有哪些
[root@localhost ~]# yum list docker-ce --showduplicates | sort -r
4. 安装 docker
选择一个版本并安装:yum install docker-ce-版本号
[root@localhost ~]# yum -y install docker-ce-18.03.1.ce
5. 启动 Docker
[root@localhost ~]# systemctl start docker
[root@localhost ~]# systemctl enable docker 设置开机自启
6. 查看 docker 版本
[root@localhost ~]# docker version
2.3 安装 RabbitMQ
1. 查找镜像
[root@localhost ~]# docker search rabbitmq:management
2. 拉取镜像
[root@localhost ~]# docker pull macintoshplus/rabbitmq-management
3. 查看镜像
[root@localhost ~]# docker images
4. 创建容器
[root@localhost ~]# systemctl start docker
[root@localhost ~]# systemctl enable docker 设置开机自启
5. 查看容器
[root@localhost ~]# docker ps -a
6. 访问测试
三、RabbitMQ 六种消息模型
3.1 基本消息模型
在上图的模型中,有以下概念:
- P:生产者,也就是要发送消息的程序。
- C:消费者:消息的接受者,会一直等待消息到来。
- queue:消息队列,图中红色部分。可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
生产者
新建一个 maven 工程,添加 amqp-client 依赖
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.7.1</version>
</dependency>
连接工具类
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ConnectionUtil {public static Connection getConnection() throws IOException, TimeoutException {// 定义连接工厂ConnectionFactory factory = new ConnectionFactory();// 设置服务器地址factory.setHost("192.168.111.133");// 设置端口factory.setPort(5672);/*** 设置账号信息,用户名、密码、vhost* 设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq*//*factory.setVirtualHost("/ly");factory.setUsername("ly");factory.setPassword("123456")*/;// 通过工厂获取连接Connection connection = factory.newConnection();return connection;}}
生产者发送消息
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Send {private final static String QUEUE_NAME = "simple_queue";public static void main(String[] args) throws IOException, TimeoutException {// 1. 获取到连接Connection connection = ConnectionUtil.getConnection();// 2. 从连接中创建通道,使用通道才能完成消息相关的操作Channel channel = connection.createChannel();// 3. 声明(创建)队列// 参数:String queue,boolean durable,boolean exclusive,boolean autoDelete, Map<String, Object> arguments/*** 参数明细:* 1.queue,队列名称* 2.durable,是否持久化,如果持久化,mq重启后队列还在* 3.exclusive,是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,* 如果将此参数设置为true可用于临时队列的创建* 4.autoDelete,自动删除,队列不再使用时是否自动删除此队列,* 如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)* 5.arguments,参数,可以设置一个队列的扩展参数,比如:可设置存活时间*/channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 消息内容String message = "Hello World!";// 向指定的队列中发送消息//参数:String exchange, String routingKey, BasicProperties props, byte[] body/*** 参数明细:* 1.exchange,交换机,如果不指定将使用mq的欧仁交换机(设置为"")* 2.routingKey,路由Key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称* 3.props,消息的属性* 4.body,消息的内容*/channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("[x] Send '" + message + "'");// 关闭通道和连接(资源关闭最好用try-catch-finally语句处理)channel.close();connection.close();}}
输出结果:
Connected to the target VM, address: '127.0.0.1:49314', transport: 'socket'
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
[x] Send 'Hello World!'
Disconnected from the target VM, address: '127.0.0.1:49314', transport: 'socket'
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
[x] Send 'Hello World!'
Disconnected from the target VM, address: '127.0.0.1:49314', transport: 'socket'
Process finished with exit code 0
web 管理页面:服务器地址/端口号 (本地:192.168.202.103:15672,默认用户及密码:guest guest)
消费者接收消息
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv {private final static String QUEUE_NAME = "simple_queue";public static void main(String[] args) throws IOException, TimeoutException {// 1. 获取到连接Connection connection = ConnectionUtil.getConnection();// 2. 从连接中创建通道,使用通道才能完成消息相关的操作Channel channel = connection.createChannel();// 3. 声明(创建)队列// 参数:String queue,boolean durable,boolean exclusive,boolean autoDelete, Map<String, Object> arguments/*** 参数明细:* 1.queue,队列名称* 2.durable,是否持久化,如果持久化,mq重启后队列还在* 3.exclusive,是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,* 如果将此参数设置为true可用于临时队列的创建* 4.autoDelete,自动删除,队列不再使用时是否自动删除此队列,* 如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)* 5.arguments,参数,可以设置一个队列的扩展参数,比如:可设置存活时间*/channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 实现消费方法DefaultConsumer consumer = new DefaultConsumer(channel) {// 获取消息,并处理,这个方法类似事件监听,如果有消息的时候,会被自动调用/*** 当接收到消息后此方法将被调用* @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsum* @param envelope 信封,通过envelope* @param properties 消息属性* @param body 消息内容* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 交换机String exchange = envelope.getExchange();// 消息id,mq再channel中用来标识消息的id,可用于确认消息已接受long deliveryTag = envelope.getDeliveryTag();// body 即消息体String msg = new String(body, "utf-8");System.out.println(" [x] received : " + msg + "!");}};// 监听队列,第二个参数:是否自动进行消息确认。// 参数: String queue,boolean autoAck,Consumer callback/*** 参数明细:* 1.queue 队列名称* 2.autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为true表示会自动回复mq,* 如果设置为false要通过编程实现回复。* 3.callback 消费方法,当消费者接收到消息要执行的方法。*/channel.basicConsume(QUEUE_NAME, true, consumer);}}
输出结果:
再看看队列的消息,已经被消费了。
我们发现,消费者已经获取了消息,但是程序没有停止,一直在监听队列中是否有新的消息。一旦有新的消息进入队列,就会立即打印。
3.1.1 消息确认机制(ACK)
通过刚才的案例可以看出,消息一旦被消费者接收,队列中的消息就会被删除。
那么问题来了:RabbitMQ 怎么知道消息被接收了呢?
如果消费者领取消息后,还没执行操作就挂掉了呢?或者抛出了异常?消息消费失败,但是 RabbitMQ 无从得知,这样消息就丢失了!
因此,RabbitMQ 有一个 ACK 机制。当消费者获取消息后,会向 RabbitMQ 发送回执 ACK,告知消息已经被接收。不过这种回执 ACK 分两种情况:
自动 ACK:消息一旦被接收,消费者自动发送 ACK。
手动 ACK:消息接收后,不会发送 ACK,需要手动调用。
大家觉得哪种更好呢?
这需要看消息的重要性:
如果消息不太重要,丢失也没有影响,那么自动 ACK 会比较方便。
如果消息非常重要,不容丢失。那么最好在消费完成后手动 ACK,否则接收消息后就自动 ACK,RabbitMQ 就会把消息从队列中删除。如果此时消费者宕机,那么消息就丢失了。
我们之前的测试都是自动 ACK 的,如果要手动 ACK,需要改动我们的代码(新建Recv2.java):
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv2 {private final static String QUEUE_NAME = "simple_queue";public static void main(String[] args) throws IOException, TimeoutException {// 1. 获取到连接Connection connection = ConnectionUtil.getConnection();// 2. 从连接中创建通道,使用通道才能完成消息相关的操作final Channel channel = connection.createChannel();// 3. 声明(创建)队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 定义队列的消费者DefaultConsumer consumer = new DefaultConsumer(channel) {// 获取消息,并处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// body 即消息体String msg = new String(body, "utf-8");System.out.println(" [x] received : " + msg + "!");// 手动进行ACK/*** void basicAck(long var1, boolean var3) throws IOException;* var1 用来标识消息的id* var3 是否批量 true:将一次性ack所有小于var1的消息*/channel.basicAck(envelope.getDeliveryTag(), false);}};// 监听队列,第二个参数false,手动进行ACKchannel.basicConsume(QUEUE_NAME, false, consumer);}}
最后一行代码设置第二个参数为 false
channel.basicConsume(QUEUE_NAME, false, consumer);
3.1.2 自动 ACK 存在的问题
修改消费者(Recv.java),添加异常,如下:
生产者不做任何修改,直接运行,消息发送成功:
运行消费者,程序抛出异常:
管理界面
消费者抛出异常,但是消息依然被消费,实际上我们还没获取到消息。
重新运行生产者发送消息:
同样,在手动进行 ack 前抛出异常,(修改 Recv2.java)运行 Recv2
再看管理界面
消息没有被消费掉!
还有另外一种情况:修改消费者 Recv2,把监听队列第二个参数自动改成手动。(去掉之前制造的异常) ,并且消费方法中没手动进行 ACK。
生产者代码不变,再次运行:
运行消费者 :
但是,查看管理界面,发现:
停掉消费者的程序,发现:
这是因为虽然我们设置了手动 ACK,但是代码中并没有进行消息确认!所以消息并未被真正消费掉。当我们关掉这个消费者,消息的状态再次变为 Ready。
正确的做法是:我们要在监听队列时设置第二个参数为 false,代码中手动进行 ACK。
再次运行消费者,查看 web 管理页面:
消费者消费成功!
3.2 work 消息模型
工作队列或者竞争消费者模式
work queues 与入门程序相比,多了一个消费端,两个消费端共同消费同一个队列中的消息,但是一个消息只能被一个消费者获取。
这个消息模型在 Web 应用程序中特别有用,可以处理短的 HTTP 请求窗口中无法处理复杂的任务。
接下来我们来模拟这个流程:
- P:生产者:任务的发布者。
- C1:消费者 1:领取任务并且完成任务,假设完成速度较慢(模拟耗时)。
- C2:消费者 2:领取任务并且完成任务,假设完成速度较快。
生产者
import com.ly.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class Send {private final static String QUEUE_NAME = "test_work_queue";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 循环发布任务for (int i = 0; i < 50; i++) {// 消息内容String message = "task .. " + i;channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");Thread.sleep(i * 2);}// 关闭通道和连接channel.close();connection.close();}
}
消费者 1
import com.ly.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;public class Recv {private final static String QUEUE_NAME = "test_work_queue";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);//实现消费方法DefaultConsumer consumer = new DefaultConsumer(channel){// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// body 即消息体String msg = new String(body,"utf-8");System.out.println(" [消费者1] received : " + msg + "!");//模拟任务耗时1stry { TimeUnit.SECONDS.sleep(1); } catch (Exception e) { e.printStackTrace(); }}};// 监听队列,第二个参数:是否自动进行消息确认。channel.basicConsume(QUEUE_NAME, true, consumer);}
}
消费者 2
import com.ly.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;public class Recv2 {private final static String QUEUE_NAME = "test_work_queue";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();//创建会话通道,生产者和mq服务所有通信都在channel通道中完成Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);//实现消费方法DefaultConsumer consumer = new DefaultConsumer(channel){// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// body 即消息体String msg = new String(body,"utf-8");System.out.println(" [消费者2] received : " + msg + "!");}};// 监听队列,第二个参数:是否自动进行消息确认。channel.basicConsume(QUEUE_NAME, true, consumer);}
}
生产者循环发送 50 条消息。
可以发现,两个消费者各自消费了不同 25 条消息,这就实现了任务的分发。
能者多劳
刚才的实现有问题吗?
- 消费者 1 比消费者 2 的效率要低,一次任务的耗时较长。
- 然而两人最终消费的消息数量是一样的。
- 消费者 2 大量时间处于空闲状态,消费者 1 一直忙碌。
现在的状态属于是把任务平均分配,正确的做法应该是消费越快的人,消费的越多。
怎么实现呢?
通过 BasicQos 方法设置 prefetchCount = 1。这样 RabbitMQ 就会使得每个 Consumer 在同一个时间点最多处理 1 个 Message。换句话说,在接收到该 Consumer 的 ack 前,他它不会将新的 Message 分发给它。相反,它会将其分派给不是仍然忙碌的下一个 Consumer。
值得注意的是:prefetchCount 在手动 ack 的情况下才生效,自动 ack 不生效(Recv.java)。
再次测试:
3.2.1 订阅模型分类
说明下:
1. 一个生产者多个消费者。
2. 每个消费者都有一个自己的队列。
3. 生产者没有将消息直接发送给队列,而是发送给 exchange(交换机、转发器)。
4. 每个队列都需要绑定到交换机上。
5. 生产者发送的消息,经过交换机到达队列,实现一个消息被多个消费者消费。
例子:注册->发邮件、发短信
X(Exchanges):交换机一方面:接收生产者发送的消息。另一方面:知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于 Exchange 的类型。
Exchange 类型有以下几种:
- Fanout:广播,将消息交给所有绑定到交换机的队列。
- Direct:定向,把消息交给符合指定 routing key 的队列。
- Topic:通配符,把消息交给符合 routing pattern(路由模式) 的队列。
- Header:header 模式与 routing 不同的地方在于,header 模式取消 routingkey,使用 header 中的 key/value(键值对)匹配队列。
- Header 模式不展开了,感兴趣可以参考这篇文章https://blog.csdn.net/zhu_tianwei/article/details/40923131
- Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!
3.3 Publish/subscribe(交换机类型:Fanout,也称为广播 )
Publish/subscribe 模型示意图 :
生产者
和前面两种模式不同:
- 1. 声明 Exchange,不再声明 Queue。
- 2. 发送消息到 Exchange,不再发送到 Queue。
import com.ly.rabbitmq.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Send {private final static String EXCHANGE_NAME = "test_fanout_exchange";public static void main(String[] argv) throws IOException, TimeoutException {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明exchange,指定类型为fanoutchannel.exchangeDeclare(EXCHANGE_NAME, "fanout");// 消息内容String message = "注册成功!!";// 发布消息到Exchangechannel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());System.out.println(" [生产者] Sent '" + message + "'");channel.close();connection.close();}
}
消费者 1 (注册成功发给短信服务)
import com.ly.rabbitmq.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv {private final static String QUEUE_NAME = "fanout_exchange_queue_sms";//短信队列private final static String EXCHANGE_NAME = "test_fanout_exchange";public static void main(String[] argv) throws IOException, TimeoutException {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");// 定义队列的消费者DefaultConsumer consumer = new DefaultConsumer(channel) {// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg = new String(body);System.out.println(" [短信服务] received : " + msg + "!");}};// 监听队列,自动返回完成channel.basicConsume(QUEUE_NAME, true, consumer);}
}
消费者 2(注册成功发给邮件服务)
import com.ly.rabbitmq.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv2 {private final static String QUEUE_NAME = "fanout_exchange_queue_email";//邮件队列private final static String EXCHANGE_NAME = "test_fanout_exchange";public static void main(String[] argv) throws IOException, TimeoutException {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");// 定义队列的消费者DefaultConsumer consumer = new DefaultConsumer(channel) {// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg = new String(body);System.out.println(" [邮件服务] received : " + msg + "!");}};// 监听队列,自动返回完成channel.basicConsume(QUEUE_NAME, true, consumer);}
}
我们运行两个消费者,然后发送 1 条消息:
思考:
1. publish/subscribe 与 work queues 有什么区别。
区别:
- work queues 不用定义交换机,而 publish/subscribe 需要定义交换机。
- publish/subscribe 的生产方是面向交换机发送消息,work queues 的生产方是面向队列发送消息(底层使用默认交换机)。
- publish/subscribe 需要设置队列和交换机的绑定,work queues 不需要设置,实际上 work queues 会将队列绑定到默认的交换机。
相同点:
所以两者实现的发布/订阅的效果是一样的,多个消费端监听同一个队列不会重复消费消息。
2. 实际工作用 publish/subscribe 还是 work queues。建议使用 publish/subscribe,发布订阅模式比工作队列模式更强大(也可以做到同一队列竞争),并且发布订阅模式可以指定自己专用的交换机。
3.4 Routing 路由模型(交换机类型:direct)
Routing 模型示意图:
- P:生产者,向 Exchange 发送消息,发送消息时,会指定一个 routing key。
- X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与 routing key 完全匹配的队列。
- C1:消费者,其所在队列指定了需要 routing key 为 error 的消息。
- C2:消费者,其所在队列指定了需要 routing key 为 info、error、warning 的消息。
生产者
import com.ly.rabbitmq.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Send {private final static String EXCHANGE_NAME = "test_direct_exchange";public static void main(String[] argv) throws IOException, TimeoutException {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明exchange,指定类型为directchannel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);// 消息内容,String message = "注册成功!请短信回复[T]退订";// 发送消息,并且指定routing key 为:sms,只有短信服务能接收到消息channel.basicPublish(EXCHANGE_NAME, "sms", null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");channel.close();connection.close();}
}
消费者 1
import com.ly.rabbitmq.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv {private final static String QUEUE_NAME = "direct_exchange_queue_sms";//短信队列private final static String EXCHANGE_NAME = "test_direct_exchange";public static void main(String[] argv) throws IOException, TimeoutException {/* 获取到连接 */Connection connection = ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机,同时指定需要订阅的routing key。可以指定多个channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "sms");//指定接收发送方指定routing key为sms的消息//channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "email");// 定义队列的消费者DefaultConsumer consumer = new DefaultConsumer(channel) {// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg = new String(body);System.out.println(" [短信服务] received : " + msg + "!");}};// 监听队列,自动ACKchannel.basicConsume(QUEUE_NAME, true, consumer);}
}
消费者 2
import com.ly.rabbitmq.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv2 {private final static String QUEUE_NAME = "direct_exchange_queue_email";//邮件队列private final static String EXCHANGE_NAME = "test_direct_exchange";public static void main(String[] argv) throws IOException, TimeoutException {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机,同时指定需要订阅的routing key。可以指定多个channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "email");//指定接收发送方指定routing key为email的消息// 定义队列的消费者DefaultConsumer consumer = new DefaultConsumer(channel) {// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg = new String(body);System.out.println(" [邮件服务] received : " + msg + "!");}};// 监听队列,自动ACKchannel.basicConsume(QUEUE_NAME, true, consumer);}
}
我们发送 sms 的 RoutingKey,发现结果:只有指定短信的消费者 1 收到消息了。
3.5 Topics 通配符模式(交换机类型:topics)
Topics 模型示意图:
每个消费者监听自己的队列,并且设置带通配符的 routingkey,生产者将消息发给 broker,由交换机根据 routingkey 来转发消息到指定的队列。
Routingkey 一般都是有一个或者多个单词组成,多个单词之间以“.”分割,例如:inform.sms
通配符规则:
#:匹配一个或多个词。
*:匹配不多不少恰好 1 个词。
举例:
audit.#:能够匹配 audit.irs.corporate 或者 audit.irs
audit.*:只能匹配 audit.irs
从示意图可知,我们将发送所有描述动物的消息。消息将使用由三个字(两个点)组成的 Routing key 发送。路由关键字中的第一个单词将描述速度,第二个颜色和第三个种类:“<speed>.<color>.<species>”。
我们创建了三个绑定:Q1 绑定了“*.orange.*”,Q2 绑定了“.*.*.rabbit”和 “lazy.#”。
Q1 匹配所有的橙色动物。
Q2 匹配关于兔子以及懒惰动物的消息。
下面做个小练习,假如生产者发送如下消息,会进入哪个队列:
quick.orange.rabbit Q1 Q2 routingKey="quick.orange.rabbit"的消息会同时路由到Q1与Q2
lazy.orange.elephant Q1 Q2
quick.orange.fox Q1
lazy.pink.rabbit Q2 (值得注意的是,虽然这个routingKey与Q2的两个bindingKey都匹配,但是只会投递Q2一次)
quick.brown.fox 不匹配任意队列,被丢弃
quick.orange.male.rabbit 不匹配任意队列,被丢弃
orange 不匹配任意队列,被丢弃
lazy.orange.elephant Q1 Q2
quick.orange.fox Q1
lazy.pink.rabbit Q2 (值得注意的是,虽然这个routingKey与Q2的两个bindingKey都匹配,但是只会投递Q2一次)
quick.brown.fox 不匹配任意队列,被丢弃
quick.orange.male.rabbit 不匹配任意队列,被丢弃
orange 不匹配任意队列,被丢弃
下面我们以指定 Routing key="quick.orange.rabbit"为例,验证上面的答案
生产者
import com.ly.rabbitmq.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Send {private final static String EXCHANGE_NAME = "test_topic_exchange";public static void main(String[] argv) throws IOException, TimeoutException {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明exchange,指定类型为topicchannel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);// 消息内容String message = "这是一只行动迅速的橙色的兔子";// 发送消息,并且指定routing key为:quick.orange.rabbitchannel.basicPublish(EXCHANGE_NAME, "quick.orange.rabbit", null, message.getBytes());System.out.println(" [动物描述:] Sent '" + message + "'");channel.close();connection.close();}
}
消费者 1
import com.ly.rabbitmq.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv {private final static String QUEUE_NAME = "topic_exchange_queue_Q1";private final static String EXCHANGE_NAME = "test_topic_exchange";public static void main(String[] argv) throws IOException, TimeoutException {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机,同时指定需要订阅的routing key。订阅所有的橙色动物channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.orange.*");// 定义队列的消费者DefaultConsumer consumer = new DefaultConsumer(channel) {// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg = new String(body);System.out.println(" [消费者1] received : " + msg + "!");}};// 监听队列,自动ACKchannel.basicConsume(QUEUE_NAME, true, consumer);}
}
消费者 2
import com.ly.rabbitmq.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;public class Recv2 {private final static String QUEUE_NAME = "topic_exchange_queue_Q2";private final static String EXCHANGE_NAME = "test_topic_exchange";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机,同时指定需要订阅的routing key。订阅关于兔子以及懒惰动物的消息channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*.rabbit");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "lazy.#");// 定义队列的消费者DefaultConsumer consumer = new DefaultConsumer(channel) {// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg = new String(body);System.out.println(" [消费者2] received : " + msg + "!");}};// 监听队列,自动ACKchannel.basicConsume(QUEUE_NAME, true, consumer);}
}
结果 C1、C2 是都接收到消息了:
3.6 RPC
RPC 模型示意图:
基本概念:
Callback queue 回调队列,客户端向服务器发送请求,服务器端处理请求后,将其处理结果保存在一个存储体中。而客户端为了获得处理结果,那么客户在向服务器发送请求时,同时发送一个回调队列地址 reply_to。
Correlation id 关联标识,客户端可能会发送多个请求给服务器,当服务器处理完后,客户端无法辨别在回调队列中的响应具体和那个请求时对应的。为了处理这种情况,客户端在发送每个请求时,同时会附带一个独有 correlation_id 属性,这样客户端在回调队列中根据 correlation_id 字段的值就可以分辨此响应属于哪个请求。
流程说明:
- 当客户端启动的时候,它创建一个匿名独享的回调队列。
- 在 RPC 请求中,客户端发送带有两个属性的消息:一个是设置回调队列的 reply_to 属性,另一个是设置唯一值的 correlation_id 属性。
- 将请求发送到一个 rpc_queue 队列中。
- 服务器等待请求发送到这个队列中来。当请求出现的时候,它执行他的工作并且将带有执行结果的消息发送给 reply_to 字段指定的队列。
- 客户端等待回调队列里的数据。当有消息出现的时候,它会检查 correlation_id 属性。如果此属性的值与请求匹配,将它返回给应用。
四、死信队列
死信队列,又可以称之为“延迟队列”、“延时队列”,也是 RabbitMQ 队列中的一种;顾名思义,指的是进入该队列中的消息会被延迟消费的队列,这种队列跟普通的队列相比,最大的差异在于消息一旦进入普通队列将会立即被消费处理,而延迟队列则是会过一定的时间再进行消费。
在传统企业级应用系统中实现消息、业务数据的延迟处理一般是通过开启定时器的方式,轮询扫描并获取数据库表中满足条件的业务数据记录,通过比较数据记录的业务时间和当前时间,如果当前时间大于记录中的业务时间,则说明该数据记录已经超过了指定的时间而未被处理,此时需要执行相应的业务逻辑,比如失效该数据记录、发送通知信息给指定的用户等等;对于这种处理方式,定时器是每隔一定的时间频率不间断地去扫描数据库表,并不断地获取满足业务条件的数据,直到手动关闭该定时器(如果不关闭的话,毫无疑问定时器开启的线程将一直运行下去)。
传统企业级应用系统处理流程
春运抢票完全可以看作是一个大数据量、高并发请求的场景(全国几乎上千万、上亿的人都在抢),在某一时刻车票开抢之后,正常情况下将陆续会有用户抢到车票,但是距离车票付款成功是有一定的时间间隔的,在这段时间内,如果定时器频繁的从数据库中获取“未付款”状态的订单,其数据量之大将难以想象,而且如果大批量的用户在 30 分钟内迟迟不付款,那从数据库中获取的数据量将一直在增长,当达到一定程度时,将给数据库服务器和应用服务器带来巨大的压力,更有甚者将直接压垮服务器,导致抢票等业务全线崩溃,带来的直接后果将不堪设想!
早期的很多抢票软件每当赶上春运高峰期时,经常会出现“网站崩溃”、“单击购买车票却一直没响应”等状况,某种程度上可能是因为在某一时刻产生的高并发,或者定时频繁拉取数据库得到的数据量过大等状况导致内存、CPU、网络和数据库服务等负载过高所引起的。
而消息中间件 RabbitMQ 的引入,不管是从业务层面还是应用的性能层面,都大大的得到了改善,下图则为引入 RabbitMQ 消息中间件后“抢票成功后 30 分钟内未付款的处理流程”的优化。
从优化后的处理流程中可以看出 RabbitMQ 的引入主要是替代了传统处理流程的“定时器”处理逻辑,取而代之的是采用 RabbitMQ 的死信队列进行处理。死信队列/延迟队列,顾名思义,指的是可以延迟一定的时间再处理相应的业务逻辑,而这也可以看作是死信队列的作用,即死信队列/延迟队列可以实现特定的消息、业务数据等待一定的时间 TTL 后再被消费者监听消费处理。
五、面试题
1. 如何避免消息堆积?
1. 采用 work queue,多个消费者监听同一队列。
2. 接收到消息后,而是通过线程池,异步消费。
2. 如何避免消息丢失?
1. 消费者的 ACK 机制,可以防止消息丢失。(如果在消费者消费之前,MQ 宕机了,消息就没了?)
2. 可以将消息进行持久化,要将消息持久化,前提是:队列、Exchange 都持久化。
3. RabbitMQ 为什么需要信道?为什么不是 TCP 直接通信?
1. TCP 创建和销毁,开销大,创建需要三次握手,销毁需要四次分手。
2. 如果不使用信道,TCP 的方式连接到 RabbitMQ,高峰时每秒成千上万条连接会造成资源的巨大浪费,而操作系统处理 TCP 连接数时有限的,必定会造成性能瓶颈。
3. 信道的原理是一条线程一条信道,多条线程多条信道共同使用一条 TCP 连接。一条 TCP 连接可以容纳无限的信道,即使每秒造成成千上万的请求也不会造成性能瓶颈。
交换机持久化
队列持久化
消息持久化