【测试开发】Mq消息重复如何测试?

本篇文章主要讲述重复消费的原因,以及如何去测试这个场景,最后也会告诉大家,目前互联网项目关于如何避免重复消费的解决方案。

Mq为什么会有重复消费的问题?

Mq 常见的缺点之一就是消息重复消费问题,产生这种问题的原因是什么呢?有以下几点:

工作流程

Mq消息重复如何测试?

1、producer 生成数据,发送到broker集群,当遇到网络抖动超时,可能会重复发送。

为了保证数据的可靠性一般都会配置重试机制如下:

rocketmq:producer:group: sanyouProducer#发送消息超过5秒未接收到broker返回的成功消息send-message-timeout: 5000#重试最大次数retry-times-when-send-failed: 2max-message-size: 4194304name-server: 172.30.34.10:9876;172.30.35.37:9876;172.30.35.30:9876#发送消息超时时长,意思是超过5秒钟未收到broker返回的发送成功的消息,#producer会重复发送,但并不是一直发送,会根据retry-times-when-send-failed次数,#最多重试多少次

极端情况下,网络出现抖动,生产者超过设置的时间未收到broker返回的成功消息,会重新发送消息。

2、消费者宕机,未提交offset给broker

由上图可知,broker接收到producer 发送的消息后,会把消息发送给消费者,一般情况下,消费者消费完一条数据,会提交一个offset给到broker,告诉它,这条消息我消费了,但是,极端情况下,消费者消费一条消息成功,提交offset之前,宕机了或者网络抖动超时了,broker未收到offset,就认为这条消息没人消费,当消费者重启服务器或网络恢复,那么broker还会发送这条消息给消费者重新消费。

3、业务上的bug,可能会导致重复消费。

生产者producer的上游系统,突然出现了bug,导致重复调用生产者所在服务的接口,生产者收到请求后,继续发送消息给broker。

当然了,重复消费的原因有很多,以上只是常见的几种原因,那怎么去测试呢?

怎么测试重复消费场景?

假如有这么一个场景,采购员在采购系统的前端页面进行采购单下单操作,下单成功后,采购系统这边会保留一份采购单数据,然后发送一条mq给到wms 仓库系统,那么生产者就是采购系统,消费者就是wms仓库系统,wms消费到采购单的消息,落入数据库wms_purchase表中,为了简化,我只设计了三个字段。

建表ddl:

CREATE TABLE `wms_purchase` (`id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT '仓库采购单id',`purchase_id` bigint(20) NOT NULL COMMENT '采购单id',`purchase_name` varchar(255) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=237 DEFAULT CHARSET=utf8;

怎么测试呢?很简单,我们只要编写生产者工具,在工具里加个循环,尽量循环多次,如下:

@RestController
@RequestMapping("/mq")
public class ProducerController {// 自动注入 RocketMQTemplate模板类,用于生产消息@Autowiredprivate RocketMQTemplate mqTemplate;// 模拟生产者重复消费问题,前提是数据库没有唯一索引,并且项目未做幂等性校验@RequestMapping("/send")public String testSend(@RequestBody WmsPurchaseDto params) {try {for (int i = 0; i <100 ; i++) {mqTemplate.convertAndSend("fourbrothertopic", params);}return "success";} catch (Exception e) {e.printStackTrace();return "fail";}}

解读:

requestmapping对外暴露一个web接口,地址是localhost:8080/demo/mq/send,
post请求,参数是json格式,类似
{
    "purchaseId": "256465",
    "purchaseName": "测试"
}

这种形式,然后起个for循环,循环调用convertAndSend方法,发送同样的消息,最终结果如下图:

Mq消息重复如何测试?

这里模拟producer重复发送的场景,前提是数据库没有对采购单id做唯一索引,并且项目未做幂等性校验。数据库里出现很多采购单id一样的数据,业务上这是不允许的。

假如说,项目出现了这么一种bug,开发那边是怎么修复的呢?

Mq如何保证幂等性?

分享几种解决方案的具体代码demo:

1、数据库unique key(表里不允许重复列出现)来保证幂等性。

很简单,我们只要在wms_purchase里,对purchaseId添加唯一索引即可,提示:在添加唯一索引之前,需清理完表里的数据。

也可以使用ddl语句:

ALTER TABLE `wms_purchase` ADD UNIQUE ( `purchaseId` ) 

代码不变,调用以下接口:

localhost:8080/demo/mq/send post请求
{"purchaseId": "256465","purchaseName": "测试"
}

得到以下结果:

Mq消息重复如何测试?

上图中,循环生产同一条采购单数据,但是右边表中只出现了一条采购单id是256465的数据,说明添加唯一索引确实保证了幂等性,但是代码里却出现大量类似Duplicate entry '256465' for key 'uniqe_key_purchaseid' 日志,是因为触发了数据设置的唯一索引,

由于触发了唯一索引,导致消费者未提交offset给broker,那么broker会认为这条消息未被消费,后续会持续不断地推送消息给消费者,也就意味着会持续不断地报错。

另外这种持续无效的请求数据库会占用数据库的连接资源,在高并发的场景下,会严重拖垮系统响应效率。

虽然保证了幂等性,但是日志里总是报错,太不讲究、也不雅观,那怎么解决呢?

2、数据库unique key+redis 来保证幂等性。

如截图:

Mq消息重复如何测试?

通俗的理解就是,消费者在进行数据库落库操作之前,会判断redis是有这条采购单数据,如果有就直接放过这条消息不做处理,没有这条数据,那就进行落库操作,但在落库之前还要进一步判断数据库是否有这条采购单数据,没有那就进行落库,落库成功,再把采购单的id当做key,采购单数据当做value set 进redis缓存里,设置一定的过期时间。

redis基于内存,操作数据特别快,在进行落库之前查询redis,可以避免很多无效的请求数据库,但是为啥要设置过期时间?因为redis的内存资源有限,并且很宝贵,所以我们希望设置的数据能在一段时间内定期失效,即使失效,也没关系,还有数据库的唯一索引兜底。

这样就很好的保证了幂等性,也避免了大量的日志报错。伪代码如下:

@Component
//mq的监听器,指定topic是TopicTest,消费者组consumerGroupTest
@RocketMQMessageListener(topic = "fourbrothertopic", consumerGroup = "consumerGroupTest")
@Slf4j
public class ConsumeController implements RocketMQListener {@Autowiredprivate WmsPurchaseMapper wmsPurchaseMapper;@Autowiredprivate RedisTemplate redisTemplate;@Overridepublic void onMessage(String message) {log.info("------- Consumer: {}", message);//将message消息映射成WmsPurchase实体WmsPurchase wmsPurchase = JSONObject.parseObject(message, WmsPurchase.class);//首先判断redis里面是否有这条采购单数据,通过PurchaseId查询,有数据,则直接放过不做处理if (redisTemplate.opsForValue().get(wmsPurchase.getPurchaseId().toString())==null){//然后再使用PurchaseId查询数据库,有数据,则直接放过不做处理if (null == wmsPurchaseMapper.selectByPurchaseId(wmsPurchase.getPurchaseId())){//数据库没有数据,就进行插入操作,if (wmsPurchaseMapper.insert(wmsPurchase)>0){//插入成功就把purchaseid塞进redis里,过期时间是72小时redisTemplate.opsForValue().set(wmsPurchase.getPurchaseId(),wmsPurchase.toString(),72, TimeUnit.HOURS);}}else {//能走到这个判断分支,说明缓存里的采购单数据已经失效,如果还有消息重复消费//那就再放入缓存一次,72h过期redisTemplate.opsForValue().set(wmsPurchase.getPurchaseId(),wmsPurchase.toString(),72, TimeUnit.HOURS);log.info("数据库已保留该数据");// 触发重复消费告警机制}}else {log.info("缓存已保留该数据");// 触发重复消费告警机制}}
}

思路很简单,如代码中注释。当然这种方法也有缺点,就是过于依赖redis,有些系统没有使用redis组件,那么还得维护一套redis组件,并且还得保证redis集群高可用。那项目只有mysql,能不能依靠数据库去维护保证幂等性呢?当然可以!

3、还有一种方法叫去重表+唯一索引,顾名思义就是另外维护一张表,记录已经消费的采购单数据,其实和上述方法差不多,上述方法查询缓存,取重表查询数据库取重表。

伪代码 如下:

@Component
//mq的监听器,指定topic是TopicTest,消费者组consumerGroupTest
@RocketMQMessageListener(topic = "fourbrothertopic", consumerGroup = "consumerGroupTest")
@Slf4j
public class ConsumeController implements RocketMQListener {@Autowiredprivate WmsPurchaseMapper wmsPurchaseMapper;@Autowiredprivate UniquePurchaseMapper uniquePurchaseMapper;@Autowiredprivate RedisTemplate redisTemplate;@SneakyThrows@Overridepublic void onMessage(String message) {log.info("------- Consumer: {}", message);//将message消息映射成WmsPurchase实体WmsPurchase wmsPurchase = JSONObject.parseObject(message, WmsPurchase.class);log.info("映射后实体消息"+ JSON.toJSONString(wmsPurchase));if (uniquePurchaseMapper.selectByPurchaseId(wmsPurchase.getPurchaseId().intValue())  == null){if (null == wmsPurchaseMapper.selectByPurchaseId(wmsPurchase.getPurchaseId())){//数据库没有数据,就进行插入操作,if (wmsPurchaseMapper.insert(wmsPurchase)>0){//插入成功就把purchaseid塞进unique_purchaseUniquePurchase  uniquePurchase =   new UniquePurchase();uniquePurchase.setPurchaseId(wmsPurchase.getPurchaseId().intValue());log.info("插入取重表消息:"+ JSON.toJSONString(uniquePurchase));uniquePurchaseMapper.insert(uniquePurchase);}}else {log.info("数据库已保留该数据");//自动触发告警机制}}else {log.info("取重表已有这条采购单数据");}}

代码已上传至gitee,感兴趣可以自行阅读。

上述方式在查询取重表时,并发不安全,极端情况下还是会触发唯一索引错误,比如说,消费者要消费大量消息(线程),执行上述代码,A线程执行完23行,挂起了,cpu把执行权给了B线程,B执行到25行并插入成功,那么这时A线程被唤起,也执行到了23行,结果触发了唯一索引错误。那怎么避免呢?

我们可以让所有线程别并发执行,串行执行,那就用到redis的分布式锁技术。

4、分布式锁+uniquekey

伪代码如下

@Component
//mq的监听器,指定topic是TopicTest,消费者组consumerGroupTest
@RocketMQMessageListener(topic = "fourbrothertopic", consumerGroup = "consumerGroupTest")
@Slf4j
public class ConsumeController implements RocketMQListener {@Autowiredprivate WmsPurchaseMapper wmsPurchaseMapper;@Autowiredprivate RedissonClient redisson;@Autowiredprivate UniquePurchaseMapper uniquePurchaseMapper;@Autowiredprivate RedisTemplate redisTemplate;@SneakyThrows@Overridepublic void onMessage(String message) {log.info("------- Consumer: {}", message);//将message消息映射成WmsPurchase实体WmsPurchase wmsPurchase = JSONObject.parseObject(message, WmsPurchase.class);
// 注入redisson
// 获取锁对象RLock lock = redisson.getLock("lockName");try {// 1. 最常见的使用方法//lock.lock();// 2. 支持过期解锁功能,10秒钟以后自动解锁, 无需调用unlock方法手动解锁//lock.lock(10, TimeUnit.SECONDS);// 3. 尝试加锁,最多等待2秒,上锁以后8秒自动解锁boolean res = lock.tryLock();if (res) { //成功//然后再使用PurchaseId查询数据库,有数据,则直接放过不做处理if (null == wmsPurchaseMapper.selectByPurchaseId(wmsPurchase.getPurchaseId())){//数据库没有数据,就进行插入操作,if (wmsPurchaseMapper.insert(wmsPurchase)>0){//插入成功就把purchaseid塞进redis里,过期时间是72小时redisTemplate.opsForValue().set(wmsPurchase.getPurchaseId().toString(),wmsPurchase.toString(),1, TimeUnit.HOURS);}}else {redisTemplate.opsForValue().set(wmsPurchase.getPurchaseId().toString(),wmsPurchase.toString(),1, TimeUnit.HOURS);log.info("数据库已保留该数据");//自动触发告警机制}}} catch (Exception e) {e.printStackTrace();} finally {//释放锁RLock lockName = redisson.getLock("lockName");if (lockName.isLocked()) {if (lockName.isHeldByCurrentThread()) {lockName.unlock();}}}
}

这种也是比较常见的一种,缺点也很明显,在高并发,大请求量的场景下,所有线程串行执行,处理效率势必会降低。当然了,技术没有好坏,只有合不合适。如果你的项目并发量一般,可以尝试使用上述方法。

具体代码demo已上传至gitee平台,地址如下:

https://gitee.com/lv1792017548/rocketmq-demo.git

总结

本文主要分享了如何测试mq消息队列重复性消费,以及避免重复消费常见的解决方案。

最后感谢每一个认真阅读我文章的人,礼尚往来总是要有的,虽然不是什么很值钱的东西,如果你用得到的话可以直接拿走:

在这里插入图片描述

这些资料,对于【软件测试】的朋友来说应该是最全面最完整的备战仓库,这个仓库也陪伴上万个测试工程师们走过最艰难的路程,希望也能帮助到你!   

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/127625.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Spring+MyBatis使用collection标签的两种使用方法

目录 项目场景&#xff1a; 实战操作&#xff1a; 1.创建菜单表 2.创建实体 3.创建Mapper 4.创建xml 属性描述&#xff1a; 效率比较&#xff1a; 项目场景&#xff1a; 本文说明了Spring BootMyBatis使用collection标签的两种使用方法 1. 方法一: 关联查询 2. 方法…

计算机网络基础知识(非常详细)

1. 网络模型 1.1 OSI 七层参考模型 七层模型&#xff0c;亦称 OSI&#xff08;Open System Interconnection&#xff09;参考模型&#xff0c;即开放式系统互联&#xff0c;是网络通信的标准模型。一般称为 OSI 参考模型或七层模型。 它是一个七层的、抽象的模型体&#xff…

【刷题篇】贪心算法(一)

文章目录 分割平衡字符串买卖股票的最佳时机Ⅱ跳跃游戏钱币找零 分割平衡字符串 class Solution { public:int balancedStringSplit(string s) {int lens.size();int cnt0;int balance0;for(int i0;i<len;i){if(s[i]R){balance--;}else{balance;}if(balance0){cnt;}}return …

PDF转Word的方法分享与注意事项。

PDF和Word是两种常用的文档格式&#xff0c;它们各有优点&#xff0c;适用于不同的场景。然而&#xff0c;有时候我们需要将PDF转换为Word&#xff0c;以便更好地进行编辑和排版。本文将介绍几种常用的PDF转Word的方法&#xff0c;并分享一些注意事项。 一、PDF转Word的方法 使…

【系统设计系列】数据库

系统设计系列初衷 System Design Primer&#xff1a; 英文文档 GitHub - donnemartin/system-design-primer: Learn how to design large-scale systems. Prep for the system design interview. Includes Anki flashcards. 中文版&#xff1a; https://github.com/donnemarti…

飞行动力学 - 第18节-全机航向稳定性与隐身性 之 基础点摘要

飞行动力学 - 第18节-全机航向稳定性与隐身性 之 基础点摘要 1. 全机航向静稳定性2. 垂尾与隐身3. 参考资料 1. 全机航向静稳定性 机翼贡献 上反角 复杂、极小幅降低 后掠角 增加稳定性 机身贡献 降低稳定性 尾翼贡献 航向静稳定性的最大来源 平尾 类似机翼贡献 垂尾 最大来…

RESP无法连接linux上redis问题

1.本机无法ping通虚拟机IP 没有打开服务&#xff08;这只是无法ping通虚拟机的一种原因&#xff09; 其他原因可以参考 虚拟机ping不通的几种原因及解决办法_虚拟机ping不通主机_在键盘上弹钢琴的菜菜的博客-CSDN博客 2.未关闭linux系统的防火墙导致无法连接redis 查看防火墙…

【数据结构】绪论

绪论 1.1数据结构的基本概念1.1.1 基本概念和术语1.1.2数据结构的三要素 1.2 算法与算法评价 1.1数据结构的基本概念 1.1.1 基本概念和术语 数据 数据是信息的载体&#xff0c;是描述客观事物属性的数、字符及所有能输入到计算机中并被计算机程序识别 和处理的符号的集合。数…

笔记随笔:基于selvlet的Web应用程序流程

前言&#xff1a; 欢迎阅读本文&#xff0c;本文将介绍基于Servlet的Web应用程序的开发流程。Servlet是Java技术中用于处理Web请求和生成动态内容的核心组件之一。通过学习本文&#xff0c;您将了解从项目结构搭建到Servlet类编写、配置和部署的全流程&#xff0c;帮助您快速入…

切面(增强)的优先级

Component Aspect Order(value 10)//为增强类指定一个优先级的值,值越小,优先级越高,优先级越高的前置先执行,后置后执行,类似洋葱 为增强类指定一个优先级的值,值越小,优先级越高,优先级越高的前置先执行,后置后执行,类似洋葱 首先会执行前置通知,再执行目标方法,按照顺序和优…

huggingface 使用入门笔记

概念 Hugging Face Hub​​和 Github 类似&#xff0c;都是Hub(社区)。Hugging Face可以说的上是机器学习界的Github。Hugging Face为用户提供了以下主要功能&#xff1a; ​模型仓库&#xff08;Model Repository&#xff09;​​&#xff1a;Git仓库可以让你管理代码版本、…

【C++深入浅出】类和对象中篇(六种默认成员函数、运算符重载)

目录 一. 前言 二. 默认成员函数 三. 构造函数 3.1 概念 3.2 特性 四. 析构函数 4.1 概念 4.2 特性 五. 拷贝构造函数 5.1 概念 5.2 特性 六. 运算符重载 6.1 引入 6.2 概念 6.3 注意事项 6.4 重载示例 6.5 赋值运算符重载 6.6 前置和后置运算符重载 七. c…

Ardupilot — AP_OpticalFlow代码梳理

文章目录 前言 1 Copter.cpp 1.1 void Copter::setup() 2 system.cpp 2.1 void Copter::init_ardupilot() 3 sensors.cpp 3.1 void Copter::init_optflow() 3.2 对象optflow说明 4 OpticalFlow.cpp 4.1 void OpticalFlow::init(uint32_t log_bit) 5 AP_OpticalFlow_…

数据结构与算法-二叉搜索树红黑树

一&#xff1a;二叉搜索树 大家来看以下几个结构&#xff1a;下图中的 二叉搜索树又叫二叉查找树&#xff0c;二叉排序树&#xff1b; 它具有以下特点&#xff1a; 1.如果它的左子树不为空&#xff0c;则左子树上结点的值都小于根结点。 2.如果它的右子树不为空&#xff0c;则右…

Matlab图像处理-自适应阈值

自适应阈值 在许多的情况下&#xff0c;背景的灰度值并不是常数&#xff0c;物体和背景的对比度在图像中也有变化。这时&#xff0c;一个在图像中某一区域效果良好的阈值在其它区域却可能效果很差。在这种情况下&#xff0c;把灰度阈值取成一个随图像中位置缓慢变化的函数值是…

SNMP的监控

SNMP的监控 一、SNMP 介绍1.1 什么是SNMP1.2 SNMP的组件1.2.1 网络管理系统 NMS&#xff08;Network Management System&#xff09;1.2.2 代理进程&#xff08;Agent&#xff09;1.2.3 被管对象&#xff08;Managed Object&#xff09;1.2.4 管理信息库MIB&#xff08;Managem…

linux c++ 开发 - 05- 使用CMake创建一个动态库

外层CMakeList.txt中的内容&#xff1a; cmake_minimum_required(VERSION 3.16) PROJECT(HELLO) ADD_SUBDIRECTORY(lib bin)lib中CMakeLists.txt中的内容&#xff1a; SET(LIBHELLO_SRC hello.cpp) ADD_LIBRARY(hello SHARED ${LIBHELLO_SRC})hello.h: hello.cpp: ADD_LIBR…

多元共进|科技促进艺术发展,助力文化传承

科技发展助力文化和艺术的传播 融合传统与创新&#xff0c;碰撞独特魅力 一起来了解 2023 Google 开发者大会上 谷歌如何依托科技创新 推动艺术与文化连接 传承和弘扬传统文化 自 2011 年成立以来&#xff0c;谷歌艺术与文化致力于提供体验艺术和文化的新方式&#xff0c;从生成…

mysql基于AES_ENCRYPTAES_DECRYPT实现密码的加密与解密

1.直接使用AES_ENCRYPT&&AES_DECRYPT函数导致的问题。 执行语句 select AES_ENCRYPT(cd123,key) 结果 加密过后的字符串是一串很奇怪的字符。 尝试使用上面加密过后的字符解密。 select AES_DECRYPT(u5£d|#,key) 结果 并未成功的解密 2.解决办法 使用 hex(…

【漏洞复现】网互联路由器存在密码泄露

漏洞描述 蜂网互联-让链接无限可能&#xff0c;灵活的多线分流&#xff0c;强大的策略分流&#xff0c;灵活调度各种软件应用&#xff0c;深度识别系统&#xff0c;各种应用一网打尽&#xff0c;灵活调整优先级&#xff0c;最简单的路由器&#xff0c;简洁易学的配置&#xff…