大数据Zookeeper--案例

文章目录

  • 服务器动态上下线监听案例
    • 需求
    • 需求分析
    • 具体实现
    • 测试
  • Zookeeper分布式锁案例
    • 原生Zookeeper实现分布式锁
    • Curator框架实现分布式锁
  • Zookeeper面试重点
    • 选举机制
    • 生产集群安装多少zk合适
    • zk常用命令

服务器动态上下线监听案例

需求

某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知
到主节点服务器的上下线。

需求分析

服务器动态上下线

具体实现

1)先在集群上创建/servers节点

[zk: localhost:2181(CONNECTED) 10] create /servers "servers" 
Created /servers

2)在Idea中创建包名:com.yudan.case1

3)服务器端向Zookeeper注册代码

import org.apache.zookeeper.*;import java.io.IOException;public class DistributeServer {private String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";private int sessionTime = 100000;private ZooKeeper zk;public static void main(String[] args) throws IOException, InterruptedException, KeeperException {DistributeServer server = new DistributeServer();// 1、获取zk连接server.getConnect();// 2、注册服务器到zk集群server.regist(args[0]);// 3、启动 业务逻辑(睡觉)server.business();}// 创建到 zk 的客户端连接private void getConnect() throws IOException {zk = new ZooKeeper(connectString, sessionTime, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {}});}// 注册到服务器private void regist(String hostname) throws InterruptedException, KeeperException {String create = zk.create("/servers/"+hostname, hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);System.out.println(hostname + " " + "is online");}// 业务功能private void business() throws InterruptedException {Thread.sleep(Long.MAX_VALUE);}
}

4)客户端代码

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;public class DistributeClient {private String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";private int sessionTime = 100000;private ZooKeeper zk;public static void main(String[] args) throws IOException, InterruptedException, KeeperException {DistributeClient client = new DistributeClient();// 1、获取zk连接client.getConnect();// 2、监听/servers下面子节点的增加和删除client.getServersList();// 3、业务逻辑(睡觉)client.business();}// 创建到 zk 的客户端连接private void getConnect() throws IOException {zk = new ZooKeeper(connectString, sessionTime, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {// 再次启动监听try {getServersList();} catch (InterruptedException e) {throw new RuntimeException(e);} catch (KeeperException e) {throw new RuntimeException(e);}}});}// 获取服务器列表信息private void getServersList() throws InterruptedException, KeeperException {// 获取服务器子节点信息,并且对父节点进行监听List<String> children = zk.getChildren("/servers", true);// 存储服务器信息列表 ArrayList<String> servers = new ArrayList<>();// 遍历所有节点,获取节点中的主机名称信息 for (String child : children) {byte[] data = zk.getData("/servers/" + child, false, null);servers.add(new String(data));}// 打印System.out.println(servers);}private void business() throws InterruptedException {Thread.sleep(Long.MAX_VALUE);}
}

测试

1)在Linux命令行上操作增加减少服务器

(1)启动DistributeClient 客户端

(2)在hadoop102上zk的客户端/servers目录上创建临时带序号节点

[zk: localhost:2181(CONNECTED) 1]  create -e -s /servers/hadoop102 "hadoop102" 
[zk: localhost:2181(CONNECTED) 2]  create -e -s /servers/hadoop103 "hadoop103"

(3)观察Idea控制台变化

[hadoop102, hadoop103]

(4)执行删除操作

[zk: localhost:2181(CONNECTED) 8]  delete /servers/hadoop1020000000000 

(5)观察Idea控制台变化

[hadoop103] 

2)在Idea上操作增加减少服务器

(1)启动DistributeClient 客户端(如果已经启动过,不需要重启)

(2)启动DistributeServer 服务

  • 点击Edit Configurations…
    在这里插入图片描述
  • 在弹出的窗口中(Program arguments)输入想启动的主机,例如,hadoop102
    在这里插入图片描述
  • 回到DistributeServer的main方法,右键,在弹出的窗口中点击Run “DistributeServer.main()”
  • 观察DistributeServer控制台,提示hadoop102 is online
  • 观察DistributeClient控制台,提示hadoop102已经上线

Zookeeper分布式锁案例

什么叫做分布式锁呢?

比如说"进程1"在使用该资源的时候,会先去获得锁,"进程1"获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源,"进程1"用完该资源以后就将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的访问该临界资源。那么我们把这个分布式环境下的这个锁叫作分布式锁。
分布式锁

原生Zookeeper实现分布式锁

1)分布式锁实现

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;public class DistributeLock {private final String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";private final int sessionTime = 100000;private final ZooKeeper zk;// 当前client等待的子节点private String waitPath;// zookeeper节点等待private CountDownLatch waitLatch = new CountDownLatch(1);// zookeeper连接private CountDownLatch connectLatch = new CountDownLatch(1);// 当前client创建的子节点private String currentMode;// 和 zk 服务建立连接,并创建根节点public DistributeLock() throws IOException, InterruptedException, KeeperException {// 1、获取连接zk = new ZooKeeper(connectString, sessionTime, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {// connectLatch 如果连接上zk 可以释放// 连接建立时, 打开latch, 唤醒wait在该latch上的线程if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {connectLatch.countDown();}// waitLatch 需要释放// 发生了waitPath的删除事件if (watchedEvent.getType() == Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)) {waitLatch.countDown();}}});// 等待 zookeeper正常连接后,往下走程序connectLatch.await();// 2、判断根节点/locks是否存在Stat stat = zk.exists("/locks", false);if (stat == null) {// 创建一下根节点zk.create("/locks","locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);}}// 对zk加锁public void zkLock() {// 创建对应的临时带序号节点try {currentMode = zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);// 判断创建的节点是否是最小的序号节点,如果是获取到锁;如果不是, 监听他序号的前一个节点List<String> children = zk.getChildren("/locks", false);// 如果children 只有一个值,那就直接获取锁;如果有多个节点,需要判断,哪个节点最小if (children.size() == 1) {return;} else {// 对children集合内的节点进行排序Collections.sort(children);// 获取节点名称 seq-String thisNode = currentMode.substring("/locks/".length());// 通过seq- 获取到该节点在children集合中的位置int index = children.indexOf(thisNode);// 判断if (index == -1) {System.out.println("数据异常");} else if (index == 0) {// 就一个节点,可以获取锁了return;}else {// 需要监听前一个节点waitPath = "/locks/" + children.get(index-1);zk.getData(waitPath,true,null);// 等待监听waitLatch.await();return;}}} catch (KeeperException e) {throw new RuntimeException(e);} catch (InterruptedException e) {throw new RuntimeException(e);}}// 对zk解锁public void unzkLock() {// 删除节点try {zk.delete(currentMode,-1);} catch (InterruptedException e) {throw new RuntimeException(e);} catch (KeeperException e) {throw new RuntimeException(e);}}
}

2)分布式锁测试

(1)创建两个线程

import org.apache.zookeeper.KeeperException;import java.io.IOException;public class DistributeLockTest {public static void main(String[] args) throws IOException, InterruptedException, KeeperException {// 创建分布式锁1final DistributeLock lock1 = new DistributeLock();// 创建分布式锁2final DistributeLock lock2 = new DistributeLock();new Thread(new Runnable() {@Overridepublic void run() {// 获取锁对象try {lock1.zkLock();System.out.println("线程1 启动,获取到锁");Thread.sleep(5 * 1000);lock1.unzkLock();System.out.println("线程1 释放锁");} catch (InterruptedException e) {throw new RuntimeException(e);}}}).start();new Thread(new Runnable() {@Overridepublic void run() {// 获取锁对象try {lock2.zkLock();System.out.println("线程2 启动,获取到锁");Thread.sleep(5 * 1000);lock2.unzkLock();System.out.println("线程2 释放锁");} catch (InterruptedException e) {throw new RuntimeException(e);}}}).start();}
}

(2)观察控制台变化

线程1获取锁 
线程1释放锁 
线程2获取锁 
线程2释放锁

Curator框架实现分布式锁

1)原生的Java API开发存在的问题

(1)会话连接是异步的,需要自己去处理。比如使用CountDownLatch

(2)Watch需要重复注册,不然就不能生效

(3)开发的复杂性还是比较高的

(4)不支持多节点删除和创建。需要自己去递归

2)Curator是一个专门解决分布式锁的框架,解决了原生Java API开发分布式遇到的问题。

详情请查看官方文档:https://curator.apache.org/index.html

3)Curator 案例实操

(1)添加依赖

<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.3.0</version> 
</dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.3.0</version> 
</dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-client</artifactId> <version>4.3.0</version> 
</dependency> 

(2)代码实现

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;public class CuratorLockTest {public static void main(String[] args) {// 创建分布式锁1InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");// 创建分布式锁2InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");new Thread(new Runnable() {@Overridepublic void run() {try {lock1.acquire();System.out.println("线程1 获取到锁");lock1.acquire();System.out.println("线程1 再次获取到锁");Thread.sleep(5 * 1000);lock1.release();System.out.println("线程1 释放锁");lock1.release();System.out.println("线程1 再次释放锁");} catch (Exception e) {throw new RuntimeException(e);}}}).start();new Thread(new Runnable() {@Overridepublic void run() {try {lock2.acquire();System.out.println("线程2 获取到锁");lock2.acquire();System.out.println("线程2 再次获取到锁");Thread.sleep(5 * 1000);lock2.release();System.out.println("线程2 释放锁");lock2.release();System.out.println("线程2 再次释放锁");} catch (Exception e) {throw new RuntimeException(e);}}}).start();}// 分布式锁初始化private static CuratorFramework getCuratorFramework() {// 重试策略,初始时间3秒,重试3次ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);CuratorFramework client = CuratorFrameworkFactory.builder().connectString("hadoop102:2181,hadoop103:2181,hadoop104:2181").connectionTimeoutMs(100000).sessionTimeoutMs(100000).retryPolicy(policy).build();// 启动客户端client.start();System.out.println("zookeeper 启动成功!");return client;}
}

(2)观察控制台变化:

线程1获取锁 
线程1再次获取锁 
线程1释放锁 
线程1再次释放锁 
线程2获取锁 
线程2再次获取锁 
线程2释放锁 
线程2再次释放锁

Zookeeper面试重点

选举机制

半数机制,超过半数的投票通过,即通过。

(1)第一次启动选举规则:

投票过半数时,服务器id大的胜出

(2)第二次启动选举规则:

①EPOCH大的直接胜出

②EPOCH相同,事务id大的胜出

③事务id相同,服务器id大的胜出

生产集群安装多少zk合适

安装奇数台。

生产经验:

  • 10台服务器:3台zk;
  • 20台服务器:5台zk;
  • 100台服务器:11台zk;
  • 200台服务器:11台zk

zk常用命令

ls、get、create、delete

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/252202.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux【docker 设置阿里源】

文章目录 一、查看本地docker的镜像配置二、配置阿里镜像三、检查配置 一、查看本地docker的镜像配置 docker info一般没有配置过是不会出现Registry字段的 二、配置阿里镜像 直接执行下面代码即可&#xff0c;安装1.10.0以上版本的Docker客户端都会有/etc/docker 1.建立配置…

docker部署笔记系统flatnotes

效果 安装 创建目录 mkdir -p /opt/flatnotes/data && cd /opt/flatnotes/ chmod -R 777 /opt/flatnotes/ 创建并启动容器(可以自己修改账户和密码) docker run -d \ --restart unless-stopped \ --name flatnotes \ -p "10040:8080" \ -v "/dat…

【高质量精品】2024美赛B题22页word版高质量半成品论文+多版保奖思路+数据+前四问思路代码等(后续会更新)

一定要点击文末的卡片&#xff0c;进入后&#xff0c;获取完整论文&#xff01;&#xff01; B 题整体模型构建 1. 潜水器动力系统失效&#xff1a;模型需要考虑潜水器在无推进力情况下的行为。 2. 失去与主船通信&#xff1a;考虑无法从主船接收指令或发送位置信息的情况。…

爱上算法:每日算法(24-2月4号)

&#x1f31f;坚持每日刷算法&#xff0c;&#x1f603;将其变为习惯&#x1f91b;让我们一起坚持吧&#x1f4aa; 文章目录 [232. 用栈实现队列](https://leetcode.cn/problems/implement-queue-using-stacks/)思路CodeJavaC 复杂度 [225. 用队列实现栈](https://leetcode.cn/…

【人工智能】文本嵌入:向量存储与数据查询的智慧交织(12)

在当今信息激增的时代&#xff0c;将中文存储到向量数据库&#xff08;如Redis等&#xff09;并实现向量检索&#xff0c;正成为解决日常应用中文信息处理难题的关键利器。这项技术不仅赋予计算机对中文语义的理解能力&#xff0c;更让我们能够以更智能、高效的方式处理和检索中…

BUUCTF-Real-[ThinkPHP]IN SQL INJECTION

目录 漏洞描述 漏洞分析 漏洞复现 漏洞描述 漏洞发现时间&#xff1a; 2018-09-04 CVE 参考&#xff1a;CVE-2018-16385 最高严重级别&#xff1a;低风险 受影响的系统&#xff1a;ThinkPHP < 5.1.23 漏洞描述&#xff1a; ThinkPHP是一款快速、兼容、简单的轻量级国产P…

【Flink入门修炼】1-1 为什么要学习 Flink?

流处理和批处理是什么&#xff1f; 什么是 Flink&#xff1f; 为什么要学习 Flink&#xff1f; Flink 有什么特点&#xff0c;能做什么&#xff1f; 本文将为你解答以上问题。 一、批处理和流处理 早些年&#xff0c;大数据处理还主要为批处理&#xff0c;一般按天或小时定时处…

鸿蒙ArkUI实现开关switch组件

鸿蒙ArkUI官方提供的toggle组件实现了开关的样式&#xff0c;但在使用过程中还是非常的不方便。 DIY可视化对鸿蒙ArkUI实现开关switch组件扩展后满足基本的switch需求&#xff0c;支持绑定值、设置标题文本、整个背景样式等。 /*** 开关*/ Component export default struct Di…

【发票识别】新增针对图片发票的识别(升级中)

说明 为了完善发票识别的功能&#xff0c;目前发票识别支持发票图片格式的识别&#xff0c;增加可用性。 体验 体验地址&#xff1a;https://invoice.behappyto.cn/invoice-service/ 体验地址上面有示例的发票&#xff0c;可以下载上传识别或者复制url地址进行识别。 技术栈…

Java 获取操作时区 ZonedDateTime

Java 获取操作时区 ZonedDateTime package com.zhong.timeaddress;import java.time.Clock; import java.time.ZoneId; import java.time.ZonedDateTime; import java.util.Set;public class TimeAddress {public static void main(String[] args) {// 获取系统默认时区ZoneId…

下载已编译的 OpenCV 包在 Visual Studio 下实现快速配置

自己编译 OpenCV 挺麻烦的&#xff0c;配置需要耗费很长时间&#xff0c;编译也需要很长时间&#xff0c;而且无法保证能全部编译通过。利用 OpenCV 官网提供的已编译的 OpenCV 库可以节省很多时间。下面介绍安装配置方法。 1. OpenCV 官网 地址是&#xff1a;https://opencv…

7.0 Zookeeper 客户端基础命令使用

zookeeper 命令用于在 zookeeper 服务上执行操作。 首先执行命令&#xff0c;打开新的 session 会话&#xff0c;进入终端。 $ sh zkCli.sh 下面开始讲解基本常用命令使用&#xff0c;其中 acl 权限内容在后面章节详细阐述。 ls 命令 ls 命令用于查看某个路径下目录列表。…

MySQL 架构和性能优化

重点&#xff1a; 视图&#xff0c;函数&#xff0c;存储过程&#xff0c;触发器&#xff0c;事件&#xff08; 了解 &#xff09; 用户管理&#xff0c;密码管理 grant revoke 权限管理 MySQL 架构&#xff08; 了解 &#xff09; 存储引擎&#xff1a;MyISAM 和 InnoDB …

PyTorch识别验证码

## 一、生成测试集数据pip install captcha common.py import random import time captcha_array list("0123456789abcdefghijklmnopqrstuvwxyz") captcha_size 4from captcha.image import ImageCaptchaif __name__ __main__:for i in range(10):image ImageC…

为后端做准备

这里写目录标题 flask 文件上传与接收flask应答&#xff08;接收请求&#xff08;文件、数据&#xff09;flask请求&#xff08;上传文件&#xff09;传递参数和文件 argparse 不从命令行调用参数1、设置default值2、"从命令行传入的参数".split()3、[--input,内容] …

肿瘤免疫分型

Elements of cancer immunity and the cancer-immune set point - PubMed (nih.gov) Daniel S Chen , Ira Mellman 人类的抗癌免疫可分为三种主要表型&#xff1a;免疫沙漠表型&#xff08;棕色&#xff09;、免疫排除表型&#xff08;蓝色&#xff09;和免疫炎症型&#xff0…

深刻理解树状数组--树状数组构造定义与动态维护区间和的合理性证明

文章目录 一.树状数组概览二.树状数组构造定义lowbit运算树状数组的结点值的定义树状数组结点层次的定义树状数组父子结点关系定义 三.关于树状数组结构的重要证明引理1引理2树状数组模板题 一.树状数组概览 树状数组的下标从1开始标识,其物理结构是线性表,逻辑结构是一颗多叉…

c++入门学习④——对象的初始化和清理

目录 对象的初始化和清理&#xff1a; why? 如何进行初始化和清理呢&#xff1f; 使用构造函数和析构函数​编辑 构造函数语法: 析构函数语法: 构造函数的分类&#xff1a; 两种分类方式&#xff1a; 三种调用方法&#xff1a; 括号法&#xff08;默认构造函数调用&…

Meta开源大模型LLaMA2的部署使用

LLaMA2的部署使用 LLaMA2申请下载下载模型启动运行Llama2模型文本补全任务实现聊天任务LLaMA2编程Web UI操作 LLaMA2 申请下载 访问meta ai申请模型下载&#xff0c;注意有地区限制&#xff0c;建议选其他国家 申请后会收到邮件&#xff0c;内含一个下载URL地址&#xff0c;…

Redis -- set集合

挑战自己&#xff0c;每天进步一点点&#xff0c;成就将属于不停止脚步的你。 目录 Redis集合&#xff1f; 集合基本命令 sadd smembers sismember scard spop srandmember smove srem 集合间操作 sinter sinterstore sunion sdiff sdiifstore Redis集合&#…