zookeeper案例

目录

案例一:服务器动态上下线

服务端:

(1)先获取zookeeper连接

(2)注册服务器到zookeeper集群:

(3)业务逻辑(睡眠):

服务端代码如下:

客户端:

(1)获取zookeeper的连接:

(2)监听/servers下边的子节点的增减:

客户端代码如下:

案例二:ZooKeeper 分布式锁

分布式锁是什么?

锁的实现:

构造函数:

加锁函数:

解锁函数:

整体代码:

测试类代码 :

Curator 框架实现分布式锁案例:

实现步骤:

代码如下:


该案例主要也是客户端监听原理,客户端监听服务器的上下线情况

先在集群上创建/servers 节点(用于存储连接的服务器的主机和该服务器的节点数)相当于zookeeper集群

案例一:服务器动态上下线

服务端:

(1)先获取zookeeper连接

        创建类对象

该类为我们创建的服务端类:

        DistributeServer server = new DistributeServer();

        获取zookeeper连接:

自己创建连接方法:

    private void getconnect() throws IOException {zk = new ZooKeeper(connectstring, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {}});}

 让后server对象在main函数中调用

(2)注册服务器到zookeeper集群:

注册是需要注册到zookeeper集群的/servers路径下,需要指定参数进行创建

private void regestServer(String hostname) throws InterruptedException, KeeperException {
zk.create(parentNode+"/"+hostname,hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);//  需要创建有序的临时节点所以-e(暂时) -s(有序)System.out.println("服务器"+hostname+"已注册连接");}

(3)业务逻辑(睡眠):

    private void business() throws InterruptedException {Thread.sleep(Long.MAX_VALUE);}

服务端代码如下:

package com.tangxiaocong.case1;
import org.apache.zookeeper.*;
import java.io.IOException;
/*** @Date 2023/8/10 19:06* @Author */
public class DistributeServer {private static String connectstring="hadoop102:2181,hadoop103:2181,hadoop104:2181";private static int sessionTimeout=2000;private ZooKeeper zk =null;private String parentNode = "/servers";public static void main(String[] args) throws IOException, InterruptedException, KeeperException {//获取zk连接//创建DistributeServer server = new DistributeServer();server.getconnect();//注册服务器到zk集群//注册是需要在/servers节点下创建所开启的服务器的路径server.regestServer(args[0]);//业务逻辑(实际是延时让它睡觉---不然会注册完成就关闭)server.business();}private void business() throws InterruptedException {Thread.sleep(Long.MAX_VALUE);}private void regestServer(String hostname) throws InterruptedException, KeeperException {
zk.create(parentNode+"/"+hostname,hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);//  需要创建有序的临时节点所以-e(暂时) -s(有序)System.out.println("服务器"+hostname+"已注册连接");}private void getconnect() throws IOException {zk = new ZooKeeper(connectstring, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {}});}
}

客户端:

(1)获取zookeeper的连接:

        先创建客户端对象,在进行构建获取zookeeper连接的方法,本方法对process方法进行了重写,填写了再发生上下线的运行逻辑

 private void getConnect() throws IOException {zk= new ZooKeeper(connectString, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {try {getServerList();} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}}});}

(2)监听/servers下边的子节点的增减:

        构建方法client.getServerList()来进行监听:

代码逻辑就是通过getChildren()方法获取指定目录下的所有子目录并开启监听

再进行遍历,把遍历结果封装到一个集合中,最后进行输出

 private void getServerList() throws InterruptedException, KeeperException {List<String> children = zk.getChildren("/servers", true);//该方法会获取指定路径下的所有子节点//true 会走初始化中的watch 也可以自己创建watch//把所有的服务器都封装到一个集合ArrayList<String> list = new ArrayList<>();for (String child : children) {byte[] data = zk.getData("/servers" +"/"+ child, false, null);//上边已经便利到一个服务器对象,再进行添加list.add(new String(data));}System.out.println(list);}

(3)业务逻辑同服务端不在赘述。

客户端代码如下:

package com.tangxiaocong.case1;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/*** @Date 2023/8/10 21:27* @Author * 客户端的监听功能*/
public class DistributeClient {
private String connectString="hadoop102:2181,hadoop103:2181,hadoop104:2181";private int sessionTimeout=2000;private ZooKeeper zk=null;public static void main(String[] args) throws IOException, InterruptedException, KeeperException {//获取zk连接DistributeClient client = new DistributeClient();client.getConnect();//监听/servers下边的子节点的增减client.getServerList();//业务逻辑(睡眠)client.business();}private void business() throws InterruptedException {Thread.sleep(Long.MAX_VALUE);}private void getServerList() throws InterruptedException, KeeperException {List<String> children = zk.getChildren("/servers", true);//该方法会获取指定路径下的所有子节点//true 会走初始化中的watch 也可以自己创建watch//把所有的服务器都封装到一个集合ArrayList<String> list = new ArrayList<>();for (String child : children) {byte[] data = zk.getData("/servers" +"/"+ child, false, null);//上边已经便利到一个服务器对象,再进行添加list.add(new String(data));}System.out.println(list);}private void getConnect() throws IOException {zk= new ZooKeeper(connectString, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {try {getServerList();} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}}});}
}

案例二:ZooKeeper 分布式锁

分布式锁是什么?

日常使用计算机的时候,我们的电脑不会只开一个进程,但是当“进程1”在访问某些资源的时候,不能被其他进程所访问,它就会去获得锁,把她所访问的资源进行锁上,对该资源进行独占。"进程 1"用完该资源以后就将锁释放掉,让其 他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的 访问该临界资源。那么我们把这个分布式环境下的这个锁叫作分布式锁。

锁的实现:

构造函数:

在该类中首先要实现构造方法,构造方法与类名相同,在该方法中需要获取连接,重写process方法,在该方法中实现释放CountDownLatch的类对象,有两种情况,正常连接释放一种,不是正常连接状态,则释放另一种。在构造方法中还要判断是否存在“/locks”路径,存在则正常退出,不存在则创建该路径。

加锁函数:

使用ZooKeeper对象进行创建节点(临时有序),让后获取“/locks”路径下的所有节点序号,对结果进行判断,如果返回的List集合只有一个节点,则直接返回,默认加锁,不用再做监听工作。如果不是只有一个节点,则对List集合进行排序,再获取他的节点名称,通过indexOf函数来获取该名称节点的下标。如果为-1,则数据异常,为0 则为最小节点,则直接退出,进行加锁不需要设置监听,结果为其他则需要设置监听,先设置监听字符串,当状态不发生改变会一致阻塞,只有上锁节点让位后会调用process方法进行释放。

解锁函数:

解锁就是直接删除节点即可

整体代码:

package com.tangxiaocong.case2;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/*** @Date 2023/8/12 19:56* @Author */
public class DistributedLock {final    private String connectString="hadoop102:2181,hadoop103:2181,hadoop104:2181";final  private int sessionTimeout=2000;final    private   ZooKeeper zk;private String waitPath;private String currentModu;//为了程序的健壮性,创建该对象   等待操作final   private CountDownLatch waitLach=new CountDownLatch(1);final   private CountDownLatch countDownLatch=new CountDownLatch(1);public DistributedLock() throws IOException, InterruptedException, KeeperException {//获取连接zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {//  connectLatch  如果正常连接zk  可以释放if (watchedEvent.getState()==Event.KeeperState.SyncConnected){countDownLatch.countDown();}//检测到删除节点并且是前一个节点则释放waitlatchif (watchedEvent.getType()==Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)){waitLach.countDown();}}});//等待是否正常连接  正常(已)连接会释放  否则阻塞countDownLatch.await();// 判断是否存在lock锁Stat stat = zk.exists("/locks", false);if (stat==null){//创建该节点String s = zk.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.CONTAINER);}}//对zk加锁public void zkLock()  {//创建临时的带序号的节点try {currentModu = zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);List<String> children = zk.getChildren("/locks", false);//如果只有一个节点   则直接获取if(children.size()==1){return;}else {//排序Collections.sort(children);//直接从s后边开始   开始的下标就是length的长度String substring = currentModu.substring("/locks/".length());//通过substring来获取在List集合中的下标位置int index = children.indexOf(substring);if (index==-1){System.out.println("数据异常");}else if (index==0){return;}else {//  需要监听上一个节点waitPath="/locks/"+children.get(index-1);zk.getData(waitPath,true,new Stat());//等待监听waitLach.await();return;}}} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}//判断创建的节点是否是最小序号的节点 如果是则获取锁  不是则监听他的前一个节点}//对zk解锁public void unzkLock(){
//删除节点try {//-1  是版本号zk.delete(this.currentModu,-1);} catch (InterruptedException  | KeeperException e) {e.printStackTrace();}}
}

测试类代码 :

package com.tangxiaocong.case2;
import org.apache.zookeeper.KeeperException;
import java.io.IOException;
/*** @Date 2023/8/12 22:31* @Author 唐晓聪*/
public class DistributedLockTest
{public static void main(String[] args) throws IOException, InterruptedException, KeeperException {//创建两个客户端对象final    DistributedLock lock1 = new DistributedLock();final   DistributedLock lock2 = new DistributedLock();new Thread(new Runnable() {@Overridepublic void run() {try {  lock1.zkLock();System.out.println("线程1启动获得锁");Thread.sleep(5*1000);lock1.unzkLock();System.out.println("线程1释放锁");} catch (Exception e) {e.printStackTrace();}}}).start();new Thread(new Runnable() {@Overridepublic void run() {try {lock2.zkLock();System.out.println("线程2启动获得锁");Thread.sleep(5*1000);lock2.unzkLock();System.out.println("线程2释放锁");} catch (Exception e) {e.printStackTrace();}}}).start();}
}

Curator 框架实现分布式锁案例:

该案例是直接使用API进行实现分布式锁

实现步骤:

创建分布式锁对象,new InterProcessMutex(),参数1为所要连接的客户端,参数2为监听路径

参数1传入的为getCuratorFramework()自定义函数,

该函数通过工厂类的方式进行建立连接,返回创建好的客户端,让后start启动客户端

创建完分布式锁对象后创建两个线程,在线程中进行获得锁,释放锁的操作。

代码如下:

package com.tangxiaocong.case3;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
/*** @Date 2023/8/13 20:07* @Author */
public class CuratorLockTest {public static void main(String[] args) {//创建分布式锁1//参数1   所连接的客户端 参数2 监听路径InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");//创建分布式锁2InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");//创建线程new Thread(new Runnable() {@Overridepublic void run() {try {lock1.acquire();System.out.println("thread 1 acquire lock");lock1.acquire();System.out.println("thread 1 again acquire lock");Thread.sleep(5*1000);lock1.release();System.out.println("thread 1 relax lock");lock1.release();System.out.println("thread 1 again relax lock");System.out.println();} catch (Exception e) {e.printStackTrace();}}}).start();new Thread(new Runnable() {@Overridepublic void run() {try {lock2.acquire();System.out.println("thread 2 acquire lock");lock2.acquire();System.out.println("thread 2 again acquire lock");Thread.sleep(5*1000);lock2.release();System.out.println("thread 2 relax lock");lock2.release();System.out.println("thread 2 again relax lock");} catch (Exception e) {e.printStackTrace();}}}).start();}private static CuratorFramework getCuratorFramework() {ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);//通过工厂类的方式进行建立连接CuratorFramework client = CuratorFrameworkFactory.builder().connectString("hadoop102:2181,hadoop102:2181,hadoop104:2181").connectionTimeoutMs(2000).sessionTimeoutMs(2000).retryPolicy(policy)//连接失败后  间隔多少秒下次间隔.build();client.start();System.out.println("zookeeper  success start  !!!!!");return client;}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/88256.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

RHEL 7配置HAProxy实现Web负载均衡

一、测试环境HAProxy&#xff1a; 主机名&#xff1a;RH7-HAProxy IP地址&#xff1a;192.168.10.20 操作系统&#xff1a;Red Hat Enterprise Linux Server release 7.2 (Maipo)最小化安装 防火墙与SELinux&#xff1a;关闭 安装的服务&#xff1a;HAProxy-1.5.14 WEB01: 主…

Javascript异步编程的4种方法

你可能知道&#xff0c;Javascript语言的执行环境是"单线程"&#xff08;single thread&#xff09;。 所谓"单线程"&#xff0c;就是指一次只能完成一件任务。如果有多个任务&#xff0c;就必须排队&#xff0c;前面一个任务完成&#xff0c;再执行后面一…

msvcp110.dll丢失怎样修复,msvcp110.dll丢失修复方法

msvcp110.dll是Microsoft C库的一部分&#xff0c;它是运行依赖于该库的程序所必需的动态链接库文件。它的作用是提供C运行时库函数的实现&#xff0c;这些函数用于处理程序的内存管理、异常处理、多线程支持等。当系统中缺少或损坏了msvcp110.dll文件时&#xff0c;请及时修复…

Python-OpenCV中的图像处理-图像梯度

Python-OpenCV中的图像处理-图像梯度 图像梯度Sobel 算子和 Scharr 算子Laplacian 算子 图像梯度 图像梯度&#xff0c;图像边界等使用到的函数有&#xff1a; cv2.Sobel()&#xff0c; cv2.Scharr()&#xff0c; cv2.Laplacian() 等原理&#xff1a;梯度简单来说就是求导。Op…

dialogbot:开箱即用的对话机器人解决方案,涵盖问答型对话、任务型对话和聊天型对话等多种场景,为您提供全方位的对话交互体验。

dialogbot&#xff1a;开箱即用的对话机器人解决方案&#xff0c;涵盖问答型对话、任务型对话和聊天型对话等多种场景&#xff0c;支持网络检索问答、领域知识问答、任务引导问答和闲聊问答&#xff0c;为您提供全方位的对话交互体验。 人机对话系统一直是AI的重要方向&#xf…

Mongodb 常用操作

// 查询 user_id 是否存在 db.getCollection("t_mongo_user").find({"user_id" : { $exists: true }}) // 查询 user_id 10 的记录 db.getCollection("t_mongo_user").find({"user_id" : 10}) // 排序 -1&#xff0c;按照 _id 倒…

python之matplotlib入门初体验:使用Matplotlib进行简单的图形绘制

目录 绘制简单的折线图1.1 修改标签文字和线条粗细1.2 校正图形1.3 使用内置样式1.4 使用scatter()绘制散点图并设置样式1.5 使用scatter()绘制一系列点1.6 python循环自动计算数据1.7 自定义颜色1.8 使用颜色映射1.9 自动保存图表练习题 绘制简单的折线图 绘制一个简单折线图…

每期一个小窍门: 玩转go mod 命令

看完本期小窍门 你将学会 go下载/更新 包的命令如何挎包调用/路径名称约定init()函数的作用和一些细节 本文涉及到的目录结构如下 关于 go.mod go.sum 这个demo依赖 github.com/bytedance/sonic 可以使用下面两个命令来确保依赖正常加载 go get github.com/bytedance/son…

2023年上半年数学建模竞赛题目汇总与难度分析

2023年上半年数学建模竞赛题目汇总与难度分析 ​由于近年来国赛ABC题出题方式漂浮不定&#xff0c;没有太大的定性&#xff0c;目前总体的命题方向为&#xff0c;由之前的单一模型问题变为数据分析评价优化或者预测类题目是B、C题的主要命题方向。为了更好地把握今年命题的主方…

【源码分析】XXL-JOB的执行器的注册流程

目的&#xff1a;分析xxl-job执行器的注册过程 流程&#xff1a; 获取执行器中所有被注解(xxlJjob)修饰的handler执行器注册过程执行器中任务执行过程 版本&#xff1a;xxl-job 2.3.1 建议&#xff1a;下载xxl-job源码&#xff0c;按流程图debug调试&#xff0c;看堆栈信息…

gcc/g++ 编译选项详解

gcc/g 编译选项详解 文章目录 gcc/g 编译选项详解编译步骤gcc 与 g 区别gcc 命令的常用选项编译优化选项-O 优化-O1优化-O2-O0-Os-Ofast-Og-Oz-O 选项控制特定的优化 WarningsReference>>>>> 欢迎关注公众号【三戒纪元】 <<<<< 编译步骤 gcc 、…

Shell编程之正则表达式(非常详细)

正则表达式 1.通配符和正则表达式的区别2.基本正则表达式2.1 元字符 &#xff08;字符匹配)2.2 表示匹配次数2.4 位置锚定2.5 分组 和 或者 3.扩展正则表达式4.部分文本处理工具4.1 tr 命令4.2 cut命令4.3 sort命令4.4 uniq命令 1.通配符和正则表达式的区别 通配符一般用于文件…

Ansible从入门到精通【六】

大家好&#xff0c;我是早九晚十二&#xff0c;目前是做运维相关的工作。写博客是为了积累&#xff0c;希望大家一起进步&#xff01; 我的主页&#xff1a;早九晚十二 专栏名称&#xff1a;Ansible从入门到精通 立志成为ansible大佬 ansible templates 模板&#xff08;templa…

闭环控制方法及其应用:优缺点、场景和未来发展

闭环控制是一种基本的控制方法&#xff0c;它通过对系统输出与期望值之间的误差进行反馈&#xff0c;从而调整系统输入&#xff0c;使系统输出更加接近期望值。闭环控制的主要目标是提高系统的稳定性、精确性和鲁棒性。在实际应用中&#xff0c;闭环控制有多种方法&#xff0c;…

释放AI创作潜能:从大模型训练到高产力应用

文章目录 每日一句正能量前言什么是人工智能生成内容&#xff08;AIGC&#xff09;人工智能生成内容&#xff08;AIGC&#xff09;能做什么为什么要用人工智能生成内容&#xff08;AIGC&#xff09;创作成果用Java实现冒泡排序算法学生信息收集系统学生请假管理系统需求分析教务…

苹果电脑图像元数据编辑器:MetaImage for Mac

MetaImage for Mac是一款功能强大的照片元数据编辑器&#xff0c;它可以帮助用户编辑并管理照片的元数据信息&#xff0c;包括基本信息和扩展信息。用户可以根据需要进行批量处理&#xff0c;方便快捷地管理大量照片。 MetaImage for Mac还提供了多种导入和导出格式&#xff0…

东南大学齿轮箱故障诊断(Python代码,MSCNN结合LSTM结合注意力机制模型,代码有注释)

运行代码要求&#xff1a; 代码运行环境要求&#xff1a;Keras版本>2.4.0&#xff0c;python版本>3.6.0 1.东南大学采集数据平台&#xff1a; 数据 该数据集包含2个子数据集&#xff0c;包括轴承数据和齿轮数据&#xff0c;这两个子数据集都是在传动系动力学模拟器&am…

基于Matlab实现心电信号小波特征提取和对应疾病识别仿真(附上源码+数据集)

本文基于Matlab平台&#xff0c;研究了心电信号的小波特征提取方法&#xff0c;并应用于心电信号疾病识别仿真实验中。首先&#xff0c;介绍了心电信号的基本特征和常见的心电疾病。然后&#xff0c;详细阐述了小波变换的原理和方法&#xff0c;并提出了一种基于小波分解和小波…

运维监控学习笔记3

DELL的IPMI页面的登录&#xff1a; 风扇的状态&#xff1a; 电源温度&#xff1a;超过70度就告警&#xff1a; 日志信息&#xff1a; 可以看到更换过磁盘。 iDRAC的设置 虚拟控制台&#xff1a;启动远程控制台&#xff1a; 可以进行远程控制。 机房工程师帮我们接远程控制&…

如何让ES低成本、高性能?滴滴落地ZSTD压缩算法的实践分享

前文分别介绍了滴滴自研的ES强一致性多活是如何实现的、以及如何提升ES的性能潜力。由于滴滴ES日志场景每天写入量在5PB-10PB量级&#xff0c;写入压力和业务成本压力大&#xff0c;为了提升ES的写入性能&#xff0c;我们让ES支持ZSTD压缩算法&#xff0c;本篇文章详细展开滴滴…