消息队列
消息队列(Message Queue,简称 MQ)是一种应用程序之间的通信方法。
基本概念
- 消息队列是一种先进先出(FIFO)的数据结构,它允许一个或多个消费者从队列中读取消息,也允许一个或多个生产者向队列中发送消息。消息在被消费之前会一直保留在队列中,这样可以确保消息的可靠传递。
工作原理
- **生产者 - 消费者模式** :生产者将消息发送到消息队列,消费者从消息队列中获取消息进行处理。生产者和消费者之间不需要直接交互,它们通过消息队列进行解耦。
- **消息传递过程** :生产者创建消息并将其发送到消息队列。消息队列接收消息并将其存储起来。消费者从消息队列中获取消息并进行处理。消费者处理完消息后,会向消息队列发送一个确认消息,表示该消息已经被成功处理。
主要特点
- **异步通信** :消息队列允许生产者和消费者异步地进行消息的发送和接收,这样可以提高系统的并发性和响应速度。
- **解耦** :消息队列可以将生产者和消费者解耦,使得它们可以独立地进行开发、部署和扩展。
- **可靠性** :消息队列通常具有可靠的消息传递机制,可以确保消息不会丢失、重复或损坏。
常见的消息队列架构模式
- **点对点(Point - to - Point)模式** :在这种模式下,一个消息只能被一个消费者消费。生产者将消息发送到一个特定的队列,消费者从该队列中获取消息进行处理。
- **发布 - 订阅(Publish - Subscribe)模式** :在这种模式下,一个消息可以被多个消费者消费。生产者将消息发送到一个主题,消费者订阅该主题,当有新的消息发布到该主题时,所有订阅该主题的消费者都会收到该消息。
应用场景
- **异步处理** :当一个任务的处理过程比较耗时,且不需要立即得到结果时,可以使用消息队列将该任务异步处理。例如,在一个电商系统中,当用户下单后,可以将订单信息发送到消息队列,然后由后台系统异步处理订单,这样可以提高系统的响应速度。
- **系统解耦** :当两个系统之间需要进行交互,但又不想直接耦合在一起时,可以使用消息队列进行解耦。例如,在一个微服务架构中,不同的微服务之间可以通过消息队列进行通信,这样可以降低系统的耦合度。
- **流量削峰** :当系统的流量突然增大,超过了系统的处理能力时,可以使用消息队列进行流量削峰。例如,在一个秒杀系统中,当秒杀活动开始时,大量的用户请求会同时到达服务器,这时可以将请求发送到消息队列,然后由后台系统按照一定的速度从消息队列中取出请求进行处理,这样可以避免服务器被瞬间的高流量压垮。
- **日志处理** :可以使用消息队列将系统的日志信息发送到日志处理系统进行处理。例如,在一个分布式系统中,各个节点的日志信息可以通过消息队列发送到日志收集器,然后由日志收集器将日志信息存储到日志数据库中。
- **分布式事务** :在分布式系统中,可以使用消息队列来实现分布式事务。例如,在一个分布式数据库系统中,当需要在多个数据库节点上执行事务操作时,可以使用消息队列来协调各个节点的操作,确保事务的原子性和一致性。
常见的消息队列产品
- **RabbitMQ** :一种开源的消息队列软件,支持多种消息队列协议,如 AMQP、STOMP、MQTT 等。
- **Kafka** :一种高吞吐量、可扩展的消息队列系统,适用于处理大量的实时数据。
- **ActiveMQ** :一种开源的消息队列软件,支持多种消息队列协议,如 JMS、AMQP、STOMP 等。
- **RocketMQ** :一种开源的消息队列软件,具有高可用性、高吞吐量、低延迟等特点,适用于大规模分布式系统。
说说你们项目里是怎么用消息队列的?
我们有一个订单系统,订单系统会每次下一个新订单的时候,就会发送一条消息到 ActiveMQ 里面去,后台有一个库存系统,负责获取消息,然后更新库存。
为什么使用消息队列?
你的订单系统不发送消息到 MQ,而是直接调用库存系统的一个接口,然后直接调用成功了,库存也更新了,那就不需要使用消息队列了呀.
使用消息队列的主要作用是:异步、解耦、削峰
解耦
不使用MQ时:A系统与各种系统耦合起来,那么需要处理的事情会多出很多

使用MQ后:
系统A发送一条消息,到消息队列中,哪个系统需要获取到哪里,那么从MQ中消费数据,如果新系统E加入的话,那么只需要编写代码,然后也直接从 MQ中消费即可,当系统D不需要这个数据时,那么只需要不对该消息进行消费即可。系统A不需要考虑给谁发送数据,也不需要维护这个代码,不需要考虑人家是否调用成功、失败、超时等等情况

总结:
通过一个MQ,发布和订阅模型,系统A就和其他系统彻底解耦了。当一个系统与其他系统之间互相调用很复杂,维护起来很麻烦,这个调用不需要同步调用接口,如果用mq给他异步化解耦也是可以的,这个时候可以考虑在自己的项目中使用mq来解耦。
异步
不用MQ的同步高延时间请求场景:

使用MQ进行异步化:

削峰


消息队列的优点和缺点
优点上面已经说了: 解耦、异步、削峰;
缺点呢? 显而易见的:
- 系统可用性降低:系统引入的外部依赖越多,越容易挂掉,本来你就是 A 系统调用 BCD 三个系统接口就好了,人家 ABCD 四个系统好好的,没啥问题,这个时候却加入了 MQ 进来,万一 MQ 挂了怎么办? MQ 挂了整套系统也会崩渍了。
- 系统复杂性提高:硬生生加个 MQ 进来,你怎么保证消息没有重复消费? 怎么处理消息丢失的情况? 怎么保证消息传递的顺序性?
- 一致性问题: A 系统处理完了直接返回成功了,人都以为你的请求成功了,但是问题是,要在 BCD 三个系统中,BD 两个系统写库成功了,结果 C 系统写虚失败了,这样就会存在数据不一致的问题,
所以说消息队列实际上是一种复杂的架构,你引入它有好多好处,但是 也得针对它带来的坏处做各种额外的技术方案和架构来规避掉,最后发现系统复杂性提升了一个数量级,也许是复杂 10 倍,但是关键时刻,用还是得用。
如何保证消息队列的高可用?
RabbitMQ 高可用性,RabbitMQ 是比较有代表性的,因为是基于主从做高可用性的。
RabbitMQ 三种模式:单机模式,普通集群模式,镜像集群模式
单机模式
就是 demo 级别的,一般就是本地启动后玩一玩,没有人生产环境中使用。
普通集群模式
• 意思就是在多台机器上启动多个 RabbitMQ 实例,每台机器启动一个,但是创建的 Queue,只会放在一个 RabbitMQ 实例上,但是每个实例都同步 queue元数据,在消费的时候,实际上是连接到另外一个实例上,那么这个实例会从queue 所在实例上拉取数据过来,这种方式确实很麻烦,也不怎么好,没做到所谓的分布式 ,就是个普通集群。因为这导致你要么消费每次随机连接一个实例,然后拉取数据,要么固定连接那个 queue 所在实例消费数据,前者有数据拉取的开销,后者导致单实例性能瓶颈。
• 而且如果那个放 queue 的实例宕机了,会导致接下来其它实例无法从那个实例拉取,如果 你开启了消息持久化,让 rabbitmq 落地存储消息的话,消息不一定会丢,得等到这个实例恢复了,然后才可以继续从这个 queue 拉取数据。这里没有什么所谓的高可用性可言,这个方案主要就是为了解决吐吞量,就是集群中的多个节点来服务于某个 queue 的读写操作。存在两个缺点
• 可能会在 RabbitMQ 中存在大量的数据传输
• 可用性没有什么保障,如果 queue 所在的节点宕机,就会导致 queue 的消息
集群镜像模式
这种模式,才是 RabbitMQ 的高可用模式,和普通的集群模式不一样的是,你创建的 queue 无论元数据还是 queue 里的消息都会存在与多个实例中,然后每次你写消息到 queue 的时候,都会自动把消息推送到多个实例的 queue 中进行消息同步。
这样的好处在于,你任何一个机器宕机了,别的机器都可以用。坏处在于,性能开销提升,消息同步所有的机器,导致网络带宽压力和消耗增加,第二就是没有什么扩展性科研,如果某个 queue 负载很重,你加机器,新增的机器也包含了这个queue 的所有数据,并没有办法线性扩展你的 queue那么如何开启集群镜像策略呢?就是在 RabbitMQ 的管理控制台,新增一个策略,这个策略就是镜像集群模式下的策略,指定的时候,可以要求数据同步到所有的节点,也可以要求就 同步到指定数量的节点,然后再次创建 queue 的时候,应用这个策略,就会自动将数据同步到其它节点上去了。
集群镜像模式下,任何一个节点宕机了都是没问题的,因为其他节点还包含了这个queue 的完整的数据,别的 consumer 可以到其它活着的节点上消费数据。
如何保证消息不被重复消费?如何保证消息消费时的幂等性?
做幂等。保证不管消费多少次,结果都是一样的,不会产生错误的数据。
幂等性就是一个数据,或者一个请求,给你执行多次,得保证对应的数据不会改变,并且不能出错,这就是幂等性。
怎么保证 幂等性?
- 比如数据要写库,首先根据主键查一下,如果这个数据已经有了,那就别插入了,执行 update 即可。
- 如果用的是 redis,那就没问题了,因为每次都是 set 操作,天然的幂等性。
- 如果不是上面的两个场景,那就做的稍微复杂一点,需要让生产者发送每条消息的时候,需要加一个全局唯一的 id,类似于订单 id 之后的东西,然后你这里消费到了之后,先根据这个 id 去 redis 中查找,之前消费过了么,如果没有消费过,那就进行处理,然后把这个 id 写入到 redis 中,如果消费过了,那就别处理了,保证别重复消费相同的消息即可。
- 还有比如基于数据库唯一键来保证重复数据不会重复插入多条,我们之前线上系统就有这个问题,就是拿到数据的时候,每次重启可能会重复,因为 Kafka消费者还没来得及提交 offset,重复数据拿到了以后,我们进行插入的时候,因为有了唯一键约束了,所以重复数据只会插入报错,不会导致数据库中出现脏数据。
如何保证消息传输不丢失?
消息队列有三个重要原则:消息不能多,不能少。不能多,指的就是刚刚提到的重复消费和幂等性问题,不能少,指的是数据在传输过程中,不会丢失。如果说使用 MQ 用来传递非常核心的消息,比如说计费,扣费的一些消息,比如设计和研发一套核心的广告平台,计费系统是一个很重的业务,操作是很耗时的,所以说广告系统整体的架构里面,实际是将计费做成异步化的,然后中间就是加了一个 MQ。例如在广告主投放了一个广告,约定的是每次用户点击一次就扣费一次,结果是用户动不动就点击了一次,扣费的时候搞的消息丢了,公司就会不断的少几块钱。这样积少成多,这就是造成了公司的巨大损失。
为什么会丢数据?
丢数据,一般分为两种,要么是 MQ 自己弄丢了,要么是我们消费的时候弄丢了。我们可以从 RabbitMQ 和 Kafka 分别来进行分析。RabbitMQ 一般来说都是承载公司的核心业务的,数据是绝对不能弄丢的。
生产者弄丢了数据
生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络啥的问题,都有可能。此时选择用 RabbitMQ 提供的事务功能,就是生产者发送数据之前,开启RabbitMQ 事务(channel.txSelect),然后发送消息,此时就可以回滚事务(channel.txRollback),然后重试发送消息,如果收到了消息,那么可以提交事务,但是问题是,RabbitMQ 事务机制一搞,基本上吞吐量会下来,因为太损耗性能。所以一般来说,如果你要确保写 RabbitMQ 消息别丢,可以开启 confirm 模式,在生产者那里设置了开启 confirm 模式之后, RabbitMQ 会给你回传一个 ack 消息,告诉你这个消息 OK 了,如果 RabbitMQ 没能处理这个消息,会给你回调一个接口,告诉你这个消息接收失败,你可以重试
// 开启事务 try { // 发送消息 } catch(Exception e) { // 重试发送消息 } // 提交
但是,因为事务机制,是同步的。针对于上述事务造成性能下降的问题,下面的方法是开启 confirm 模式:
• 首先把 channel 设置成 confirm 模式
• 然后发送一个消息
• 发送完消息之后,就不用管了
• RabbitMQ 如果接收到这个消息的话,就会回调你生产者本地的一个接口,通知你说这条消息我们已经收到了
• RabbitMQ 如果在接收消息的时候出错了,就会回调这个接口
一般生产者如果要保证消息不丢失,一般是用 confirm 机制,因为是异步的模式,
在发送消息之后,不会阻塞,直接可以发送下一条消息,这样吞吐量会更高一些。
RabbitMQ 丢失数据
这个就是 RabbitMQ 自己丢失数据, 这个时候就必须开启 RabbitMQ 的持久化,就是消息写入之后,同时需要持久化到磁盘中,哪怕是 RabbitMQ 自己宕机了,也能够从磁盘中读取之前存储的消息,这样数据一般就不会丢失了,但是存在一个极端的情况,就是 RabbitMQ 还没持久化的时候,就已经宕机了,那么可能会造成少量的数据丢失,但是这个概率是比较小的。
设置持久化的两个步骤:
- 第一个是创建 queue 的时候,将其持久化的,这样就保证了 RabbitMQ 持久化 queue 的元数据,但是不会持久化 queue 中的数据
- 第二个就是发送消息的时候,将消息的 deliveryMode 设置为 2,就是将消息设置为持久化的,此时 RabbitMQ 将会将消息持久化到磁盘上,必须同时设置两个持久化才行,哪怕是 Rabbit 挂了,也会从磁盘中恢复 queue 和 queue 中的数据。而且持久化可以跟生产者那边的 confirm 机制配置起来,只有消息被持久化到磁盘后,才会通知生产者 ACK 了,所以哪怕是在持久化磁盘之前,RabbitMQ 挂了,数据丢了,生产者收不到 ACK,你也是可以自己重发的。
消费者丢失数据
消费者丢失数据,主要是因为打开了 AutoAck 的机制,消费者会自动通知RabbitMQ,表明自己已经消费完这条数据了,但是如果你消费到了一条消息,还在处理中,还没处理完,此时消费者就会自动 AutoAck 了,通知 RabbitMQ 说这条消息已经被消费了,此时不巧的是,消费者系统宕机了,这条消息就会丢失,因为 RabbitMQ 以为这条消息已经处理掉。 在消费者层面上,我们需要将 AutoAck 给关闭,然后每次自己确定已经处理完了一条消息后,你再发送 ack 给 RabbitMQ,如果你还没处理完就宕机了,此时RabbitMQ 没收到你发的 Ack 消息,然后 RabbitMQ 就会将这条消息分配给其它的消费者去处理。
如何保障消息的顺序性?
场景
以前做过一个 MySQL binlog 同步系统,压力还是非常大的,日同步数据要达到上亿。常见一点的在于 大数据项目中,就需要同步一个 mysql 库过来,然后对公司业务的系统做各种的复杂操作。在 mysql 里增删改一条数据,对应出来的增删改 3 条 binlog,接着这三条 binlog发送到 MQ 里面,到消费出来依次执行,这个时候起码得保证能够顺序执行,不然本来是:增加、修改、删除,然后被换成了:删除、修改、增加,不全错了呢。本来这个数据同步过来,应该是最后删除的,结果因为顺序搞错了,最后这个数据被保留了下来,数据同步就出错
• RabbitMQ:一个 queue,多个 consumer,这不明显乱了
• Kafka:一个 topic,一个 partition,一个 consumer,内部多线程,就会乱套
在消息队列中,一个 queue 中的数据,一次只会被一个消费者消费掉,但因为不同消费者的执行速度不一致,在存入数据库后,造成顺序不一致的问题。
RabbitMQ 保证消息顺序性
RabbitMQ:拆分多个 queue,每个 queue 一个 consumer,就是多一些 queue 而已,确实是麻烦,或者就是一个 queue,但是对应一个 consumer,然后这个consumer 内部用内存队列做排队,然后分发给底层不同的 worker 来处理。下图为:一个 consumer 对应 一个 queue,这样就保证了消息消费的顺序性。

Kafka 保证消息消息顺序性
一个 topic,一个 partition,一个 consumer,内部单线程消费,写 N 个内存,然后 N 个线程分别消费一个内存 queu 即可。注意,kafka 中,写入一个 partition 中的数据,一定是有顺序的,但是在一个消费者的内部,假设有多个线程并发的进行数据的消费,那么这个消息又会乱掉。这样时候,我们需要引入内存队列,然后我们通过消息的 key,然后我们通过hash 算法,进行 hash 分发,将相同订单 key 的散列到我们的同一个内存队列中,然后每一个线程从这个 Queue 中拉数据,同一个内存 Queue 也是有顺序的。

但是在一个消费者的内部,假设有多个线程并发的进行数据的消费,那么这个消息又会乱掉

这样时候,我们需要引入内存队列,然后我们通过消息的 key,然后我们通过hash 算法,进行 hash 分发,将相同订单 key 的散列到我们的同一个内存队列中,然后每一个线程从这个 Queue 中拉数据,同一个内存 Queue 也是有顺序的。

死信
死信队列 是什么?
死信队列(Dead Letter Queue,简称DLQ) 是一种特殊类型的消息队列,用于存储那些由于各种原因无法被正常消费的消息 。这些消息被称为“死信”,可能是因为消费者拒绝处理、消息过期、队列已满等情况。 死信队列的主要作用是防止这些无法处理的消息在系统中堆积,影响其他正常消息的消费, 同时提供一个机制来对这些消息进行后续处理。
死信队列的应用场景
- **错误消息的重试与恢复**:当消息因消费者代码错误或外部系统不可用而无法处理时,可将它们发送到死信队列,然后通过专门的消费者进行重试或人工干预。
- **异常消息的分析与监控**:通过监控死信队列中的消息,可以及时发现系统中的问题,如消费者代码中的逻辑错误或外部系统的不可用状态。
- **消息过期处理**:对于设置了TTL(Time To Live)的消息,如果它们在指定的时间内未被消费,将被发送到死信队列,以便进行过期处理。
- **消息审计与合规性检查**:将无法通过审计或合规性检查的消息发送到死信队列,可以确保只有符合要求的消息被进一步处理。
- **消息路由与分流**:在复杂的消息系统中,使用死信队列作为中间环节,可以实现更灵活的消息路由和分流策略。
- **消息备份与恢复**:将重要消息发送到死信队列(作为备份队列),可以在系统发生故障或数据丢失时进行恢复。