RabbitMQ快速入门
1、Part1前言
Rabbitmq 是使用 Erlang 语言开发的开源消息队列系统,基于 AMQP 实现,是一种应用程序对应用程序的通信方
法,应用程序通过读写出入队列的消息来通信,而无需专用连接来链接它们。消息传递指的是应用程序之间通过在
消息中发送数据进行通信,而不是通过直接调用彼此通信,直接调用通常是指远程过程调用的技术。
2、Part2核心组成
-
Server:又称 Broker,接收客户端的连接,实现 AMQP 实体服务,安装 rabbitmq-server。
-
Connection:连接,应用程序与Broker的网络连接TCP/IP/三次握手和四次挥手。
-
Channel:网络信道,几乎所有操作都在 Channel 中进行,Channel 是进行消息读写的通道,客户端可以建
立多个 Channel,每个 Channel 代表一个会话任务。
-
Message:消息,服务与应用程序之间传送的数据,由 Properties 和 Body 组成,Properties 可以对消息进
行修饰,比如消息的优先级,延迟等高级特性,Body 则是消息体的内容。
-
Virtual Host:虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个虚拟主机可以有若干个 exchange 和
queue,同一个虚拟主机里面不能有相同名称的 exchange。
-
Exchange:交换机,接收消息,根据路由键发送消息到绑定的队列(不具备消息存储能力)。
-
Bindings: exchange 和 queue 之间的虚拟连接,binding 中可以保存多个 routing key。
-
Routing key:是一个路由规则,虚拟机可以用它来确定如何路由一个特定消息。
-
Queue:队列,也称为 Message Queue,消息队列,保存消息并将它们转发给消费者。
3、Part3Rabbitmq 消息模式
3.1 Simple 模式
Simple 模式是最简单的一个模式,由一个生产者,一个队列,一个消费者组成,生产者将消息通过交换机(此时,
图中并没有交换机的概念,如不定义交换机,会使用默认的交换机)把消息存储到队列,消费者从队列中取出消息
进行处理。
3.2 Fanout 模式
Fanout——发布订阅模式,是一种广播机制。
此模式包括:一个生产者、一个交换机 (exchange)、多个队列、多个消费者。生产者将消息发送到交换机,交换
机不存储消息,将消息存储到队列,消费者从队列中取消息。如果生产者将消息发送到没有绑定队列的交换机上,
消息将丢失。
3.3 Direct 模式
Direct 模式是在 Fanout 模式基础上添加了 routing key,Fanout(发布/订阅)模式是交换机将消息存储到所有绑
定的队列中,而 Direct 模式是在此基础上,添加了过滤条件,交换机只会将消息存储到满足 routing key 的队列
中。
3.4 Topic 模式
Topic 模式是生产者通过交换机将消息存储到队列后,交换机根据绑定队列的 routing key 的值进行通配符匹配,
如果匹配通过,消息将被存储到该队列,如果 routing key 的值匹配到了多个队列,消息将会被发送到多个队列;
如果一个队列也没匹配上,该消息将丢失。
routing_key 必须是单词列表,用点分隔,其中 *
和 #
的含义为:
*
:1个单词
#
:0个或多个单词
3.5 Work 模式
当有多个消费者时,如何均衡消息者消费消息的多少,主要有两种模式:
- 轮询模式分发:按顺序轮询分发,每个消费者获得相同数量的消息
- 公平分发:根据消费者消费能力公平分发,处理快的处理的多,处理慢的处理的少,按劳分配
3.5.1 轮询分发
在这种模式下,rabbitmq 采用轮询的方式将任务分配给多个消费者,但可能出现一种情况,当分配给某一个消费
者的任务很复杂时,而有些消费者接收的任务较轻量,会出现有的消费者很忙,而有的消费者处于空闲的状态,而
rabbitmq 不会感知到这种情况的发生,rabbitmq 不考虑消费者未确认消息的数量,只是盲目的分配任务。
3.5.2 公平分发
为了解决 Work 轮询分发模式 这个问题,rabbitmq 使用带有 perfetchCount = 1 设置的 basicQos 方法。当消费
者接受处理并确认前一条消息前,不向此消费者发送新消息,会分配给其他空闲的消费者。
相较于轮询分发模式,添加了 Qos 机制,设置值为1,代表消费者每次从队列中获取几条消息。
当使用 Work 公平分发模式时,要设置消费者为手动应答,并且开启 Qos 机制。
4、Part4防止消息丢失机制
4.1 消息确认
消费者完成一项任务可能需要几秒钟,如果其中一个消费者开始了一项长期任务并且只完成了部分任务而死亡,如
果将 autoAck 设置为 true ,一旦 RabbitMQ 将消息传递给消费者,它会立即将其标记为删除,在这种情况下,我
们将丢失所有已分派给该特定消费者但尚未处理的消息。
如果其中一个消费者宕了,rabbitmq 可以将其消息分配给其他消费者。为了确保消息不会丢失,rabbitmq 采用
消息确认,消费者发回确认消息,告诉 rabbitmq 消息已经被接收并处理,此时,rabbitmq 可以放心的删除这条
消息。
如果消费者在没有发送 ack 的情况下宕了,rabbitmq 将理解为该条消息未被消费者处理完,如果有其他消费者在
线,将迅速重新交付给其他消费者,这样就可以确保不会丢失消息了。
默认情况下rabbitmq 会启用手动消息确认,也就是 autoAck 默认为 false,一旦我们完成了一项任务,需要手动
的进行消息确认,所以 autoAck 需要保持为默认值 false,并使用如下方法进行手动应答。
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
4.2 持久化
rabbitmq 的消息确认机制可以保证消息不会丢失,但是如果 rabbitmq 服务器停止,我们的任务仍然会丢失。
当 rabbitmq 退出或崩溃时,如果不进行持久化,队列和消息都会消失。需要做两件事来确保消息不会丢失,将队
列和消息都标记为持久的。
1、设置队列持久
boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);
2、设置消息持久
channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
将消息标记为持久性并不能完全保证消息不会丢失,当 rabbitmq 接收到消息并且还没保存时,仍然有很短的时间
窗口会使消息丢失,如果需要更强的保证,可以使用发布者确认机制。
5、Part5使用场景
- 解耦、削峰、异步
5.1 解耦
在微服务架构体系中,微服务A需要与微服务B进行通信,传统的做法是A调用B的接口。但这样做如果系统B无法
访问或连接超时,系统A需要等待,直到系统B做出响应,并且A与B存在严重的耦合现象。如果引入消息队列进行
系统AB的通信,流程是这样的:
-
系统A将消息存储到消息队列中,返回成功信息
-
系统B从队列中获取消息,进行处理操作
系统A将消息放到队列中,就不用关心系统B是否可以获取等其他事情了,实现了两个系统间的解耦。
使用场景:
- 短信、邮件通知
5.2 削峰
系统A每秒请求100个,系统可以稳定运行,但如果在秒杀活动中,每秒并发达到1w个,但系统最大处理能力只能
每秒处理 1000 个,所以,在秒杀活动中,系统服务器会出现宕机的现象。如果引入 MQ ,可以解决这个问题。
每秒 1w个请求会导致系统崩溃,那我们让用户发送的请求都存储到队列中,由于系统最大处理能力是每秒1000个
请求,让系统A每秒只从队列中拉取1000个请求,保证系统能稳定运行,在秒杀期间,请求大量进入到队列,积压
到MQ中,而系统每秒只从队列中取1000个请求处理。这种短暂的高峰期积压是没问题的,因为高峰期一旦过去,
每秒请求数迅速递减,而系统每秒还是从队列中取1000个请求进行处理,系统会快速将积压的消息消费掉。
使用场景:
-
秒杀活动
-
团抢活动
5.3 异步
用户注册,需要发送注册邮件和注册短信,传统的做法有两种:串行、并行。
-
串行方式:将注册信息写库后(50ms),发送邮件(50ms),再发送短信(50ms),任务完成后,返回客
户端,共耗时(150ms)
-
并行方式:将注册信息写库后(50ms),开启子线程让发送邮件和发送短信同时进行(50ms),返回客户
端,共耗时(100ms)
-
引入MQ,将注册信息写库(50ms),将发送邮件和短信的操作写入队列(5s),返回客户端,而消费者什
么时候从队列中取消息进行处理,不用关心,共耗时(55ms)
使用场景:
- 将不是必须等待响应结果的业务逻辑进行异步处理