使用分布式锁解决IM聊天数据重复插入的问题

导航

  • 业务背景
  • 问题分析与定位
  • 探索可行的解决方案
    • 数据库层面处理——唯一索引
    • 应用程序层面处理——分布式锁
  • 分布式锁概述
    • 分布式锁需要具备哪些特性?
    • 分布式锁有哪些实现方式?
      • 基于数据库的实现方式
      • 基于Redisson实现方式
  • Redission介绍
    • 概述
    • 可重入锁
  • 基于Redisson解决方案
    • 方案梳理
    • Springboot集成Redisson
  • 结语
  • 参考

本文首发《使用分布式锁解决IM聊天数据重复插入的问题》

业务背景和问题

在IM聊天业务中,除了自建聊天服务器,构架闭环的咨询聊天,往往还需要接入三方的平台的IM流量。

这个就不得不去适配各种平台的推流方式。

在我们自建的IM聊天服务解决方案中,IM会话创建和消息的接收是两个独立模块(接口)。
这种设计方式从客户端层面就将两个流程分开且保证了顺序性,有效避免了一些不可预知的问题。

但是,三方流量平台的是通过消息推流的方式将流量投递给我们,我们必须在接收流量的过程中完成客户、会话、消息的创建。



如果所有消息是排队,一个一个执行,那么这个流程是没有问题的。

但是,我们发现三方推送消息的时候偶尔会发生推送同一客户的多条消息的情况,这种并发写入,导致数据重复写入。

这种情况下,就可能会导致新客户创建多次,对应的会话也会创建多个。



而且还会带来数据查询中偶尔出现selectOne的异常。

desc":"org.mybatis.spring.MyBatisSystemException: nested exception is org.apache.ibatis.exceptions.TooManyResultsException: Expected one result (or null) to be returned by selectOne(), but found: 2

在没有查明具体问题之前,我们在特定查询的时候增加了limit 1限制,原则上取最新的那一条。

问题分析与定位

对于聊天场景来说,这种脏数据的产生是不能容忍的。

为了找到问题的根本解决办法,我们开始专项排查。

我把代码走读了一遍,发现代码层面没有明显的bug。但是,从数据上来看大概率是消息并发投递导致。

为了证明这种猜想,我编写了一个测试用例来验证。

具体做法,就是写一个python脚本程序,模拟10个线程,每个线程都会调用消息接收的业务接口,并且每个消息的fromUsertoUser都是一样的。

核心思想就是同时推送给一个人多条消息。

经过验证,数据重复写入的问题复现了。并发请求原因已经实锤。

这里给个简单示意图,解释一下并发请求的流程。



可行性方案探索

我们自己也思考了一下,大致的解决方案有两种:

  • 数据层面解决
  • 应用程序层面解决
数据层面解决

这个很好理解,利用Mysql字段唯一索引阻止重复插入,这是数据库自己的机制。
但是,因为user表中tenantUserId字段最初就为设计唯一索引。

ALTER TABLE user ADD UNIQUE uk_tenant_user_id( tenantUserId );

一旦为tenantUserId列加上唯一索引后,当上述并发情况发生时,请求1和请求2中必然有一者会优先完成数据的插入操作,而另一者则会得到类似错误。因此,最终保证user表中只有一条tenantUserId=xxx的记录存在。

 Cause: java.sql.SQLIntegrityConstraintViolationException: Duplicate entry 'xxx' for key 'tenantUserId'\n##

经过评估,目前单表已经仅仅2000w数据。短时间内升级不太现实。
而且历史数据的修复也不是一个小工程。

应用程序层面解决

另一种解决的思路是我们不依赖底层的数据库来为我们提供唯一性的保障,而是靠应用程序自身的代码逻辑来避免并发冲突。

之所以我们会遇到重复插入数据的问题,是因为“检测数据是否已经存在”和“插入数据”两个动作被分割开来。由于这两个步骤不具备原子性,才导致两个不同的请求可以同时通过第一步的检测。如果我们能够把这两个动作合并为一个原子操作,就可以避免数据冲突了。这时候我们就需要通过加锁,来实现这个代码块的原子性。


考虑到我们的应用程序API是多机部署的,我们决定采用业界比较成熟的分布式锁方案。

分布式锁概述

分布式锁需要具备哪些特性?
  • 在分布式系统环境下,同一时间只有一台机器的一个线程可以获取到锁
  • 高可用的获取锁与释放锁
  • 高性能的获取锁与释放锁
  • 具备可重入特性
  • 具备锁失效机制,防止死锁
  • 具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败
分布式锁实现主要有如下三种:
  • 基于数据库实现分布式锁
  • 基于Zookeeper实现分布式锁
  • 基于Redis实现分布式锁

每种的具体实现可以参考《什么是分布式锁?实现分布式锁的三种方式》。

除了以上三种分布式锁实现以外,还有一种是基于Redission实现方式。
因为我们业务接口是基于Springboot框架,所以查阅了相关资料我们选择一种Redission实现。

Redission介绍

概述

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。

以下是Redisson的结构:

Redisson作为独立节点 可以用于独立执行其他节点发布到分布式执行服务 和 分布式调度任务服务 里的远程任务。



可重入锁(Reentrant Lock)

基于Redis的Redisson分布式可重入锁RLock Java对象实现了java.util.concurrent.locks.Lock接口。同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。

RLock lock = redisson.getLock("anyLock");
// 最常见的使用方法
lock.lock();

大家都知道,如果负责储存这个分布式锁的Redisson节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是30秒钟,也可以通过修改Config.lockWatchdogTimeout来另行指定。

另外Redisson还通过加锁的方法提供了leaseTime的参数来指定加锁的时间。超过这个时间后锁便自动解开了。

另外Redisson还通过加锁的方法提供了leaseTime的参数来指定加锁的时间。超过这个时间后锁便自动解开了。

// 加锁以后10秒钟自动解锁
// 无需调用unlock方法手动解锁
lock.lock(10, TimeUnit.SECONDS);// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {try {...} finally {lock.unlock();}
}

Redisson同时还为分布式锁提供了异步执行的相关方法:

RLock lock = redisson.getLock("anyLock");
lock.lockAsync();
lock.lockAsync(10, TimeUnit.SECONDS);
Future<Boolean> res = lock.tryLockAsync(100, 10, TimeUnit.SECONDS);

RLock对象完全符合Java的Lock规范。也就是说只有拥有锁的进程才能解锁,其他进程解锁则会抛出IllegalMonitorStateException错误。但是如果遇到需要其他进程也能解锁的情况,请使用分布式信号量Semaphore 对象.

关于Redisson的更多介绍请移步Redisson 中文文档

基于Redisson解决方案

在本案例中,我们采用了基于Redisson实现分布式锁的方式。

方案梳理

技术方案确定了,但是还是需要结合实际场景合理应用。
那么,我们在哪些环节加锁呢?



我们再次对消息接收处理流程进行梳理,在原来的基础上增加了分布式锁。

Springboot集成Redisson

pom.xml中引入redisson

<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.34.1</version>
</dependency>  

yml文件中redis配置

  redis:enabled: truehost: xxxxport: 6371password: xxxdatabase: 2timeout: 10000connectionPoolSize: 15connectionMinimumIdleSize: 5

redissonConfig.java

@Configuration
@ConditionalOnExpression("${spring.redis.enabled}")
public class RedissonConfig {@Value("${spring.redis.host}")private String host;@Value("${spring.redis.port}")private String port;@Value("${spring.redis.timeout}")private String timeout;@Value("${spring.redis.password}")private String password;@Value("${spring.redis.database}")private int database;@Value("${spring.redis.connectionPoolSize}")private int connectionPoolSize;@Value("${spring.redis.connectionMinimumIdleSize}")private int connectionMinimumIdleSize;@Bean(name = "redissonClient")public RedissonClient redissonClient() {Config config = new Config();config.setCodec(new StringCodec());SingleServerConfig singleServerConfig =config.useSingleServer().setAddress("redis://" + host + ":" + port).setDatabase(database).setConnectionPoolSize(connectionPoolSize).setConnectionMinimumIdleSize(connectionMinimumIdleSize).setTimeout(Integer.parseInt(timeout));if (StringUtils.isNotBlank(password)) {singleServerConfig.setPassword(password);}return Redisson.create(config);}
}

上面准备好之后,就可以在使用了。

核心代码实现
        //新创建增加分布式锁String mutex = StrUtil.format("im:lock:user:{}", createUserDto.getTenantUserId());RLock lock = redissonClient.getLock(mutex);boolean successLock = lock.tryLock();if (!successLock) {// 获取分布式锁失败log.info(String.format("{\"Method\":\"%s\",\"content\":\"%s\"}", "【getOrCreateUser】", JsonUtil.toJson(createUserDto)));throw new BizException("该顾客已经在创建中了", ResponseCodeEnum.GET_R_LOCK_FAIL.getCode());}//创建用户User visitor = new User();visitor.setUserName(createUserDto.getTenantUserId());//...
        //消息创建过程中,首次创建顾客、会话,//在获取锁失败的情况下,增加重试机制try {receiveMessage(inputDto);}catch (BizException ex){log.error(String.format("{\"Method\":\"%s\",\"content\":\"%s\"}", "【receiveMessage】", ex));if(ex.getCode().equals(ResponseCodeEnum.GET_R_LOCK_FAIL.getCode())) {//重试一次Thread.sleep(1000);log.info(String.format("{\"Method\":\"%s\",\"content\":\"%s\"}", "【receiveMessage.retry】", JsonUtil.toJson(inputDto)));receiveMessage(inputDto);}}

Notes: 关于Springboot中如何使用Redisson,更加具体实现代码请移步《Spring Boot 实战纪实》,项目源码中可以查阅。

测试用例

确保写的代码是可调式的。-《对几次通宵加班发版的复盘和思考》

在这么多年的职业生涯中,我逐渐摸索出一个确保代码质量的笨方法——单步调试。

这里我们也写一个测试用例。具体是思路前面也提过,这里不再赘述。

import json
import requests
import time
import uuid
import threadingdef receive_xhs_msg():try:#请求urlurl = """http://localhost:7071/api/message/receive"""# 增加请求头headers = {"Content-Type": "application/json; charset=UTF-8"}message_id=str(uuid.uuid4())print('message_id:'+message_id)userInfo={"header_image":"xxx.jpg","nickname":"- ","user_id":"63038d28000000001200d311"}payload={"content":"6ED5KduMqTDJZ1ztw+ZPgw==~split~OMo7DD2gqsJqBafx9WKsZlnNNkcEYD4hLLPczczIFmr+YMtTB9Wz4ZI0MYCM4cF28kG7rfqnXdR9cRmamEJzHmKLfTmVxv5jzGUFVQOU00iimtunMAEJ4x76oJDrdAVUc4bJfV5zFLotz/Bm0WM9TADvD2cLhpHsVmaZRXaiJ96wMQgqx+K727l5S15jmMa5PiLqZqBO2q/G+WEkJSbfLQ==","from_user_id":"63038d28000000001200d311","intentCommentId":"","message_id":message_id,"message_source":2,"message_type":"HINT","timestamp":"1723268668573","to_user_id":"575d2c135e87e733f0162b88","user_info":[userInfo]}#转换成jsongetJson=json.dumps(payload)#构造发送请求response=requests.post(url=url,data=getJson,headers=headers)#打印响应数据print(response.text)time.sleep(1)except Exception as e:print('Error:',e)finally:print('执行完成')if __name__ == '__main__':threads = []for _ in range(10):  # 循环创建10个线程t = threading.Thread(target=receive_xhs_msg)threads.append(t)for t in threads:  # 循环启动10个线程t.start()t.join()

结语

分布式锁在日常工作中应用广泛,比如接口防抖(防重复提交),并发处理等。

在近期的IM消息处理中,正好有了一次生动的实践。

一点浅浅的经验,分享给大家,希望能起到抛砖引玉的作用。

参考

  • Redisson 中文文档
  • 《什么是分布式锁?实现分布式锁的三种方式》
  • 《灵活运用分布式锁解决数据重复插入问题》

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/406980.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

精彩!双疾病搭档孟德尔随机化,中国学者得出阴性结果照样拿下一区top!

孟德尔随机化分析领域&#xff0c;选题新才是王道&#xff01;在之前孟德尔随机化的文章中&#xff0c;大多是分析暴露与疾病的关系&#xff0c;今天分享的这篇文章与之前不同&#xff0c;中国学者使用双向孟德尔随机化分析两种疾病之间的关联&#xff0c;还是阴性结果&#xf…

MinerU pdf文档解析markdown格式、内容提取

参考&#xff1a; https://github.com/opendatalab/MinerU/blob/master/README_zh-CN.md demo在线网址&#xff1a; https://opendatalab.com/OpenSourceTools/Extractor/PDF/detail

C语言高手参考手册:网络编程高级话题与技术细节

在上一篇文章中&#xff0c;我们介绍了基本的网络编程概念和操作。本文将深入探讨网络编程的一些高级话题和技术细节&#xff0c;包括错误处理、非阻塞I/O、多路复用&#xff08;select/poll/epoll&#xff09;、套接字选项以及安全编程等。 1. 错误处理 1.1 错误码 在处理网…

[数据集][目标检测]红外场景下车辆和行人检测数据集VOC+YOLO格式19069张4类别

数据集格式&#xff1a;Pascal VOC格式YOLO格式(不包含分割路径的txt文件&#xff0c;仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件) 图片数量(jpg文件个数)&#xff1a;19069 标注数量(xml文件个数)&#xff1a;19069 标注数量(txt文件个数)&#xff1a;19069 标…

基于B站的热门视频数据分析与情感分析【关联性、主题、情感分析】

目录 2 研究内容 2.1 主要研究内容 2.2 拟解决的关键问题 2.2.1热门视频特征的识别和提取 2.2.2情感分析与用户反馈 2.3技术路线 2.3.1数据收集 2.3.2数据预处理 2.3.3数据挖掘 2.3.4 数据可视化 2.4可行性分析 2.4.1技术可行性 2.4.2数据可行性 2.4.3经济可行性 2.5数据库设计…

[000-01-011].第2节:持久层方案的对比

我的后端学习大纲 MyBatis学习大纲 1.持久层解决方案&#xff1a; 1.1.面试1&#xff1a;请说一说持久层解决方案有哪些&#xff1f;&#xff1f;&#xff1f; 1.jdbc JDBC为访问不同的数据库提供了一种统一的途径&#xff0c;为开发者屏蔽了一些细节问题。Java程序员使用JDB…

关于springboot的异常处理以及源码分析(一)

一、什么是异常处理 1、文档定义 首先我们先来看springboot官方对于异常处理的定义。springboot异常处理 在文档的描述中&#xff0c;我们首先可以看到的一个介绍如下&#xff1a; By default, Spring Boot provides an /error mapping that handles all errors in a sensib…

计算机网络-2-tcpip协议

1.说说 TCP/IP 四层模型&#xff1f; TCP/IP&#xff08;Transmission Control Protocol/Internet Protocol&#xff09;模型是一种用于描述互联网通信的协议层次结构。它分为四个主要层次&#xff0c;每个层次都定义了不同的协议来实现特定的功能。下面是TCP/IP模型各层的常用…

Android系统安全 — 1-OpenSSL支持的常用加解密算法介绍

常用加解密算法介绍 1. 哈希算法 常见的函数包含MD系列、SHA-1、SHA-2家族、SHA-3家族、SM3等。 1.1 MD5&#xff08;单向散列算法&#xff09; 全称是Message-Digest Algorithm 5&#xff08;信息-摘要算法&#xff09;&#xff0c;经MD2、MD3和MD4发展而来。MD5算法的使用…

0基础学习Python路径(21)Python NameSpaceScope

命名空间定义了在某个作用域内变量名和绑定值之间的对应关系&#xff0c;命名空间是键值对的集合&#xff0c;变量名与值是一一对应关系。作用域定义了命名空间中的变量能够在多大范围内起作用。 命名空间在 Python 解释器中是以字典的形式存在的&#xff0c;是以一种可以看得…

【快速入门 LVGL】-- 1、STM32 工程移植 LVGL

目录 一、LVGL 简述 二、复制一个STM32工程 三、下载 LVGL 四、裁剪 源文件 五、工程添加 LVGL 文件 六、注册 显示 七、注册 触摸屏 八、LVGL 心跳、任务刷新 九、开跑 LVGL 十、控件的事件添加、响应处理 十 一、几个好玩小事情 十 二、显示中文 ~~ 约定 ~~ 在…

从【人工智能】到【计算机视觉】,【深度学习】引领的未来科技创新与变革

前几天偶然发现了一个超棒的人工智能学习网站&#xff0c;内容通俗易懂&#xff0c;讲解风趣幽默&#xff0c;简直让人欲罢不能。忍不住分享给大家&#xff0c;点击这里立刻跳转&#xff0c;开启你的AI学习之旅吧&#xff01; 前言 – 人工智能教程https://www.captainbed.cn/l…

linux文件——用户缓冲区——概念深度探索、IO模拟实现

前言&#xff1a;本篇文章主要讲解文件缓冲区。 讲解的方式是通过抛出问题&#xff0c; 然后通过分析问题&#xff0c; 将缓冲区的概念与原理一步一步地讲解。同时&#xff0c; 本节内容在最后一部分还会带友友们模拟实现一下c语言的printf&#xff0c; fprintf接口&#xff0c…

跨境电商补单秘籍:Lazada、Shopeee、eBay、Wish等平台实战技巧

在跨境电商领域&#xff0c;Lazada、Shopee、eBay、Wish及速卖通等平台为商家提供了广阔的市场空间。为了有效扩大产品的曝光率和提升转化率&#xff0c;商家需充分利用平台活动及营销工具。平台活动不仅是获取流量的关键渠道&#xff0c;还能显著提升品牌知名度。此外&#xf…

Python画笔案例-006 绘制正多边形

1、绘制正多边形 通过 python 的turtle 库绘制一个正多边形的图案&#xff0c;如下图&#xff1a; 2、实现代码 绘制一个正多边形&#xff0c;关键两个因素&#xff0c;一个是边长&#xff0c;决定了图形的大小&#xff1b;另一个就是图形里每个角的角度&#xff0c;绘制多边形…

SSM健康生活博客小程序—计算机毕业设计源码23497

摘 要 本文设计了一种基于SSM框架的健康生活博客小程序&#xff0c;为人们提供了运动视频教学、博客信息分享&#xff0c;用户能够方便快捷地查看资讯、搜索健康方面的相关信息、还能发布个人生活博客等。健康生活博客小程序采取面对对象的开发模式进行软件的开发和硬体的架设&…

Python | Leetcode Python题解之第365题水壶问题

题目&#xff1a; 题解&#xff1a; class Solution:def canMeasureWater(self, x: int, y: int, z: int) -> bool:if x y < z:return Falseif x 0 or y 0:return z 0 or x y zreturn z % math.gcd(x, y) 0

opencv-python图像增强十一:图像强光逆光调整:

文章目录 一&#xff0c;简介&#xff1a;二&#xff0c;方案简述&#xff1a;三&#xff0c;算法实现步骤&#xff1a;3.1 获得图像的阴影区域&#xff1a;3.2 调整阴影区域的亮度和对比度 四&#xff1a;整体代码五&#xff0c;效果&#xff1a; 一&#xff0c;简介&#xff…

UE5 多个类选择界面生成。解决方案思路。

中控器CC 》用户界面控制器UI_CC 》用户界面UI_Inst 生成 CC使用接口&#xff0c;通知UI_CC开始生成UI_Inst。 蓝图函数库编写判断是否存在和创建UI的蓝图。&#xff08;此处略&#xff09; UI_CC生成时&#xff0c;userwidget使用接口&#xff0c;注册UI_CC的用户控件的控件…

系统编程-信号

6 信号与管道 1 目录 6 信号与管道 1 信号 信号的概念 信号的使用 信号的发送 通过函数来实现信号的发送 信号改造函数(重点) 给自己发送信号函数 定时闹钟函数 暂停进程的函数 例题&#xff1a; 代码一&#xff1a; 代码二&#xff1a; 代码分析 -- linux系统下…