Zookeeper笔记

为什么要使用Zookeeper

dubbo需要一个注册中心,而Zookeeper是我们在使用Dubbo是官方推荐的注册中心
在这里插入图片描述
在这里插入图片描述

Zookeeper介绍

在这里插入图片描述
Zookeeper的集群机制
Zookeeper是为了其他分布式程序提供服务的,所以不能随便就挂了。Zookeeper的集群机制采取的是半数存活机制。也就是整个集群节点中有半数以上的节点存活,那么整个集群环境可用。这也是为什么说zk的集群最好是奇数个节点。

zk的作用

序号功能
1为别的分布式程序服务的
2本身就是一个分布式程序
3主从协调 服务器节点动态上下线 统一配置管理 分布式共享锁 统一名称服务
4管理(存储,读取)用户程序提交的数据 并为用户程序提供数据节点监听服务

Zookeeper 节点的三种角色

Leader
Leader是Zookeeper集群工作的核心,其主要工作是:

  1. 事务请求的唯一调度和处理者,保证集群事务处理的顺序性。
  2. 集群内部各服务器的调度者

Follower
Follower是zookeeper集群的跟随者,主要工作是:

  1. 处理客户端非事务性请求(读取数据),转发事务请求给Leader服务器
  2. 参与事务请求Proposal的投票
  3. 参与Leader选举投票

Observer
Observer充当观察者的角色,观察Zookeeper集群的最小状态变化并将这些状态同步过来,其对于非事务请求可以独立处理,对于事务请求,会转给Leader节点进行处理。Observer不会参与投票,包括事务请求Proposal的投票和Leader选举投票

###Zookeeper 节点的四种状态
在知道了 Zookeeper 中有三种角色后,我们可能会问: Zookeeper 是如何知道自己目前是什么角色的呢?

在 ZAB 协议中定义:通过自身的状态来区分自己的角色的,在运行期间各个进程可能出现以下四种状态之一:

  • Looking:不确定Leader状态,该状态下的服务器认为当前集群中没有Leader,会发起Leader选举
  • Following:跟随者状态,表明当前服务器角色是Follower,并且它知道Leader是谁
  • Leading:领导者状态,表明当前服务器角色是Leader,它会维护与Follower间的心跳
  • Observing:观察者状态,表明当前服务器角色是Observer,与Follower唯一的不同在于不参与选举,也不参与集群写操作时的投票

在组成 ZAB 协议的所有进程启动的时候,初始化状态都是 LOOKING 状态,此时进程组中不存在 Leader,选举之后才有,在进行选举成功后,就进入消息广播模式,此时 Zookeeper 集群中的角色状态就不再是 LOOKING 状态。

集群环境准备

节点的映射关系

每个节点设置相应的ip和主机名的映射关系,方便集群环境的部署
修改hosts配置文件中的信息
在这里插入图片描述

配置免密登录
生成公钥和私钥

ssh-keygen

输入命令后根据提示,四次回车即可
在这里插入图片描述

发送公钥给需要免密登录的节点

ssh-copy-id zk01
ssh-copy-id zk02
ssh-copy-id zk03

在这里插入图片描述
节点和节点发送文件通过scp命令实现

	scp -r b.txt bobo01:/root/

在这里插入图片描述
关闭防火墙
查看防火墙状态

firewall-cmd --state

停止防火墙

systemctl stop firewall.service

禁止开机启动

systemctl disable firewall.service

Zookeeper的选举机制

Leader主要作用是保证分布式数据一致性,即每个节点的存储的数据同步。

服务器初始化时Leader选举

Zookeeper由于自身的性质,一般建议选取奇数个节点进行搭建分布式服务器集群。以3个节点组成的服务器集群为例,说明服务器初始化时的选举过程。启动第一台安装Zookeeper的节点时,无法单独进行选举,启动第二台时,两节点之间进行通信,开始选举Leader。

  1. 每个Server投出一票。第一次他们都投给自己作为Leader,投票内容未(SID,ZXID)。
    SID为Server的id,即启动ZK时配置文件中的myid;
    ZXID为事务id,为节点的更新程序,ZXID越大,代表Server对ZK节点的操作越新。由于服务器初始化,
    每个Sever上的Znode为0,所以Server1投的票为(1,0),Server2为(2,0)。两Server将各自投票发给集群中其他机器。
  2. 每个Server接收来自其他Server的投票。集群中的每个Server先判断投票的有效性,如检查是不是本轮的投票,是不是来Looking状态的服务器投的票。
  3. 对投票结果进行处理。处理规则为:
    • 首先对比ZXID。ZXID大的服务器优先作为Leader
    • 若ZXID系统,如初始化时,每个Server的ZXID都是0
    • 就会比较sid即myid,myid大的选出来做Leader。
      首次选举对于Server而言,他接受到的投票为(2,0),因为自身的票为(1,0),所以此时它会选举Server2为Leader,
      将自己的更新为(2,0)。而Server2收到的投票为Server1的(1,0)由于比他自己小,
      Server2的投票不变。Server1和Server2再次将票投出,投出的票都为(2,0)
  4. 统计投票。每次投票后,服务器都会统计投票信息,如果判定某个Server有过半的票数,俺么该Server就是Leader。首次投票对于Server1和Server2而言,统计出已经有两台机器接收了(2,0)的投票信息,此时认为选出了Leader。
  5. 改变服务器的状态。当确定了Leader之后,每个Server更新自己的状态,
    Leader将状态更新为Leading,Follower将状态更新为Following。
    image.png
    在这里插入图片描述

服务器运行期间的Leader选举

ZK运行期间,如果有新的Server加入,或非Leader节点挂了,那么Leader会同步数据给新的Server或寻找其他备用Server替代宕机的Server。若Leader宕机,此时集群暂停对外服务,开始在内部选举新的Leader。假设当前集群中有Server1、Server2、Server3三台服务器,Server2为当前集群的Leader,由于意外情况,Server2宕机了,便开始进入选举状态。过程如下

1 变更状态。其他非Observer服务器将自己的状态改变成Looking,开始进入Leader选举。
2. 每个Server投出1张票(myid,ZXID),由于集群运行过,所以每个Server的ZXID可能不同。
假设Server1的ZXID为145,Server3的为122,第一轮投票中,Server1和Server3都投自己,
票分别为(1,145)、(3,122),将自己的票发送给集群中所有机器。
3. 每个Server接收接收来自其他Server的投票,接下来的步骤与初始化时相同。

恢复模式的两个原则

当集群正在启动过程中,或 Leader 与超过半数的主机断连后,集群就进入了恢复模式。对于要恢复的数据状态需要遵循两个原则。
已被处理过的消息不能丢失
被丢弃的消息(原Leader未提交的事务)不能再现

1.已被处理过的消息不能丢失(部分follower提交了)
当 Leader 收到超过半数 Follower 的 ACKs 后,就向各个 Follower 广播 COMMIT 消息, 批准各个 Server 执行该写操作事务。当各个 Server 在接收到 Leader 的 COMMIT 消息后就会在本地执行该写操作,然后会向客户端响应写操作成功。

但是如果在非全部 Follower 收到 COMMIT 消息之前 Leader 就挂了,这将导致一种后 果:部分 Server 已经执行了该事务,而部分 Server 尚未收到 COMMIT 消息,所以其并没有执行该事务。当新的 Leader 被选举出,集群经过恢复模式后需要保证所有 Server 上都执行 了那些已经被部分 Server 执行过的事务。

2.被 Leader 提出的但没有被提交的消息不能再现(无follower提交)
当新事务在 Leader 上已经通过,其已经将该事务更新到了本地,但所有 Follower 还都没有收到 COMMIT 之前,Leader 宕机了(比前面叙述的宕机更早),此时,所有 Follower 根本 就不知道该 Proposal 的存在。当新的 Leader 选举出来,整个集群进入正常服务状态后,之 前挂了的 Leader 主机重新启动并注册成为了 Follower。

若那个别人根本不知道的 Proposal 还保留在那个主机,那么其数据就会比其它主机多出了内容,导致整个系统状态的不一致。所以,该 Proposa 应该被丢弃。类似这样应该被丢弃的事务,是不能再次出现在集群中的, 应该被清除。

myid与zxid

myid : 每个 Zookeeper 服务器,都需要在数据文件夹下创建一个名为 myid 的文件,该文件包含整个 Zookeeper 集群唯一的 ID

例如,某个 Zookeeper 集群包含三台服务器,在配置文件中,server.后面的数据即为 myid
server.1=zoo1:2888:3888
server.2=zoo2:2888:3888
server.3=zoo3:2888:3888

zxid : 类似于 RDBMS 中的事务ID,用于标识一个 Proposal ID,为了保证顺序性,ZXID 必须单调递增.

ZAB协议

ZAB(Zookeeper Atomic Broadcast)协议是Apache Zookeeper分布式协调服务中使用的一种原子广播协议。Zookeeper是一个开源的、高性能的分布式协调服务,用于管理分布式系统中的配置信息、命名服务、分布式锁等。ZAB协议是Zookeeper的核心协议,通过确保Zookeeper集群中所有服务器之间的事务广播顺序,它实现了Zookeeper数据的一致性

Zab 协议核心
Zookeeper采用ZAB协议的核心就是只要一台服务器提交了Proposal(事务),就要确保所有服务器最终都能正确提交Proposal,这也是CAP/BASE最终一致性的体现。(选举的核心

从大体上看,Zab 协议的原理可细分为四个阶段:

  1. 选举(Leader Election)
  2. 发现(Discovery)
  3. 同步(Synchronization)
  4. 广播(Broadcast)
    但是在 ZooKeeper 的实现中,将发现和同步两部分内容合成数据恢复一部分,所以按实现划分可以分为三个阶段:(选举的核心
  5. Leader选举(Fast Leader Election)
  6. 数据恢复(Recovery Phase)
  7. 广播(Broadcast)

Zab 协议按照功能的不同,有两种模式:一种是消息广播模式,另一种是崩溃恢复模式。
image.png
消息广播模式用于处理客户端的请求,崩溃恢复模式用于在节点意外崩溃时能够快速恢复,继续对外提供服务,让集群达成高可用状态。

1) 消息广播模式

  • 在Zookeeper集群中数据副本的传递策略就是采用消息广播模式,Zookeeper中的数据副本同步方式与2PC方式相似但却不同,2PC是要求协调者必须等待所有参与者全部反馈ACK确认消息后,再发送commit消息,要求所有参与者要么全成功要么全失败,2PC方式会产生严重的阻塞问题。
  • 而Zookeeper中Leader等待Follower的ACK反馈是指:只要半数以上的Follower成功反馈即可,不需要收到全部的Follower反馈。
    image.png

Zookeeper 中广播消息步骤:

  • 客户端发起一个写操作请求
  • Leader服务器处理客户端请求后将请求转为Proposal(事务),同时为每个Proposal分配一个全局唯一ID,即ZXID
  • Leader服务器与每个Follower之间都有一个队列,Leader将消息发送到该队列
  • Follower机器从队列中取出消息处理完(写入本地事务日志中)后,向LEader服务器发送ACK确认
  • Leader服务器收到半数以上的Follower的ACK后,即可以发送Commit
  • Leader向所有的Follower服务器发送Commit消息

崩溃恢复模式

一旦 Leader 服务器出现崩溃或者由于网络原因导致 Leader 服务器失去了与过半 Follower 的联系,那么就会进入崩溃恢复模式。

Zookeeper 集群中为保证任何进程能够顺序执行,只能是 Leader 服务器接收写请求,其他服务器接收到客户端的写请求,也会转发至 Leader 服务器进行处理。

Zab 协议崩溃恢复需满足以下2个请求:

  • 确保已经被 Leader 提交的 proposal 必须最终被所有的 Follower 服务器提交
  • 确保丢弃已经被 Leader 提出的但没有被提交的 Proposal(事务)

也就是新选举出来的 Leader 不能包含未提交的 Proposal(事务),必须都是已经提交了的 Proposal 的 Follower 服务器节点,新选举出来的 Leader 节点中含有最高的 ZXID,所以,在 Leader 选举时,将 ZXID 作为每个 Follower 投票时的信息依据。这样做的好处是避免了 Leader 服务器检查 Proposal 的提交和丢弃工作。(具体看上面的恢复规则)

Zookeeper是不是强一致性

写操作时是强一致性,读操作则不是强一致性
强一致性

  • 数据的强一致性指的是:一个外部的客户端,去请求一个分布式系统时,一旦这个数据被允许查询,那么任何一个被要求存储该数据的节点上,在任何时候都能查询到这份数据,且数据内容要求一模一样。

顺序一致性

  • 写入操作顺序一致性:当多个客户端同时向ZooKeeper进行写入操作时,这些写入操作会被按照客户端发起的顺序逐个执行。ZooKeeper通过强制所有写入操作都经过Leader节点进行协调,保证了写入操作的顺序一致性(Leader节点与从节点写入的数据一定是一致的)。
  • 读取数据顺序一致性:无论客户端连接到哪个ZooKeeper节点,它们在读取数据时都会获得相同的数据视图。

Zookeeper官方定义:顺序一致性。(Sequential consistency)

  • zookeeper 并不能保证所有客户端在某一时刻读到的数据是一致的,客户端还是有可能读到旧数据的,但可以使用sync()方法向leader节点同步最新的已提交数据。
    在这里插入图片描述
    已上图为例,如果一个zk集群有10000台节点,当进行写入的时候,如果已经有6K个节点写入成功,zk就认为本次写请求成功。但是这时候如果一个客户端读取的刚好是另外4K个节点的数据,那么读取到的就是旧的过期数据。

在zk的官方文档中对此有解释,地址在:https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html

ZK保证了保持长链接的单个客户端,总是能得到自己保持链接的机器上的最新值,这个值不一定是Leader的最新值。如果该机器挂了,会连接到zxid不小于当前节点的机器,从而保证不会读到更旧的值。

Zookeeper顺序一致性怎么做到的?

Leader 发送 proposal 时,其实会为每个 Follower 创建一个队列,都往各自的队列中发送 proposal。
image.png

Zookeeper客户端使用

配置Zookeeper的环境变量
为了简化我们每次操作Zookeeper而不用进入到Zookeeper的安装目录,我们可以将Zookeeper的安装信息配置到系统的环境变量中

vim /etc/profile

添加的内容

export ZOOKEPPER_HOME=/opt/zookeeper
export PATH=$PATH:$ZOOKEEPER_HOME/bin

执行source命令

source /etc/profile

我们就可以在节点的任意位置操作Zookeeper了,通过scp命令将profile文件发送到其他几个节点上

scp /etc/profile zk02:/etc/

客户端连接
通过bin目录下的zkCli.sh 命令连接即可

	zkCli.sh

在这里插入图片描述
zkCli.sh默认连接的是当前节点的Zookeeper节点,如果我们要连接其他节点执行如下命令即可

	zkCli.sh -timeout 5000 -server zk02:2181

数据操作

Zookeeper的数据结构

  1. 层次化的目录结构,命名符合常规文件系统规范
  2. 每个节点在Zookeeper中叫做znode,并且有一个唯一的路径标识
  3. 节点znode可以包含数据和子节点(但是EPHEMERAL类型的节点不能有子节点)
  4. 客户端应用可以在节点上设置监听器
    在这里插入图片描述

节点类型

1).znode有两种类型:

短暂性(ephemeral)(断开连接自己删除)
持久性(persistent)(断开连接不删除)

2).znode有四种形式的目录节点(默认是persistent)如下

序号节点类型描述
1PERSISTENT持久节点
2PERSISTENT_SEQUENTIAL持久有序节点(顺序节点)
3EPHEMERAL短暂节点 (临时节点)
4EPHEMERAL_SEQUENTIAL短暂有序节点 (临时顺序节点)

创建znode时设置顺序标识,znode名称后会附加一个值,顺序号是一个单调递增的计数器,有父节点维护在分布式系统中,顺序号可以被用于为所有的事件进行全局排序,这样客户端可以通过顺序号推断事件的顺序

常用命令
Zookeeper作为Dubbo的注册中心用来保存我们各个服务的节点信息,显示Zookeeper是可以实现输出的存储操作的,我们来看下Zookeeper中存储操作的基本命令

ls
​ ls用来查看某个节点下的子节点信息
在这里插入图片描述
增强的命令,查看节点下的子节点及当前节点的属性信息 ls2或者 ls -s 命令
在这里插入图片描述
create
​ 创建节点信息
在这里插入图片描述
get

​ get命令用来查看节点的数据
在这里插入图片描述
如果要查看节点的属性信息那么我们可以通过get -s 来实现

delete
​ delete只能删除没有子节点的节点要删除非空节点可以通过 rmr 或者 deleteall 命令实现
在这里插入图片描述
set

​ set命令可以用来修改节点的内容。
在这里插入图片描述
事件监听
监听某个节点的数据内容变化,通过get命令 带 -w 参数即可,在3.4版本的Zookeeper中是通过 get path watch 来说实现监控的
在这里插入图片描述
然后我们在其他节点上修改app1节点的数据,会触监听事件

Zookeeper Java API使用

pom

	<dependencies><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.5.9</version></dependency><dependency><groupId>com.github.sgroschupf</groupId><artifactId>zkclient</artifactId><version>0.1</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency></dependencies>

连接ZK服务,并监听节点变化

@Slf4j
public class ConfigCenter {private final static  String CONNECT_STR="192.168.40.243:2181";private final static Integer  SESSION_TIMEOUT=30*1000;private static ZooKeeper zooKeeper=null;private static CountDownLatch countDownLatch=new CountDownLatch(1);public static void main(String[] args) throws IOException, InterruptedException, KeeperException {zooKeeper=new ZooKeeper(CONNECT_STR, SESSION_TIMEOUT, new Watcher() {@Overridepublic void process(WatchedEvent event) {if (event.getType()== Event.EventType.None&& event.getState() == Event.KeeperState.SyncConnected){log.info("连接已建立");countDownLatch.countDown();}}});countDownLatch.await();MyConfig myConfig = new MyConfig();myConfig.setKey("anykey");myConfig.setName("anyName");ObjectMapper objectMapper=new ObjectMapper();byte[] bytes = objectMapper.writeValueAsBytes(myConfig);String s = zooKeeper.create("/myconfig", bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);Watcher watcher = new Watcher() {@SneakyThrows@Overridepublic void process(WatchedEvent event) {if (event.getType()== Event.EventType.NodeDataChanged&& event.getPath()!=null && event.getPath().equals("/myconfig")){log.info(" PATH:{}  发生了数据变化" ,event.getPath());//循环监听//监听结束后,重新设置byte[] data = zooKeeper.getData("/myconfig", this, null);MyConfig newConfig = objectMapper.readValue(new String(data), MyConfig.class);log.info("数据发生变化: {}",newConfig);}}};byte[] data = zooKeeper.getData("/myconfig", watcher, null);MyConfig originalMyConfig = objectMapper.readValue(new String(data), MyConfig.class);log.info("原始数据: {}", originalMyConfig);//        TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);}

在这里插入图片描述
standalone版本

@Slf4j
public abstract class  StandaloneBase {private static final String CONNECT_STR="192.168.109.200:2181";private static final int SESSION_TIMEOUT=30 * 1000;private static ZooKeeper zooKeeper =null;private static CountDownLatch countDownLatch = new CountDownLatch(1);private Watcher watcher =new Watcher() {@Overridepublic void process(WatchedEvent event) {if (event.getState() == Event.KeeperState.SyncConnected&& event.getType()== Event.EventType.None){countDownLatch.countDown();log.info("连接建立");}}};@Beforepublic void init(){try {log.info(" start to connect to zookeeper server: {}",getConnectStr());zooKeeper=new ZooKeeper(getConnectStr(), getSessionTimeout(), watcher);log.info(" 连接中...");countDownLatch.await();} catch (IOException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}public static ZooKeeper getZooKeeper() {return zooKeeper;}@Afterpublic void test(){try {TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);} catch (InterruptedException e) {e.printStackTrace();}}protected String getConnectStr(){return CONNECT_STR;}protected int getSessionTimeout() {return SESSION_TIMEOUT;}
}
private String first_node = "/first-node";/*** 创建节点*/@Testpublic void testCreate() throws KeeperException, InterruptedException {ZooKeeper zooKeeper = getZooKeeper();String s = zooKeeper.create(first_node, "first".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);log.info("Create:{}",s);}/*** 获得节点数据*/@Testpublic void testGetData(){Watcher watcher = new Watcher() {@Overridepublic void process(WatchedEvent event) {if (event.getPath()!=null && event.getPath().equals(first_node)&& event.getType()== Event.EventType.NodeDataChanged){log.info(" PATH: {}  发现变化",first_node);try {byte[] data = getZooKeeper().getData(first_node, this, null);log.info(" data: {}",new String(data));} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}}};try {byte[] data = getZooKeeper().getData(first_node, watcher, null);  //log.info(" data: {}",new String(data));} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}/*** 异步获取节点数据*/@Testpublic void  asyncTest(){String userId="xxx";getZooKeeper().getData("/test", false, (rc, path, ctx, data, stat) -> {Thread thread = Thread.currentThread();log.info(" Thread Name: {},   rc:{}, path:{}, ctx:{}, data:{}, stat:{}",thread.getName(),rc, path, ctx, data, stat);},"test");log.info(" over .");}/*** 判断节点是否存在*/@Testpublic void exist() throws  Exception{ZooKeeper zooKeeper = getZooKeeper();// true表示的是使用Zookeeper中的watchStat stat = zooKeeper.exists(first_node, true);if(stat != null){System.out.println("节点存在"+ stat.getNumChildren());}else{System.out.println("节点不存在 ....");}}/*** 获取某个节点下面的所有的子节点*/@Testpublic void getChildrens() throws Exception{ZooKeeper zooKeeper = getZooKeeper();List<String> childrens = zooKeeper.getChildren(first_node, true);for (String children : childrens) {// System.out.println(children);// 获取子节点中的数据byte[] data = zooKeeper.getData(first_node+"/" + children, false, null);System.out.println(children+":" + new String(data));}}/*** 修改节点的内容*/@Testpublic void setData() throws Exception{// -1 不指定版本 自动维护Stat stat = zooKeeper.setData(first_node+"/a1", "666666".getBytes(), -1);System.out.println(stat);//  指定版本 自动维护//ZooKeeper zooKeeper = getZooKeeper();//Stat stat = new Stat();//byte[] data = zooKeeper.getData(first_node, false, stat);//int version = stat.getVersion();//版本号//zooKeeper.setData(first_node, "third".getBytes(), version);}/*** 删除节点*/@Testpublic void deleteNode() throws Exception{zooKeeper.delete(first_node,-1);}

事件监听处理

/*** 监听Node节点下的子节点的变化*/@Testpublic void nodeChildrenChange() throws Exception{List<String> list = zooKeeper.getChildren("/app1", new Watcher() {/***              None(-1),*             NodeCreated(1),*             NodeDeleted(2),*             NodeDataChanged(3),*             NodeChildrenChanged(4),*             DataWatchRemoved(5),*             ChildWatchRemoved(6);* @param watchedEvent*/@Overridepublic void process(WatchedEvent watchedEvent) {System.out.println("--->"+ watchedEvent.getType());}});for (String s : list) {System.out.println(s);}Thread.sleep(Integer.MAX_VALUE);}/*** 监听节点内容变更*/@Testpublic void nodeDataChanged() throws Exception{byte[] data = zooKeeper.getData("/app1/a1", new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {System.out.println("--->" + watchedEvent.getType());}}, null);System.out.println("--->" + new String(data));Thread.sleep(Integer.MAX_VALUE);}

Curator

Curator 是一套由netflix 公司开源的,Java 语言编程的 ZooKeeper 客户端框架,Curator项目
是现在ZooKeeper 客户端中使用最多,对ZooKeeper 版本支持最好的第三方客户端,并推荐使
用,Curator 把我们平时常用的很多 ZooKeeper 服务开发功能做了封装,例如 Leader 选举、
分布式计数器、分布式锁。这就减少了技术人员在使用 ZooKeeper 时的大部分底层细节开发工
作。在会话重新连接、Watch 反复注册、多种异常处理等使用场景中,用原生的 ZooKeeper
处理比较复杂。而在使用 Curator 时,由于其对这些功能都做了高度的封装,使用起来更加简
单,不但减少了开发时间,而且增强了程序的可靠性。

yaml

	<dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>5.0.0</version><exclusions><exclusion><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-x-discovery</artifactId><version>5.0.0</version><exclusions><exclusion><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.5.8</version></dependency>

父类

@Slf4j
public abstract  class CuratorStandaloneBase {private static final String CONNECT_STR = "192.168.109.200:2181";private static final int sessionTimeoutMs = 60*1000;private static final int connectionTimeoutMs = 5000;private static CuratorFramework curatorFramework;@Beforepublic void init() {RetryPolicy retryPolicy = new ExponentialBackoffRetry(5000, 30);curatorFramework = CuratorFrameworkFactory.builder().connectString(getConnectStr()).retryPolicy(retryPolicy).sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).canBeReadOnly(true).build();curatorFramework.getConnectionStateListenable().addListener((client, newState) -> {if (newState == ConnectionState.CONNECTED) {log.info("连接成功!");}});log.info("连接中......");curatorFramework.start();}public void createIfNeed(String path) throws Exception {Stat stat = curatorFramework.checkExists().forPath(path);if (stat==null){String s = curatorFramework.create().forPath(path);log.info("path {} created! ",s);}}public static CuratorFramework getCuratorFramework() {return curatorFramework;}/* @Afterpublic void   test(){try {TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);} catch (InterruptedException e) {e.printStackTrace();}}*/protected   String getConnectStr(){return CONNECT_STR;}
}
// 递归创建子节点@Testpublic void testCreateWithParent() throws Exception {CuratorFramework curatorFramework = getCuratorFramework();String pathWithParent = "/node-parent/sub-node-1";String path = curatorFramework.create().creatingParentsIfNeeded().forPath(pathWithParent);log.info("curator create node :{}  successfully.", path);}// protection 模式,防止由于异常原因,导致僵尸节点@Testpublic void testCreate() throws Exception {CuratorFramework curatorFramework = getCuratorFramework();String forPath = curatorFramework.create().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/curator-node", "some-data".getBytes());log.info("curator create node :{}  successfully.", forPath);}@Testpublic void testGetData() throws Exception {CuratorFramework curatorFramework = getCuratorFramework();byte[] bytes = curatorFramework.getData().forPath("/curator-node");log.info("get data from  node :{}  successfully.", new String(bytes));}@Testpublic void testSetData() throws Exception {CuratorFramework curatorFramework = getCuratorFramework();curatorFramework.setData().forPath("/curator-node", "changed!".getBytes());byte[] bytes = curatorFramework.getData().forPath("/curator-node");log.info("get data from  node /curator-node :{}  successfully.", new String(bytes));}@Testpublic void testDelete() throws Exception {CuratorFramework curatorFramework = getCuratorFramework();String pathWithParent = "/node-parent";curatorFramework.delete().guaranteed().deletingChildrenIfNeeded().forPath(pathWithParent);}@Testpublic void testListChildren() throws Exception {CuratorFramework curatorFramework = getCuratorFramework();String pathWithParent = "/discovery/example";List<String> strings = curatorFramework.getChildren().forPath(pathWithParent);strings.forEach(System.out::println);}//线程池方式@Testpublic void testThreadPool() throws Exception {CuratorFramework curatorFramework = getCuratorFramework();ExecutorService executorService = Executors.newSingleThreadExecutor();String ZK_NODE="/zk-node";curatorFramework.getData().inBackground((client, event) -> {log.info(" background: {}", event);},executorService).forPath(ZK_NODE);}

Watch机制

ZooKeeper的Watch机制是它的一个重要特性,它允许客户端在ZooKeeper节点发生变化时得到通知。通过Watch机制,客户端可以设置对指定节点的监视,并在节点发生变化(数据更新、节点删除、子节点变化等)时,ZooKeeper会通知相关的客户端。(相当与redis分布式锁中的watchDog机制,是分布式锁中十分重要的机制,用于监听比自己小的节点

Watch机制的主要特点包括:

1.一次性触发:一旦Watch被设置在一个节点上,并且该节点发生了监视的事件(例如数据更新),那么Watch就会触发通知。一次触发后,Watch就会失效,客户端需要重新设置Watch以继续监视节点。

2.轻量级通知:ZooKeeper的Watch机制是轻量级的,因为Watch只是一个通知,不包含实际数据。客户端收到Watch通知后,可以根据需要再次向ZooKeeper请求节点的最新数据。

3.顺序性:Watch通知是有序的。也就是说,如果多个Watch被设置在一个节点上,当该节点发生变化时,通知的顺序是确定的,保证了客户端可以按照特定的顺序处理通知。

4.一次性回调:客户端收到Watch通知后,需要进行一次性回调来处理事件。在处理通知的过程中,如果发生了新的变化,之前的Watch不会再次触发。

需要注意的是,Watch机制并不保证强一致性。由于网络延迟或其他因素,Watch通知可能有一定的延迟,客户端可能会收到旧的数据变更通知。因此,客户端在处理Watch通知时,需要谨慎处理,并考虑可能出现的数据不一致情况。

Watch机制是ZooKeeper实现分布式协作的关键机制之一。通过Watch,客户端可以实时获取ZooKeeper节点的变化,从而在分布式系统中做出相应的处理,实现高效的协作和协调。

ZK分布式锁

创建节点,判断顺序号是否是最小的
在这里插入图片描述
在这里插入图片描述

创建永久节点,在普通节点下创建临时顺序节点,节点之间按顺序依次监听(通过watch机制),当拿到锁的节点处理完事务后,释放锁,后一个节点监听到前一个节点释放锁后,立刻申请获得锁,以此类推
过程解析

  • 第一部分:客户端在Zookeeper集群创建临时顺序节点
  • 第二部分:判断节点是否是当前最小的节点,如果是,获取锁,反之,监听前一个节点
    原生方式实现Zookeeper的分布式锁
/*** 分布式锁 Zookeeper原生API*/
public class DistributedLock {private String connectString = "192.168.58.100:2181";private ZooKeeper client;private CountDownLatch countDownLatch = new CountDownLatch(1);private CountDownLatch waitLatch = new CountDownLatch(1);//当前节点private String currentNode;//要等待的节点private String waitPath;//1.连接Zookeeperpublic DistributedLock() throws Exception {client = new ZooKeeper(connectString, 300000, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {//连接上zk  释放if(watchedEvent.getState() == Event.KeeperState.SyncConnected){countDownLatch.countDown();}// waitLatch 需要释放,节点被删除,并且是前一个节点if(watchedEvent.getType() == Event.EventType.NodeDeleted &&watchedEvent.getPath().equals(waitPath)){waitLatch.countDown();}}});//zk 连接成功,再往下走countDownLatch.await();//2.判断节点是否存在Stat stat = client.exists("/locks", false);if(stat == null){//创建根节点client.create("/locks","locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}}//3.加锁public void zkLock() throws KeeperException, InterruptedException {//创建临时顺序节点currentNode = client.create("/locks/" + "seq-" ,null,ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);//判断创建的节点是否是最小序号节点,如果是 就获取锁,不是监听前一个节点List<String> children = client.getChildren("/locks", false);//如果集合中只有一个元素,可以直接获取锁if(children.size() == 1){return;}else{//先排序Collections.sort(children);//获取节点名称String nodeName = currentNode.substring("/locks/".length());//获取节点名称 在集合的位置int index = children.indexOf(nodeName);if(index == -1){System.out.println("数据异常");}else if(index == 0){return;}else{//需要监听前一个节点的变化waitPath = "/locks/" + children.get(index - 1);client.getData(waitPath,true,null);//等待监听执行waitLatch.await();return;}}}//解锁public void unZkLock() throws KeeperException, InterruptedException {//删除节点client.delete(currentNode,-1);}
}

Nginx代理转发
在这里插入图片描述

Curator框架实现分布式锁

实现思路
image.png

InterProcessMutex介绍

Apache Curator 内置了分布式锁的实现: InterProcessMutex

  • InterProcessMutex有两个构造方法
public InterProcessMutex(CuratorFramework client, String path){this(client, path, new StandardLockInternalsDriver());
}public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver){this(client, path, LOCK_NAME, 1, driver);
}
  • 参数说明如下
参数说明
clientcurator中zk客户端对象
path抢锁路径,同一个锁path需一致
driver可自定义lock驱动实现分布式锁
  • 主要方法
//获取锁,若失败则阻塞等待直到成功,支持重入
public void acquire() throws Exception//超时获取锁,超时失败
public boolean acquire(long time, TimeUnit unit) throws Exception//释放锁
public void release() throws Exception
  • 注意点,调用acquire()方法后需相应调用release()来释放锁

代码实现
配置类

@Configuration
public class CuratorCfg {//Curator初始化@Bean(initMethod = "start")public CuratorFramework curatorFramework(){RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.40.240:2181", retryPolicy);//CuratorFramework build = CuratorFrameworkFactory.builder()
//                .connectString("192.168.40.240:2181")
//                .sessionTimeoutMs(60*1000)
//                .connectionTimeoutMs(60*1000)
//                .retryPolicy(new ExponentialBackoffRetry(1000,3))
//                .build();return client;}
}

业务代码

@RestController
public class TestController {@Autowiredprivate OrderService orderService;@Value("${server.port}")private String port;public static final String product = "/product_";@AutowiredCuratorFramework curatorFramework;@PostMapping("/stock/deduct")public Object reduceStock(Integer id) throws Exception {InterProcessMutex interProcessMutex = new InterProcessMutex(curatorFramework, product + id);//互斥锁try {// ...interProcessMutex.acquire();//加锁orderService.reduceStock(id);} catch (Exception e) {if (e instanceof RuntimeException) {throw e;}}finally {interProcessMutex.release();//解锁}return "ok:" + port;}
}

全局异常处理类

@ControllerAdvice
public class ExceptionHandlerController {@ExceptionHandler@ResponseStatus(value = HttpStatus.BAD_REQUEST)@ResponseBodypublic Object exceptionHandler(RuntimeException e){Map<String,Object> result=new HashMap<>(  );result.put( "status","error" );result.put( "message",e.getMessage() );return result;}
}

ZK注册中心

项目较小的话会考虑使用ZK做注册中心,原理:使用临时节点

spring.application.name=product-center1
#zookeeper 连接地址
spring.cloud.zookeeper.connect-string=192.168.40.240:2181
#将本服务注册到zookeeper,如果不希望直接被发现可以配置为false,默认为true
spring.cloud.zookeeper.discovery.register=true
spring.cloud.zookeeper.session-timeout=30000

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/78802.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JVM问题

1. jvm运行时区域划分及每个区域的作用 堆、方法区&#xff08;元空间&#xff09;、虚拟机栈、本地方法栈、程序计数器 2. 堆内存分配策略&#xff1a;新生代&#xff0c;老年代&#xff0c;gc时机 • 对象优先分配在Eden区&#xff0c;如果Eden区没有足够的空间进行分配时&am…

三种方式创建对象的几种方式及new实例化时做了什么?

创建对象的几种方式 利用对象字面量创建对象 const obj {}2.利用 new Object创建对象 const obj new Object()3.使用 构造函数实例化对象 function Fn(name) {this.name name} const obj new Fn(张三) console.log(obj.name); //张三为什么要用构造函数的形式&#xff1…

STM32--综述

文章目录 前言STM32简介STM32F103C8T6系统结构Keil软件安装注意事项新建工程操作流程 前言 本专栏将学习B站江协科技的STM32入门教程&#xff0c;通过自身理解和对老师的总结所写的博客专栏。 STM32简介 STM32是意法半导体&#xff08;STMicroelectronics&#xff09;公司推…

实施高级存储功能

实施高级存储功能 使用Stratis管理分层存储 Stratis 适用于Linux的本地存储管理解决方案。旨在提供更便利的方式执行存储的初始配置&#xff0c;对存储配置进行修改&#xff0c;并使用高级存储功能。 Stratis以管理物理存储设备池的服务形式运行&#xff0c;并透明地为新创…

Popconfirm气泡确认框(antd-design组件库)简单使用

1.Popconfirm气泡确认框 点击元素&#xff0c;弹出气泡式的确认框。 2.何时使用 目标元素的操作需要用户进一步的确认时&#xff0c;在目标元素附近弹出浮层提示&#xff0c;询问用户。 和 confirm 弹出的全屏居中模态对话框相比&#xff0c;交互形式更轻量。 组件代码来自&…

kubernetes 集群搭建(二进制方式)

使用二进制方式搭建Kubernetes集群&#xff0c;可以更加灵活、自由地定制和配置Kubernetes。同时&#xff0c;它还可以实现更高的性能和更小的资源占用。 对于我这个初学者来说&#xff1a; 更加直观地看到Kubernetes的各个组件&#xff0c;了解它们之间的关系和作用。在搭建…

火车头标题伪原创【php源码】

大家好&#xff0c;给大家分享一下python怎么读取文件中的数据&#xff0c;很多人还不知道这一点。下面详细解释一下。现在让我们来看看&#xff01; 火车头采集ai伪原创插件截图&#xff1a; python是一门非常火爆且相对易学的编程语言&#xff0c;应用在各种场景。许多人想学…

GO学习之 网络通信(Net/Http)

GO系列 1、GO学习之Hello World 2、GO学习之入门语法 3、GO学习之切片操作 4、GO学习之 Map 操作 5、GO学习之 结构体 操作 6、GO学习之 通道(Channel) 7、GO学习之 多线程(goroutine) 8、GO学习之 函数(Function) 9、GO学习之 接口(Interface) 10、 文章目录 GO系列前言一、H…

微信小程序tab加列表demo

一、效果 代码复制即可使用&#xff0c;记得把图标替换成个人工程项目图片。 微信小程序开发经常会遇到各种各样的页面组合&#xff0c;本demo为list列表与tab组合&#xff0c;代码如下&#xff1a; 二、json代码 {"usingComponents": {},"navigationStyle&q…

在java中操作redis_Data

1.引入依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency> 2.配置Redis数据源 redis:host: ${sky.redis.host}port: ${sky.redis.port}password: ${sk…

基于Windows手动编译openssl和直接安装openssl

零、环境 win10-64位 VS2019 一、手动编译 前言&#xff1a;对于一般的开发人员而言&#xff0c;在 openssl 上下载已经编译好的 openssl 库&#xff0c;然后直接拿去用即可&#xff0c;&#xff0c;不用手动编译&#xff0c;{见下文直接安装}。。。对于一些开发人员&#…

【C#学习笔记】装箱和拆箱

文章目录 装箱和拆箱性能消耗装箱拆箱 比较var&#xff0c;object&#xff0c;dynamic&#xff0c;\<T\>varobject\<T\> 泛型dynamic 装箱和拆箱 在讲引用类型object的时候&#xff0c;我们说它是万能的&#xff0c;却没说它万能在哪里。 除了object为每一种变量…

Huggingface使用

文章目录 前置安装Huggingface介绍NLP模块分类transformer流程模块使用详细讲解tokennizermodeldatasetsTrainer Huggingface使用网页直接体验API调用本地调用(pipline)本地调用&#xff08;非pipline&#xff09; 前置安装 anaconda安装 使用conda创建一个新环境并安装pytorc…

针对高可靠性和高性能优化的1200V碳化硅沟道MOSFET

目录 标题&#xff1a;1200V SiC Trench-MOSFET Optimized for High Reliability and High Performance摘要信息解释研究了什么文章创新点文章的研究方法文章的结论 标题&#xff1a;1200V SiC Trench-MOSFET Optimized for High Reliability and High Performance 摘要 本文详…

fishing之第二篇Gophish钓鱼平台搭建

文章目录 一、Gophish介绍二、Gophish部署三、Gophish配置0x01 功能介绍0x02 Sending Profiles(钓鱼邮箱发送配置)0x03 Email Templates(钓鱼邮件模板)0x04 Landing Pages(伪造钓鱼页面)0x05 Users & Groups(用户和组)0x06 Campaigns(钓鱼测试)0x07 Dashboard(仪…

ESP32-C2开发板 ESP8684芯片 兼容ESP32-C3开发

C2是一个芯片采用4毫米x 4毫米封装&#xff0c;与272 kB内存。它运行框架&#xff0c;例如ESP-Jumpstart和ESP造雨者&#xff0c;同时它也运行ESP-IDF。ESP-IDF是Espressif面向嵌入式物联网设备的开源实时操作系统&#xff0c;受到了全球用户的信赖。它由支持Espressif以及所有…

Markdown系列之Flowchat流程图

一.欢迎来到我的酒馆 介绍Markdown的Flowchart流程图语法。 目录 一.欢迎来到我的酒馆二.什么是Flowchart三.更进一步 二.什么是Flowchart 2.1 Flowchart是一款基于javascript的工具&#xff0c;使用它可以用代码创建简单的流程图。具体信息可以查看flowchart官网&#xff1a;…

百度秋招攻略,百度网申笔试面试详解

百度秋招简介 作为行业巨头&#xff0c;百度向社会提供的岗位一直都是非常吃香的&#xff0c;每年也都有很多考生密切关注&#xff0c;百度发布的招聘广告&#xff0c;以尽可能的让自己进入这家企业工作&#xff0c;实现自己的人生价值。那么百度每年的秋招时间是多久&#xf…

【JavaSE】面向对象编程思想之多态(图文详解)

目录 1. 多态的概念 2. 多态实现条件 3. 重写 4. 向上转型和向下转型 4.1 向上转型 4.2 向下转型 5. 多态的优缺点 6. 避免在构造方法中调用重写的方法 1. 多态的概念 多态的概念&#xff1a;通俗来说&#xff0c;就是多种形态&#xff0c;具体点就是去完成某个行为&a…

Linux学习笔记

Linux学习笔记 目录 一&#xff0e; 操作系统的发展历史与linux二&#xff0e; 安装VMWare三&#xff0e; 安装和配置CentOS 7四&#xff0e; Linux操作系统目录结构五&#xff0e; Linux命令 一&#xff0e; 操作系统的发展历史与linux 概述   操作系统产生与发展经历了人工…