文章目录
- 一、概述
- 二、导入依赖包
- 三、与 Zookeeper 建立连接
- 四、判断 ZooKeeper 节点是否存在
- 四、创建 ZooKeeper 节点数据
- 五、获取 ZooKeeper 节点数据
- 六、修改 ZooKeeper 节点数据
- 七、异步获取 ZooKeeper 节点数据
- 八、完整示例
如果您还没有安装Zookeeper请看ZooKeeper 安装说明,Zookeeper 命令使用方法和数据说明。
一、概述
-
ZooKeeper是一个开源的、分布式的协调服务,它主要用于分布式系统中的数据管理和协调任务。它提供了一个具有高可用性的分布式环境,用于存储和管理小规模数据,例如配置信息、命名服务、分布式锁等。
-
本文主要介绍如何使用 Java 与 ZooKeeper 建立连接,进行数据创建、修改、读取、删除等操作。
-
源码地址:https://github.com/apache/zookeeper
二、导入依赖包
-
在 pom.xml 文件中导入 Zookeeper 包,注意一般这个包的版本要和您安装的 Zookeeper 服务端版本一致。
<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.8.2</version> </dependency>
三、与 Zookeeper 建立连接
- 与ZooKeeper集群建立连接使用 ZooKeeper 类,传递三个参数,分别是
- connectionString ,是ZooKeeper 集群地址(没连接池的概念,是Session的概念)
- sessionTimeout , 是ZooKeeper Session 数据超时时间(也就是这个Session关闭后,与这个Session相关的数据在ZooKeeper中保存多信)。
- watcher, ZooKeeper Session 级别监听器( Watcher),(Watch只发生在读方法上,如 get、exists等)
private static ZooKeeper testCreateZookeeper() throws IOException, InterruptedException, KeeperException {final CountDownLatch countDownLatch = new CountDownLatch(1);// ZooKeeper 集群地址(没连接池的概念,是Session的概念)String connectionString = "192.168.8.51:2181,192.168.8.52:2181,192.168.8.53:2181";// ZooKeeper Session 数据超时时间(也就是这个Session关闭后,与这个Session相关的数据在ZooKeeper中保存多信)。Integer sessionTimeout = 3000;// ZooKeeper Session 级别 Watcher(Watch只发生在读方法上,如 get、exists)final ZooKeeper zooKeeper = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {try {Event.KeeperState state = watchedEvent.getState();Event.EventType type = watchedEvent.getType();String path = watchedEvent.getPath();switch (state) {case Unknown:break;case Disconnected:break;case NoSyncConnected:break;case SyncConnected:countDownLatch.countDown();break;case AuthFailed:break;case ConnectedReadOnly:break;case SaslAuthenticated:break;case Expired:break;case Closed:break;}switch (type) {case None:break;case NodeCreated:break;case NodeDeleted:break;case NodeDataChanged:break;case NodeChildrenChanged:break;case DataWatchRemoved:break;case ChildWatchRemoved:break;case PersistentWatchRemoved:break;}System.out.println("Session watch state=" + state);System.out.println("Session watch type=" + type);System.out.println("Session watch path=" + path);} catch (Exception e) {e.printStackTrace();}}});// 由于建立连接是异步的,这里先阻塞等待连接结果countDownLatch.await();ZooKeeper.States state = zooKeeper.getState();switch (state) {case CONNECTING:break;case ASSOCIATING:break;case CONNECTED:break;case CONNECTEDREADONLY:break;case CLOSED:break;case AUTH_FAILED:break;case NOT_CONNECTED:break;}System.out.println("ZooKeeper state=" + state);return zooKeeper;}
四、判断 ZooKeeper 节点是否存在
- 创建节点数据使用 exists 方法,传递四个参数
- path , 表示节点目录名称
- watch, 表示监听器(只对该路径有效)
- stat, 判断结果回调函数
- context, 自定义上下文对象
private static void testExists(ZooKeeper zooKeeper) throws IOException, InterruptedException, KeeperException {// 判断 ZooKeeper 节点是否存在Object context = new Object();zooKeeper.exists("/yiqifu", new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {}}, new AsyncCallback.StatCallback() {@Overridepublic void processResult(int i, String s, Object o, Stat stat) {if(null != stat){System.out.println("ZooKeeper /yiqifu 节点存在");}else {System.out.println("ZooKeeper /yiqifu 节点不存在");}}}, context);}
四、创建 ZooKeeper 节点数据
- 创建节点数据使用 create 方法,传递四个参数
- path , 表示节点目录名称
- data , 表示节点数据
private static void testCreateNode(ZooKeeper zooKeeper) throws IOException, InterruptedException, KeeperException {// 在 ZooKeeper 中创建节点String nodeName = zooKeeper.create("/yiqifu", "test create data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);System.out.println("ZooKeeper 创建节点成功:" + nodeName);}
五、获取 ZooKeeper 节点数据
- 获取 ZooKeeper 节点数据使用 getData 方法,传递三个参数
- path , 表示节点目录名称
- watch, 表示路径级别的监听器,这个监听器只对该路径下的数据操作监听生效。
private static void testGetdata(final ZooKeeper zooKeeper) throws IOException, InterruptedException, KeeperException {// 获取 ZooKeeper 节点数据,这里设置了Path级Watchfinal Stat stat = new Stat();byte[] nodeData = zooKeeper.getData("/yiqifu", new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {try {Event.KeeperState state = watchedEvent.getState();Event.EventType type = watchedEvent.getType();String path = watchedEvent.getPath();System.out.println("Path watch state=" + state);System.out.println("Path watch type=" + type);System.out.println("Path watch path=" + path);//zooKeeper.getData("/yiqifu", true, stat); // 这里会使用Session级WatchzooKeeper.getData("/yiqifu", this, stat);} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}}, stat);System.out.println("ZooKeeper 同步获取节点数据:" + new String(nodeData));}
六、修改 ZooKeeper 节点数据
- 修改 ZooKeeper 节点数据使用 setData 方法,传递三个参数
- path , 表示节点目录名称。
- data, 表示新数据。
- version, 表示数据版本。
private static void testSetdata(ZooKeeper zooKeeper) throws IOException, InterruptedException, KeeperException {// 更新 ZooKeeper 节点数据(修改数据会触发Watch)zooKeeper.setData("/yiqifu", "test modify data 1 ".getBytes(), 0);zooKeeper.setData("/yiqifu", "test modify data 2 ".getBytes(), 1);}
七、异步获取 ZooKeeper 节点数据
-
修改 ZooKeeper 节点数据使用 getData 方法,传递三个参数
-
path , 表示节点目录名称。
-
watch, 表示是否触发监听器。
-
dataCallback, 表示异步获取数据的回调函数。
-
private static void testAsyncGetdata(ZooKeeper zooKeeper) throws IOException, InterruptedException, KeeperException {// 获取 ZooKeeper 节点数据(使用异步回调方式)Object context = new Object();zooKeeper.getData("/yiqifu", false, new AsyncCallback.DataCallback() {@Overridepublic void processResult(int i, String s, Object o, byte[] bytes, Stat stat) {System.out.println("ZooKeeper 同步获取节点数据的目录:"+s);System.out.println("ZooKeeper 异步获取节点数据:"+new String(bytes));}}, context);}
八、完整示例
package top.yiqifu.study.p131;import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;import java.io.IOException;
import java.util.concurrent.CountDownLatch;public class Test01_Zookeeper {public static void main(String[] args) {try {// 创建 ZooKeeper 对象ZooKeeper zooKeeper = testCreateZookeeper();// 在 ZooKeeper 创建数据节点testCreateNode(zooKeeper);// 在 ZooKeeper 中同步获取节点数据testGetdata(zooKeeper);// 在 ZooKeeper 中更新节点数据testSetdata(zooKeeper);// 在 ZooKeeper 异步获取节点数据testAsyncGetdata(zooKeeper);Thread.sleep(3000);} catch (IOException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}}private static ZooKeeper testCreateZookeeper() throws IOException, InterruptedException, KeeperException {final CountDownLatch countDownLatch = new CountDownLatch(1);// ZooKeeper 集群地址(没连接池的概念,是Session的概念)String connectionString = "192.168.8.51:2181,192.168.8.52:2181,192.168.8.53:2181";// ZooKeeper Session 数据超时时间(也就是这个Session关闭后,与这个Session相关的数据在ZooKeeper中保存多信)。Integer sessionTimeout = 3000;// ZooKeeper Session 级别 Watcher(Watch只发生在读方法上,如 get、exists)final ZooKeeper zooKeeper = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {try {Event.KeeperState state = watchedEvent.getState();Event.EventType type = watchedEvent.getType();String path = watchedEvent.getPath();switch (state) {case Unknown:break;case Disconnected:break;case NoSyncConnected:break;case SyncConnected:countDownLatch.countDown();break;case AuthFailed:break;case ConnectedReadOnly:break;case SaslAuthenticated:break;case Expired:break;case Closed:break;}switch (type) {case None:break;case NodeCreated:break;case NodeDeleted:break;case NodeDataChanged:break;case NodeChildrenChanged:break;case DataWatchRemoved:break;case ChildWatchRemoved:break;case PersistentWatchRemoved:break;}System.out.println("Session watch state=" + state);System.out.println("Session watch type=" + type);System.out.println("Session watch path=" + path);} catch (Exception e) {e.printStackTrace();}}});countDownLatch.await();ZooKeeper.States state = zooKeeper.getState();switch (state) {case CONNECTING:break;case ASSOCIATING:break;case CONNECTED:break;case CONNECTEDREADONLY:break;case CLOSED:break;case AUTH_FAILED:break;case NOT_CONNECTED:break;}System.out.println("ZooKeeper state=" + state);return zooKeeper;}private static void testCreateNode(ZooKeeper zooKeeper) throws IOException, InterruptedException, KeeperException {// 在 ZooKeeper 中创建节点String nodeName = zooKeeper.create("/yiqifu", "test create data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);System.out.println("ZooKeeper 创建节点成功:" + nodeName);}private static void testGetdata(final ZooKeeper zooKeeper) throws IOException, InterruptedException, KeeperException {// 获取 ZooKeeper 节点数据,这里设置了Path级Watchfinal Stat stat = new Stat();byte[] nodeData = zooKeeper.getData("/yiqifu", new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {try {Event.KeeperState state = watchedEvent.getState();Event.EventType type = watchedEvent.getType();String path = watchedEvent.getPath();System.out.println("Path watch state=" + state);System.out.println("Path watch type=" + type);System.out.println("Path watch path=" + path);//zooKeeper.getData("/yiqifu", true, stat); // 这里会使用Session级WatchzooKeeper.getData("/yiqifu", this, stat);} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}}, stat);System.out.println("ZooKeeper 同步获取节点数据:" + new String(nodeData));}private static void testSetdata(ZooKeeper zooKeeper) throws IOException, InterruptedException, KeeperException {// 更新 ZooKeeper 节点数据(修改数据会触发Watch)zooKeeper.setData("/yiqifu", "test modify data 1 ".getBytes(), 0);zooKeeper.setData("/yiqifu", "test modify data 2 ".getBytes(), 1);}private static void testAsyncGetdata(ZooKeeper zooKeeper) throws IOException, InterruptedException, KeeperException {// 获取 ZooKeeper 节点数据(使用异步回调方式)Object context = new Object();zooKeeper.getData("/yiqifu", false, new AsyncCallback.DataCallback() {@Overridepublic void processResult(int i, String s, Object o, byte[] bytes, Stat stat) {System.out.println("ZooKeeper 同步获取节点数据的目录:"+s);System.out.println("ZooKeeper 异步获取节点数据:"+new String(bytes));}}, context);}
}