Zookeeper Java 开发,自定义分布式锁示例

文章目录

    • 一、概述
    • 二、导入依赖包
    • 三、创建锁的过程
      • 3.1 通过 create 创建节点信息
      • 3.2 AsyncCallback.StringCallback 回调函数
      • 3.3 AsyncCallback.Children2Callback 的回调函数
      • 3.4 Watcher 的回调函数
    • 四、完整示例
      • 4.1 完整分布式锁代码
      • 4.2 测试类

如果您还没有安装Zookeeper请看ZooKeeper 安装说明,Zookeeper 命令使用方法和数据说明,Zookeeper Java 开发入门。

一、概述

  • 情景:假设有10个客户端(分散的10台主机)要执行一个任务,这个任务某些过程需要保持原子性。那么我们就需要一个分布式锁。

  • 原理:通过在Zookeeper中创建序列节点来实现获得锁,删除节点来释放锁。其实质是一个按先来后到的排序过程,实现过程如下:

    • 客户端发起请求,创建锁序列节点(/lock/xxxxxxxx)

    • 获取所有锁节点,判断自己是否为最小节点

      • 如果自己是最小序列节点,则立即获得锁
      • 否则不能获得锁,但要监控前一个序列节点的状态
    • 获得锁的客户端开始执行任务。

    • 执行完任务后释放锁。

      • 由于后一个节点监控了前一个节点,当前一个节点删除时,后一个客户端会收到回调。

      • 在这个回调中再获取所有锁节点,判断自己是否为最小节点。

      • 以此类推,直到全部结束。

  • 流程如下

在这里插入图片描述

  • 如果您对没有做过 Zookeeper 开发,强烈建立先看 Zookeeper Java 开发入门。

二、导入依赖包

  • 在 pom.xml 文件中导入 Zookeeper 包,注意一般这个包的版本要和您安装的 Zookeeper 服务端版本一致。

    <dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.8.2</version>
    </dependency>
    

三、创建锁的过程

3.1 通过 create 创建节点信息

  • 通过 create 创建序列节点信息。他是异步方式,创建成功后会调用 AsyncCallback.StringCallback.processResult 回调函数。
    public void lock() throws InterruptedException, LockException {zooKeeper.create("/lock", "xxx".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, this, context);countDownLatch.await();if(StringUtils.isEmpty(this.lockNodePath)){throw new LockException("创建锁失败");}System.out.println(this.appName + " 获得锁");}

3.2 AsyncCallback.StringCallback 回调函数

  • 在 AsyncCallback.StringCallback 的回调函数中通过 getChildren 方法获取 ZooKeeper 锁节点下的所有节点信息。这个方法是异步的,调用成功后会调用 AsyncCallback.Children2Callback.processResult 回调函数。
    // AsyncCallback.StringCallback@Overridepublic void processResult(int i, String s, Object o, String s1) {if(StringUtils.isEmpty(s1)){// 这里是创建锁失败的情况。this.countDownLatch.countDown();return;}System.out.println(this.appName + " create lock node="+s1);this.lockNodePath = s1;// 获取 ZooKeeper 锁节点下的所有节点信息,以此来判断我是不是第一个创建节点,如果就获得锁,否则监控前一个节点信息。zooKeeper.getChildren("/", false, this, context);}

3.3 AsyncCallback.Children2Callback 的回调函数

  • 在 AsyncCallback.Children2Callback 的回调函数判断我是不是第一个创建节点,如果就获得锁,否则监控前一个节点信息。监控前一个节点信息使用 exists 方法,这个方法设置了 Watcher 的 processResult 回调函数
    // AsyncCallback.Children2Callback@Overridepublic void processResult(int i, String s, Object o, List<String> list, Stat stat) {Collections.sort(list);//        for (String s1 : list) {
//            System.out.println("\t "+this.lockNodePath+" previous lock node="+s1);
//        }int index = list.indexOf(lockNodePath.substring(1));if(0 == index){// 如果我现在是第一个节点,则获得锁try {zooKeeper.setData("/", this.lockNodePath.getBytes(), -1);} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}this.countDownLatch.countDown();}else {// 我不是第一个节点,监控前一个节点信息(等他删除后我就是可能是第一个了)String watchNodePath = "/" + list.get(index - 1);System.out.println("\t "+this.lockNodePath+" watch node:"+ watchNodePath);zooKeeper.exists(watchNodePath, this, new StatCallback() {@Overridepublic void processResult(int i, String s, Object o, Stat stat) {}}, context);}}

3.4 Watcher 的回调函数

  • 在 Watcher 的回调函数,我们通过判断 watchedEvent.getType() 为 NodeDeleted 类型时,重新获取 ZooKeeper 锁节点下的所有节点信息,这使得消息回到了 “3.3”步,判断谁是第一个节点,然后获得得,完成整个流程。
    // Watcher@Overridepublic void process(WatchedEvent watchedEvent) {switch (watchedEvent.getType()) {case None:break;case NodeCreated:break;case NodeDeleted:zooKeeper.getChildren("/", false, this, context);break;case NodeDataChanged:break;case NodeChildrenChanged:break;case DataWatchRemoved:break;case ChildWatchRemoved:break;case PersistentWatchRemoved:break;}}

四、完整示例

4.1 完整分布式锁代码

package top.yiqifu.study.p131;import org.apache.commons.lang3.StringUtils;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;public class ZookeeperLock implements Watcher, AsyncCallback.StringCallback,AsyncCallback.Children2Callback {private String appName;private ZooKeeper zooKeeper;private Object context;private String lockNodePath;private CountDownLatch countDownLatch = new CountDownLatch(1);public ZookeeperLock(String name, ZooKeeper zk){this.appName = name;this.zooKeeper = zk;this.context = this;}public void lock() throws InterruptedException, LockException {zooKeeper.create("/lock", "xxx".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, this, context);countDownLatch.await();if(StringUtils.isEmpty(this.lockNodePath)){throw new LockException("创建锁失败");}System.out.println(this.appName + " 获得锁");}public void unlock() throws KeeperException, InterruptedException, LockException {if(StringUtils.isEmpty(this.lockNodePath)){throw new LockException("没有获得锁,无法释放锁");}zooKeeper.delete(lockNodePath, -1);System.out.println(this.appName + " 释放锁");}// AsyncCallback.StringCallback@Overridepublic void processResult(int i, String s, Object o, String s1) {if(StringUtils.isEmpty(s1)){// 这里是创建锁失败的情况。this.countDownLatch.countDown();return;}System.out.println(this.appName + " create lock node="+s1);this.lockNodePath = s1;// 获取 ZooKeeper 锁节点下的所有节点信息,以此来判断我是不是第一个创建节点,如果就获得锁,否则监控前一个节点信息。zooKeeper.getChildren("/", false, this, context);}// AsyncCallback.Children2Callback@Overridepublic void processResult(int i, String s, Object o, List<String> list, Stat stat) {Collections.sort(list);//        for (String s1 : list) {
//            System.out.println("\t "+this.lockNodePath+" previous lock node="+s1);
//        }int index = list.indexOf(lockNodePath.substring(1));if(0 == index){// 如果我现在是第一个节点,则获得锁try {zooKeeper.setData("/", this.lockNodePath.getBytes(), -1);} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}this.countDownLatch.countDown();}else {// 我不是第一个节点,监控前一个节点信息(等他删除后我就是可能是第一个了)String watchNodePath = "/" + list.get(index - 1);System.out.println("\t "+this.lockNodePath+" watch node:"+ watchNodePath);zooKeeper.exists(watchNodePath, this, new StatCallback() {@Overridepublic void processResult(int i, String s, Object o, Stat stat) {}}, context);}}// Watcher@Overridepublic void process(WatchedEvent watchedEvent) {switch (watchedEvent.getType()) {case None:break;case NodeCreated:break;case NodeDeleted:zooKeeper.getChildren("/", false, this, context);break;case NodeDataChanged:break;case NodeChildrenChanged:break;case DataWatchRemoved:break;case ChildWatchRemoved:break;case PersistentWatchRemoved:break;}}public class LockException extends  Exception{public LockException(String message){super(message);}}
}

4.2 测试类

package top.yiqifu.study.p131;import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;public class Test06_ZookeeperLock {public static void main(String[] args) {try {// 创建 ZooKeeper 对象final ZooKeeper zooKeeper = testCreateZookeeper();int clientCount = 10;final CountDownLatch countDownLatch = new CountDownLatch(clientCount);for (int i = 0; i < clientCount; i++) {new Thread(new Runnable(){@Overridepublic void run() {TestLock(zooKeeper);countDownLatch.countDown();}}).start();}countDownLatch.await();} catch (IOException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}}private static void  TestLock(ZooKeeper zooKeeper){try {String appName = Thread.currentThread().getName();ZookeeperLock zookeeperLock = new ZookeeperLock(appName, zooKeeper);// 加锁(获得分布式锁)zookeeperLock.lock();System.out.println(appName + " 执行任务");Thread.sleep(1000);// 释放锁zookeeperLock.unlock();} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();} catch (ZookeeperLock.LockException e) {e.printStackTrace();}}private static ZooKeeper testCreateZookeeper() throws IOException, InterruptedException, KeeperException {final CountDownLatch countDownLatch = new CountDownLatch(1);// ZooKeeper 集群地址(没连接池的概念,是Session的概念)//String connectionString = "192.168.8.51:2181,192.168.8.52:2181,192.168.8.53:2181";String connectionString = "192.168.8.51:2181,192.168.8.52:2181,192.168.8.53:2181/aaa";// ZooKeeper Session 数据超时时间(也就是这个Session关闭后,与这个Session相关的数据在ZooKeeper中保存多信)。Integer sessionTimeout = 3000;// ZooKeeper Session 级别 Watcher(Watch只发生在读方法上,如 get、exists)final ZooKeeper zooKeeper = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {try {Event.KeeperState state = watchedEvent.getState();Event.EventType type = watchedEvent.getType();String path = watchedEvent.getPath();switch (state) {case Unknown:break;case Disconnected:break;case NoSyncConnected:break;case SyncConnected:countDownLatch.countDown();break;case AuthFailed:break;case ConnectedReadOnly:break;case SaslAuthenticated:break;case Expired:break;case Closed:break;}switch (type) {case None:break;case NodeCreated:break;case NodeDeleted:break;case NodeDataChanged:break;case NodeChildrenChanged:break;case DataWatchRemoved:break;case ChildWatchRemoved:break;case PersistentWatchRemoved:break;}System.out.println("Session watch state=" + state);System.out.println("Session watch type=" + type);System.out.println("Session watch path=" + path);} catch (Exception e) {e.printStackTrace();}}});countDownLatch.await();ZooKeeper.States state = zooKeeper.getState();switch (state) {case CONNECTING:break;case ASSOCIATING:break;case CONNECTED:break;case CONNECTEDREADONLY:break;case CLOSED:break;case AUTH_FAILED:break;case NOT_CONNECTED:break;}System.out.println("ZooKeeper state=" + state);return zooKeeper;}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/196334.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

让你的Mac体验更便捷,快速启动工具Application Wizard为你助力!

亲爱的Mac用户们&#xff0c;你是否经常感到在繁琐的软件启动过程中浪费了太多时间&#xff1f;你是否希望能够以更快的速度找到并启动你所需的应用程序&#xff1f;如果是的话&#xff0c;那么不要犹豫&#xff0c;让我们来介绍一款强大的软件快速启动工具——Application Wiz…

uniapp app tabbar 页面默认隐藏

1.在page.json 中找到tabbar visible 默认为true,设为false则是不显示 uni.setTabBarItem({ index: 1, //列表索引 visible:true //显示或隐藏 })

12-1- GAN -简单网络-线性网络

功能 随机噪声→生成器→MINIST图像。 训练方法 0 损失函数:gan的优化目标是一个对抗损失,是二分类问题,用BCELoss 1 判别器的训练,首先固定生成器参数不变,其次判别器应当将真实图像判别为1,生成图像判别为0 loss=loss(real_out, 1)+loss(fake_out, 0) 2 生成器的…

开源简历生成器OpenResume

什么是 OpenResume &#xff1f; OpenResume 是一个功能强大的开源简历生成器和简历解析器。OpenResume 的目标是为每个人提供免费的现代专业简历设计&#xff0c;让任何人都能充满信心地申请工作。 OpenResume 有 5 个核心特点&#xff1a; 特征描述1. 实时UI更新当您输入简历…

高德地图系列(一):vue项目如何使用高德地图、入门以及基本控件使用

目录 第一章 前言 第二章 准备工作 2.1 账号注册 2.2 高德地图开发平台文档 2.3 创建应用 第三章 使用地图 3.1 地图使用步骤 3.2 理解几个地图基础控件 3.3 基础类理解 第一章 前言 小编都是在vue项目中使用高德地图的&#xff0c;每一个功能都会亲测可用之后才会…

【机器学习10】循环神经网络

1循环神经网络 RNN通过将神经元串行起来处理序列化的数据。 由于每个神经元能用它的内部变量保存之前输入的序列信息&#xff0c;因此整个序列被浓缩成抽象的表示&#xff0c; 并可以据此进行分类或生成新的序列。 2 循环神经网络的梯度消失或梯度爆炸问题 传统的循环神经网…

WPF xaml Command用法介绍

WPF (Windows Presentation Foundation) 中的命令设计模式是一种用于分离用户界面逻辑和业务逻辑的方法。在WPF中&#xff0c;这种模式通过命令接口&#xff08;如 ICommand&#xff09;实现&#xff0c;使得用户界面组件&#xff08;如按钮、菜单项等&#xff09;可以触发不直…

k8s ingress高级用法一

前面的文章中&#xff0c;我们讲述了ingress的基础应用&#xff0c;接下来继续讲解ingress的一些高级用法 一、ingress限流 在实际的生产环境中&#xff0c;有时间我们需要对服务进行限流&#xff0c;避免单位时间内访问次数过多&#xff0c;常用的一些限流的参数如下&#x…

深度学习(五)softmax 回归之:分类算法介绍,如何加载 Fashion-MINIST 数据集

Softmax 回归 基本原理 回归和分类&#xff0c;是两种深度学习常用方法。回归是对连续的预测&#xff08;比如我预测根据过去开奖列表下次双色球号&#xff09;&#xff0c;分类是预测离散的类别&#xff08;手写语音识别&#xff0c;图片识别&#xff09;。 现在我们已经对回…

JAVAEE 初阶 多线程基础(一)

多线程基础 一.线程的概念二.为什么要有线程三.进程和线程的区别和关系四.JAVA的线程和操作系统线程的关系五.第一个多线程程序1.继承Thread类 一.线程的概念 一个线程就是一个 “执行流”. 每个线程之间都可以按照顺讯执行自己的代码. 多个线程之间 “同时” 执行着多份代码 同…

CV计算机视觉每日开源代码Paper with code速览-2023.11.14

点击CV计算机视觉&#xff0c;关注更多CV干货 论文已打包&#xff0c;点击进入—>下载界面 点击加入—>CV计算机视觉交流群 1.【基础网络架构&#xff1a;Transformer】Aggregate, Decompose, and Fine-Tune: A Simple Yet Effective Factor-Tuning Method for Vision…

流媒体协议

◆ RTP(Real-time Transport Protocol)&#xff0c;实时传输协议。 ◆ RTCP(Real-time Transport Control Protocol)&#xff0c;实时传输控制协议。 ◆ RTSP(Real Time Streaming Protocol)&#xff0c;实时流协议。 ◆ RTMP(Real Time Messaging Protocol)&#xff0c;实时…

【Proteus仿真】【Arduino单片机】LM35温度计

文章目录 一、功能简介二、软件设计三、实验现象联系作者 一、功能简介 本项目使用Proteus8仿真Arduino单片机控制器&#xff0c;使用PCF8574、LCD1602液晶、LM35传感器等。 主要功能&#xff1a; 系统运行后&#xff0c;LCD1602显示传感器检测温度。 二、软件设计 /* 作者&a…

单片机的冷启动、热启动、复位

一文看懂STC单片机冷启动和复位有什么区别-电子发烧友网 单片机的冷启动、热启动和复位是不同的启动或重置方式&#xff0c;它们在系统状态和初始化方面有所不同&#xff1a; 1.冷启动&#xff08;Cold Start&#xff09;&#xff1a; 定义&#xff1a; 冷启动是指系统从完全关…

【火炬之光-魔灵装备】

文章目录 装备天赋追忆石板技能魂烛刷图策略 装备 头部胸甲手套鞋子武器盾牌项链戒指腰带神格备注盾牌其余的装备要么是召唤物生命&#xff0c;要么是技能等级&#xff0c;鞋子的闪电技能等级加2不是核心&#xff0c;腰带的话主要是要冷却有冷却暗影的技能是不会断的&#xff…

揭示CDN加速的局限性与探讨其小众化原因

在网络加速领域&#xff0c;CDN&#xff08;内容分发网络&#xff09;被认为是提升性能的关键技术之一。然而&#xff0c;尽管其在某些方面表现出色&#xff0c;CDN在广泛应用中仍然相对小众。本文将从CDN加速的局限性出发&#xff0c;深入探讨为何这项技术尚未迎来大规模的应用…

.NET 8.0 中有哪些新的变化?

1性能提升 .NET 8在整个堆栈中带来了数千项性能改进 。默认情况下会启用一种名为动态配置文件引导优化 (PGO) 的新代码生成器&#xff0c;它可以根据实际使用情况优化代码&#xff0c;并且可以将应用程序的性能提高高达 20%。现在支持的 AVX-512 指令集能够对 512 位数据向量执…

计算机毕业设计选题推荐-掌心办公微信小程序/安卓APP-项目实战

✨作者主页&#xff1a;IT毕设梦工厂✨ 个人简介&#xff1a;曾从事计算机专业培训教学&#xff0c;擅长Java、Python、微信小程序、Golang、安卓Android等项目实战。接项目定制开发、代码讲解、答辩教学、文档编写、降重等。 ☑文末获取源码☑ 精彩专栏推荐⬇⬇⬇ Java项目 Py…

竞赛选题 疫情数据分析与3D可视化 - python 大数据

文章目录 0 前言1 课题背景2 实现效果3 设计原理4 部分代码5 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; 大数据全国疫情数据分析与3D可视化 该项目较为新颖&#xff0c;适合作为竞赛课题方向&#xff0c;学长非常推荐&#xff0…

websocket详解

一、什么是Websocket WebSocket 是一种在单个 TCP 连接上进行 全双工 通信的协议&#xff0c;它可以让客户端和服务器之间进行实时的双向通信。 WebSocket 使用一个长连接&#xff0c;在客户端和服务器之间保持持久的连接&#xff0c;从而可以实时地发送和接收数据。 在 Web…