zookeeper作为一个分布式协调框架,它的创建就是为了方便或者简化分布式应用的开发。除了服务注册与发现之外,它还能够提供更多的功能,但是对于入门来说,看这一篇就够了。后续会讲zookeeper的架构设计与原理,比如zookeeper的原子协议,leader选举算法等。欢迎关注
目录
zookeeper安装与启动
一、zookeeper下载
二、安装zookeeper
三、linux下启动zookeeper
四、windows下启动zookeeper
zookeeper基本概念
一、zookeeper的存储结构
二、什么是znode
三、znode节点的四种类型
四、权限控制ACL(Access Control List)
五、事件监听watcher
zookeeper的基本操作
一、节点的增删改查
二、zookeeper的其他命令
2.1 ls path:列出path下的文件
2.2 stat path:查看节点状态
2.3 ls2 path:列出path节点的子节点及状态
三、其他
在java客户端中操作zookeeper
用zookeeper实现服务注册与发现中心
zookeeper安装与启动
一、zookeeper下载
镜像站下载:http://mirrors.hust.edu.cn/apache/zookeeper/
记住选择带bin的。从版本3.5.5开始,带有bin名称的包才是我们想要的下载可以直接使用的里面有编译后的二进制的包,而之前的普通的tar.gz的包里面是只是源码的包无法直接使用。不然会爆:
错误: 找不到或无法加载主类 org.apache.zookeeper.server.quorum.QuorumPeerMain
下载后解压到自己的电脑位置,比如:D:\apache-zookeeper-3.5.8-bin
若用wsl,请将apache-zookeeper-3.5.8-bin.tar.gz拷贝到wsl下面后再解压,可以参考WSL访问windows下的文件
解压后目录结构:
-
bin目录
- zk的可执行脚本目录,包括zk服务进程,zk客户端,等脚本。其中,.sh是Linux环境下的脚本,.cmd是Windows环境下的脚本。
- conf目录
配置文件目录。zoo_sample.cfg为样例配置文件,需要修改为自己的名称,一般为zoo.cfg。log4j.properties为日志配置文件。 - lib
zk依赖的包。 - contrib目录
一些用于操作zk的工具包。 - recipes目录
zk某些用法的代码示例
二、安装zookeeper
ZooKeeper的安装包括单机模式安装,以及集群模式安装。
开发情况下由于资源有限一般用单机模式,我们先讲单机模式,让zookeeper跑起来。后面实践案例再讲集群模式。
在启动zookeeper之前,我们需要先修改zookeeper的配置信息,我们先进入zookeeper-3.5.8-bin/conf目录,修改zoo_sample.cfg文件为:
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/tmp/zookeeper(修改为自己的目录)dataLogDir=/tmp/zookeeper(修改为自己的目录)
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
主要修改项为dataDir和dataLogDir,dataDir是zookeeper存放数据的地方,dataLogDir是存放zookeeper日志的地方。
如果只配置dataDir,则数据和日志都会创建在dataDir目录下。默认情况下zookeeper会占有8080端口,如果你不想8080端口被占用,增加一行admin.serverPort=8082,指定你自己的端口。
其他配置项的意思我们留到后面再说。
注意:如果你是在windows下使用zookeeper,需要将zoo_sample.cfg改名为zoo.cfg
三、linux下启动zookeeper
我们需要先启动zookeeper服务端,再启动客户端。
首先进入 zookeeper-3.5.8-bin/bin目录
输入命令 ./zkServer.sh start (我之前安装的是zookeeper-3.4.13版本,所以图里的版本和文章的版本不一致,不影响)
可以看到STARTED,zookeeper服务端启动成功了。
接下来启动客户端。输入命令 ./zkCli.sh -server 127.0.0.1:2181 (-server参数就代表我们要连接哪个zookeeper服务端)
连接成功出现:
这样就算启动成功了。如果不放心,可以输入下面两条命令(创建节点和获取节点)测试一下。
四、windows下启动zookeeper
windows和linux大同小异。只不过执行文件从zkServer.sh替换成zkServer.cmd,zkCli.sh替换成zkCli.cmd。
如果你前面没有改名的话,需要将conf目录下的zoo_sample.cfg改名为zoo.cfg
用cmd进入我们zookeeper的bin目录。
输入zkServer.cmd
双击zkCli.cmd
出现:
同样输入create /zk "test" 和get /zk测试一下
至此,zookeeper安装与启动完成。
zookeeper基本概念
一、zookeeper的存储结构
zookeeper的存储结构极其类似于文件系统,都是树形结构,如下图所示。
与文件系统不同的是,文件系统分为目录和文件,目录是没有数据的。而zookeeper则全部称为节点(znode),每个节点既能保存数据又有孩子节点。
zookeeper的根节点都是“/"。
每一个节点(znode)的命名空间(类似于java中的包名)都由其路径组成。zookeeper称上面这种结构为分层命名空间(Hierarchical Namespace)。
例如,根节点的命名空间为“/",第二层左节点的命名空间为”/app1",右节点的命名空间为“/app2”。
节点的命名空间我们又可以理解为是每个节点的标识符,程序能够根据名称定位到具体是哪个节点。
二、什么是znode
上图中的每个节点在zookeeper中称为znode。在zookeeper推荐在znode中存储的数据不超过1M,这是从性能和效率的角度出发。zookeeper作为协调分布式应用的服务中心,一般是存储状态信息、配置信息和本地数据等等。从设计的初衷上看也不是为了存储大量数据准备的。如果真的要存储大数据,应该把数据存储在别的地方比如数据库上,然后在znode上存储他们的引用。
znode在每次更新数据时,都是全量更新,直接覆盖以前的值,不存在追加或者修改其中某个地方的操作。读取数据也是全部读取。同时,znode的读取和写入都是原子操作。
znode还存储了znode版本信息有三个版本号dataversion(数据版本号)、cversion(子节点版本号)、aclversion(节点所拥有的ACL版本号)。每个版本号其实是一个数字,每次修改对应的版本号就会增加。
比如我们创建一个节点,create /zk "test"后,在linux下用get /zk后返回的信息如下:
test #znode的数据
cZxid = 0x1e #znode的创建事务id
ctime = Tue Sep 29 06:45:54 CST 2020 #znode的创建时间
mZxid = 0x1e #znode的修改事务id
mtime = Tue Sep 29 06:45:54 CST 2020 #修改时间
pZxid = 0x1e #该节点的子节点列表最后一次修改的版本号,添加子节点或删除子节点就会影响子节点列表,但是修改子节点的数据内容则不影响该值,孙子节点的操作也不影响
cversion = 0 #children节点的版本号,每次子节点修改加1,下同
dataVersion = 0 #数据版本号
aclVersion = 0 #ACL(权限控制列表)版本号
ephemeralOwner = 0x0 #如果节点为临时节点,那么它的值为这个节点拥有者的session ID;如果该节点不是ephemeral节点, ephemeralOwner值为0.
dataLength = 4 #数据长度
numChildren = 0 #孩子节点的数量
版本号的作用
Zookeeper里面的版本号和我们理解的版本号不同,它表示的是对数据节点的内容、子节点列表或者ACL信息的修改次数。节点创建时dataversion、aclversion,cversion都为0,每次修改响应内容其对应的版本号加1。
这个版本号的用途就和分布式场景的一个锁概念有关。比如演出售票中的一个座位,显然每个场次中的每个座位都只有一个,不可能卖出2次。如果A下单的时候显示可售,他想买,那么为了保证他可以下单成功,此时别人就不能买。这时候就需要有一种机制来保证同一时刻只能有一个人去修改该座位的库存。这就用到了锁。锁有悲观锁和乐观锁。
-
悲观锁:它会假定所有不同事务的处理一定会出现干扰,数据库中最严格的并发控制策略,如果一个事务A正在对数据处理,那么在整个事务过程中,其他事务都无法对这个数据进行更新操作,直到A事务释放了这个锁。
-
乐观锁:它假定所有不同事务的处理不一定会出现干扰,所以在大部分操作里不许加锁,但是既然是并发就有出现干扰的可能,如何解决冲突就是一个问题。在乐观锁中当你在提交更新请求之前,你要先去检查你读取这个数据之后该数据是否发生了变化,如果有那么你此次的提交就要放弃,如果没有就可以提交。
Zookeeper中的版本号就是乐观锁,你修改节点数据之前会读取这个数据并记录该数据版本号,当你需要更新时会携带这个版本号去提交,如果你此时携带的版本号(就是你上次读取出来的)和当前节点的版本号相同则说明该数据没有被修改过,那么你的提交就会成功,如果提交失败说明该数据在你读取之后和提交之前这段时间内被修改了。
三、znode节点的四种类型
zookeeper有四种节点,临时节点,临时顺序节点,持久节点和持久顺序节点。
3.1 临时节点
当zookeeper的客户端申请zookeeper服务端创建临时节点时,节点的ephemeralOwner为此客户端与服务端的sessionId。当客户端与服务端断开连接时,临时节点也会被删除。
3.2 临时顺序节点
当创建的是临时顺序节点时,会在节点名称后面增加序号,不断递增。
举个例子。比如申请创建的是/zk临时顺序节点,如果此时服务端没有/zk的节点,那么就会创建/zk-1节点。这时第二个请求过来了,也是创建临时顺序节点/zk,那么服务端就会创建/zk-2,依次是/zk-3,/zk-4......不断递增下去,一直到2^32。与临时节点一样的是,当客户端断开链接时,临时顺序节点也会被删除。
3.3 持久节点
顾名思义,就是创建后除非主动删除,否则会一直存在的节点。
3.4 持久顺序节点
当创建的是持久顺序节点时,举个例子。比如申请创建的是/zk临时顺序节点,如果此时服务端没有/zk的节点,那么就会创建/zk-1节点。这是有第二个请求过来了,也是创建临时顺序节点/zk,那么服务端就会创建/zk-2,不断递增下去,一直到2^32。与持久节点一样的是,创建后除非主动删除,否则会一直存在。
四、权限控制ACL(Access Control List)
ZooKeeper的权限控制是基于每个znode节点的,需要对每个节点设置权限。
zookeeper每个节点的权限类型有五种:create、read、delete、write、admin
CREATE:创建子节点的权限
READ:读节点数据的权限包括获取它的子节点列表的权限
DELETE:有删除子节点的权限
WRITE:写节点数据的权限
ADMIN:可以设置节点访问控制列表权限
zookeeper的授权策略(Scheme)有5种:world、auth、digest、ip、x509(有一些博客写了Super即超级管理员模式这个类型,可能是老版本,从3.4开始,官方文档介绍ACL的时候就没看到super了)
world:默认方式,全部都能访问,
auth:认证用户可以使用,(cli中可以通过addauth digest user:pwd 来添加当前上下文中的授权用户)
digest:即用户名:密码这种方式认证,这也是业务系统中最常用的。用 username:password 字符串来产生一个MD5串,然后该串被用来作为ACL ID。认证是通过明文发送username:password 来进行的,当用在ACL时,表达式为username:base64 ,base64是password的SHA1摘要的编码。
ip:使用客户端的主机IP作为ACL ID 。这个ACL表达式的格式为addr/bits ,此时addr中的有效位与客户端addr中的有效位进行比对。
x509 :uses the client X500 Principal as an ACL ID identity. The ACL expression is the exact X500 Principal name of a client. When using the secure port, clients are automatically authenticated and their auth info for the x509 scheme is set.
ACL 权限控制,使用:scheme:id:perm 来标识,主要涵盖 3 个方面:权限模式(Scheme):授权的策略 授权对象(ID):授权的对象 权限类型(Permission):授予的权限。权限模式和权限类型就是上文所讲,授权对象(ID)很好理解,就是权限赋予的用户或者一个实体,例如:IP 地址或者机器。
五、事件监听watcher
zookeeper可以向节点注册一个watcher,用来监听节点的变化。当节点状态发生变化时,比如说被删除或者修改,那么它就会发送通知到监听这个节点的客户端,客户端因此而做出自己的操作。
每个注册仅使用一次,也就是当发生一次节点改变,通知完客户端之后,如果你需要这个节点下次发生改变时也发送通知到这个客户端,那么就需要再注册一次监听。
zookeeper的基本操作
在zookeeper的bin目录下,输入./zkServer.sh start和./zkCli.sh启动服务端和客户端,然后我们就可以进行zookeeper的基本操作了
一、节点的增删改查
zookeeper节点的增删改查命令很简单,唯一需要注意的是create命令有两个参数,-s代表顺序节点,-e代表临时节点。
我们先用create /temp 123命令创建一个名为temp的znode节点,123是这个节点保存的data值。
用get /temp命令查看
如果你输入的是get temp,那么会出现Command failed: java.lang.IllegalArgumentException: Path must start with /character错误,这是因为在zookeeper中,没有相对路径的概念,所有的节点都需要用绝对路径表示,也即所有节点名称都会以“/”开头。
如果我们要创建临时节点,则给create命令增加一个-e的参数。
修改temp节点的数据,set /temp 456
再用get /temp查看
可以看到,/temp节点的数据已经从123变成了456。在zookeeper中,对于数据的修改都是全量修改,没有只修改某一部分这种说法。这也是为什么zookeeper的修改命令是set而不是update的原因。
需要注意的是,但我们修改了数据之后,可以看到mZxid(修改事务id)已经从原来的0X26变成了0x27。
如果要删除某个节点,则用delete path即可。
需要注意的是:
set path data [version]命令,如果我们多次修改,会发现 dataVersion ,也就是数据版本,在不停得发生变化(自增)如果我们在set的时候手动去指定了版本号,就必须和上一次查询出来的结果一致,否则 就会报错。
这个可以用于我们在修改节点数据的时候,保证我们修改前数据没被别人修改过。因为如果别人修改过了,我们这次修改是不会成功的
delete path [version]
删除指定节点数据,其version参数的作用于set指定一致
二、zookeeper的其他命令
2.1 ls path:列出path下的文件
与linux命令类似,ls命令用于列出给定路径下的zookeeper节点
2.2 stat path:查看节点状态
我们查看持久节点temp和临时节点short的状态,主要不同是画红线部分,从ephemeralOwner的值可以判断这个节点是持久节点还是临时节点。0X0代表的是持久节点。如果节点为临时节点,那么它的值为这个节点拥有者的session ID。
2.3 ls2 path:列出path节点的子节点及状态
我们先给temp节点创建一个child1的子节点,接着用ls2 /temp查看它的子节点和状态。
三、其他
我们可以输入-h获取zookeeper的所有命令
在java客户端中操作zookeeper
先启动zookeeper服务端。
在maven引入zookeeper依赖。
<dependency><groupId>org.apache.hadoop</groupId><artifactId>zookeeper</artifactId><version>3.3.1</version> </dependency>
org.apache.zookeeper.Zookeeper是客户端入口主类,负责建立与server的会话。它提供了以下 所示几类主要方法。
在java中启动客户端,注册一个watcher监听链接的建立。
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;public class ZookeeperClient {private static final String connectString = "127.0.0.1:2181";private static final int sessionTimeout = 2000;private static ZooKeeper zkClient = null;public void init() throws Exception {zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent event) {// 收到事件通知后的回调函数(应该是我们自己的事件处理逻辑)System.out.println("zookeeper链接建立");}});}
}
在main方法里测试我们的init方法,用Thread.sleep方法等待zookeeper连接创建,不然在zookeeper客户端建立连接之前主线程就已经退出。
public static void main(String[] args)throws Exception{new ZookeeperClient().init();Thread.sleep(5000);}
控制台输出:
接下来我们创建一个名为“/java”的节点,节点数据为“data”,ZooDefs.Ids.OPEN_ACL_UNSAFE的意思是不节点能被所有人访问,CreateMode.PERSISTENT:节点的类型为持久节点。
public void createZnode()throws Exception{zkClient.create("/java", "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}
判断节点是否存在,false代表不注册监听事件,如果是true,则注册我们在new zookeeper方法里面传递的watcher。
public void testExist() throws Exception{Stat stat = zkClient.exists("/java", false);System.out.println(stat==null?"节点不存在":"节点存在");}
测试一下我们的方法。
public static void main(String[] args)throws Exception{new ZookeeperClient().init();Thread.sleep(5000);new ZookeeperClient().createZnode();new ZookeeperClient().testExist();}
获取节点的数据。false和上面exists方法参数含义一样,表示不注册连接建立时的watcher,第三个stat对象则存储了除了节点数据之外的其他信息,如czxid、mzxid等。如果为null则表示不保存节点的这些信息。
public void getNodeData()throws Exception{byte[] res = zkClient.getData("/java",false,new Stat());System.out.println(new String(res));}
同样测试我们的方法。
public static void main(String[] args)throws Exception{new ZookeeperClient().init();Thread.sleep(3000);new ZookeeperClient().getNodeData();}
获取ACL控制列表
public void getACl()throws Exception{List<ACL> res = zkClient.getACL("/java",new Stat());for(ACL acl : res){System.out.println(acl.getId().toString()+acl.getPerms());}}
测试:
public static void main(String[] args)throws Exception{new ZookeeperClient().init();Thread.sleep(3000);new ZookeeperClient().getACl();}
在/java下创建子节点,获取子节点列表。
public void createChildZnode()throws Exception{zkClient.create("/java/child", "child data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);zkClient.create("/java/child2", "child2 data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}public void getChildNode()throws Exception{List<String> res = zkClient.getChildren("/java",false);for(String s : res){System.out.println(s);}}
测试:
public static void main(String[] args)throws Exception{new ZookeeperClient().init();Thread.sleep(3000);new ZookeeperClient().createChildZnode();new ZookeeperClient().getChildNode();}
全部代码如下:
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;import java.util.List;public class ZookeeperClient {private static final String connectString = "127.0.0.1:2181";private static final int sessionTimeout = 2000;private static ZooKeeper zkClient = null;public void init() throws Exception {zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent event) {// 收到事件通知后的回调函数(应该是我们自己的事件处理逻辑)System.out.println("zookeeper链接建立");}});}public void createZnode()throws Exception{zkClient.create("/java", "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}public void createChildZnode()throws Exception{zkClient.create("/java/child", "child data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);zkClient.create("/java/child2", "child2 data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}public void getChildNode()throws Exception{List<String> res = zkClient.getChildren("/java",false);for(String s : res){System.out.println(s);}}public void testExist() throws Exception{Stat stat = zkClient.exists("/java", false);System.out.println(stat==null?"节点不存在":"节点存在");}public void getNodeData()throws Exception{byte[] res = zkClient.getData("/java",false,new Stat());System.out.println(new String(res));}public void getACl()throws Exception{List<ACL> res = zkClient.getACL("/java",new Stat());for(ACL acl : res){System.out.println(acl.getId().toString()+acl.getPerms());}}public static void main(String[] args)throws Exception{new ZookeeperClient().init();Thread.sleep(3000);new ZookeeperClient().createChildZnode();new ZookeeperClient().getChildNode();}}
用zookeeper实现服务注册与发现中心
经过前面的讲解,我们已经对zookeeper建立起初步的概念,这里就来做一个小小的实践,用zookeeper实现一个简单版的服务注册与发现中心。
zookeeper的一个常见功能就是作为服务注册与发现中心。
我们先创建一个节点/services。
Stat stat = zkClient.exists("/services",false);if (stat == null ){zkClient.create("/services","".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}
每当有一个服务上线时,我们就向我们的服务注册与发现中心zookeeper注册我们的应用。
比如,我们注册一个user服务,服务地址是localhost:8080,那么我们就在/services下面建立一个user子节点,子节点数据为user服务的真实url地址,比如localhost:8080,子节点类型为临时节点。
public void registerService()throws Exception{zkClient.create("/services/user","localhost:8080".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL);}
当我们向user请求服务时,首先通过/services节点获取user服务,判断user服务是否存在。进而获取它的地址,发起真正的请求。同时,我们注册一个监听事件,监听节点的状态变化。当user服务出现故障或其他因素而下线时,/services/user节点会被删除,zookeeper server会通知到监听这个节点的客户端,从而使客户端做出自己的响应,同样的,当user服务上线或地址修改,客户端也能收到通知。
public void invokeUserService()throws Exception{Stat stat = zkClient.exists("/services/user",false);if (stat == null){System.out.println("未能找到user服务,服务未注册或已下线");}byte[] url = zkClient.getData("/services/user", new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {if (watchedEvent.getType() == Event.EventType.NodeDeleted){System.out.println("服务下线");
// 处理业务逻辑}if (watchedEvent.getType() == Event.EventType.NodeCreated){System.out.println("服务上线");
// 处理业务逻辑}if (watchedEvent.getType() == Event.EventType.NodeDataChanged){System.out.println("服务地址修改了");}}}, null);
// 处理业务逻辑System.out.println("向"+new String(url)+"发起请求");}
如果对前面有印象的话,应该记得zookeeper的watcher只触发一次,当节点状态改变一次之后,节点状态的第二次改变就不能监听到了。为了能够持续监听,我们需要修改一下我们的代码。
我们把判断服务上线的代码挪到上面来,并且在下面的监听事件里回调invokeUserService方法,实现持续监听的功能。
为了简单易懂,这里代码写得并不够好,如果是实际项目,需要再做点拆分与封装。
public void invokeUserService()throws Exception{Stat stat = zkClient.exists("/services/user",false);if (stat == null){System.out.println("未能找到user服务,服务未注册或已下线");zkClient.exists("/services/user", new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {if (watchedEvent.getType() == Event.EventType.NodeCreated){System.out.println("服务上线");
// 处理业务逻辑}}});}else{byte[] url = zkClient.getData("/services/user", new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {if (watchedEvent.getType() == Event.EventType.NodeDeleted){System.out.println("服务下线");
// 处理业务逻辑}if (watchedEvent.getType() == Event.EventType.NodeDataChanged){System.out.println("服务地址修改了");}try {invokeUserService();}catch (Exception e){}}}, null);
// 处理业务逻辑System.out.println("向"+new String(url)+"发起请求");}}