为什么使用分布式锁
单机环境并发时,使用synchronized或lock接口可以保证线程安全,但它们是jvm层面的锁,分布式环境并发时,100个并发的线程可能来自10个服务节点,那就是跨jvm了。
简单分布式锁实现
SETNX
格式:setnx key value
当且仅当key不存在时,key能设置成功并返回1,否则返回0。
SETNX即SET if Not eXists 如果不存在。
SETNX存在的问题
高并发场景下,有可能存在key自动过期了,加锁的线程还未执行完成的情况。此时线程2加锁进来了,且刚好线程1执行完了,线程1会把线程2刚刚创建的锁删掉,导致后面进来了更多的线程。【自己加的锁被别人删除了】
解决方案:给每个加锁的key生成一个唯一的UUID作为value,删除之前,从redis拿出来的UUID与当前线程一致才能删除key。
然而finally依然存在问题,因为释放锁的代码不是原子操作。【超时零界点】,在判断uuid相同后,且还未删除key前,此时key超时了,其他线程加锁成功,当前线程执行delete命令时,删除的依然是其他线程加的锁。
以上问题均是由于锁的时间不够长导致,接下来的解决方案是【锁续命】
锁续命
原理:线程1加锁成功后,再创建一个线程,使用定时任务来监控锁剩余时间。定时任务执行的周期必须小于锁的超时时间。比如锁超时时间默认设置为30秒,那么该线程每10秒执行一次,给锁重新设置超时时间为30秒, 保证主线程删除锁时,是自己加的锁。如果主线程已释放锁,子线程执行定时任务时会先判断主线程加的key是否还存在。 目前市面上已有成熟的解决方案,如redisson,它适用于分布式各种场景。参考redisson帮助文档
Redisson续命锁流程
引入redisson的依赖包
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.27.2</version>
</dependency>
创建一个Redisson的bean注册到Spring容器中
//获取锁对象
RLock redissonLock = redisson.getLock(lockKey);
//加分布式锁
redissonLock.lock();
........
//解锁
redissonLock.unlock();
Redisson加锁源码
底层是基于lua脚本实现,它能保证原子性。因为Redis服务端执行命令是单线程的,读到这一块lua代码会将它当成一个整体执行完,再去执行下一条命令。
第一步:通过lua脚本加锁成后,返回null,加锁失败返回锁剩余时间。
hset key field value
通过hash结构设置field为UUID+线程ID
,value是1,表示重入次数。 key是构建锁时传入的redisson.getLock(lockKey);
锁超时时间默认是30秒。
第二步:看门狗机制给锁续命
Future模式异步执行lua脚本加锁,加锁成功后回调Future的监听器获取加锁结果。 加锁的lua脚本执行成功后返回null,则进入scheduleExpireRenewal()
方法定时的给锁重置超时时间。
创建一个TimeTask延时任务,10秒后才执行run()方法,lua脚本判断锁存在则重设锁的超时时间为30秒。它同样是使用的Future模式,下面添加了一个监听,重置结束后,会回调监听器,监听器拿到结果再回调scheduleExpireRenewal()
方法本身。
第三步:加锁失败的线程自旋等待
- 加锁失败后,先订阅该key的channel通道消息,类似于一个队列或topic,当锁在过期之前释放了,需要唤醒等待的线程竞争锁。
- 进入while(true)自旋加锁。先尝试一次加锁,成功则退出循环,失败则获取Semaphore调用tryAcquire()等待ttl秒后,重新进入while(true)尝试获取锁。如果1000个线程同时结束等待,就会一起抢锁,所以该锁是非公平锁。
第四步:释放锁时,发布该key的channel消息,通知等待的线程竞争锁。
一旦channel中有消息后,会执行LockPubSub.onMessage()
方法,获取一个Semaphora信号量释放等待的线程,让他们竞争锁。
总结:redisson的架构设计涉及到Future,自旋锁,看门狗机制,发布订阅,lua脚本,semaphore等技术。
它的lock()接口实现的是非公平锁。tryAcquire()加锁成功后,继续执行业务代码。其他线程拿锁时返回锁剩余的时间ttl,判断ttl大于0,则使用Semaphore.getLatch().tryAcquire()
,等到超时时间结束后再去抢锁。如果ttl还没有变为0,此时锁已释放,主线程会向指定的channel发送一条消息,等待的线程会订阅这个消息LockPubSub.onMessage()
,拿到释放锁的消息后调用Semaphore.getLatch().release()
,唤醒阻塞的线程立刻抢锁。
主线程给锁续命使用的是Future机制,异步开启了一个线程,每隔10秒嵌套调用重设锁时间的方法。
lua脚本