目录
底层原理
加锁机制
锁互斥机制
可重入锁机制
总结
Redisson 加锁非常简单,还支持 redis 单实例、redis 哨兵、redis cluster、redis master-slave 等各种部署架构
RLock lock = redisson.getLock("cyk-test");
lock.lock();
lock.unlock();
底层原理
加锁机制
废话不多说,直接看源码,下面的代码先不看,先看 tryAcquire 是如何获取锁的
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {long threadId = Thread.currentThread().getId();Long ttl = tryAcquire(-1, leaseTime, unit, threadId);// lock acquiredif (ttl == null) {return;}CompletableFuture<RedissonLockEntry> future = subscribe(threadId);pubSub.timeout(future);RedissonLockEntry entry;if (interruptibly) {entry = commandExecutor.getInterrupted(future);} else {entry = commandExecutor.get(future);}...
}
查看 tryAcquire 方法,点进去看发现调用了 tryAcquireAsync0 方法,这里 RFuture 继承自 java.util.concurrent.Future,表示这是一个异步的任务,get 方法会同步获取结果
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {return get(tryAcquireAsync0(waitTime, leaseTime, unit, threadId));
}private RFuture<Long> tryAcquireAsync0(long waitTime, long leaseTime, TimeUnit unit, long threadId) {return getServiceManager().execute(() -> tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
查看 tryAcquireAsync 方法
private RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {RFuture<Long> ttlRemainingFuture;if (leaseTime > 0) {ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);} else {ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);}...}
查看 tryLockInnerAsync 方法
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {return commandExecutor.syncedEval(getRawName(), LongCodec.INSTANCE, command,"if ((redis.call('exists', KEYS[1]) == 0) " +"or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"return redis.call('pttl', KEYS[1]);",Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));}
这是一段加锁的 lua 脚本,用于保证原子性,参数解释如下:
-
KEYS[1] 表示加锁的 key
-
ARGV[1] 表示锁 key 的默认超时时间
-
ARGV[2] 表示加锁的客户端 ID,由 UUID:线程 ID 组成
客户端在第一次加锁完成,会设置一个 key 为客户端 ID,value 为加锁次数的 hash 数据结构:
现在知道了内部方法的逻辑,往回倒一步,重点看我加在代码中的注释
private RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {RFuture<Long> ttlRemainingFuture;if (leaseTime > 0) {ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);} else {ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);}// 这里是定义了加锁Lua脚本的异步任务,通过CompletableFuture编排CompletionStage<Long> s = handleNoSync(threadId, ttlRemainingFuture);ttlRemainingFuture = new CompletableFutureWrapper<>(s);// 这个f依赖ttlRemainingFuture的结果,如果入参的leaseTime<=0会触发看门狗机制// 否则按照设置的过期时间来过期CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {// lock acquiredif (ttlRemaining == null) {if (leaseTime > 0) {internalLockLeaseTime = unit.toMillis(leaseTime);} else {scheduleExpirationRenewal(threadId);}}return ttlRemaining;});// 返回编排好的CompletableFuturereturn new CompletableFutureWrapper<>(f);}
把 RFuture 返回以后,就到了 get 方法阻塞获取方法这里
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {return get(tryAcquireAsync0(waitTime, leaseTime, unit, threadId));
}private RFuture<Long> tryAcquireAsync0(long waitTime, long leaseTime, TimeUnit unit, long threadId) {return getServiceManager().execute(() -> tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
最后回到了这里
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {long threadId = Thread.currentThread().getId();Long ttl = tryAcquire(-1, leaseTime, unit, threadId);// lock acquiredif (ttl == null) {return;}// 这里的 subscribe 就是 Redis 订阅解锁的 lua 脚本中的 publishCompletableFuture<RedissonLockEntry> future = subscribe(threadId);pubSub.timeout(future);RedissonLockEntry entry;if (interruptibly) {entry = commandExecutor.getInterrupted(future);} else {entry = commandExecutor.get(future);}...}
接着看下面循环获取锁的逻辑
try {while (true) {// 自旋尝试获取锁ttl = tryAcquire(-1, leaseTime, unit, threadId);// 看Lua脚本,ttl为null说明锁获取到了if (ttl == null) {break;}
// waiting for messageif (ttl >= 0) {try {// 注意这里的tryAcquire和之前的tryAcquire不是同一个东西,这里是信号量的tryAcquire// entry.getLatch()这里返回的是信号量,释放锁的代码会释放一个许可// 如果没有可用的许可,会一直休眠直到超时时间 ttl msentry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {if (interruptibly) {throw e;}entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);}} else {// 当 key 存在但没有设置剩余生存时间时,pttl返回 -1,会走到这个逻辑// 我感觉正常流程走不到这个逻辑,因为当前线程无非是看到锁存在或者不存在// 看到锁不存在等于加锁成功了,因为Lua脚本是原子性的// 看到锁存在,默认也给了超时时间// 这里就没有设置超时时间,一直等释放锁的许可if (interruptibly) {entry.getLatch().acquire();} else {entry.getLatch().acquireUninterruptibly();}}}} finally {unsubscribe(entry, threadId);}
这里设计的巧妙之处就在于利用了消息订阅、信号量的机制,它不是无休止的盲等机制,也避免了不断的重试,而是检测到锁被释放才去尝试重新获取,这对 CPU 十分的友好
锁互斥机制
此时如果客户端 2 来尝试加锁,同样走进 RedissonLock#lock 方法,会咋样呢?第一个 if 判断会执行 exists myLock,发现 myLock 这个锁 key 已经存在了。接着第二个 if 判断,判断一下,myLock 锁 key 的 hash 数据结构中,对应客户端 2 的 ID 的 key 的 value 为 1,也没有。最终会获取到 pttl myLock 返回的锁 key 的剩余生存时间,进入 while 循环,不停的尝试加锁
可重入锁机制
那如果客户端1都已经持有了这把锁了,结果可重入的加锁会怎么样呢?
此时会执行可重入加锁的逻辑,走第二个 if 逻辑,对客户端 1 的加锁次数累加 1,此时 myLock 数据结构变为下面这样: