Redisson分布式锁原理解析

前言

首先Redis执行命令是单线程的,所以可以利用Redis实现分布式锁,而对于Redis单线程的问题,是其线程模型的问题,本篇重点是对目前流行的工具Redisson怎么去实现的分布式锁进行深入理解;开始之前,我们可以下你思考一个问题,Redisson的实现方式有何不同?为什么?

使用

引入依赖

 <dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.25.0</version>
</dependency>

添加配置

    @Autowiredprivate RedisProperties redisProperties;@Beanpublic RedissonClient redisClient() {Config config = new Config();config.setTransportMode(TransportMode.NIO);SingleServerConfig singleServerConfig = config.useSingleServer();singleServerConfig.setAddress(String.format("redis://%s:%s", redisProperties.getHost(), redisProperties.getPort()));singleServerConfig.setPassword(redisProperties.getPassword());return Redisson.create(config);}

加锁

private final RedissonClient redissonClient;public void lock() throws Exception {RLock lock = redissonClient.getLock(PRODUCT_LOCK_KEY + id);if (lock.tryLock(100, TimeUnit.MILLISECONDS)) {// 业务} else {// 业务}
}

如上使用是最简单的方式,Redission它底层封装了很多逻辑,但如果说要redis客户端实现,你要怎么实现?

Redis中本就有支持分布式锁的命令:setnx,对应RedisTemplate中,使用如下:

redisTemplate.opsForValue().setIfAbsent("lockkey", "", 1, TimeUnit.SECONDS);

使用Redis实现分布式锁需要注意的是不要产生死锁,所以使用Redis实现分布式锁有两种方式:

  1. setnx命令
  2. lua脚本

两种方式都是一个操作完成key,value的设置以及过期时间的设置,你是否有相关,他们的实现是否都一样,或者说,这个实现可以有其他实现方式?

源码

原理

  1. lua脚本保证多个命令的原子性;
  2. 采用hash数据结构,key为锁的名称,field是线程对应的名称,也因为这个数据结构,也支持可重入锁;
  3. 定时延时的操作避免死锁(看门狗);

lock()

我们先看lock()方法,tryLock()lock()底层是一样的,所以我们只看lock方法;

   @Overridepublic void lock() {try {lock(-1, null, false);} catch (InterruptedException e) {throw new IllegalStateException();}}

位置:org.redisson.RedissonLock#lock(long, java.util.concurrent.TimeUnit, boolean)

传参是:(-1, null, false)

image-20240402175531643

/**
leaseTime: 过期时间
unit:过期时间单位
interruptibly: 信号量,是否打断线程
*/
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {// 取线程ID,后面作为锁的一个标志long threadId = Thread.currentThread().getId();// 加锁// 两个步骤:// 1. 利用lua脚本设置锁与过期时间(原子操作),过期时间lockWatchdogTimeout = 30 * 1000 = 30秒// 2. 执行过期时间刷新(这里的刷新时利用回调的方式 + 延迟执行实现的定时任务),lockWatchdogTimeout/3 = 10秒Long ttl = tryAcquire(-1, leaseTime, unit, threadId);// lua脚本中,加锁成功返回nil,对应redis中是null,加锁失败,则返回的是已存在的锁的过期时间// 所以这里返回null,就是加锁成功了,就不再往下走了if (ttl == null) {return;}// 加锁成功的在上面就已经结束了,所以下面的都是加锁失败时走的// 这里它订阅了一个channel,参数threadId无用,别被误导了,它订阅的名称时固定的// 为什么这里要订阅?可以思考一下CompletableFuture<RedissonLockEntry> future = subscribe(threadId);// 检查是否超时pubSub.timeout(future);RedissonLockEntry entry;// 这里的interruptibly应该时程序一次时,是否结束,而不是一直在执行中if (interruptibly) {entry = commandExecutor.getInterrupted(future);} else {entry = commandExecutor.get(future);}try {// 这里就是一个自旋 + 加锁while (true) {// 每次循环都进行加锁操作ttl = tryAcquire(-1, leaseTime, unit, threadId);// 同样,如果这里时null,那么就是加锁成功了if (ttl == null) {break;}// 加锁失败:返回了锁的过期时间if (ttl >= 0) {try {// 信号量 + unsafe.park // 信号量:本地更改线程共享变量状态达到加锁的目的// unsafe.park:利用park方法将加锁失败(信号量更改失败)的线程进行挂起// 为什么要挂起,这个问题应该不用多说entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {if (interruptibly) {throw e;}entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);}} else {// 这个分支时ttl < 0,也就是过期时间为负数,也就是锁失效了// 对本地加锁,之后在下一次循环redis加锁if (interruptibly) {entry.getLatch().acquire();} else {entry.getLatch().acquireUninterruptibly();}}}} finally {unsubscribe(future, threadId);}
//        get(lockAsync(leaseTime, unit));}

上面的步骤就是Redisson大致逻辑:

  1. lua脚本加锁,记录线程id,防止非本线程解锁
  2. 成功则退出
  3. 添加订阅对应的channel(这里的订阅是异步的)
    1. 唤醒:当收到channel的通知后,也就是上一个锁解锁了
    2. 从第6步醒来
  4. 自旋
  5. lua脚本加锁(与第一步一样),存在订阅后,上一个锁解锁了,就不用再挂起线程
  6. 失败挂起:unsafe.park
  7. 加锁成功后,取消订阅

lua脚本加锁

image-20240402175746999image-20240402175814387

image-20240403103101607

简单说一下这个脚本做了什么(lua脚本类似js):

redis.call是调用redis命令,第一个参数是redis的命令,第二个参数是参数;

它先是exists判断了key,以及hash数据结构了的field是否存在,

如果存在,对field递增,为什么要递增?思考一下;并设置过期时间,返回返回nil

如果不存在,调用redis命令pttl,获取key的过期时间,并返回;

那如果我们自己写lua脚本呢?

redis 2.6之后支持lua脚本,一个脚本,执行这个脚本是原子性的,所以脚本里的多个命令是原子性的。

如果说要执行批量的命令,可以使用piple,但是管道的话,它并不是原子性的,他只是一次性把批量的命令发给了redis。

LUA脚本格式:

eval "脚本 KEYS[1...N] ARGV[1...N]" count key[1...N] argv[1...N]

KEYS[1…N]:key的展位符,多个时,序号递增,从1开始

ARGV[1…N]:value的展位符,多个时,序号递增,从1开始

count:是对应key输入的个数

key[1…N]:对应KEYS[0…N]的key

argv[1…N]:对应ARGV[0…N]的value

这里就大概简单的说明了一下,使得看本篇的朋友能够理解,详细的还请百度;

看门狗

使用过Redisson的朋友应该都听过“看门狗”,为什么Redisson加锁要看门狗呢?

如果我们使用lock()方法,不设置过期时间,那么应该是永不过期;

好,如果说,加锁成功,在解锁时出了意外,如服务异常退出,或宕机,导致没有解锁,那么这个锁就需要人工干预了,这是有问题的。

所以,在Redisson中,它并不是永不过期,当我们使用lock()方法时,它的参数时-1,但在最终执行lua脚本时传入了默认参数:

位置:org.redisson.RedissonLock#tryAcquireAsync

image-20240402205409341

过期时间时:internalLockLeaseTime;

image-20240402205832964image-20240402205846003

image-20240402205902393

可以看到在初始化时,它便赋予了该值一个默认的30秒;

它并没有将锁设置为无限时间,而是30秒,又怎么保证锁的有效?

所以它还有一个续约的操作,对未使用unlock的锁进行时间延迟,这一操作是为了保证未来某一时刻如果出现服务或其他问题导致解锁失败,产生死锁这样的一个情况。

来看代码:

 private RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {RFuture<Long> ttlRemainingFuture;// leaseTime=-1表示永不过期if (leaseTime > 0) {// 对应方法:tryLock(过期时间, 时间单位)ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);} else {// 对应方法:lock()// 注意它达到参数是:internalLockLeaseTime,上面说了他是30秒// internalLockLeaseTime = lockWatchdogTimeout = 30 * 1000;ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);}// 这里是上面ttlRemainingFuture回调结果处理,// 如果出现异常,就unlock解锁CompletionStage<Long> s = handleNoSync(threadId, ttlRemainingFuture);ttlRemainingFuture = new CompletableFutureWrapper<>(s);
// 这里是定时刷新任务CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {// 执行成功返回的时null,所以这里以null为加锁成功的标志if (ttlRemaining == null) {if (leaseTime > 0) {internalLockLeaseTime = unit.toMillis(leaseTime);} else {scheduleExpirationRenewal(threadId);}}return ttlRemaining;});return new CompletableFutureWrapper<>(f);}

image-20240403105722608

image-20240403105635897

handleNoSync它只是针对异常做了处理,正常情况下只是进行了封装,而unlocakInnerAysnc也是在异常时的一个回调;

return是结果封装;

image-20240403105756315

再回到刷新的部分:

它是在加锁完后的一个回调方法,ttlRemaining它就是上面执行的结果,null或者是过期时间

image-20240403110155068

 private RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {RFuture<Long> ttlRemainingFuture;// leaseTime=-1表示永不过期if (leaseTime > 0) {// 对应方法:tryLock(过期时间, 时间单位)ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);} else {// 对应方法:lock()// 注意它达到参数是:internalLockLeaseTime,上面说了他是30秒// internalLockLeaseTime = lockWatchdogTimeout = 30 * 1000;ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);}// 这里是上面ttlRemainingFuture回调结果处理,// 如果出现异常,就unlock解锁CompletionStage<Long> s = handleNoSync(threadId, ttlRemainingFuture);ttlRemainingFuture = new CompletableFutureWrapper<>(s);
// 这里是定时刷新任务CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {// 执行成功返回的时null,所以这里以null为加锁成功的标志if (ttlRemaining == null) {if (leaseTime > 0) {// leaseTime > 0 表示加锁时设置了过期时间// 而ternalLockLeaseTime存在默认值,这里就是取消了默认值internalLockLeaseTime = unit.toMillis(leaseTime);} else {// 那,这里执行 过期时间的定时刷新scheduleExpirationRenewal(threadId);}}return ttlRemaining;});return new CompletableFutureWrapper<>(f);}

image-20240403144946555

过期时间刷新的步骤:

  1. 创建ExpirationEntry,可以看作时一个上下文对象,他是延时的标识
  2. 原子操作,添加上下文ExpirationEntry
  3. 设置当前线程id到上下文
  4. 创建Timeout延时任务,延时10秒
  5. 延时任务执行(异步):
    1. 根据客户端id和锁获取上下文
    2. 通过上下文获取到线程id
    3. 异步执行延时lua脚本
    4. 执行回调:成功,回调本身,回调第四步,失败,移除上下文,取消延时
 protected void scheduleExpirationRenewal(long threadId) {// 1. 创建ExpirationEntry,可以看作时一个上下文对象,他是延时的标识ExpirationEntry entry = new ExpirationEntry();// 2. 原子操作,添加上下文ExpirationEntry// 要进行过期时间刷新,就要先添加这个对象,可以理解为一个刷新的标志ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);// 3. 设置当前线程id到上下文if (oldEntry != null) {// 不为空,说明这个对象已经存在,已经有其他线程执行了过期刷新oldEntry.addThreadId(threadId);} else {// 为空,则是不存在该映射对象(锁不存在)entry.addThreadId(threadId);try {// 执行过期时间刷新// 4. 创建Timeout延时任务,延时10秒// 5. 延时任务执行(异步)://    5.1 根据客户端id和锁获取上下文//    5.2 通过上下文获取到线程id//    5.3 异步执行延时lua脚本//    5.4 执行回调:成功,回调本身,回调第四步,失败,移除上下文,取消延时renewExpiration();} finally {if (Thread.currentThread().isInterrupted()) {cancelExpirationRenewal(threadId);}}}}

下面我们进入renewExpiration方法:

 private void renewExpiration() {// EXPIRATION_RENEWAL_MAP 在上一层方法中添加了刷新的上下文标志ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ee == null) {// 如果这里为null,就说明已经有其他线程移除了,那么就是不需要刷新,锁不存在了return;}// 4. 创建Timeout延时任务,延时10秒// 注意这里的时间: internalLockLeaseTime / 3// 之前说过:internalLockLeaseTime = lockWatchdogTimeout = 30 * 1000;// 所以这里是延迟10秒Timeout task = getServiceManager().newTimeout(new TimerTask() {// 5. 延时任务执行(异步)@Overridepublic void run(Timeout timeout) throws Exception {// 5.1 根据客户端id和锁获取上下文// entryName是客户端id+线程id// 每次执行都要检查,刷新的标志存在,就说明锁还在,需要执行ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ent == null) {return;}// 5.2 通过上下文获取到线程idLong threadId = ent.getFirstThreadId();if (threadId == null) {return;}// 5.3 异步执行延时lua脚本CompletionStage<Boolean> future = renewExpirationAsync(threadId);// 当延时完成后,执行这个方法,也是个回调future.whenComplete((res, e) -> {if (e != null) {log.error("Can't update lock {} expiration", getRawName(), e);// 异常时,移除刷新的标志// 也是避免死锁的一个方式EXPIRATION_RENEWAL_MAP.remove(getEntryName());return;}// 5.4 成功:回调本身,回调第四步,失败:移除上下文,取消延时if (res) {// 延时成功,回调当前这个方法,以实现定时任务renewExpiration();} else {// 没有成功,也是移除刷新的标志// 这里没有成功的情况,是锁已经不存在了,对应的定时任务就应该停止;// 两个步骤:// 1. task.cancel():通过上下文获取到任务,然后取消// 2. EXPIRATION_RENEWAL_MAP.remove(getEntryName());cancelExpirationRenewal(null);}});}}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);// 将定时任务放到entry中,也就是放到了定时任务的上下文中// 在下一次时,通过上下文获取到这个定时任务ee.setTimeout(task);}

其实他延时的逻辑也是一个lua脚本:

image-20240403154502286

漏了一点:

由Redisson实现JDK的Timeout类,加回调完成的一个定时任务;当当前任务执行完后,又执行本身方法,创建一个延迟任务,也就实现了一个定时任务

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/346175.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Vmess协议是什么意思? VLESS与VMess有什么区别?

VMess 是一个基于 TCP 的加密传输协议&#xff0c;所有数据使用 TCP 传输&#xff0c;是由 V2Ray 原创并使用于 V2Ray 的加密传输协议&#xff0c;它分为入站和出站两部分&#xff0c;其作用是帮助客户端跟服务器之间建立通信。在 V2Ray 上客户端与服务器的通信主要是通过 VMes…

ThinkPHP发邮件配置教程?群发功能安全吗?

ThinkPHP发邮件的注意事项&#xff1f;如何优化邮件发送的性能&#xff1f; 无论是用户注册、密码重置还是消息提醒&#xff0c;发送邮件都是一个常见的需求。AokSend将详细介绍如何在ThinkPHP框架中配置和发送邮件&#xff0c;帮助开发者轻松实现邮件功能。 ThinkPHP发邮件&…

43【PS 作图】颜色速途

1 通过PS让画面细节模糊&#xff0c;避免被过多的颜色干扰 2 分析画面的颜色 3 作图 参考网站&#xff1a; 色感不好要怎么提升呢&#xff1f;分享一下我是怎么练习色感的&#xff01;_哔哩哔哩_bilibili https://www.bilibili.com/video/BV1h1421Z76p/?spm_id_from333.1007.…

【Python教程】3-控制流、循环结构与简单字符串操作

在整理自己的笔记的时候发现了当年学习python时候整理的笔记&#xff0c;稍微整理一下&#xff0c;分享出来&#xff0c;方便记录和查看吧。个人觉得如果想简单了解一名语言或者技术&#xff0c;最简单的方式就是通过菜鸟教程去学习一下。今后会从python开始重新更新&#xff0…

MySQL之查询性能优化(七)

查询性能优化 排序优化 无论如何排序都是一个成本很高的操作&#xff0c;所以从性能角度考虑&#xff0c;应尽可能避免排序或者尽可能避免对大量数据进行排序。前面已经提到了&#xff0c;当不能使用索引生成排序结果的时候&#xff0c;MySQL需要自己进行排序&#xff0c;如果…

人脸考勤项目实训

第一章 Python-----Anaconda安装 文章目录 第一章 Python-----Anaconda安装前言一、Anaconda是什么&#xff1f;二、Anaconda的前世今生二、Windows安装步骤1.官网下载2.安装步骤安装虚拟环境 总结 前言 工欲善其事必先利其器&#xff0c;项目第一步&#xff0c;安装我们的环境…

【Unity UGUI】Screen.safeArea获取异形屏数据失败

Screen.safeArea获取不到异形屏的尺寸位置等数据 检查AndroidManifest.xml文件是否有设置&#xff1a;android:theme"style/UnityThemeSelector"&#xff0c;没有加上即可 android:theme"style/UnityThemeSelector"

第1章Hello world 4/5:对比Rust/Java/C++创建和运行Hello world全过程:运行第一个程序

讲动人的故事,写懂人的代码 1.7 对比Rust/Java/C++创建和运行Hello world全过程 有了会听懂人类的讲话,还能做记录的编程助理艾极思,他们三人的讨论内容,都可以变成一份详细的会议纪要啦。 接下来,我们一起看看艾极思是如何记录下赵可菲创建和运行Java程序Hello world,…

简记:为Docker配置服务代理

简记 为Docker配置服务代理 - 文章信息 - Author: 李俊才 (jcLee95) Visit me at CSDN: https://jclee95.blog.csdn.netMy WebSite&#xff1a;http://thispage.tech/Email: 291148484163.com. Shenzhen ChinaAddress of this article:https://blog.csdn.net/qq_28550263/art…

Leetcode3040. 相同分数的最大操作数目 II

Every day a Leetcode 题目来源&#xff1a;3040. 相同分数的最大操作数目 II 解法1&#xff1a;记忆化搜索 第一步可以做什么&#xff1f;做完后&#xff0c;剩下要解决的问题是什么&#xff1f; 删除前两个数&#xff0c;剩下 nums[2] 到 nums[n−1]&#xff0c;这是一个…

分享一个 .NET Core Console 项目中应用 NLog 写日志的详细例子

前言 日志在软件开发中扮演着非常重要的角色&#xff0c;通常我们用它来记录应用程序运行时发生的事件、错误信息、警告以及其他相关信息&#xff0c;帮助在调试和排查问题时更快速地定位和解决 Bug。 通过日志&#xff0c;我们可以做到&#xff1a; 故障排除和调试&#xff…

4.大模型微调技术LoRA

大模型低秩适配(LoRA)技术 现有PEFT 方法的局限与挑战 Adapter方法,通过增加模型深度而额外增加了模型推理延时。Prompt Tuning、Prefix Tuning、P-Tuning等方法中的提示较难训练,同时缩短了模型可用的序列长度。往往难以同时实现高效率和高质量,效果通常不及完全微调(f…

已解决Error || RuntimeError: size mismatch, m1: [32 x 100], m2: [500 x 10]

已解决Error || RuntimeError: size mismatch, m1: [32 x 100], m2: [500 x 10] 原创作者&#xff1a; 猫头虎 作者微信号&#xff1a; Libin9iOak 作者公众号&#xff1a; 猫头虎技术团队 更新日期&#xff1a; 2024年6月6日 博主猫头虎的技术世界 &#x1f31f; 欢迎来…

基于Java-SpringBoot-VUE-MySQL的高校数字化迎新管理系统

基于Java-SpringBoot-VUE-MySQL的高校数字化迎新管理系统 登陆界面 联系作者 如需本项目源代码&#xff0c;可扫码或者VX:bob1638联系作者。 首页图表 系统功能持续更新中。。。 介绍 这是一款主要用于高校迎新的系统&#xff0c;主要是采用了SpringBoot2.X VUE2.6 ElementUI2.…

mysql 数据库datetime 类型,转换为DO里面的long类型后,只剩下年了,没有了月和日

解决方法也简单&#xff1a; 自定义个一个 Date2LongTypeHandler <resultMap id"BeanResult" type"XXXX.XXXXDO"><result column"gmt_create" property"gmtCreate" jdbcType"DATE" javaType"java.lang.Long&…

软件游戏steam_api.dll丢失的解决方法,总结5种有效的方法

在玩电脑游戏时&#xff0c;我们经常会遇到一些错误提示&#xff0c;其中之一就是“游戏缺少steam_api.dll”。这个问题可能让很多玩家感到困惑和烦恼。那么&#xff0c;究竟是什么原因导致游戏缺少steam_api.dll呢&#xff1f;又该如何解决这个问题呢&#xff1f;本文将为大家…

Jmeter压测 —— 1秒发送1次请求

场景&#xff1a;有时候测试场景需要设置请求频率为一秒一次&#xff08;或几秒一次&#xff09;实现方法一&#xff1a;1、首先需要在线程组下设置循环次数&#xff08;可以理解为请求的次数&#xff09; 次数设置为请求300次&#xff0c;其中线程数跟时间自行设置 2、在设置…

JavaScript前端技术入门教程

引言 在前端开发的广阔天地中&#xff0c;JavaScript无疑是最耀眼的一颗明星。它赋予了网页动态交互的能力&#xff0c;让网页从静态的文本和图片展示&#xff0c;进化为可以与用户进行实时交互的丰富应用。本文将带您走进JavaScript的世界&#xff0c;为您提供一个入门级的教…

按键精灵在Win11中弹窗出现乱码并且自带的部分系统插件不能使用的解决方法

按键精灵中出现以下问题&#xff1a; 提示信息的弹窗出现乱码&#xff1a; 系统自带的部分像 plugin. 开头的插件不能使用&#xff0c;如下&#xff1a;s Plugin.Sys.GetDateTime() screenX Plugin.GetSysInfo.GetScreenResolutionX screenY Plugin.GetSysInfo.GetScreenRe…

Mysql使用中的性能优化——批量插入的规模对比

在《Mysql使用中的性能优化——单次插入和批量插入的性能差异》中&#xff0c;我们观察到单次批量插入的数量和耗时呈指数型关系。 这个说明&#xff0c;不是单次批量插入的数量越多越好。本文我们将通过实验测试出本测试案例中最佳的单次批量插入数量。 结论 本案例中约每次…