削峰解耦
1.通过异步处理提高系统性能(削峰、减少响应所需时间);2.降低系统耦合性。
ActiveMQ | RabbitMQ | RocketMQ | Kafka | |
吞吐 | 万 | 万 | 十万百万 | 十万百万 |
可用 | 主从 | 主从 | 分布式 | 分布式 |
时效 | ms | 性能好,微秒 | ms | ms |
功能支持 | 完备 | 完备 | 完备 | 简单 |
消息丢失 | 非常低 | 非常低 | 理论不会 | 理论不会 |
在不使用消息队列服务器的时候,用户的请求数据直接写入数据库,在高并发的情况下数据库压力剧 增,使得响应速度变慢。但是在使用消息队列之后,用户的请求数据发送给消息队列之后立即 返回,再由消息队列 的消费者进程从消息队列中获取数据,异步写入数据库。由于消息队列服务器处理速度快于数据库(消息队列也比数 据库有更好的伸缩性),因此响应速度得到大幅改善。
用户请求数据写入消息队列之后就立即返回给用户了,但是请求数据在后续的业务校验、写数据库等操作中 可能失败。因此使用消息队列进行异步处理之后,需要适当修改业务流程进行配合,比如用户在提交订单之后,订单 数据写入消息队列,不能立即返回用户订单提交成功,需要在消息队列的订单消费者进程真正处理完该订单之后,甚 至出库后,再通过电子邮件或短信通知用户订单成功,以免交易纠纷。这就类似我们平时手机订火车票和电影票。
防止重复消费
-
确保消息幂等性:设计消费者时,应确保其具有幂等性,即多次处理同一条消息所产生的结果与只处理一次时的结果相同。可以通过以下方式实现:
-
使用唯一标识符:在消息的属性中添加一个唯一的标识符,如消息ID或其他业务相关的唯一键。消费者在处理消息时,先检查该标识符是否已经处理过该消息,如果已经处理,则忽略该消息。
-
利用数据库特性:利用数据库的主键约束、唯一索引等机制来确保重复消息不会对数据产生影响。
-
使用缓存系统:在缓存系统(如Redis)中记录已消费的消息ID。使用Redis的
SETNX
命令(即仅当键不存在时插入)可以确保每条消息只会被消费一次。
-
-
手动提交偏移量:消费者可以选择手动提交消息的消费位置(偏移量),如果某个消息已经处理过,就明确告知消息队列该消息已经消费,不再分配给其他消费者。
-
使用事务机制:如果消费的过程需要保证原子性(即消费和偏移量更新的原子操作),可以使用消息队列的事务机制。
消息积压
优化消费者性能
-
增加消费者数量:水平扩展消费者,增加消费者实例的数量,以提高并行处理能力。
-
优化消费逻辑:排查并优化消费者代码中的耗时逻辑,减少不必要的计算和 I/O 操作,例如通过缓存常用数据、异步化处理等方式。
-
多线程消费:如果消费者支持多线程处理,并且消息是非顺序性的,可以通过增加线程数来提升消费速率。
2. 调整消息队列配置
-
增加分区数量:对于 Kafka 等消息队列,增加 Topic 的分区数量,从而支持更多的消费者并行处理。
-
修改消费者配置:调整消费者的配置参数,如 Kafka 的
fetch.min.bytes
、max.poll.records
等,以提高消费效率。
3. 临时扩容与分流
-
临时扩容消费者:在业务紧急时,临时增加消费者数量,快速处理积压消息。
-
新建临时 Topic:新建一个临时 Topic,将积压消息转发到临时 Topic,并增加分区数量,以提高消费速度。
4. 控制消息生产速率
-
限流措施:对生产者实施限流措施,确保其生产速率不会超过消费者的处理能力。
-
批量发送:调整生产者的发送策略,使用批量发送减少网络请求次数,提高系统吞吐量。
5. 其他优化
-
死信队列管理:对于无法处理的消息,移动到死信队列,并定期分析和处理这些消息。
-
消费者降级:在消息积压时,采用降级策略,例如只执行快路径逻辑,减少不必要的处理。
-
监控与告警:实施有效的监控与告警机制,提前发现消息积压问题并及时处理。
6. 紧急处理措施
如果消息积压情况紧急且无法快速解决,可以考虑以下措施:
-
临时丢弃部分消息:如果消息的时效性要求不高,可以临时丢弃部分积压消息,优先处理关键消息。
-
手动干预:在某些情况下,可能需要手动干预,例如调整消息队列的配置或清理部分积压消息。