前言
本节内容使用zookeeper实现分布式锁,完成并发访问“超卖”问题的解决。相对于redis分布式锁,zookeeper能够保证足够的安全性。关于zookeeper的安装内容这里不做介绍,开始本节内容之前先自行安装好zookeeper中间键服务。这里我们利用创建zookeeper路径节点的唯一性实现分布式锁。并同时演示如何使用Curator工具包,完成分布式锁。
正文
- 在项目中添加zookeeper的pom依赖
<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.7.2</version>
</dependency>
- 创建zookeeper客户端工具,实现加锁和解锁方法
- 创建ZookeeperClient客户端工具
package com.ht.atp.plat.util;import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.springframework.stereotype.Component;import javax.annotation.PostConstruct; import javax.annotation.PreDestroy;@Component public class ZookeeperClient {/*** zookeeper连接地址*/private static final String connectString = "192.168.110.88:2181";/*** 分布式锁根路径*/private static final String ROOT_PATH = "/distributed";/*** zookeeper客户端*/private ZooKeeper zooKeeper;/*** 初始化zookeeper客户端*/@PostConstructpublic void init() {try {// 连接zookeeper服务器this.zooKeeper = new ZooKeeper(connectString, 30000, event -> System.out.println("获取链接成功!!"));// 创建分布式锁根节点if (this.zooKeeper.exists(ROOT_PATH, false) == null) {this.zooKeeper.create(ROOT_PATH, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}} catch (Exception e) {System.out.println("获取链接失败!");e.printStackTrace();}}/*** 销毁zookeeper客户端*/@PreDestroypublic void destroy() {try {if (zooKeeper != null) {zooKeeper.close();}} catch (InterruptedException e) {e.printStackTrace();}}/*** 加锁** @param lockName*/public void lock(String lockName) {try {zooKeeper.create(ROOT_PATH + "/" + lockName, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);} catch (Exception e) {// 重试try {Thread.sleep(200);lock(lockName);} catch (InterruptedException ex) {ex.printStackTrace();}}System.out.println("----------加锁成功------------");}/*** 解锁** @param lockName*/public void unlock(String lockName) {try {this.zooKeeper.delete(ROOT_PATH + "/" + lockName, 0);} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}System.out.println("----------解锁成功------------");} }
- 使用临时节点EPHEMERAL加锁,这里使用临时节点是方便锁的自动释放,避免发生死锁问题
- 业务执行完成,删除临时节点,解锁
- 实现“超卖”的业务方法,使用自定义的zookeeper工具类加锁
@AutowiredZookeeperClient zookeeperClient;@Overridepublic void checkAndReduceStock() {zookeeperClient.lock("lock");// 查询库存WmsStock wmsStock = baseMapper.selectById(1L);// 验证库存大于0再扣减库存if (wmsStock != null && wmsStock.getStockQuantity() > 0) {wmsStock.setStockQuantity(wmsStock.getStockQuantity() - 1);baseMapper.updateById(wmsStock);}// 释放锁zookeeperClient.unlock("lock");}
- 将数据库库存表的库存恢复为10000,分别启动7000,7001,7002服务
- 启动jmeter压测工具,压测库存扣减接口,查看结果
- 库存扣减为0
- jmeter压测结果,平均访问时间502ms,请求吞吐量为每秒161
- 不存在并发“超卖问题”,但是接口访问吞吐量较低。由于在加锁过程中,获取不到锁,会无限自旋去获取锁,导致性能下降。
- 优化分布式锁,使用zk的临时序列化节点实现分布式锁,避免锁的自旋操作
- 优化代码
package com.ht.atp.plat.util;import org.apache.commons.lang3.StringUtils; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils;import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.util.List;@Component public class ZookeeperClientNoBlock {/*** zookeeper连接地址*/private static final String connectString = "192.168.110.88:2181";/*** 分布式锁根路径*/private static final String ROOT_PATH = "/distributed";/*** zookeeper客户端*/private ZooKeeper zooKeeper;/*** 初始化zookeeper客户端*/@PostConstructpublic void init() {try {// 连接zookeeper服务器this.zooKeeper = new ZooKeeper(connectString, 30000, event -> System.out.println("获取链接成功!!"));// 创建分布式锁根节点if (this.zooKeeper.exists(ROOT_PATH, false) == null) {this.zooKeeper.create(ROOT_PATH, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}} catch (Exception e) {System.out.println("获取链接失败!");e.printStackTrace();}}/*** 销毁zookeeper客户端*/@PreDestroypublic void destroy() {try {if (zooKeeper != null) {zooKeeper.close();}} catch (InterruptedException e) {e.printStackTrace();}}/*** 创建锁** @param lockName*/public String lock(String lockName) {try {String realLock = zooKeeper.create(ROOT_PATH + "/" + lockName + "-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);return realLock;} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}System.out.println("----------加锁成功------------");return null;}/*** 检查锁** @param lockName*/public void checkLock(String lockName) {String preNode = getPreNode(lockName);// 如果该节点没有前一个节点,说明该节点时最小节点,放行执行业务逻辑if (StringUtils.isEmpty(preNode)) {return;}// 重新检查。是否获取到锁try {Thread.sleep(20);} catch (InterruptedException ex) {ex.printStackTrace();}checkLock(lockName);}/*** 获取指定节点的前节点** @param path* @return*/private String getPreNode(String path) {try {// 获取当前节点的序列化号Long curSerial = Long.valueOf(StringUtils.substringAfterLast(path, "-"));// 获取根路径下的所有序列化子节点List<String> nodes = this.zooKeeper.getChildren(ROOT_PATH, false);// 判空if (CollectionUtils.isEmpty(nodes)) {return null;}// 获取前一个节点Long flag = 0L;String preNode = null;for (String node : nodes) {// 获取每个节点的序列化号Long serial = Long.valueOf(StringUtils.substringAfterLast(node, "-"));if (serial < curSerial && serial > flag) {flag = serial;preNode = node;}}return preNode;} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}return null;}/*** 解锁** @param lockName*/public void unlock(String lockName) {try {this.zooKeeper.delete(lockName, 0);} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}System.out.println("----------解锁成功------------");} }
- 使用临时序列化节点EPHEMERAL_SEQUENTIAL加锁,保证每个节点都可以加锁成功,避免自旋操作
- 通过判断当前节点是否是第一个节点,如果是第一个节点,才可执行后续的业务
- 解锁操作
- 扣减库存业务实现方法
public void checkAndReduceStock() {//加锁String lock = zookeeperClientNoBlock.lock("lock");//检查锁zookeeperClientNoBlock.checkLock(lock);// 查询库存WmsStock wmsStock = baseMapper.selectById(1L);// 验证库存大于0再扣减库存if (wmsStock != null && wmsStock.getStockQuantity() > 0) {wmsStock.setStockQuantity(wmsStock.getStockQuantity() - 1);baseMapper.updateById(wmsStock);}// 释放锁zookeeperClientNoBlock.unlock(lock);}
- 将扣减库存恢复为10000,重新启动7000,7001,7002服务,使用jmeter压测工具再次压测
- 库存扣减为0
- 压测结果:平均访问时间1597ms,请求访问吞吐量为每秒62
- 从优化结果来看,此种方式更加耗时,性能更差。将加锁操作改为非自旋操作,虽然加锁不在耗时,但是会自旋判断自己是否是最小的节点,依然存在耗时操作,且逻辑更为复杂。
- 优化分布式锁,通过使用zookeeper的Watcher监听来实现阻塞锁
- 实现代码
package com.ht.atp.plat.util;import org.apache.commons.lang3.StringUtils; import org.apache.zookeeper.*; import org.apache.zookeeper.proto.WatcherEvent; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils;import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.util.List; import java.util.concurrent.CountDownLatch;@Component public class ZookeeperClientBlockWatch {/*** zookeeper连接地址*/private static final String connectString = "192.168.110.88:2181";/*** 分布式锁根路径*/private static final String ROOT_PATH = "/distributed";/*** zookeeper客户端*/private ZooKeeper zooKeeper;/*** 初始化zookeeper客户端*/@PostConstructpublic void init() {try {// 连接zookeeper服务器this.zooKeeper = new ZooKeeper(connectString, 30000, event -> System.out.println("获取链接成功!!"));// 创建分布式锁根节点if (this.zooKeeper.exists(ROOT_PATH, false) == null) {this.zooKeeper.create(ROOT_PATH, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}} catch (Exception e) {System.out.println("获取链接失败!");e.printStackTrace();}}/*** 销毁zookeeper客户端*/@PreDestroypublic void destroy() {try {if (zooKeeper != null) {zooKeeper.close();}} catch (InterruptedException e) {e.printStackTrace();}}/*** 创建锁** @param lockName*/public String lock(String lockName) {try {String realLock = zooKeeper.create(ROOT_PATH + "/" + lockName + "-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);return realLock;} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}System.out.println("----------加锁成功------------");return null;}/*** 检查锁** @param lockName*/public void checkLock(String lockName) {try {String preNode = getPreNode(lockName);// 如果该节点没有前一个节点,说明该节点时最小节点,放行执行业务逻辑if (!StringUtils.isEmpty(preNode)) {CountDownLatch countDownLatch = new CountDownLatch(1);if (this.zooKeeper.exists(ROOT_PATH + "/" + preNode, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {System.out.println("监控:"+ watchedEvent.getPath());countDownLatch.countDown();}}) == null) {return;}// 阻塞,减少自旋检查countDownLatch.await();}return;} catch (Exception e) {e.printStackTrace();// 重新检查。是否获取到锁try {Thread.sleep(20);} catch (InterruptedException ex) {ex.printStackTrace();}checkLock(lockName);}}/*** 获取指定节点的前节点** @param path* @return*/private String getPreNode(String path) {try {// 获取当前节点的序列化号Long curSerial = Long.valueOf(StringUtils.substringAfterLast(path, "-"));// 获取根路径下的所有序列化子节点List<String> nodes = this.zooKeeper.getChildren(ROOT_PATH, false);// 判空if (CollectionUtils.isEmpty(nodes)) {return null;}// 获取前一个节点Long flag = 0L;String preNode = null;for (String node : nodes) {// 获取每个节点的序列化号Long serial = Long.valueOf(StringUtils.substringAfterLast(node, "-"));if (serial < curSerial && serial > flag) {flag = serial;preNode = node;}}return preNode;} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}return null;}/*** 解锁** @param lockName*/public void unlock(String lockName) {try {this.zooKeeper.delete(lockName, 0);} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}System.out.println("----------解锁成功------------");} }
- 检查加锁,加入前一个节点的监控判断,如果前一个节点的锁还没有释放,就使用CountDownLatch计数器阻塞程序,减少自旋检查,当监控到前一个节点的锁释放,则当前节点获取到锁,开始执行业务逻辑
- 修改扣减库存业务实现方法为监控阻塞的方式
@AutowiredZookeeperClientBlockWatch zookeeperClientBlockWatch;public void checkAndReduceStock() {//加锁String lock = zookeeperClientBlockWatch.lock("lock");//检查锁zookeeperClientBlockWatch.checkLock(lock);// 查询库存WmsStock wmsStock = baseMapper.selectById(1L);// 验证库存大于0再扣减库存if (wmsStock != null && wmsStock.getStockQuantity() > 0) {wmsStock.setStockQuantity(wmsStock.getStockQuantity() - 1);baseMapper.updateById(wmsStock);}// 释放锁zookeeperClientBlockWatch.unlock(lock);}
- 将扣减库存恢复为10000,重新启动7000,7001,7002服务,使用jmeter压测工具再次压测
- 库存扣减为了0
- jmeter压测结果:平均访问时间441ms,请求访问吞吐量为每秒223
- 从优化结果来看,使用带监控的阻塞锁吞吐量更高,平均访问时间更小,性能比自旋加锁的方式性能更优。
- 使用ThreadLocal优化分布式锁,将zookeeper的Watcher监听来实现阻塞锁优化为可重入分布式锁
- 优化代码
package com.ht.atp.plat.util;import org.apache.commons.lang3.StringUtils; import org.apache.zookeeper.*; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils;import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.util.List; import java.util.concurrent.CountDownLatch;@Component public class ZookeeperClientBlockWatchReentrant {/*** zookeeper连接地址*/private static final String connectString = "192.168.110.88:2181";/*** 分布式锁根路径*/private static final String ROOT_PATH = "/distributed";/*** zookeeper客户端*/private ZooKeeper zooKeeper;/*** ThreadLocal本地线程*/private static final ThreadLocal<Integer> THREAD_LOCAL = new ThreadLocal<>();/*** 初始化zookeeper客户端*/@PostConstructpublic void init() {try {// 连接zookeeper服务器this.zooKeeper = new ZooKeeper(connectString, 30000, event -> System.out.println("获取链接成功!!"));// 创建分布式锁根节点if (this.zooKeeper.exists(ROOT_PATH, false) == null) {this.zooKeeper.create(ROOT_PATH, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}} catch (Exception e) {System.out.println("获取链接失败!");e.printStackTrace();}}/*** 销毁zookeeper客户端*/@PreDestroypublic void destroy() {try {if (zooKeeper != null) {zooKeeper.close();}} catch (InterruptedException e) {e.printStackTrace();}}/*** 创建锁** @param lockName*/public String lock(String lockName) {try {if (THREAD_LOCAL.get() == null || THREAD_LOCAL.get() == 0){String realLock = zooKeeper.create(ROOT_PATH + "/" + lockName + "-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);return realLock;}} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}System.out.println("----------加锁成功------------");return null;}/*** 检查锁** @param lockName*/public void checkLock(String lockName) {Integer flag = THREAD_LOCAL.get();if (flag != null && flag > 0) {THREAD_LOCAL.set(flag + 1);return;}try {String preNode = getPreNode(lockName);// 如果该节点没有前一个节点,说明该节点时最小节点,放行执行业务逻辑if (!StringUtils.isEmpty(preNode)) {CountDownLatch countDownLatch = new CountDownLatch(1);if (this.zooKeeper.exists(ROOT_PATH + "/" + preNode, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {System.out.println("监控:"+ watchedEvent.getPath());countDownLatch.countDown();}}) == null) {THREAD_LOCAL.set(1);return;}// 阻塞,减少自旋检查countDownLatch.await();}THREAD_LOCAL.set(1);return;} catch (Exception e) {e.printStackTrace();// 重新检查。是否获取到锁try {Thread.sleep(20);} catch (InterruptedException ex) {ex.printStackTrace();}checkLock(lockName);}}/*** 获取指定节点的前节点** @param path* @return*/private String getPreNode(String path) {try {// 获取当前节点的序列化号Long curSerial = Long.valueOf(StringUtils.substringAfterLast(path, "-"));// 获取根路径下的所有序列化子节点List<String> nodes = this.zooKeeper.getChildren(ROOT_PATH, false);// 判空if (CollectionUtils.isEmpty(nodes)) {return null;}// 获取前一个节点Long flag = 0L;String preNode = null;for (String node : nodes) {// 获取每个节点的序列化号Long serial = Long.valueOf(StringUtils.substringAfterLast(node, "-"));if (serial < curSerial && serial > flag) {flag = serial;preNode = node;}}return preNode;} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}return null;}/*** 解锁** @param lockName*/public void unlock(String lockName) {try {THREAD_LOCAL.set(THREAD_LOCAL.get() - 1);if (THREAD_LOCAL.get() == 0) {this.zooKeeper.delete(lockName, 0);THREAD_LOCAL.remove();}} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}System.out.println("----------解锁成功------------");} }
- 使用ThreadLocal存储当前线程的操作
- 加锁:先检测本地线程是否存在或者值是否为0,如果不存在或者为0,则创建锁,避免重复创建
- 检查加锁:如果值大于0,代表可重入,值加1;如果不存在或者等于0,将值设置为1,代表第一次获取到锁
- 解锁:获取线程的值,并减少1,如果减少后的值变为0,代表锁已经不在占用,此时才释放锁,并且将当前线程的值存储移除;如果减少的值还是大于0,代表此时锁还在占用
- 扣减库存业务代码
@AutowiredZookeeperClientBlockWatchReentrant zookeeperClientBlockWatchReentrant;public void checkAndReduceStock() {//加锁String lock = zookeeperClientBlockWatchReentrant.lock("lock");//检查锁zookeeperClientBlockWatchReentrant.checkLock(lock);// 查询库存WmsStock wmsStock = baseMapper.selectById(1L);// 验证库存大于0再扣减库存if (wmsStock != null && wmsStock.getStockQuantity() > 0) {wmsStock.setStockQuantity(wmsStock.getStockQuantity() - 1);baseMapper.updateById(wmsStock);}// 释放锁zookeeperClientBlockWatchReentrant.unlock(lock);}
- 将扣减库存恢复为10000,重新启动7000,7001,7002服务,使用jmeter压测工具再次压测
- 库存扣减为0
- jmeter压测结果:平均访问时间426ms,吞吐量每秒231
- 使用zookeeper的Curator工具包实现分布式锁
- 引入pom依赖
<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes --> <dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>5.1.0</version> </dependency>
- 创建CuratorFramework的bean工具类
package com.ht.atp.plat.config;import lombok.extern.slf4j.Slf4j; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.api.CuratorEventType; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.WatchedEvent; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;/*** CuratorFramework工具类配置*/ @Slf4j @Configuration public class ZookeeperConfig {@Beanpublic CuratorFramework curatorFramework() {// ExponentialBackoffRetry是种重连策略,每次重连的间隔会越来越长,1000毫秒是初始化的间隔时间,3代表尝试重连次数。ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);// 创建客户端CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("192.168.110.88:2181", retryPolicy);// 添加watched 监听器curatorFramework.getCuratorListenable().addListener((CuratorFramework client, CuratorEvent event) -> {CuratorEventType type = event.getType();if (type == CuratorEventType.WATCHED) {WatchedEvent watchedEvent = event.getWatchedEvent();String path = watchedEvent.getPath();log.info(watchedEvent.getType() + " ----------------------------> " + path);// 重新设置该节点监听if (null != path) {client.checkExists().watched().forPath(path);}}});// 启动客户端curatorFramework.start();return curatorFramework;} }
- 使用可重入锁,扣减库存业务方法
@Autowiredprivate CuratorFramework curatorFramework;public void checkAndReduceStock() throws Exception {//加锁InterProcessMutex mutex = new InterProcessMutex(curatorFramework, "/curator/lock");try {// 加锁mutex.acquire();// 查询库存WmsStock wmsStock = baseMapper.selectById(1L);// 验证库存大于0再扣减库存if (wmsStock != null && wmsStock.getStockQuantity() > 0) {wmsStock.setStockQuantity(wmsStock.getStockQuantity() - 1);baseMapper.updateById(wmsStock);}this.testSub(mutex);} catch (Exception e) {e.printStackTrace();} finally {// 释放锁mutex.release();}}public void testSub(InterProcessMutex mutex) throws Exception {try {mutex.acquire();System.out.println("测试可重入锁。。。。");} catch (Exception e) {e.printStackTrace();}finally {// 释放锁mutex.release();}}
- 将扣减库存恢复为10000,重新启动7000,7001,7002服务,使用jmeter压测工具再次压测
- 库存扣减为0
- jmeter压测结果:平均访问时间497,吞吐量每秒198
- 能够解决并发访问“超卖问题”
结语
关于使用zookeeper分布式锁解决“超卖”问题的内容到这里就结束了,我们下期见。。。。。。