常见分布式锁4:zookeeper 瞬时znode节点 + watcher监听机制,ChatGPT回复的解决死锁的方案

原文地址在这里

临时节点具备数据自动删除的功能。当client与ZooKeeper连接和session断掉时,相应的临时节点就会被删除。zk有瞬时和持久节点,瞬时节点不可以有子节点。会话结束之后瞬时节点就会消失,基于zk的瞬时有序节点实现分布式锁:

多线程并发创建瞬时节点的时候,得到有序的序列,序号最小的线程可以获得锁;

其他的线程监听自己序号的前一个序号。前一个线程执行结束之后删除自己序号的节点;

下一个序号的线程得到通知,继续执行;

以此类推,创建节点的时候,就确认了线程执行的顺序。

<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.4.14</version><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion></exclusions>
</dependency>

zk 的观察器只可以监控一次,数据发生变化之后可以发送给客户端,之后需要再次设置监控。exists、create、getChildren三个方法都可以添加watcher ,也就是在调用方法的时候传递true就是添加监听。注意这里Lock 实现了Watcher和AutoCloseable:

当前线程创建的节点是第一个节点就获得锁,否则就监听自己的前一个节点的事件:

/*** 自己本身就是一个 watcher,可以得到通知* AutoCloseable 实现自动关闭,资源不使用的时候*/
@Slf4j
public class ZkLock implements AutoCloseable, Watcher {
​private ZooKeeper zooKeeper;
​/*** 记录当前锁的名字*/private String znode;
​public ZkLock() throws IOException {this.zooKeeper = new ZooKeeper("localhost:2181",10000,this);}
​public boolean getLock(String businessCode) {try {//创建业务 根节点Stat stat = zooKeeper.exists("/" + businessCode, false);if (stat==null){zooKeeper.create("/" + businessCode,businessCode.getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);}
​//创建瞬时有序节点  /order/order_00000001znode = zooKeeper.create("/" + businessCode + "/" + businessCode + "_", businessCode.getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
​//获取业务节点下 所有的子节点List<String> childrenNodes = zooKeeper.getChildren("/" + businessCode, false);//获取序号最小的(第一个)子节点Collections.sort(childrenNodes);String firstNode = childrenNodes.get(0);//如果创建的节点是第一个子节点,则获得锁if (znode.endsWith(firstNode)){return true;}//如果不是第一个子节点,则监听前一个节点String lastNode = firstNode;for (String node:childrenNodes){if (znode.endsWith(node)){zooKeeper.exists("/"+businessCode+"/"+lastNode,true);break;}else {lastNode = node;}}synchronized (this){wait();}return true;} catch (Exception e) {e.printStackTrace();}return false;}
​@Overridepublic void close() throws Exception {zooKeeper.delete(znode,-1);zooKeeper.close();log.info("我已经释放了锁!");}
​@Overridepublic void process(WatchedEvent event) {if (event.getType() == Event.EventType.NodeDeleted){synchronized (this){notify();}}}
}

这段代码实现了一个基于 ZooKeeper 的分布式锁,以下是它的实现步骤:

  1. 首先创建 ZooKeeper 客户端,并实现 Watcher 接口,在自身上注册监听器。

  2. 在 ZooKeeper 上创建一个业务根节点,例如 /businessCode,表示该业务下所有的分布式锁。

  3. 使用 ZooKeeper 的临时有序节点创建子节点,例如 /businessCode/businessCode_00001,表示当前节点占用了分布式锁,并记录在 znode 中。

  4. 获取业务节点下所有的子节点,并按节点名称排序。如果当前节点的名称是所有子节点中最小的,则获取分布式锁。

  5. 如果当前节点不是所有子节点中最小的,则监听前一个子节点的删除事件,等待前一个子节点释放锁。

  6. 当前一个子节点被删除时,重新执行第四步,即再次检查当前节点是否为所有子节点中最小的。

  7. 如果当前节点成功获取到锁,则执行业务操作;否则等待锁释放,重新获取锁。

  8. 业务操作完成后,执行 close() 方法释放锁,删除当前节点。

总体来说,这段代码实现了一种基本的分布式锁机制,通过 ZooKeeper 的临时有序节点和 Watcher 监听机制来实现。需要注意的是,此实现方式还可能存在死锁问题,当持有锁的节点出现网络故障或宕机时,会导致整个分布式锁失效。因此,在实际应用中,还需要综合考虑各种异常情况,确保分布式锁的正确性和高可用性。

 针对死锁ChatGPT3.5的解决方案:

那段代码中存在潜在的死锁问题,即如果持有锁的节点出现异常,可能导致整个分布式锁失效。为了解决这个问题,可以考虑以下两种方案:

  1. 引入超时机制

在获取锁的过程中,可以引入超时机制,例如在等待前一个节点释放锁的过程中,设定一个超时时间,如果超过了这个时间仍未收到通知,则当前节点主动删除自己的临时节点,释放锁,并尝试重新获取锁。

  1. 心跳检测机制

对于已经获取到分布式锁的节点,可以在一定时间间隔内向 ZooKeeper 发送心跳消息,以检测自身是否还处于活跃状态。如果该节点长时间没有发送心跳消息或者 ZooKeeper 无法接收到该节点的心跳消息,就认为该节点出现了异常,需要主动删除自身的临时节点,释放锁,并尝试重新获取锁。

在前面提到的两种方案中,我选择了第一种方案,即引入超时机制来解决死锁问题。具体实现方式如下:

在获取锁的过程中,我们可以增加一个等待时间的参数 waitTime,表示等待前一个节点释放锁的最长时间。在设置监听器之后,使用 CountDownLatch 等待前一个节点的删除事件通知,当出现以下情况之一时,即可跳出等待循环:

  1. 在等待时间内成功获取到锁,即当前节点为业务节点下所有临时节点中的最小节点。

  2. 在等待时间内等待到超时时间,说明前一节点无法正常释放锁,因此当前节点放弃等待,主动删除自己的临时节点,重新尝试获取锁。

修改后的代码如下所示:

public class DistributedLock implements AutoCloseable {private static final String ROOT_NODE = "/businessCode"; // 业务根节点private final ZooKeeper zkClient; // ZooKeeper 客户端private final String lockName; // 分布式锁名称private String currentLockNodePath; // 当前节点的 znode 路径public DistributedLock(String zkServers, String lockName) throws IOException {this.zkClient = new ZooKeeper(zkServers, 3000, event -> {});this.lockName = lockName;ensureRootNodeExists();}// 确保业务根节点存在private void ensureRootNodeExists() {try {if (zkClient.exists(ROOT_NODE, false) == null) {zkClient.create(ROOT_NODE, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}} catch (Exception e) {throw new RuntimeException("Failed to ensure root node exists: " + ROOT_NODE, e);}}// 获取锁public void lock(long waitTime) {try {// 创建当前节点currentLockNodePath = zkClient.create(ROOT_NODE + "/" + lockName + "_",null,ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);// 等待前一个节点释放锁List<String> children = zkClient.getChildren(ROOT_NODE, false);String[] sortedNodes = children.stream().sorted().toArray(String[]::new);int currentNodeIndex = Arrays.binarySearch(sortedNodes, currentLockNodePath.substring(currentLockNodePath.lastIndexOf("/") + 1));if (currentNodeIndex == 0) {// 当前节点为业务节点下所有临时节点中的最小节点,获取锁return;}// 等待前一个节点的删除事件通知,如果超时仍未收到,则主动删除自己的临时节点CountDownLatch latch = new CountDownLatch(1);String previousNodePath = ROOT_NODE + "/" + sortedNodes[currentNodeIndex - 1];Stat stat = zkClient.exists(previousNodePath, event -> {if (event.getType() == Watcher.Event.EventType.NodeDeleted) {latch.countDown();}});if (stat != null) {if (latch.await(waitTime, TimeUnit.MILLISECONDS)) {// 等待成功,重新执行获取锁操作lock(waitTime);} else {// 超时未等到通知,主动删除自己的临时节点,放弃获取锁zkClient.delete(currentLockNodePath, -1);throw new RuntimeException("Failed to acquire lock within wait time: " + waitTime);}}} catch (Exception e) {throw new RuntimeException("Failed to acquire lock: " + lockName, e);}}// 释放锁@Overridepublic void close() {try {System.out.println("Releasing lock: " + lockName);zkClient.delete(currentLockNodePath, -1);zkClient.close();} catch (Exception e) {throw new RuntimeException("Failed to release lock: " + lockName, e);}}
}

lock() 方法中增加了一个参数 waitTime,表示等待前一个节点释放锁的最长时间。在方法最后,如果超时未收到通知,则主动删除当前节点,放弃获取锁,并抛出异常提示用户。

需要注意的是,在使用该类时,需要确保所有节点设置的 waitTime 值是一致的。另外,为了让线程能够跳出等待循环,应该每隔一段时间检查一下当前的线程状态,例如通过 Thread.currentThread().isInterrupted() 判断线程是否被打断,如果被打断则立即跳出循环。

下面是使用心跳检测机制的代码实例:

public class DistributedLock implements AutoCloseable {private static final String ROOT_NODE = "/businessCode"; // 业务根节点private static final long HEARTBEAT_RATE = 1000L; // 心跳检测频率,单位毫秒private final ZooKeeper zkClient; // ZooKeeper 客户端private final String lockName; // 分布式锁名称private volatile boolean locked = false; // 是否已经获取到了锁private Thread heartbeatThread; // 心跳线程private String currentLockNodePath; // 当前节点的 znode 路径public DistributedLock(String zkServers, String lockName) throws IOException {this.zkClient = new ZooKeeper(zkServers, 3000, event -> {});this.lockName = lockName;ensureRootNodeExists();}// 确保业务根节点存在private void ensureRootNodeExists() {try {if (zkClient.exists(ROOT_NODE, false) == null) {zkClient.create(ROOT_NODE, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}} catch (Exception e) {throw new RuntimeException("Failed to ensure root node exists: " + ROOT_NODE, e);}}// 获取锁public void lock() {try {// 创建当前节点currentLockNodePath = zkClient.create(ROOT_NODE + "/" + lockName + "_",null,ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);// 等待前一个节点释放锁while (!locked) {List<String> children = zkClient.getChildren(ROOT_NODE, false);String[] sortedNodes = children.stream().sorted().toArray(String[]::new);int currentNodeIndex = Arrays.binarySearch(sortedNodes, currentLockNodePath.substring(currentLockNodePath.lastIndexOf("/") + 1));if (currentNodeIndex == 0) {// 当前节点为业务节点下所有临时节点中的最小节点,获取锁locked = true;startHeartbeatThread(); // 启动心跳线程break;}// 等待前一个节点的删除事件通知String previousNodePath = ROOT_NODE + "/" + sortedNodes[currentNodeIndex - 1];CountDownLatch latch = new CountDownLatch(1);Stat stat = zkClient.exists(previousNodePath, event -> {if (event.getType() == Watcher.Event.EventType.NodeDeleted) {latch.countDown();}});if (stat != null) {latch.await();}}} catch (Exception e) {throw new RuntimeException("Failed to acquire lock: " + lockName, e);}}// 释放锁@Overridepublic void close() {try {System.out.println("Releasing lock: " + lockName);zkClient.delete(currentLockNodePath, -1);stopHeartbeatThread(); // 停止心跳线程zkClient.close();} catch (Exception e) {throw new RuntimeException("Failed to release lock: " + lockName, e);}}// 启动心跳线程private void startHeartbeatThread() {heartbeatThread = new Thread(() -> {while (true) {try {zkClient.setData(currentLockNodePath, null, -1);Thread.sleep(HEARTBEAT_RATE);} catch (Exception e) {System.out.println("Failed to send heartbeat signal: " + lockName);}}});heartbeatThread.start();}// 停止心跳线程private void stopHeartbeatThread() {if (heartbeatThread != null) {heartbeatThread.interrupt();heartbeatThread = null;}}
}

在该代码实例中,我们创建了一个布尔型变量 locked,表示当前是否已经获取到了锁。在获取锁的过程中,如果当前节点为业务节点下所有临时节点中的最小节点,则设置 lockedtrue,同时启动心跳线程。心跳线程每隔一段时间就向 ZooKeeper 发送一次空数据以保持会话,从而保证自己的临时节点不会过期。

在释放锁的过程中,停止心跳线程即可。需要注意的是,心跳线程的 while 循环不能被打断,因为一旦被打断,线程就会退出,从而不再发送心跳信号,导致临时节点过期。因此,在捕获 InterruptedException 异常时只是简单地输出日志,并继续下一轮循环。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/13564.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何用ChatGPT设计出可控制的代码?

【编者按】如何训练 ChatGPT 实现自己想要的代码与功能&#xff0c;秘诀就是编写更具交互性和前瞻性的设计提示&#xff0c;本文作者分享了如何利用 ChatGPT 设计出可控制的代码步骤。 原文链接&#xff1a;https://www.friendlyskies.net/maybe/a-quick-way-to-get-more-creat…

使用ChatGPT和EZDML迅速高效生成可运行的软件系统原型

ChatGPT最近很热&#xff0c;其对程序员可以说影响极大&#xff0c;是不得不跟的潮流趋势&#xff0c;因此EZDML新版也把ChatGPT的支持加上了&#xff0c;可以在几分钟内按您的意思生成一个数据模型&#xff0c;再搭载使用EZDML自带的代码模板&#xff0c;能快速生成可真正运行…

2023年最新CSA研讨会-ChatGPT的安全影响白皮书下载

万物相生相成。以ChatGPT为代表的生成式人工智能&#xff0c;为网络安全领域曾经难解的问题&#xff0c;提供全新解决思路&#xff0c;也伴生出令人担忧的安全问题。 出现矛盾意味着新的突破正要诞生。对于网络安全厂商来说&#xff0c;如何在保证数据安全、合规的基础上&#…

文艺一言 VS chatGPT

最近百度文心一言新闻发布会召开&#xff0c;会上正式推出了百度版ChatGPT——文心一言。号称中国版的chatgpt&#xff0c;好不好用我不知道&#xff0c;毕竟现在还是内测期间&#xff0c;得有邀请码才能用。但是从发布会开始后&#xff0c;百度集团股价出现持续下挫&#xff0…

chatGPT真的完美吗?“翻车”现场频发,它的智商是9岁小孩儿?

前面我们说到&#xff0c;一款基于chatGPT的插件【Merlin】&#xff0c;聊天式感受强大AI能力 &#xff0c;在我们惊呼chatGPT强大能力的时候&#xff0c;它就真的很完美吗&#xff1f; NO&#xff01;&#xff01; 尽管已经chatGPT已经开始进入商用阶段&#xff0c;各大互联…

震惊科学界!微软最新研究刷屏:GPT-4能力接近人类?

来源&#xff1a;新智元 GPT-4会演变为通用人工智能吗&#xff1f; Meta首席人工智能科学家、图灵奖得主Yann LeCun对此表示质疑。 在他看来&#xff0c;大模型对于数据和算力的需求实在太大&#xff0c;学习效率却不高&#xff0c;因此学习「世界模型」才能通往AGI之路。 不过…

微软豪华力作,称GPT-4已具备人类心智,LeCun质疑

源 | 新智元 在通往AGI的路上我们还有多远&#xff1f;微软豪华作者团队发布的154页论文指出&#xff0c;GPT-4已经初具通用人工智能的雏形。 GPT-4会演变为通用人工智能吗&#xff1f; Meta首席人工智能科学家、图灵奖得主Yann LeCun对此表示质疑。 在他看来&#xff0c;大模型…

OceanBase CTO杨传辉:万字解读,打造开发者友好的分布式数据库

欢迎访问 OceanBase 官网获取更多信息&#xff1a;https://www.oceanbase.com/ 3 月 25 日&#xff0c;第一届 OceanBase 开发者大会在北京举行&#xff0c;OceanBase CTO 杨传辉在主论坛进行了《打造开发者友好的分布式数据库》的分享。 以下为演讲实录&#xff1a; 各位 Oc…

Dora全家桶到底是怎样的味道

Android APP项目大体可以分为基础架构的开发、UI布局和接口调试&#xff0c;其他业务逻辑我这里无法预知&#xff0c;要不然我不成神了&#xff1f;所以我开源了三条路线的框架&#xff0c;dora、dview和dcache。本篇我只讲解基础架构dora是如何使用的。 Get Started // Add …

Mac、iPad卖不动,苹果靠iPhone续命

深燃&#xff08;shenrancaijing&#xff09;原创 作者 | 王敏 编辑 | 金玙璠 北京时间5月5日凌晨&#xff0c;苹果交出了最新一季度的成绩单。 根据财报&#xff0c;在2023年一季度&#xff08;截至4月1日的2023财年第二财季&#xff09;&#xff0c;苹果营收948.4亿美元&…

【历史上的今天】6 月 28 日:马斯克诞生;微软推出 Office 365;蔡氏电路的发明者出生

整理 | 王启隆 透过「历史上的今天」&#xff0c;从过去看未来&#xff0c;从现在亦可以改变未来。 今天是 2023 年 6 月 28 日&#xff0c;在 2005 年的今天&#xff0c;腾讯 Q 店曝光&#xff0c;标志着腾讯进军 C2C 领域。当时国内 C2C 市场一片混战&#xff0c;淘宝、易趣…

Scale AI:大模型还需要数据标注吗?

我们在 2021 年 7 月编译过一篇关于 Scale AI 的文章&#xff0c;但在过去一段时间&#xff0c;AI 行业每天都在发生十级地震&#xff0c;行业价值链也发生变化&#xff0c;因此我们认为有必要重新审视此前研究过的重要公司&#xff0c;所以把 Scale AI 拿出来重新研究。 Scal…

聚观早报 |必应成为中国第一大桌面搜索引擎;快手上市后首次盈利

今日要闻&#xff1a;必应成为中国第一大桌面搜索引擎&#xff1b;快手上市后集团层面首次盈利&#xff1b;ChatGPT相关诈骗攻击与日俱增&#xff1b;比亚迪回应法国建厂传闻&#xff1b;薇娅夫妇半年收获两家上市公司 必应成为中国第一大桌面搜索引擎 5 月 22 日消息&#xf…

一周 AIGC 丨白宫宣布首个 AI 监管计划,中国 AI 领域或面临美国全面投资禁令...

世界经济论坛&#xff08;WEF&#xff09;最新发布的报告显示&#xff0c;未来几年&#xff0c;因人工智能&#xff08;AI&#xff09;和经济增长放缓等因素&#xff0c;全球就业形势将受到严重冲击。全球近四分之一的工作岗位将发生变化&#xff0c;其中一些将被淘汰&#xff…

技术动态 | 基于GPT-4的知识图谱构建能力评测

一、摘要 知识图谱是一种用图模型来描述知识和建模世界万物之间关联关系的大规模语义网络&#xff0c;是大数据时代知识表示的重要方式之一。而大型语言模型&#xff0c;如OpenAI发布的GPT-4 &#xff0c;通过在大量文本等数据上进行预训练&#xff0c;展示出了极其强大的通识知…

微软放弃收购雅虎 | 历史上的今天

整理 | 王启隆 透过「历史上的今天」&#xff0c;从过去看未来&#xff0c;从现在亦可以改变未来。 今天是 2023 年 5 月 4 日&#xff0c;青年节。在 1995 年的今天&#xff0c;德国公司收购 Commodore。作为个人计算机行业的先驱&#xff0c;Commodore 于 1994 年停止生产并宣…

鼠标之父诞生 | 历史上的今天

整理 | 王启隆 透过「历史上的今天」&#xff0c;从过去看未来&#xff0c;从现在亦可以改变未来。 今天是 2022 年 1 月 30 日&#xff0c;在 203 年前的今天&#xff0c;1820 年 1 月 30 日&#xff0c;南极洲被发现。南极洲又称第七大陆&#xff0c;是是人类最后到达的大陆、…

关于大型语言模型的争论和局限

以色列巴伊兰大学教授Yoav Goldberg分享了他对大型语言模型的能力和局限性的看法&#xff0c;以及在语言理解方面的立场。&#xff08;以下内容经授权后由OneFlow编译发布&#xff0c;译文转载请联系OneFlow获得授权。原文&#xff1a;https://gist.github.com/yoavg/59d174608…

ChatGPT4常用插件-Wolfram数学神器

介绍 Wolfram插件通过WolframlAlpha和Wolfram语言使ChatGPT能够访问强大的计算、准确的数学、精选知识、实时数据和可视化&#xff0c;从而使它变得更加智能。 提供从化学到地理、天文学到流行文化、营养到工程学以及算法代码执行的广泛而深入的报道。 安装方式 ChatGPT 用户…

ChatGPT为啥这么强:万字长文详解 by WolframAlpha之父

来源&#xff1a;量子位 本文约7500字&#xff0c;建议阅读10分钟 本文介绍了“ChatGPT是什么”和“为什么它能这么有效”两个问题。 Wolfram语言之父Stephen Wolfram&#xff0c;又来给ChatGPT背书了。 1月&#xff0c;他还专门写过一篇文章&#xff0c;力荐自家的计算知识搜索…