分布式锁之redis实现

docker安装redis

拉取镜像

docker pull redis:6.2.6

查看镜像

87d429bb8dfa467baedf8733e62ac37b.png

启动容器并挂载目录

需要挂在的data和redis.conf自行创建即可

docker run --restart always -d -v /usr/local/docker/redis/redis.conf:/usr/local/etc/redis/redis.conf -v /usr/local/docker/redis/data:/data --name redis -p 6379:6379 redis:6.2.6 redis-server /usr/local/etc/redis/redis.conf

查看运行状态 

不要忘记开放端口6379

b8ff2272d9354ac39d198b5819e62aef.png

进入容器内部使用redis-cli

docker exec -it 13829d3f335a /bin/bashredis-cli

[可选]用密码登录 

修改redis.conf配置文件,设置 requirepass xxxxx

spring boot 集成redis

添加依赖

      <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency>

添加redis配置 


server.port= 10010spring.datasource.driver-class-name= com.mysql.cj.jdbc.Driver
spring.datasource.url= jdbc:mysql://39.106.53.30:3306/lock_db?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai
spring.datasource.username=root
spring.datasource.password=root123456spring.redis.host=39.106.53.30
spring.redis.port=6379

使用StringRedisTemplate

如果直接使用RedisTemplate使用的序列化器是jdk的,存的是二进制,使用StringRedisTemplate默认初始化序列化器就是String类型

    public StringRedisTemplate() {this.setKeySerializer(RedisSerializer.string());this.setValueSerializer(RedisSerializer.string());this.setHashKeySerializer(RedisSerializer.string());this.setHashValueSerializer(RedisSerializer.string());}

redis演示超卖问题

执行票数存入redis指令

set ticket 5000

 编写代码演示超卖问题

/*** @Author sl*/
@Servicepublic class TicketServiceImpl implements TicketService {@Autowiredprivate StringRedisTemplate redisTemplate;@Overridepublic  void sellTicket(){//获取redis中的票数String ticket = redisTemplate.opsForValue().get("ticket");if(ticket!= null && ticket.length() != 0){// 扣减票数Integer integer = Integer.valueOf(ticket);if(integer >0){redisTemplate.opsForValue().set("ticket",String.valueOf(--integer));}}}
}

 5000请求压测,结果为4895,发生了超卖问题

a7fc4d7f1ee548169fd7f08b0fc6ca0a.png
e1d1f4af1d704938b8188b32b6ccbc36.png
redis解决超卖问题 

解决方案

解决方案

  •         本地jvm锁(这种情况仅限单机,不做介绍)
  •         redis乐观锁 watch  multi exec(性能低)
  •         分布式锁(redis+lua手动实现或者通过redission实现)

redis乐观锁实现 

watch: 监控一个或者多个key,如果这些key在提交事务(exec)之前被其他用户修改过,那么事务将执行失败,需要重新获取最新数据重头操作

multi: 开启事务,使用该命令,标记一个事务块的开始,redis会将这些操作放入队列中

exec: 执行事务

720c867464d145e68e27ad877dc0f155.png

 乐观锁的代码需要包在SessionCallback中实现

package com.test.lockservice.service.impl;import com.test.lockservice.service.TicketService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.core.RedisOperations;
import org.springframework.data.redis.core.SessionCallback;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;import java.util.List;/*** @Author sl*/
@Servicepublic class TicketServiceImpl implements TicketService {@Autowiredprivate StringRedisTemplate redisTemplate;@Overridepublic  void sellTicket(){redisTemplate.execute(new SessionCallback<Object>() {@Overridepublic  Object execute(RedisOperations redisOperations) throws DataAccessException {// 开启监听redisOperations.watch("ticket");//获取redis中的票数String ticket = redisTemplate.opsForValue().get("ticket");if(ticket!= null && ticket.length() != 0){// 开启事务redisOperations.multi();Integer integer = Integer.valueOf(ticket);// 扣减票数redisOperations.opsForValue().set("ticket",String.valueOf(--integer));// 提交事务List exec = redisOperations.exec();// 如果获取锁失败 ,重试if(exec == null || exec.size() == 0){try {// 减少锁争抢,避免栈内存溢出Thread.sleep(40);sellTicket();} catch (InterruptedException e) {e.printStackTrace();}}}return null;}});}
}

 1000请求压测,结果为4000,没有发生超卖,但性能极低

b975be5edee2453597b57ccd66d557f1.pngredis实现分布式锁

分布式锁的实现方案中redis的实现主要思想就是独占排他使用,在redis中可以使用setnx命令进行独占排他使用

  • 加锁 setnx 
  • 解锁 del
  • 重试:递归(容易造成栈内存溢出),这里使用循环

 

package com.test.lockservice.service.impl;import com.test.lockservice.service.TicketService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;/*** @Author sl*/
@Servicepublic class TicketServiceImpl implements TicketService {@Autowiredprivate StringRedisTemplate redisTemplate;@Overridepublic  void sellTicket(){// setnx 排他使用,如果获取锁不成功,则重试while(!redisTemplate.opsForValue().setIfAbsent("lock", "111")){try {Thread.sleep(40);} catch (InterruptedException e) {e.printStackTrace();}}try {//获取redis中的票数String ticket = redisTemplate.opsForValue().get("ticket");if(ticket!= null && ticket.length() != 0){// 扣减票数Integer integer = Integer.valueOf(ticket);if(integer >0){redisTemplate.opsForValue().set("ticket",String.valueOf(--integer));}}} finally {// 解锁操作redisTemplate.delete("lock");}}
}

压测1000,显示无超卖现象 

5629d991544d4b7c9b77472abca1c682.png042eaa1342124922a9d528dc660a16ec.png

添加过期时间防止死锁问题

当前代码存在问题,假如现在有4台服务器争抢锁,编号为1的服务器抢到了锁,但是没来得及释放锁,就宕机啦,其他2,3,4服务器就永远拿不到锁,这就是产生的死锁问题,解决方案是给锁添加过期时间来解决

4affdb3b239141e78d695451512263fb.png

要保证枷锁和设置过期时间具有原子性,否则加了锁,没来得及给过期时间就宕机啦,又会产生死锁问题

expire key 20指令和枷锁指令是两条指令不具有原子性,在这里使用 set key ex 20 nx命令设置过期时间来保证原子性

9d6becfe33a84ebf93433896a4d65bcf.png

添加过期时间和获取锁的原子性

redisTemplate.opsForValue().setIfAbsent("lock", "111",3, TimeUnit.SECONDS)

 // setnx 排他使用,如果获取锁不成功,则重试while(!redisTemplate.opsForValue().setIfAbsent("lock", "111",3, TimeUnit.SECONDS)){try {Thread.sleep(40);} catch (InterruptedException e) {e.printStackTrace();}}

通过UUID防止误删

因为已经加了过期时间,如果加了3秒过期时间,第一个请求到了第3秒还没执行完毕,锁就失效了,这时第二个请求获取锁,执行1s的时候,第一个请求执行到del指令,就把第二个锁删除掉啦(误删)

解决方案:通过uuid标识是自己的锁,通过判断是自己的锁,在删除

84e258e5fb574c91a78b4244a41923d9.png

添加uuid防止误删

    public  void sellTicket(){String uuid = UUID.randomUUID().toString();// setnx 排他使用,如果获取锁不成功,则重试while(!redisTemplate.opsForValue().setIfAbsent("lock", uuid,3, TimeUnit.SECONDS)){try {Thread.sleep(40);} catch (InterruptedException e) {e.printStackTrace();}}try {//获取redis中的票数String ticket = redisTemplate.opsForValue().get("ticket");if(ticket!= null && ticket.length() != 0){// 扣减票数Integer integer = Integer.valueOf(ticket);if(integer >0){redisTemplate.opsForValue().set("ticket",String.valueOf(--integer));}}} finally {// 判断是自己的锁在删除if(uuid.equals(redisTemplate.opsForValue().get("lock"))){redisTemplate.delete("lock");}}}

使用Lua脚本解决防误删的原子性问题

判断和删除锁之间需要保证原子性第一个请求因为如果判断的时候,发现是自己的锁,然后此时锁超过了过期时间,此时,第二个请求获取到锁,第一个请求执行del指令,删除的是第二个请求的锁,所以需要在判断和删除锁之间保持原子性

解决方案:使用Lua脚本保证原子性,Lua脚本将多条命令一次性发给redis,redis单线程的特性可以保证原子性操作

Lua脚本介绍和redis执行Lua脚本 

Lua 是一种轻量小巧的脚本语言,用标准C语言编写并以源代码形式开放, 其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能,编译后仅仅一百余K,可以很方便的嵌入别的程序里

菜鸟地址:https://www.runoob.com/lua/lua-variables.html

Lua脚本流程控制和变量定义

--[ 定义全局变量a 局部变量用local a --]
a = 100;
--[ 检查条件 --]
if( a < 20 )
then--[ if 条件为 true 时执行该语句块 --]print("a 小于 20" )
else--[ if 条件为 false 时执行该语句块 --]print("a 大于 20" )
end
print("a 的值为 :", a)

在redis中执行Lua脚本

redis中继承了Lua脚本,lua-time-limit参数现在脚本最长运行时间,默认是5秒,执行指令为:

eval script numkeys key [key ...] arg [arg ...]

numkeys:标识key的数量 不能省略

hello word

eval "return 'hello world'" 0

分支语句KEYS和ARGV必须大写

eval "if KEYS[1]==1 then return KEYS[1] else return  ARGV[1] end" 1 0 3 

4c5257c42e834f25958f88987fdfb151.png解决判断和删除之间的原子性问题

// 如果是自己的锁,则删除,否则返回0为false
if redis.call('get',KEYS[1]) == ARGV[1]
thenreturn redis.call('del',KEYS[1])
elsereturn 0
endkeys:lockargv: uuid
 public  void sellTicket(){String uuid = UUID.randomUUID().toString();// setnx 排他使用,如果获取锁不成功,则重试while(!redisTemplate.opsForValue().setIfAbsent("lock", uuid,3, TimeUnit.SECONDS)){try {Thread.sleep(40);} catch (InterruptedException e) {e.printStackTrace();}}try {//获取redis中的票数String ticket = redisTemplate.opsForValue().get("ticket");if(ticket!= null && ticket.length() != 0){// 扣减票数Integer integer = Integer.valueOf(ticket);if(integer >0){redisTemplate.opsForValue().set("ticket",String.valueOf(--integer));}}} finally {String script="if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";this.redisTemplate.execute(new DefaultRedisScript<>(script,Boolean.class), Collections.singletonList("lock"),uuid);}}

 压测1000 显示无超卖现象

daf44ce416a4420294d1bd923028b763.png

4b48b5d2be32427c9ec2bf1033598291.pnghash+Lua解决锁的可重复入问题

由于上述加锁命令使用了 SETNX ,一旦键存在就无法再设置成功,这就导致后续同一线程内继续加 锁,将会加锁失败。当一个线程执行一段代码成功获取锁之后,继续执行时,又遇到加锁的子任务代码,可重入性就保证线程能继续执行,而不可重入就是需要等待锁释放之后,再次获取锁成功,才能继续往下执行

第一个就是锁的重入问题

当前方法a获取锁,在方法之中调用b方法,b方法也需要获取锁,这个时候造成了死锁问题,采用hash+Lua脚本解决

第二个就是锁的自动续期问题:后续会解决续期问题

探讨ReentrantLock的可重入原理

ReentrantLock继承了aqs,aqs是锁的基石

可重入锁加锁流程

  • CAS获取锁,如果没有线程占用锁(state==0),加锁成功并记录当前线程是有锁线程
  • 如果state的值不为0,说明锁已经被占用。则判断当前线程是否是有锁线程,如果是则重入 (state + 1)
  • 否则加锁失败,入队等待

可重入锁解锁流程

  • 判断当前线程是否是有锁线程,不是则抛出异常
  • 对state的值减1之后,判断state的值是否为0,为0则解锁成功,返回true
  • 如果减1后的值不为0,则返回false

hash+Lua实现可重复入锁

参照ReentrantLock中的非公平可重入锁实现分布式可重入锁: hash + lua脚本
加锁

  •     判断锁是否存在 (exists),则直接获取锁 hset key field value
  •     如果锁存在则判断是否自己的锁 (hexists),如果是自己的锁则重入: hincrby key field increment
  •     否则重试:递归 循环
加锁
如果锁不存在或者这是自己的锁,就通过hincrby(不存在就新增并加1,存在就加1)获取锁或者锁次
数加1if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1], ARGV[1]) == 1 
then redis.call('hincrby',KEYS[1], ARGV[1], 1)redis.call('expire',KEYS[1],ARGV[2])return 1
else return 0
endkeys lock
argv uuid 30解锁
判断 hash set 可重入 key 的值是否等于 0
如果为 nil 代表 自己的锁已不存在,在尝试解其他线程的锁,解锁失败
如果为 0 代表 可重入次数被减 1
如果为 1 代表 该可重入 key 解锁成功
1 代表解锁成功,锁被释放
0 代表可重入次数被减 1
null 代表其他线程尝试解锁,解锁失败
if redis.call('hexists',KEYS[1],ARGV[1])==0 
then return nil 
elseif redis.call('hincrby',KEYS[1],ARGV[1],-1)>0 
then return 0 
else redis.call('del',KEYS[1]) return 1 
endkeys lock
argv uuid

exists判断lock是否存在,hexists lock uuid 判断filed是否存在

通过hincrby(不存在就新增并加1,存在就加1)获取锁或者锁次数加1,hincrby命令,如果增加的key filed 不存在则新增并加1

ef39ebc7bb9840cd8169db5e279dc80a.png

加锁工具类

package com.test.lockservice.utils;import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;import java.util.Collections;
import java.util.UUID;public class RedisLock {private StringRedisTemplate redisTemplate;private String lockName;private String uuid;private Integer expire = 30;private static final ThreadLocal<String> THREAD_LOCAL = new ThreadLocal<>();public RedisLock(StringRedisTemplate redisTemplate, String lockName) {this.redisTemplate = redisTemplate;this.lockName = lockName;this.uuid = THREAD_LOCAL.get();if (uuid == null) {this.uuid = UUID.randomUUID().toString();THREAD_LOCAL.set(uuid);}this.expire = expire;}public void lock(){this.lock(expire);}public void lock(Integer expire){this.expire = expire;String script = "if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1], ARGV[1]) == 1 then redis.call('hincrby',KEYS[1], ARGV[1], 1) redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end";System.out.println(script);if (!this.redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Collections.singletonList(lockName), uuid, expire.toString())){try {// 没有获取到锁,重试Thread.sleep(60);lock(expire);} catch (InterruptedException e) {e.printStackTrace();}}}public void unlock(){String script = "if redis.call('hexists',KEYS[1],ARGV[1])==0 then return nil elseif redis.call('hincrby',KEYS[1],ARGV[1],-1)>0 then return 0 else redis.call('del',KEYS[1]) return 1 end";/*** 如果返回值没有使用Boolean,spring-data-redis 进行类型转换时将会把 null 转为 false,这就会影响我们逻辑判断* 所以返回类型只好使用 Long:null-解锁失败;0-重入次数减1;1-解锁成功*/Long result = this.redisTemplate.execute(new DefaultRedisScript<>(script, Long.class),  Collections.singletonList(lockName), uuid);// 如果未返回值,代表尝试解其他线程的锁if (result == null) {throw new IllegalMonitorStateException("attempt to unlock lock, not locked by lockName: " + lockName + " with request: " + uuid);} else if (result == 1) {THREAD_LOCAL.remove();}}
}

测试可重入 

@Overridepublic  void checkAndLock(){RedisLock lock = new RedisLock(redisTemplate, "lock");lock.lock();// 查询票数Ticket ticket = ticketMapper.selectOne(new QueryWrapper<Ticket>().eq("sell_company", "12306"));// 判断不为空和票数大于0if(ticket!=null&& ticket.getCount() > 0){ticket.setCount(ticket.getCount()-1);ticketMapper.updateById(ticket);}// 测试可重入testRepeatEntry();lock.unlock();}public void testRepeatEntry(){RedisLock lock = new RedisLock(redisTemplate, "lock");lock.lock();System.out.println("redis分布式锁测试可重入");lock.unlock();}

 压测1000,未发现超卖问题,并解决可重入的问题

7dfd3282120641e2a34d09da730a07c6.png

d920dd12a6cb4e7dbce13ea0c1b15000.png锁的自动续期

如果在锁还在使用过程中,锁还未使用完,就失效了,也就产生了锁如何自动添加过期时间的问题 

实现方案: 定时器 + Lua脚本定时续期


自动续期if redis.call('hexists',KEYS[1],ARGV[1])==1
thenredis.call('expire',KEYS[1],ARGV[2]) return 1 
else return 0 
end

这里没有选用线程池的原因在于释放锁之后没有取消定时任务的方法,所以选用jdk自带的

Timer作为定时任务 

package com.test.lockservice.utils;import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;import java.util.*;public class RedisLock {private StringRedisTemplate redisTemplate;private String lockName;private String uuid;private Integer expire = 30;@SuppressWarnings("all")private static final Timer timer = new Timer();private static final ThreadLocal<String> THREAD_LOCAL = new ThreadLocal<>();public RedisLock(StringRedisTemplate redisTemplate, String lockName) {this.redisTemplate = redisTemplate;this.lockName = lockName;this.uuid = THREAD_LOCAL.get();if (uuid == null) {this.uuid = UUID.randomUUID().toString();THREAD_LOCAL.set(uuid);}this.expire = expire;}public void lock(){this.lock(expire);}public void lock(Integer expire){this.expire = expire;String script = "if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1], ARGV[1]) == 1 then redis.call('hincrby',KEYS[1], ARGV[1], 1) redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end";if (!this.redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Collections.singletonList(lockName), uuid, expire.toString())){try {// 没有获取到锁,重试Thread.sleep(60);lock(expire);} catch (InterruptedException e) {e.printStackTrace();}}// 自动续期renewExpire();}public void unlock(){String script = "if redis.call('hexists',KEYS[1],ARGV[1])==0 then return nil elseif redis.call('hincrby',KEYS[1],ARGV[1],-1)>0 then return 0 else redis.call('del',KEYS[1]) return 1 end";/*** 如果返回值没有使用Boolean,spring-data-redis 进行类型转换时将会把 null 转为 false,这就会影响我们逻辑判断* 所以返回类型只好使用 Long:null-解锁失败;0-重入次数减1;1-解锁成功*/Long result = this.redisTemplate.execute(new DefaultRedisScript<>(script, Long.class),  Collections.singletonList(lockName), uuid);// 如果未返回值,代表尝试解其他线程的锁if (result == null) {throw new IllegalMonitorStateException("attempt to unlock lock, not locked by lockName: " + lockName + " with request: " + uuid);} else if (result == 1) {THREAD_LOCAL.remove();}// 释放锁成功this.uuid = null;}@SuppressWarnings("all")private void renewExpire() {String script = "if redis.call('hexists',KEYS[1],ARGV[1])==1 then redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end";timer.schedule(new TimerTask() {@Overridepublic void run() {if (uuid != null) {redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), RedisLock.this.uuid, expire.toString());renewExpire();}}},expire * 1000 / 3);}
}

红锁算法 

利用红锁算法解决集群下锁的问题:

  • 1、应用程序获取当前系统时间
  • 2、应用程序以相同的kv值依次从多个redis实例中获取锁,如果某一个节点超过了一定时间(小于过期时间)没有获取到锁,则放弃,尽快从其他节点获取锁,避免一个节点宕机阻塞
  • 3、计算锁的消耗时间= 客户端当前时间-step1中的事件,获取锁的时间小于总的锁定时间,并且半数以上节点获取锁成功,认为获取锁成功
  • 4、如果获取锁失败,对所有节点释放锁

redis分布式锁小结

redis分布式锁最开始采用setnex+Lua脚本的方式,我们发现存在不可重入的问题,于是使用hash+Lua脚本解决可重入问题,并解决了自动续期问题,但是还存在一个重要问题,就是redis集群部署所带来的并发问题,所以使用Redission作为最终的分布式锁解决方案

redis集群状态下的问题:
  • 客户端A从master获取到锁
  • 在master将锁同步到slave之前,master宕掉了
  • slave节点被晋级为master节点
  • 客户端B取得了同一个资源被客户端A已经获取到的另外一个锁

redisson中的分布式锁  

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅 提供了一系列的分布式的Java常用对象,还提供了许多分布式服务
Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上

Redisson引入依赖

 <dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.11.2</version>
</dependency>

Redission配置 

package com.test.lockservice.config;import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @Author sl*/
@Configuration
public class RedissonConfig {@Beanpublic RedissonClient redissonClient(){Config config = new Config();
//        config.useClusterServers()config.useSingleServer().setAddress("redis://39.106.53.30:6379").setPassword("12345");return Redisson.create(config);}
}

Redission使用

@Autowiredprivate RedissonClient redissonClient;public void userRedisson(){// 获取锁RLock lock = redissonClient.getLock("lock");try {// 加锁lock.lock();//获取redis中的票数String ticket = redisTemplate.opsForValue().get("ticket");if(ticket!= null && ticket.length() != 0){// 扣减票数Integer integer = Integer.valueOf(ticket);if(integer >0){redisTemplate.opsForValue().set("ticket",String.valueOf(--integer));}}} finally {// 解锁lock.unlock();}}

1000并发压测,发现并无超卖问题 

60440ac508af4380ba2ecc5de16754fa.png

c017a7803ee2469c9e89014ef00362c7.png

RLock原理

 RLock对象实现了 java.util.concurrent.locks.Lock 接口,如果负责储存这个分布式锁的Redisson节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗检查锁的超时时间 是30秒钟,也可以通过修改 Config.lockWatchdogTimeout 来另行指定

  • RLock 对象完全符合Java的Lock规范。也就是说只有拥有锁的进程才能解锁,其他进程解锁则会抛出 IllegalMonitorStateException 错误
  • 另外Redisson还通过加锁的方法提供了 leaseTime 的参数来指定加锁的时间。超过这个时间后锁便自动解开了

其实Redisson底层的实现思路同样是hash+Lua脚本的实现方式,在源码中可以看到,下面列举一下加锁的源码

    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {internalLockLeaseTime = unit.toMillis(leaseTime);return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hset', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"return redis.call('pttl', KEYS[1]);",Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));}

 公平锁 

基于Redis的Redisson分布式可重入公平锁也是实现了 java.util.concurrent.locks.Lock 接口的一 种 RLock 对象。同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。它保证了 当多个Redisson客户端线程同时请求加锁时,优先分配给先发出请求的线程。所有请求线程会在一个队 列中排队,当某个线程出现宕机时,Redisson会等待5秒后继续下一个线程,也就是说如果前面有5个 线程都处于等待状态,那么后面的线程会等待至少25秒
public void useFairLock() {RLock fairLock = redissonClient.getFairLock("fairLock");
//        fairLock.lock();// 10秒钟以后自动解锁// 无需调用unlock方法手动解锁fairLock.lock(10, TimeUnit.SECONDS);System.out.println("加锁成功"+Thread.currentThread().getName());// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
//        boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS);
//        fairLock.unlock();}加锁成功http-nio-10010-exec-5
加锁成功http-nio-10010-exec-10

可以看到,公平锁会维护一个队列,按发送顺序依次加锁

22641eac255c4cd78315c6cda75863ee.png 

联锁

   在多个redis实例上获取锁,联锁所有的锁都上锁成功才算成功

  @Overridepublic void useMutiLock() {RLock lock1 = redissonClient.getLock("lock1");
//        RLock lock2 = redissonClient.getLock("lock2");//联锁所有的锁都上锁成功才算成功RedissonMultiLock redissonMultiLock = new RedissonMultiLock(lock1);redissonMultiLock.lock();System.out.println("业务内容");redissonMultiLock.unlock();}

红锁

在多个节点上加锁,大部分节点获取锁成功就算成功

public void useRedLock() {RLock lock1 = redissonClient.getLock("lock1");
//        RLock lock2 = redissonClient.getLock("lock2");RedissonRedLock readLock = new RedissonRedLock(lock1);// 红锁在大部分节点上加锁成功就算成功readLock.lock();System.out.println("业务内容");readLock.unlock();}

读写锁

对读和写上锁,RReadWriteLock实现了java.util.concurrent.locks.ReadWriteLock接口,读-读不阻塞

 public void useReadWriteLock() {/*** 读-读 不阻塞 读-写 阻塞 写-写 阻塞* RReadWriteLock实现了java.util.concurrent.locks.ReadWriteLock接口*/RReadWriteLock rwlock = redissonClient.getReadWriteLock("readWrite");// 最常见的读锁rwlock.readLock().lock();// 写锁rwlock.writeLock().lock();// 10秒钟以后自动解锁无需调用unlock方法手动解锁rwlock.readLock().lock(10, TimeUnit.SECONDS);rwlock.writeLock().lock(10, TimeUnit.SECONDS);// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁// boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS);rwlock.readLock().unlock();rwlock.writeLock().unlock();}

 信号量

资源限流并发工具类,java.util.concurrent.semaphore是单机版限流,RSemaphore是分布式限流,下面的Semaphore会始终限流3个资源

单机版 

package com.test.lockservice.service.impl;import java.util.concurrent.Semaphore;/*** @Author sl*/
public class SemaphoreTest {public static void main(String[] args) {// 3个有限资源Semaphore semaphore = new Semaphore(3);for (int i = 0; i < 6; i++) {new Thread(()->{try{// 获取资源semaphore.acquire();System.out.println(Thread.currentThread().getName() + "抢到车位");Thread.sleep(1000);System.out.println(Thread.currentThread().getName()  +"离开车位");}catch (Exception e){e.printStackTrace();}finally {// 释放资源semaphore.release();}}).start();}}
}Thread-1抢到车位
Thread-0抢到车位
Thread-4抢到车位
Thread-0离开车位
Thread-1离开车位
Thread-4离开车位
Thread-3抢到车位
Thread-5抢到车位
Thread-2抢到车位
Thread-5离开车位
Thread-3离开车位
Thread-2离开车位

分布式版

  public void useSemaphore() {/*** RSemaphore 采用了与java.util.concurrent.semaphore相似的接口* 资源限流信号量, 3个资源 6个线程, semaphore是单机版限流,RSemaphore是分布式限流*/RSemaphore semaphore = redissonClient.getSemaphore("semaphore");try{semaphore.acquire();}catch(Exception e){e.printStackTrace();}finally {semaphore.release();}}

闭锁(CountDownLatch

CountDownLatch并发工具类,一个线程等待一组线程结束是一个做减法的倒计时器,RCountDownLatch 采用了与java.util.concurrent.CountDownLatch 相似的接口和用法,

 单机版

package com.test.lockservice.service.impl;import java.util.concurrent.CountDownLatch;/*** @Author sl*/
public class CountDownLatchTest {public static void main(String[] args) throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(6);for (int i = 1; i <= 6; i++) {new Thread(()->{System.out.println(Thread.currentThread().getName() + "\t上完自习");countDownLatch.countDown();},String.valueOf(i)).start();}// 班长等待所有线程同学走完在锁门countDownLatch.await();System.out.println(Thread.currentThread().getName() + "\t班长离开,锁门");}
}1	上完自习
3	上完自习
4	上完自习
5	上完自习
2	上完自习
6	上完自习
main	班长离开,锁门

顺道介绍一下CyclicBarrier并发工具类,与CountDownLatch正好相反,它做的是加法

package com.test.lockservice.service.impl;import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;/*** @Author sl*/
public class CyclicBarrierTest {public static void main(String[] args) {CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{System.out.println("集齐了卡片,开始召唤神龙");});for (int i = 0; i < 7; i++) {String s = String.valueOf(i);new Thread(()->{System.out.println(Thread.currentThread().getName() + "\t 收集到第"+s+"卡片");try {cyclicBarrier.await();} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}},String.valueOf(i)).start();}}
}0	 收集到第0卡片
6	 收集到第6卡片
2	 收集到第2卡片
1	 收集到第1卡片
5	 收集到第5卡片
4	 收集到第4卡片
3	 收集到第3卡片
集齐了卡片,开始召唤神龙

分布式版

 public void useCountDownLatch() {/*** RCountDownLatch 采用了与java.util.concurrent.CountDownLatch 相似的接口和用法* 一个线程 等待一组线程完事* 班长等待所有同学走出门口在锁门 CountDownLatch是单机版 RCountDownLatch是分布式版*/RCountDownLatch latch = redissonClient.getCountDownLatch("anyCountDownLatch");latch.trySetCount(6);latch.countDown();try{latch.await();}catch (Exception e){e.printStackTrace();}}

关于zookeeper实现分布式锁,在本专栏zookeeper章节中做了简单介绍,就是创建临时顺序节点,值最小的就是锁。 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/128326.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ClickHouse进阶(七):Clickhouse数据查询-1

进入正文前&#xff0c;感谢宝子们订阅专题、点赞、评论、收藏&#xff01;关注IT贫道&#xff0c;获取高质量博客内容&#xff01; &#x1f3e1;个人主页&#xff1a;含各种IT体系技术,IT贫道_Apache Doris,大数据OLAP体系技术栈,Kerberos安全认证-CSDN博客 &#x1f4cc;订阅…

微信小程序云开发数据懒加载+打破云数据库返回数据条数限制

目录 数据懒加载 打破数据表返回条数限制 数据懒加载 show.wxml <view wx:for="{{Adata}}" wx:key="index" style="padding: 80rpx 10rpx 140rpx;border-bottom: rgb(109, 134, 134) 2px solid;"><view style="margin-left: 20…

springMVC基础技术使用

目录 1.常用注解 1.1RequestMapping 1.2.RequestParam 1.3.RequestBody 1.4.PathVariable 2.参数传递 2.1 slf4j-----日志 2.2基础类型 2.3复杂类型 2.4RequestParam 2.5PathVariable 2.6RequestBody 2.7请求方法&#xff08;增删改查&#xff09; 3.返回值 3.1void …

FPGA基本算术运算

FPGA基本算术运算 FPGA基本算术运算1 有符号数与无符号数2 浮点数及定点数I、定点数的加减法II、定点数的乘除法 3 仿真验证i、加减法验证ii、乘除法验证 FPGA基本算术运算 FPGA相对于MCU有并行计算、算法效率较高等优势&#xff0c;但同样由于没有成型的FPU等MCU内含的浮点数运…

python创建exe文件

1、搭建环境 pip install pyinstaller 2、准备测试代码 exe_test.py import timeprint("hello") print("hello") print("hello") print("hello")time.sleep(5) 注&#xff1a;添加sleep以便在执行exe文件的时候能看到结果 3、生…

个人能做股票期权吗?个人期权交易开户条件新规

个人投资者是可以交易股票期权的&#xff0c;不过期权交易通常需要投资者具备一定的投资经验和风险承受能力&#xff0c;因为期权交易涉及较高的风险和复杂性&#xff0c;下文为大家介绍个人能做股票期权吗&#xff1f;个人期权交易开户条件新规的内容。本文来自&#xff1a;期…

索尼 toio™ 应用创意开发征文|探索创新的玩乐世界——索尼 toio™

导语&#xff1a; 在技术的不断进步和发展中&#xff0c;玩具也逐渐融入了智能化的潮流。索尼 toio™作为一款前沿的智能玩具&#xff0c;给孩子和成人带来了全新的游戏体验。本文将介绍索尼 toio™的特点、功能和应用场景&#xff0c;让读者了解这个令人兴奋的创新产品。 1. 了…

【计算机网络】 ARP协议和DNS协议

文章目录 数据包在传输过程中的变化过程单播组播和广播ARP协议ARP代理免费ARP路由数据转发过程DNS协议 数据包在传输过程中的变化过程 在说ARP和DNS之前&#xff0c;我们需要知道数据包在传输过程的变化过程 从图片中可以看到&#xff0c;发送方的原数据最开始是在应用层&…

SpringAOP面向切面编程

文章目录 一. AOP是什么&#xff1f;二. AOP相关概念三. SpringAOP的简单演示四. SpringAOP实现原理 一. AOP是什么&#xff1f; AOP&#xff08;Aspect Oriented Programming&#xff09;&#xff1a;面向切面编程&#xff0c;它是一种编程思想&#xff0c;是对某一类事情的集…

最经典的解析LSA数据库(第六课)

初步认识OSPF的大致内容(第三课)_IHOPEDREAM的博客-CSDN博客 1 OSPF 工作过程 建立领居表 同步数据库 今天来 说一说数据库概念 计算路由表 2 什么是数据库&#xff1f; 数据库是一个组织化的数据集合&#xff0c;用于存储、管理和检索数据。它是一个可访问的集合&#x…

OpenCV项目实战(1)— 如何去截取视频中的帧

前言&#xff1a;Hello大家好&#xff0c;我是小哥谈。针对一段视频&#xff0c;如何去截取视频中的帧呢&#xff1f;本节课就给大家介绍两种方式&#xff0c;一种方式是按一定间隔来截取视频帧&#xff0c;另一种方式是截取视频的所有帧。希望大家学习之后能够有所收获&#x…

基于springboot实现的rabbitmq消息确认

概述 RabbitMQ的消息确认有两种。 一种是消息发送确认。这种是用来确认生产者将消息发送给交换器&#xff0c;交换器传递给队列的过程中&#xff0c;消息是否成功投递。发送确认分为两步&#xff0c;一是确认是否到达交换器&#xff0c;二是确认是否到达队列。 第二种是消费接…

入门力扣自学笔记277 C++ (题目编号:42)(动态规划)

42. 接雨水 题目&#xff1a; 给定 n 个非负整数表示每个宽度为 1 的柱子的高度图&#xff0c;计算按此排列的柱子&#xff0c;下雨之后能接多少雨水。 示例 1&#xff1a; 输入&#xff1a;height [0,1,0,2,1,0,1,3,2,1,2,1] 输出&#xff1a;6 解释&#xff1a;上面是由数组…

Redis——Java中的客户端和API

Java客户端 在大多数的业务实现中&#xff0c;我们还是使用编码去操作Redis&#xff0c;对于命令的学习只是知道这些数据库可以做什么操作&#xff0c;以及在后面学习到了Java的API之后知道什么方法对应什么命令即可。 官方推荐的Java的客户端网页链接如下&#xff1a; 爪哇…

强大易用的开源 建站工具Halo

特点 可插拔架构 Halo 采用可插拔架构&#xff0c;功能模块之间耦合度低、灵活性提高。支持用户按需安装、卸载插件&#xff0c;操作便捷。同时提供插件开发接口以确保较高扩展性和可维护性。 ☑ 支持在运行时安装和卸载插件 ☑ 更加方便地集成三方平台 ☑ 统一的可配置设置表…

Pytest系列-fixture的详细使用和结合conftest.py的详细使用(3)

介绍 前面一篇讲了setup、teardown可以实现在执行用例前或结束后加入一些操作&#xff0c;但这种都是针对整个脚本全局生效的。 Fixture是pytest的非常核心功能之一&#xff0c;在不改变被装饰函数的前提下对函数进行功能增强&#xff0c;经常用于自定义测试用例前置和后置工作…

网络原理

网络原理 传输层 UDP 特点 特点&#xff1a;无连接&#xff0c;不可靠&#xff0c;面向数据报&#xff0c;全双工 格式 怎么进行校验呢&#xff1f; 把UDP数据报中的源端口&#xff0c;目的端口&#xff0c;UDP报文长度的每个字节&#xff0c;都依次进行累加 把累加结果&a…

Kafka源码分析之网络通信

1、生产者网络设计 架构设计图 2、生产者消息缓存机制 1、RecordAccumulator 将消息缓存到RecordAccumulator收集器中, 最后判断是否要发送。这个加入消息收集器&#xff0c;首先得从 Deque 里找到自己的目标分区&#xff0c;如果没有就新建一个批量消息 Deque 加进入 2、消…

excel中的引用与查找函数篇1

1、COLUMN(reference)&#xff1a;返回与列号对应的数字 2、ROW(reference)&#xff1a;返回与行号对应的数字 参数reference表示引用/参考单元格&#xff0c;输入后引用单元格后colimn()和row()会返回这个单元格对应的列号和行号。若参数reference没有引用单元格&#xff0c;…

【APUE】标准I/O库

目录 1、简介 2、FILE对象 3、打开和关闭文件 3.1 fopen 3.2 fclose 4、输入输出流 4.1 fgetc 4.2 fputc 4.3 fgets 4.4 fputs 4.5 fread 4.6 fwrite 4.7 printf 族函数 4.8 scanf 族函数 5、文件指针操作 5.1 fseek 5.2 ftell 5.3 rewind 6、缓冲相关 6.…