消息队列
- 一、起源
- 二、原理
- 预取值
- 死信队列
- 死信
- 延迟队列
- 应用场景
- 三、用法
一、起源
消息队列简称MQ(Message Queue)。
假设有一个简单的订单处理系统,涉及三个业务:订单提交、库存更新和支付处理。
如果没有消息队列,订单处理系统可能会按照以下方式进行处理:
-
订单提交:用户在网站上提交订单后,网站后台服务直接接收到订单信息,并开始处理订单。订单处理的过程是同步的,网站后台服务会直接调用库存更新服务和支付处理服务来执行后续的业务逻辑。
-
库存更新:订单提交流程调用库存更新服务,进行商品库存的更新操作。这个操作是同步的,订单提交流程会阻塞直到库存更新完成。如果库存不足,库存更新服务可能会直接返回错误信息给订单提交流程。
-
支付处理:如果库存更新成功,订单提交流程会直接调用支付服务进行支付处理。支付操作同样是同步的,订单提交流程会阻塞直到支付处理完成。如果支付失败,支付服务可能会直接返回错误信息给订单提交流程。
在没有消息队列的情况下,整个订单处理过程是同步的,每个步骤都必须等待上一个步骤完成后才能进行。这种方式会导致系统的响应速度受限于处理速度最慢的步骤,并且难以处理高并发情况。此外,系统的可靠性也会受到影响,因为任何一个步骤的失败都可能导致整个订单处理过程失败。
在上述简化的订单处理系统中,如果引入消息队列,可以更好地体现解耦、异步和削峰的特性。
-
解耦:引入消息队列后,订单提交服务与库存更新服务和支付处理服务之间的耦合度降低了。订单提交服务只需将订单信息发送到消息队列,而不需要直接调用库存更新服务和支付处理服务。这样,订单提交服务与后续处理服务之间的依赖关系更松散,每个服务可以独立开发、测试和维护。
-
异步:引入消息队列后,订单处理过程变成了异步的。订单提交服务将订单信息发送到消息队列后,就可以立即响应用户请求,不需要等待后续业务处理完成。后续的库存更新和支付处理等业务逻辑则可以在自己的线程中异步处理,不会阻塞订单提交服务。这样可以提高系统的响应速度和并发能力。
-
削峰:消息队列可以用来平滑处理系统的高峰流量。在订单提交服务面对突发的订单请求时,如果后续的库存更新和支付处理服务处理不过来,消息队列可以暂时存储订单消息,避免系统因为负载过高而崩溃。后续的库存更新和支付处理服务可以按照自己的处理能力逐步消费消息队列中的订单消息,以平滑处理系统的高峰流量,保证系统的稳定性和可靠性。
综上所述,引入消息队列可以使订单处理系统实现解耦、异步和削峰的特性,提高系统的灵活性、可扩展性和稳定性。
二、原理
RabbitMQ 是一个流行的开源消息代理软件,用于实现高效的消息队列。它基于 AMQP(高级消息队列协议)标准,提供了可靠的消息传递机制,被广泛应用于构建分布式系统、微服务架构以及异步通信。
下面是 RabbitMQ 的基本原理:
-
消息生产者:
生产者是消息的发送方,它负责将消息发送到 RabbitMQ 交换机。消息可以是任何数据,通常以字节流的形式发送。生产者连接到 RabbitMQ,选择一个交换机,并将消息发送到该交换机。生产者可以选择指定消息的路由键,以确定消息被发送到哪个队列。 -
交换机(Exchange):
交换机是消息的中转站,它接收来自生产者的消息,并将它们路由到一个或多个队列中。交换机根据特定的路由规则将消息路由到队列中。RabbitMQ 提供了几种类型的交换机,包括直接交换机、扇出交换机、主题交换机和头交换机,每种类型都有不同的路由规则。 -
队列(Queue):
队列是消息的存储容器,它保存着被发送到交换机并满足特定路由规则的消息。生产者发送的消息最终都会被存储在一个或多个队列中。消费者连接到队列并从中接收消息。队列可以配置为持久化,以确保即使 RabbitMQ 服务器重启,也不会丢失消息。 -
消息消费者:
消费者是消息的接收方,它连接到 RabbitMQ 并订阅一个或多个队列,以接收消息。一旦有消息被发送到队列中,消费者就会收到通知,并从队列中接收消息。消费者处理消息的方式可以是同步或异步的,通常使用消费者来处理和执行从队列中接收到的消息。 -
虚拟主机(Virtual Host):
虚拟主机提供了逻辑隔离机制,允许在单个 RabbitMQ 服务器上创建多个独立的消息代理环境。每个虚拟主机都有自己的一组交换机、队列和权限规则,使得不同的应用或团队可以在同一台 RabbitMQ 服务器上独立地使用消息队列服务而不会相互干扰。 -
持久化:
RabbitMQ 支持消息的持久化,包括交换机、队列和消息本身。通过持久化,可以确保即使在 RabbitMQ 服务器重启后,消息仍然不会丢失。
这些是 RabbitMQ 的基本原理,它们提供了一个强大而灵活的消息传递机制,适用于各种分布式系统和应用场景。
预取值
预取值可以理解为消费者一次从队列中获取的消息的个数。
默认情况下,RabbitMQ 的 prefetchCount 参数为 0,表示没有限制,即消费者会尽可能快地从队列中获取消息。
调用 channel.basicQos(prefetchCount)
来设置 prefetchCount 参数为 1,即每次只能获取一条消息。这样做可以确保消费者在处理完当前的消息之前不会收到新的消息,从而实现了不公平的消息分发。
死信队列
死信
无法被消费者消费的消息;
来源:
- 消息过期(超过TTL还没有消费者接收消息,则成为死信)
- 队列达到最大长度(队列满了,无法再添加数据到 mq 中)
- 消息被拒绝(basic.reject 或
basic.nack)并且 requeue=false.
死信消息会由正常队列发送给死信交换机,再由死信交换机发送给死信队列,由别的消费者取到。
因此正常队列要设置queueDeclare()
的最后一个参数,绑定到某个死信交换机。还要设置routing-key
指定死信消息去往哪个死信队列。
Map<String, Object> params = new HashMap<>();
// 正常队列设置死信交换机 参数 key 是固定值
params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
// 正常队列设置死信 routing-key 参数 key 是固定值
params.put("x-dead-letter-routing-key", "lisi");
String normalQueue = "normal-queue";
channel.queueDeclare(normalQueue, false, false, false, params);
延迟队列
基于死信队列的实现(队列中延迟):本质就是一个死信队列。利用了消息过期会成为死信消息,进入死信队列。(过期时间可以在队列上设置,也可以在生产者上设置)
基于插件的实现(交换机中延迟)
因为队列中的消息是排队处理的,如果使用在消息属性上设置 TTL 的方式,消息可能并不会按时“死亡“,因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。
因此就有了基于插件的实现,将消息的延迟放在了交换机中。
应用场景
适用于需要延迟处理的场景。如: 用户发起退款,如果三天内没有得到处理则通知相关运营人员。