提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档
文章目录
- Redis与分布式锁
- Jedis实现
- 1.RedisConfig
- 2.RedisDistLock
- 3.应用
- 4.加上看门狗逻辑 RedisDistLockWithDog
- redisson实现
- 1.依赖
- 2.代码
Redis与分布式锁
Jedis实现
1.RedisConfig
<dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>3.6.3</version></dependency>
# Redis服务器地址
redis.host=127.0.0.1
# Redis服务器连接端口
redis.port=6379
# Redis服务器连接密码(默认为空)
redis.password=null
redis.timeout=30000
# 连接池最大连接数(使用负值表示没有限制)
redis.maxTotal=30
# 连接池中的最大空闲连接
redis.maxIdle=10
redis.numTestsPerEvictionRun=1024
redis.timeBetweenEvictionRunsMillis=30000
redis.minEvictableIdleTimeMillis=1800000
redis.softMinEvictableIdleTimeMillis=10000
# 连接池最大阻塞等待时间(使用负值表示没有限制)
redis.maxWaitMillis=1500
redis.testOnBorrow=true
redis.testWhileIdle=true
redis.blockWhenExhausted=false
redis.JmxEnabled=true
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;@Configuration
@PropertySource("classpath:application.properties")
public class RedisConfig {@Value("${redis.host}")private String host;@Value("${redis.port}")private int port;@Value("${redis.timeout}")private int timeout;@Value("${redis.maxIdle}")private int maxIdle;@Value("${redis.maxWaitMillis}")private int maxWaitMillis;@Value("${redis.blockWhenExhausted}")private Boolean blockWhenExhausted;@Value("${redis.JmxEnabled}")private Boolean JmxEnabled;@Beanpublic JedisPool jedisPoolFactory() {JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();jedisPoolConfig.setMaxIdle(maxIdle);jedisPoolConfig.setMaxWaitMillis(maxWaitMillis);// 连接耗尽时是否阻塞, false报异常,true阻塞直到超时, 默认truejedisPoolConfig.setBlockWhenExhausted(blockWhenExhausted);// 是否启用pool的jmx管理功能, 默认truejedisPoolConfig.setJmxEnabled(JmxEnabled);JedisPool jedisPool = new JedisPool(jedisPoolConfig, host, port, timeout);return jedisPool;}
}
2.RedisDistLock
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.params.SetParams;import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;/*** 分布式锁的实现*/
@Component
public class RedisDistLock implements Lock {private final static int LOCK_TIME = 5*1000;//失效时间private final static String RS_DISTLOCK_NS = "tdln:"; //加锁的key的前缀/*if redis.call('get',KEYS[1])==ARGV[1] thenreturn redis.call('del', KEYS[1])else return 0 end*///释放锁的时候,确保原子。lua脚本:确保 释放锁的线程就是加锁的线程,不能被线程的线程无脑调用释放private final static String RELEASE_LOCK_LUA ="if redis.call('get',KEYS[1])==ARGV[1] then\n" +" return redis.call('del', KEYS[1])\n" +" else return 0 end";/*保存每个线程的独有的ID值*/private ThreadLocal<String> lockerId = new ThreadLocal<>();/*解决锁的重入*/private Thread ownerThread;private String lockName = "lock";@Autowiredprivate JedisPool jedisPool;public String getLockName() {return lockName;}public void setLockName(String lockName) {this.lockName = lockName;}public Thread getOwnerThread() {return ownerThread;}public void setOwnerThread(Thread ownerThread) {//加锁成功,就会把抢到锁的线程进行保存this.ownerThread = ownerThread;}@Overridepublic void lock() { //redis的分布式锁while(!tryLock()){try {Thread.sleep(100); //每隔100ms 都会去尝试加锁} catch (InterruptedException e) {e.printStackTrace();}}}@Overridepublic void lockInterruptibly() throws InterruptedException {throw new UnsupportedOperationException("不支持可中断获取锁!");}@Overridepublic boolean tryLock() {Thread t = Thread.currentThread();if(ownerThread==t){/*说明本线程持有锁*/return true;}else if(ownerThread!=null){/*本进程里有其他线程持有分布式锁*/return false;}Jedis jedis = jedisPool.getResource();try {String id = UUID.randomUUID().toString();SetParams params = new SetParams();params.px(LOCK_TIME);params.nx();synchronized (this){/*线程们,本地抢锁*/if((ownerThread==null)&&"OK".equals(jedis.set(RS_DISTLOCK_NS+lockName,id,params))){lockerId.set(id);setOwnerThread(t);return true;}else{return false;}}} catch (Exception e) {throw new RuntimeException("分布式锁尝试加锁失败!");} finally {jedis.close();}}@Overridepublic boolean tryLock(long time, TimeUnit unit) throws InterruptedException {throw new UnsupportedOperationException("不支持等待尝试获取锁!");}@Overridepublic void unlock() {if(ownerThread!=Thread.currentThread()) {throw new RuntimeException("试图释放无所有权的锁!");}Jedis jedis = null;try {jedis = jedisPool.getResource();Long result = (Long)jedis.eval(RELEASE_LOCK_LUA,Arrays.asList(RS_DISTLOCK_NS+lockName),Arrays.asList(lockerId.get()));if(result.longValue()!=0L){System.out.println("Redis上的锁已释放!");}else{System.out.println("Redis上的锁释放失败!");}} catch (Exception e) {throw new RuntimeException("释放锁失败!",e);} finally {if(jedis!=null) jedis.close();lockerId.remove();setOwnerThread(null);System.out.println("本地锁所有权已释放!");}}@Overridepublic Condition newCondition() {throw new UnsupportedOperationException("不支持等待通知操作!");}}
3.应用
4.加上看门狗逻辑 RedisDistLockWithDog
- 看门狗线程启动
- 通过delayDog 避免无谓的轮询,减少看门狗线程的轮序次数 阻塞延迟队列
往延迟阻塞队列中加入元素(让看门口可以在过期之前一点点的时间去做锁的续期) - 续锁逻辑:判断是持有锁的线程才能续锁
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.params.SetParams;import javax.annotation.PreDestroy;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;/*** 分布式锁,附带看门狗线程的实现:加锁,保持锁1秒*/
@Component
public class RedisDistLockWithDog implements Lock {private final static int LOCK_TIME = 1*1000;private final static String LOCK_TIME_STR = String.valueOf(LOCK_TIME);private final static String RS_DISTLOCK_NS = "tdln2:";/*if redis.call('get',KEYS[1])==ARGV[1] thenreturn redis.call('del', KEYS[1])else return 0 end*/private final static String RELEASE_LOCK_LUA ="if redis.call('get',KEYS[1])==ARGV[1] then\n" +" return redis.call('del', KEYS[1])\n" +" else return 0 end";/*还有并发问题,考虑ThreadLocal*/private ThreadLocal<String> lockerId = new ThreadLocal<>();private Thread ownerThread;private String lockName = "lock";@Autowiredprivate JedisPool jedisPool;public String getLockName() {return lockName;}public void setLockName(String lockName) {this.lockName = lockName;}public Thread getOwnerThread() {return ownerThread;}public void setOwnerThread(Thread ownerThread) {this.ownerThread = ownerThread;}@Overridepublic void lock() {while(!tryLock()){try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}}}@Overridepublic void lockInterruptibly() throws InterruptedException {throw new UnsupportedOperationException("不支持可中断获取锁!");}@Overridepublic boolean tryLock() {Thread t=Thread.currentThread();/*说明本线程正在持有锁*/if(ownerThread==t) {return true;}else if(ownerThread!=null){/*说明本进程中有别的线程正在持有分布式锁*/return false;}Jedis jedis = null;try {jedis = jedisPool.getResource();/*每一个锁的持有人都分配一个唯一的id,也可采用snowflake算法*/String id = UUID.randomUUID().toString();SetParams params = new SetParams();params.px(LOCK_TIME); //加锁时间1sparams.nx();synchronized (this){if ((ownerThread==null)&&"OK".equals(jedis.set(RS_DISTLOCK_NS+lockName,id,params))) {lockerId.set(id);setOwnerThread(t);if(expireThread == null){//看门狗线程启动expireThread = new Thread(new ExpireTask(),"expireThread");expireThread.setDaemon(true);expireThread.start();}//往延迟阻塞队列中加入元素(让看门口可以在过期之前一点点的时间去做锁的续期)delayDog.add(new ItemVo<>((int)LOCK_TIME,new LockItem(lockName,id)));System.out.println(Thread.currentThread().getName()+"已获得锁----");return true;}else{System.out.println(Thread.currentThread().getName()+"无法获得锁----");return false;}}} catch (Exception e) {throw new RuntimeException("分布式锁尝试加锁失败!",e);} finally {jedis.close();}}@Overridepublic boolean tryLock(long time, TimeUnit unit) throws InterruptedException {throw new UnsupportedOperationException("不支持等待尝试获取锁!");}@Overridepublic void unlock() {if(ownerThread!=Thread.currentThread()) {throw new RuntimeException("试图释放无所有权的锁!");}Jedis jedis = null;try {jedis = jedisPool.getResource();Long result = (Long)jedis.eval(RELEASE_LOCK_LUA,Arrays.asList(RS_DISTLOCK_NS+lockName),Arrays.asList(lockerId.get()));System.out.println(result);if(result.longValue()!=0L){System.out.println("Redis上的锁已释放!");}else{System.out.println("Redis上的锁释放失败!");}} catch (Exception e) {throw new RuntimeException("释放锁失败!",e);} finally {if(jedis!=null) jedis.close();lockerId.remove();setOwnerThread(null);}}@Overridepublic Condition newCondition() {throw new UnsupportedOperationException("不支持等待通知操作!");}/*看门狗线程*/private Thread expireThread;//通过delayDog 避免无谓的轮询,减少看门狗线程的轮序次数 阻塞延迟队列 刷1 没有刷2private static DelayQueue<ItemVo<LockItem>> delayDog = new DelayQueue<>();//续锁逻辑:判断是持有锁的线程才能续锁private final static String DELAY_LOCK_LUA ="if redis.call('get',KEYS[1])==ARGV[1] then\n" +" return redis.call('pexpire', KEYS[1],ARGV[2])\n" +" else return 0 end";private class ExpireTask implements Runnable{@Overridepublic void run() {System.out.println("看门狗线程已启动......");while(!Thread.currentThread().isInterrupted()) {try {LockItem lockItem = delayDog.take().getData();//只有元素快到期了才能take到 0.9sJedis jedis = null;try {jedis = jedisPool.getResource();Long result = (Long)jedis.eval(DELAY_LOCK_LUA,Arrays.asList(RS_DISTLOCK_NS+lockItem.getKey ()),Arrays.asList(lockItem.getValue(),LOCK_TIME_STR));if(result.longValue()==0L){System.out.println("Redis上的锁已释放,无需续期!");}else{delayDog.add(new ItemVo<>((int)LOCK_TIME,new LockItem(lockItem.getKey(),lockItem.getValue())));System.out.println("Redis上的锁已续期:"+LOCK_TIME);}} catch (Exception e) {throw new RuntimeException("锁续期失败!",e);} finally {if(jedis!=null) jedis.close();}} catch (InterruptedException e) {System.out.println("看门狗线程被中断");break;}}System.out.println("看门狗线程准备关闭......");}}// @PostConstruct
// public void initExpireThread(){
//
// }@PreDestroypublic void closeExpireThread(){if(null!=expireThread){expireThread.interrupt();}}
}
redisson实现
- github项目 redisson 实现分布式锁和同步器,可以直接调用
https://github.com/redisson/redisson/
1.依赖
<!--引入Redis--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-redis</artifactId><version>1.4.2.RELEASE</version></dependency><dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.12.3</version></dependency>
2.代码
import java.util.concurrent.TimeUnit;/*** Redisson分布式锁接口* <p>* RLock的实现有可重入非公平锁(RedissonLock)、可重入公平锁(RedissonFairLock)、联锁(RedissonMultiLock)、 红锁(RedissonRedLock)、 读锁(RedissonReadLock)、 写锁(RedissonWriteLock)等*/
public interface DistributedLock {void lock(String lockKey);void lock(String lockKey, long timeout);void lock(String lockKey, TimeUnit unit, long timeout);boolean tryLock(String lockKey, TimeUnit unit, long leaseTime);boolean tryLock(String lockKey, TimeUnit unit, long waitTime, long leaseTime);boolean unlock(String lockKey);}
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.annotations.Result;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;/*** Redisson分布式锁实现-使用可重入非公平锁(RedissonLock)*/
@Slf4j
@Component
public class RedisDistributedLock implements DistributedLock {@Resourceprivate RedissonClient redissonClient;public void lock(String lockKey) {RLock lock = this.redissonClient.getLock(lockKey);lock.lock();}public void lock(String lockKey, long timeout) {RLock lock = this.redissonClient.getLock(lockKey);lock.lock(timeout, TimeUnit.SECONDS);}public void lock(String lockKey, TimeUnit unit, long timeout) {RLock lock = this.redissonClient.getLock(lockKey);lock.lock(timeout, unit);}public boolean tryLock(String lockKey, TimeUnit unit, long leaseTime) {RLock lock = this.redissonClient.getLock(lockKey);try {return lock.tryLock(5L, leaseTime, unit);} catch (InterruptedException var7) {return false;}}public boolean tryLock(String lockKey, TimeUnit unit, long waitTime, long leaseTime){RLock lock = redissonClient.getLock(lockKey);try {return lock.tryLock(waitTime, leaseTime, unit);} catch (InterruptedException e) {return false;}}public boolean unlock(String lockKey){RLock lock = redissonClient.getLock(lockKey);try {if (lock.isLocked()) {lock.unlock();log.info("释放锁[{}]成功",lockKey);}return true;} catch (IllegalMonitorStateException localIllegalMonitorStateException) {log.error("释放锁[{}]失败",lockKey,localIllegalMonitorStateException);return false;}}}
private static final String CHECK_HEART_BEAT_TASK = "checkHeartbeatTask";@Autowiredprivate DistributedLock distributedLock;try {startTime = System.currentTimeMillis();boolean lock = distributedLock.tryLock(CHECK_HEART_BEAT_TASK);//log.info("检查心跳任务获取锁状态:{}", lock);if (lock){handleCheckHeartbeatTask();} else {log.info("检查心跳任务获取分布式锁失败!");}} catch (Exception e) {log.error("检查心跳任务异常中断 {}", e.getMessage());} finally {if (distributedLock.unlock(CHECK_HEART_BEAT_TASK)) {//log.info("检查心跳任务释放分布式锁!");} else {log.warn("检查心跳任务释放分布式锁失败!");}Long endTime = System.currentTimeMillis();Long costTime = endTime - startTime;if (costTime.longValue() > heartbeatCheckTime.longValue()) {log.info("检查心跳任务耗时:{}ms", costTime);}}