一、消息队列基本概念
消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构
1、消息队列的特点
可靠性
消息持久化:消息可以被持久化存储,即使消费者暂时不可用,消息也不会丢失。
确认机制:消费者成功处理消息后可以发送确认,确保消息被正确处理。如果处理失败,消息可以被重新发送或回滚。
负载均衡
多消费者:多个消费者可以竞争消费同一个队列中的消息,实现负载均衡。
高并发:通过增加消费者数量,可以提高系统的处理能力和吞吐量。
扩展性
水平扩展:可以通过增加更多的生产者和消费者来扩展系统的处理能力。
垂直扩展:可以通过增加服务器资源来提升单个节点的处理能力。
消息排序
顺序消息:某些消息队列支持消息的有序性,确保消息按照发送的顺序被消费。
优先级:可以设置消息的优先级,确保高优先级的消息优先被处理。
2、消息队列的通讯模式
点对点通讯模式
在这种模式下,消息生产者发送的消息会被存储在一个特定的消息队列中,然后由一个或多个消息消费者从该队列中接收消息
发布订阅模式
Pub/Sub 模式允许消息的发送者(发布者)和接收者(订阅者)之间松散耦合,使得多个订阅者可以同时接收相同的消息,消息生产者发布消息到指定Topic,由订阅这个Topic的消费者进行消费
两种模式的比较
请求/回复(Request/Reply)模式
客户端发送请求消息,并等待服务端的回复。服务端将回复消息发送到一个临时队列中,客户端从该队列中获取回复。
延迟消息模式
消息被存储在延时队列中,直到达到指定的时间点才被发送。
广播模式
消息可以被发送到多个队列或主题,每个队列或主题的消费者都会收到消息。
死信队列模式
无法被正常处理的消息会被转移到一个专门的死信队列中。可以对死信队列中的消息进行单独处理,如重试、记录日志等。
3、为什么要使用消息队列
这里列举几种常见应用场景
1)异步处理
举个例子,当用户下单后,将会面临一系列服务调用的流程,比如库存检查、支付处理等,但是这些后续的服务调用都是后台默默做,并不直接影响用户下单这个行为,所以我们可以将这些服务调用变成异步处理,处理完了有结果通知反馈一下就行。
这个异步处理我们可以借助MQ,具体做法:订单系统将订单信息发送到消息队列,后台系统监听到MQ有任务通知,则执行异步处理订单,如库存检查、支付处理、物流安排等。
2)流量削峰
就像用户投递快递,高峰到40W每秒,但是我们的后续处理业务每秒只能20W,还剩下20W在MQ进行堆积,这就是MQ很重要的流量削峰的能力,将用户的洪峰流量,让后台慢慢来处理,MQ承担一个缓冲的作用
比如可以这样做,用户的请求,服务器收到之后,首先写入消息队列进行排队等候真正的服务被执行,但是当加入消息队列长度超过最大值,则直接抛弃用户请求或跳转到错误页面.(这里可以做熔断降级)
3)系统解耦
我们可以看到,如果没加MQ,则一系列调用都是一条完整的同步调用链,某一个服务执行的失败会影响前后的服务。
但是我们通过MQ,可以将后续多个服务单独执行,互不影响,如果有依赖关系,某一个服务执行失败了,我们可以进行一些补偿、回滚机制等
4)实时通知
对于需要实时更新的数据,生产者将更新后的数据发布到Topic上,只要订阅了这个Topic的服务都能够收到这个实时更新的推送,进而做一些实时性的后续处理。
4、消息队列有什么缺点
最直观的就是增加了系统的复杂性,引入了第三方中间件
还有就是增加了网络开销,因为消息的传递是会消耗网络带宽资源的,并且也会带来一定的网络延迟
还有系统可用性降低,因为当MQ宕机了,那么我们有些依赖于MQ进行通信的两两服务之间就失联了
还有一个数据一致性问题,如下图所示,系统需要保证快递投递,扣减系统费,通知等之间的数据一致性,如果系统短信通知,快递通知执行成功,扣减系统费执行失败时,就会出现数据不一致问题
5、各类消息队列的比较
特性 | ActiveMQ | RabbitMQ | Kafka | RocketMQ |
---|---|---|---|---|
生产者-消费者模式 | 支持 | 支持 | 支持 | 支持 |
发布-订阅模式 | 支持 | 支持 | 支持 | 支持 |
REQUEST-REPLY模式 | 支持 | 支持 | - | 支持 |
API完备性 | 高 | 高 | 高 | 低(静态配置) |
多语言支持 | 支持,JAVA优先 | 语言无关 | 支持,JAVA优先 | 支持 |
单机呑吐量 | 万级 | 万级 | 十万级 | 单机万级 |
消息延迟 | 毫秒级 | 微秒级 | 毫秒级 | 毫秒级 |
可用性 | 高(主从) | 高(主从) | 很高(分布式) | 非常高(分布式) |
消息丢失 | - | 低 | 理论上不会丢失 | 理论上不会丢失 |
消息重复 | - | 可控制 | 理论上会有重复 | 允许重复 |
文档的完备性 | 高 | 高 | 高 | 中 |
提供快速入门 | 有 | 有 | 有 | 无 |
首次部署难度 | - | 低 | 中 | 高 |
注: - 表示尚未查找到准确数据
二、RabbitMQ的基本使用
1、RabbitMQ的一些基本概念、名词解释
Publisher
消息生产者,就是投递消息的程序
发布者 (或称为生产者) 负责生产消息并将其投递到指定的交换器上。
Message
消息由消息头和消息体组成,消息头用于存储与消息相关的元数据:如目标交换器的名字 (exchange_name 指明消息发给哪一个交换器)路由键 (RountingKey)和其他可选配置 (properties) 信息。消息体为实际需要传递的数据。
Exchange
交换器负责接收来自生产者的消息,并将消息路由到一个或者多个队列中,如果路由不到,则返回给生产者或者直接丢弃,这取决于交换器的 mandatory 属性:
当 mandatory 为 true 时:如果交换器无法根据自身类型和路由键找到一个符合条件的队列,则会将该消息返回给生产者;
当 mandatory 为 false 时:如果交换器无法根据自身类型和路由键找到一个符合条件的队列,则会直接丢弃该消息。
BindingKey
交换器与队列通过 BindingKey 建立绑定关系。 (理解为@RequesMapping,定义了路由跳转)
Routingkey
基于交换器类型的规则相匹配时,消息被路由到对应的队列中
生产者将消息发给交换器的时候,一般会指定一个 RountingKey,用来指定这个消息的路由规则,当 RountingKey 与 BindingKey相匹配时,消息被路由到对应的队列中。
(理解为写的/order/findById 定义了url)
Queue
消息队列载体,每个消息都会被投入到一个或多个队列。
用于存储路由过来的消息,多个消费者可以订阅同一个消息队列,此时队列会将收到的消息将以轮询的方式分发给所有消费者,即每条消息只会发送给一个消费者,不会出现一条消息被多个消费者重复消费的情况。
Consumer
消息消费者,就是接受消息的程序
消费者订阅感兴趣的队列,并负责消费存储在队列中的消息。为了保证消息能够从队列可靠地到达消费者,RabbitMQ 提供了消息确认机制 (messageacknowledgement),并通过 autoAck 参数来进行控制
当 autoAck 为 true 时:此时消息发送出去 (写入TCP套接字) 后就认为消费成功,而不管消费者是否真正消费到这些消息。当 TCP 连接或 channel 因意外而关闭,或者消费者在消费过程之中意外宕机时,对应的消息就丢失。因此这种模式可以提高吞吐量,但会存在数据丢失的风险。
当 autoAck 为 false 时:需要用户在数据处理完成后进行手动确认,只有用户手动确认完成后,RabbitMQ 才认为这条消息已经被成功处理,这可以保证数据的可靠性投递,但会降低系统的吞吐量。
Connection
用于传递消息的 TCP 连接。
Channel
消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。
RabbitMQ 采用类似 NIO (非阻塞式 IO ) 的设计,通过 Channel 来复用 TCP 连接,并确保每个 Channel的隔离性,就像是拥有独立的 Connection 连接。当数据流量不是很大时,采用连接复用技术可以避免创建过多的 TCP 连接而导致昂贵的性能开销。
Virtual Host
虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离
RabbitMQ 通过虚拟主机来实现逻辑分组和资源隔离,一个虚拟主机就是一个小型的 RabbitMQ服务器,拥有独立的队列、交换器和绑定关系。用户可以按照不同业务场景建立不同的虚拟主机,虚拟主机之间是完全独立的,你无法将 vhost1 上的交换器与vhost2 上的队列进行绑定,这可以极大的保证业务之间的隔离性和数据安全,默认的虚拟主机名为 / 。
Broker
简单来说就是消息队列服务器实体。就是指部署MQ的服务器实体
2、docker安装RabbitMQ
拉取RabbitMQ的镜像
docker pull rabbitmq:management
运行RabbitMQ
docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management
访问管理界面
输入 账号:guest 密码:guest
如果访问不了,可以尝试把linux的防火墙关闭试试
3、交换机类型
Direct exchange(直连交换器)
直连交换机是一种带路由功能的交换机,一个队列会和一个交换机绑定(通过binding_key,一个队列与交换机可以有多条binding_key绑定),当消息被发送的时候,需要指定一个routing_key,这个消息被送达交换机的时候,交换机就会拿着routing_key与binding_key相匹配,只要匹配上的都进行发送到相应的queue
当生产者(P)发送消息时 Rotuing key=booking 时,这时候将消息传送给 Exchange,Exchange 获取到生产者发送过来消息后,会根据自身的规则进行与匹配相应的 Queue,这时发现 Queue1 和 Queue2 都符合,就会将消息传送给这两个队列。
如果我们以 Rotuing key=create 和 Rotuing key=confirm 发送消息时,这时消息只会被推送到 Queue2 队列中,其他 Routing Key 的消息将会被丢弃。
Fanout exchange(扇形交换器)
扇型交换机(fanout exchange)将消息路由给绑定到它身上的所有队列,而不理会绑定的路由键。如果 N 个队列绑定到某个扇型交换机上,当有消息发送给此扇型交换机时,交换机会将消息的拷贝分别发送给这所有的 N 个队列。扇型用来交换机处理消息的广播路由(broadcast routing)。
Topic exchange(主题交换器)
直接交换机只能做到routingkey的精确匹配,扇形交换机只能广播到所有queue,那有没有比较折中一点的,可以进行过滤匹配的交换机?主题交换机就是做这个事情的。
对于bindingkey,写匹配的条件:
可以存在两种特殊字符 “*” 与“#”,用于做模糊匹配,其中 “*” 用于匹配一个单词,“#”用于匹配多个单词(可以是零个)
对于routingkey,写实际路由:
routing key 为一个句点号 “.” 分隔的字符串(我们将被句点号 “. ” 分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
当生产者发送消息 Routing Key=F.C.E 的时候,这时候只满足 Queue1,所以会被路由到 Queue 中,如果 Routing Key=A.C.E 这时候会被同是路由到 Queue1 和 Queue2 中,如果 Routing Key=A.F.B 时,这里只会发送一条消息到 Queue2 中。
主题交换机拥有非常广泛的用户案例。无论何时,当一个问题涉及到那些想要有针对性的选择需要接收消息的 多消费者 / 多应用(multiple consumers/applications) 的时候,主题交换机都可以被列入考虑范围。
三、SpringBoot整合RabbitMQ
参考文章
SpringBoot快速整合RabbitMQ实现MQ最基本操作_mq基本操作-CSDN博客