Zookeeper的监听机制及原理解析

系列文章目录

手把手教你安装Zookeeper 及可视化插件ZooInspector、ZKUI
Zookeeper入门篇,了解ZK存储特点

使用Zookeeper的监听及原理解析

  • 系列文章目录
  • 前言
  • 一、监听机制的基本概念
  • 二、Zookeeper监听原理
    • 1. 事件类型
    • 2. 监听模式与监听器类型
      • (1)监听模式
      • (2)监听器类型
    • 3. 监听原理
      • (1)基础概念
      • (2)监听触发处理
  • 三、Zookeeper监听的使用Demo


前言

在这里插入图片描述

ZK在现在之所以能非常好用,它便捷的监听功能是很重要的,本次我们就以监听为题,分析一下ZK的监听是怎么设计和管理的,并在文末写了个demo验证我们的所学

📕作者简介:战斧,从事金融IT行业,有着多年一线开发、架构经验;爱好广泛,乐于分享,致力于创作更多高质量内容
📗本文收录于 Zookeeper 专栏,有需要者,可直接订阅专栏实时获取更新
📘高质量专栏 云原生、RabbitMQ、Spring全家桶 等仍在更新,欢迎指导
📙 mysql Redis dubbo docker netty等诸多框架,以及架构与分布式专题即将上线,敬请期待


一、监听机制的基本概念

其实对于监听,我们并不陌生,我们曽在 Spring专栏 中提到过 《Spring监听器用法与原理详解》,其主要是基于观察者模式,如下图就是一个经典的观察者模型

在这里插入图片描述

Zookeeper的监听机制其实也是基于观察者模式,这种模式允许客户端在数据节点发生变化时得到通知。

而且最通俗的解释就是,当我想监听某个主题的变动时,就会向该主题登记一个观察者。最后当主题真的触发时,就遍历观察者列表,向每个观察者通知该事件。

二、Zookeeper监听原理

1. 事件类型

不管什么监听器,肯定都有自己想监听的内容,也即监听事件。只有当我想看的事件被触发时,才会让我的监听器有所反应。而ZK则提供了以下几种事件类型

  • NodeCreated:节点创建
  • NodeDeleted:节点被删除
  • NodeDataChanged:节点数据变更
  • NodeChildrenChanged:子节点变更
  • DataWatchRemoved:数据监听器被移除
  • ChildWatchRemoved:子节点监听器被移除
  • PersistentWatchRemoved 永久化监听器被移除

需要注意的是,事件操作并不是一回事。比如我们新增一个节点。它其实会触发当前节点的节点创建 和其父节点的子节点变更 两个事件。

同样,我们也不难发现,前4个事件是针对节点进行变更的事件,也是我们最常用的。而后面3种其实是监听器移除事件

2. 监听模式与监听器类型

(1)监听模式

明白了事件,我们再来看一下针对这些事件,我们能用怎样的方式来监听,也即监听模式

  • STANDARD 标准监听
  • PERSISTENT 永久监听
  • PERSISTENT_RECURSIVE 永久递归监听

所谓标准监听,其实就是某个节点的监听器一旦被触发了,这个监听器就会被删除,也就是所谓“一次性”监听。
永久监听就是永久存在,不会被删,可以一直触发。而永久递归监听则代表这个监听器不仅可以监听这个节点的事件,还能监听到该节点的所有子节点的事件,而且可以永久存在。

需要注意的是,监听模式可以叠加出不同的监听状态,比如说一个永久递归的监听器,可以再给他加一个标准监听,此时如果再删除永久递归监听器,那么还能够剩下一个标准监听器在工作,具体原理在源码的 WatchStats 部分

public final class WatchStats {private static final WatchStats[] WATCH_STATS = new WatchStats[] {new WatchStats(0), // NONEnew WatchStats(1), // STANDARDnew WatchStats(2), // PERSISTENTnew WatchStats(3), // STANDARD + PERSISTENTnew WatchStats(4), // PERSISTENT_RECURSIVEnew WatchStats(5), // STANDARD + PERSISTENT_RECURSIVEnew WatchStats(6), // PERSISTENT + PERSISTENT_RECURSIVEnew WatchStats(7), // STANDARD + PERSISTENT + PERSISTENT_RECURSIVE};/*** Stats that have no watchers attached.** <p>This could be used as start point to compute new stats using {@link #addMode(WatcherMode)}.*/public static final WatchStats NONE = WATCH_STATS[0];private final int flags;private WatchStats(int flags) {this.flags = flags;}private static int modeToFlag(WatcherMode mode) {return 1 << mode.ordinal();}/*** Compute stats after given mode attached to node.** @param mode watcher mode* @return a new stats if given mode is not attached to this node before, otherwise old stats*/public WatchStats addMode(WatcherMode mode) {int flags = this.flags | modeToFlag(mode);return WATCH_STATS[flags];}public WatchStats removeMode(WatcherMode mode) {int mask = ~modeToFlag(mode);int flags = this.flags & mask;if (flags == 0) {return NONE;}return WATCH_STATS[flags];}/*** Check whether given mode is attached to this node.** @param mode watcher mode* @return true if given mode is attached to this node.*/public boolean hasMode(WatcherMode mode) {int flags = modeToFlag(mode);return (this.flags & flags) != 0;}
}

(2)监听器类型

需要注意的是,知道了所有的事件类型,以及能选择的监听的模式。其实监听器怎么弄,完全取决于你,理论上能做出 m * n 种类型,但ZK在源码中其实做了归纳,只提供了五种类型

  • Children: 子节点监听器
  • Data: 数据监听器
  • Persistent: 永久监听器
  • PersistentRecursive:永久递归监听器
  • Any 所有监听器

其中 ChildrenDataAny 是最开始提供的,也是非常容易理解,因为对事件我们也能归纳为 子节点事件数据事件,所以监听器归纳成 子节点监听器数据监听器 很合理。而PersistentPersistentRecursive 监听器则是在在后续加上的。主要是因为只归纳成这几类的话,如果想要单独删除永久化的监听器就没法做了。加入了这样的枚举后,就能指定更具体的监听器类型进行删除了。

在这里插入图片描述

3. 监听原理

(1)基础概念

知晓了事件类型 与 监听器类型 后,我们再来讲讲监听原理,其实监听整理起来主要就是两个结构和三个步骤。

因为节点和监听器是多对多的关系,一个节点能被多个监听器监听,一个监听器也能监听多个节点。所以两个结构就分别从节点、监听器的角度来对监听关系进行归纳,在源码中就是两个 HashMap:watchTablewatch2Paths

// key 为某个节点的具体路径, value 为该节点的所有监听器集合
private final Map<String, Set<Watcher>> watchTable = new HashMap<>();// key 为某个监听器,value 值为该监听器在不同路径下的监听状态
private final Map<Watcher, Map<String, WatchStats>> watch2Paths = new HashMap<>();

而三个步骤其实也很简单:

  1. 客户端注册监听器: 客户端通过创建一个监听器(Watcher)并将其注册到Zookeeper服务器上的指定节点上。
    在这里插入图片描述

  2. 节点变更通知: 当节点发生变化时(如节点数据被修改、节点被创建或删除等),Zookeeper服务器会将变更通知发送给所有对该节点注册了监听器的客户端。同时处理该监听器,如下图,标准监听器watch 1被触发后会在该节点上被删除,而永久监听器watch还能继续留存。

在这里插入图片描述

  1. 客户端处理节点变更: 客户端在收到节点变更通知后,会调用注册的监听器进行处理。客户端可以根据具体的业务需求,对节点变更进行相应的处理逻辑,如重新读取节点数据、重新注册监听器等。Watch 接口如下:

在这里插入图片描述

不难看出,Zookeeper监听机制的核心是Watcher接口通知机制。从整个流程来说,我们可以细分为3个步骤:

Watcher接口:Watcher接口是Zookeeper提供的一个回调接口,在客户端注册监听器时需要实现该接口。该接口中只有一个process方法,当节点发生变化时,Zookeeper会调用该方法通知客户端。

通知机制:Zookeeper的通知机制是基于事件触发的。当注册了Watcher的节点发生变化时,Zookeeper会生成一个事件,并将该事件放入事件队列中。客户端线程会从事件队列中获取事件并进行处理。

(2)监听触发处理

我们看一下,当某个节点发生变动后,它是怎么找到该节点的监听器,并触发它的。我们直接看源码并配上注释

    // WatchManager.java
/*** 触发watch事件** @param path    节点路径* @param type    事件类型* @param zxid    事务ID* @param acl     节点的ACL列表* @param supress 指定不触发的Watcher* @return 返回触发事件的Watcher或者BitSet*/
public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid, List<ACL> acl, WatcherOrBitSet supress) {// 创建WatchedEvent对象WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path, zxid);// 创建Watcher集合Set<Watcher> watchers = new HashSet<>();synchronized (this) {// 遍历节点路径的父路径,因为父路径上可能有递归的监听器,也需要监听到此事件PathParentIterator pathParentIterator = getPathParentIterator(path);for (String localPath : pathParentIterator.asIterable()) {// 获取对应路径的Watcher集合Set<Watcher> thisWatchers = watchTable.get(localPath);// 如果Watcher集合为空,直接跳过if (thisWatchers == null || thisWatchers.isEmpty()) {continue;}// 遍历Watcher集合Iterator<Watcher> iterator = thisWatchers.iterator();while (iterator.hasNext()) {Watcher watcher = iterator.next();// 获取Watcher对应的路径映射Map<String, WatchStats> paths = watch2Paths.getOrDefault(watcher, Collections.emptyMap());// 获取Watcher对应路径的状态WatchStats stats = paths.get(localPath);// 如果状态为空,输出警告日志并跳过if (stats == null) {LOG.warn("inconsistent watch table for watcher {}, {} not in path list", watcher, localPath);continue;}// 如果不是在父路径上,则添加Watcherif (!pathParentIterator.atParentPath()) {watchers.add(watcher);// 【【【重要】】】:移除STANDARD模式的状态,而不是触发就删除,可见 2(1)的监听模式WatchStats newStats = stats.removeMode(WatcherMode.STANDARD);// 如果新的状态为空,则移除Watcher和路径映射if (newStats == WatchStats.NONE) {iterator.remove();paths.remove(localPath);} else if (newStats != stats) {paths.put(localPath, newStats);}} else if (stats.hasMode(WatcherMode.PERSISTENT_RECURSIVE)) {// 父路径当前只会有永久递归监听器能响应子节点事件,且响应完不会删除监听器,所以直接把该监听器触发,不用做其他操作watchers.add(watcher);}}// 如果Watcher集合为空,从watchTable中移除该路径if (thisWatchers.isEmpty()) {watchTable.remove(localPath);}}}// 如果Watcher集合为空,返回nullif (watchers.isEmpty()) {if (LOG.isTraceEnabled()) {ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK, "No watchers for " + path);}return null;}// 触发Watcher的事件处理方法for (Watcher w : watchers) {if (supress != null && supress.contains(w)) {continue;}if (w instanceof ServerWatcher) {((ServerWatcher) w).process(e, acl);} else {w.process(e);}}// 根据事件类型更新服务器指标switch (type) {case NodeCreated:ServerMetrics.getMetrics().NODE_CREATED_WATCHER.add(watchers.size());break;case NodeDeleted:ServerMetrics.getMetrics().NODE_DELETED_WATCHER.add(watchers.size());break;case NodeDataChanged:ServerMetrics.getMetrics().NODE_CHANGED_WATCHER.add(watchers.size());break;case NodeChildrenChanged:ServerMetrics.getMetrics().NODE_CHILDREN_WATCHER.add(watchers.size());break;default:// Other types not logged.break;}// 返回Watcher或BitSetreturn new WatcherOrBitSet(watchers);
}

概括一下上述代码的逻辑:

    1. 首先,根据提供的节点路径,遍历该节点的所有父路径。
    1. 对于本路径及其每个父路径,获取与之关联的Watcher集合。如果Watcher集合为空,直接跳过。
    1. 逐个遍历Watcher集合中的Watcher对象,并根据其关联的路径状态来判断是否应该触发监听事件。
      – 如果当前不在父路径上,则添加Watcher到触发集合中,并更新该路径的状态。
      – 如果是父路径上的递归Persistent_Watcher模式,则添加Watcher到触发集合中。
    1. 如果触发集合中的Watcher为空,则结束。
    1. 逐个触发Watcher集合中的Watcher对象的事件处理方法,传入相应的参数。
    1. 根据事件的类型,更新服务器指标(例如,节点被创建的监听器数量)。
    1. 返回触发事件的Watcher集合或者BitSet对象

这里还有一点细节是比较有趣的,就是监听器虽然依附于节点,但并不意味着如果我们删除了节点,该节点的监听器就一定会消失。比如你在某节点设置了一个永久监听器,即使这个结点被删除了,它在 watchTablewatch2Paths 中也不会被删除,即节点与监听器的关联还在,所以当节点后续又被创建的时候,这个监听器仍然可以使用

三、Zookeeper监听的使用Demo

现在我们使用一个Demo来使用一下ZK的监听功能,我们打算使用一个永久监听器监听多个节点

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;import java.io.IOException;
import java.util.List;public class ZooKeeperListenerDemo implements Watcher {private static final String ZK_SERVER = "localhost:2181";private static final int SESSION_TIMEOUT = 3000;private static final String[] PATHS = {"/zhanfu", "/zhanfu2"}; // 需要监听的路径private ZooKeeper zooKeeper;public static void main(String[] args) {ZooKeeperListenerDemo listenerDemo = new ZooKeeperListenerDemo();listenerDemo.connectZooKeeper();listenerDemo.registerWatchers();// 测试监听器,程序会一直运行,直到被中断try {Thread.sleep(Long.MAX_VALUE);} catch (InterruptedException e) {e.printStackTrace();}listenerDemo.close();}// 连接ZooKeeper服务器public void connectZooKeeper() {try {zooKeeper = new ZooKeeper(ZK_SERVER, SESSION_TIMEOUT, this);System.out.println("Connected to ZooKeeper server: " + ZK_SERVER);} catch (IOException e) {e.printStackTrace();}}// 注册监听器public void registerWatchers() {for (String path : PATHS) {try {// AddWatchMode.PERSISTENT 即为永久监听器zooKeeper.addWatch(path, this, AddWatchMode.PERSISTENT);System.out.println("Registered watcher for path: " + path);} catch (KeeperException | InterruptedException e) {e.printStackTrace();}}}// 处理ZooKeeper事件public void process(WatchedEvent event) {if (event.getState() == Event.KeeperState.SyncConnected) {if (event.getType() == Event.EventType.None && event.getPath() == null) {System.out.println("Connected to ZooKeeper server");} else {// 一般监听器只会处理部分事件,这里就不做限制,只打印日志System.out.println("Event received: " + event.getType() + ", path: " + event.getPath());// 在此处理收到的事件}}}// 关闭ZooKeeper连接public void close() {try {zooKeeper.close();System.out.println("ZooKeeper connection closed.");} catch (InterruptedException e) {e.printStackTrace();}}
}

最后我们运行一下,并修改节点内容、删除再重新添加节点,可以看到该监听器为永久的,一直在生效

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/394480.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

健康管理系统

目录 第1章 系统概述 第2章 可行性研究 2.1 项目背景及意义 2.2 可行性研究 第3章 需求分析 3.1 功能性需求 3.2 非功能性需求 3.2.1 性能需求 第4章 总体设计 4.1 技术架构 4.2功能模块设计 第5章 详细设计 5.1 主页 5.2 写剧本杀 5.3 剧本杀分类管理 5.4 个人…

数组下标越界异常(ArrayIndexOutOfBoundsException)以及解决方案

在Java学习的初期&#xff0c;我们往往可能会遇到一些程序的错误提示&#xff0c;告诉我们&#xff0c;程序出现了某些不正常的情况&#xff0c;在这种情况发生时&#xff0c;我们一般称之为出现了异常。 我们目前有两类常见的错误&#xff1a; 一个是编译时异常 &#xff0c…

数据分析与应用:微信-情人节红包流向探索分析

目录 0 需求描述 1 红包发送方用户的基本信息缺失率有多高?(即有多少红包发送方用户无法在用户基本信息表中匹配? 2 哪一组红包金额的拒收率最高? 3、最受二线城市欢迎的红包金额为?(即发出次数最多) 4 北上广深 4 大城市中,哪座城市的男性用户发出的 520 红包比例…

三大口诀不一样的代码,小小的制表符和换行符玩的溜呀

# 小案例&#xff0c;打印输出加法口诀 for i in range(1,10):for j in range(1,10):if j>i:breakprint(f"{j}{i}{ji}".strip(),end\t)print() print(\n) for i in range(1,10):for j in range(1,10):if j>i:breakprint(f"{j}x{i}{j*i}",end\t)print…

计算机毕业设计选题推荐-房屋租赁系统-Java/Python项目实战

✨作者主页&#xff1a;IT研究室✨ 个人简介&#xff1a;曾从事计算机专业培训教学&#xff0c;擅长Java、Python、微信小程序、Golang、安卓Android等项目实战。接项目定制开发、代码讲解、答辩教学、文档编写、降重等。 ☑文末获取源码☑ 精彩专栏推荐⬇⬇⬇ Java项目 Python…

app逆向抓包技巧:SSL Pinning检测绕过

本篇博客旨在记录学习过程&#xff0c;不可用于商用等其它途径 场景 在charles抓包下&#xff0c;某斑马app在注册时发现点击登录毫无反应&#xff0c;看抓包结果提示SSL handshake with client failed&#xff0c;确定是触发了SSL/TLS Pinning&#xff08;证书锁定&#xff…

【SpringBoot 属性加载机制】

SpringBoot 属性加载 一个 SpringBoot 应用的配置属性可以有多种不同的来源, 比如可以来自操作系统的环境变量, 比如可以来自 application.yaml 文件; 每一种不同的属性来源, 都会被 SpringBoot 封装成一个PropertySource对象, 保存在 Environment 对象的 PropertySources 类型…

数据采集工具之Canal

本文主要介绍canal采集mysql数据的tcp、datahub(kafka)模式如何实现 1、下载canal https://aliyun-datahub.oss-cn-hangzhou.aliyuncs.com/tools/canal.deployer-1.1.5-SNAPSHOT.tar.gz canal的原理类似于mysql的主从复制&#xff0c;canal模拟的是从节点拉取主节点的binlog数…

LeetCode 热题 HOT 100 (015/100)【宇宙最简单版】

【栈】No. 0155 最小栈【中等】&#x1f449;力扣对应题目指路 希望对你有帮助呀&#xff01;&#xff01;&#x1f49c;&#x1f49c; 如有更好理解的思路&#xff0c;欢迎大家留言补充 ~ 一起加油叭 &#x1f4a6; 欢迎关注、订阅专栏 【力扣详解】谢谢你的支持&#xff01; …

深入了解核函数:连接机器学习与统计学的桥梁

引言 在机器学习中&#xff0c;支持向量机&#xff08;SVM&#xff09;是一种强大的监督学习模型&#xff0c;特别适合处理分类问题。然而&#xff0c;SVM最初被设计用于线性可分的数据集&#xff0c;现实中的数据往往不是线性可分的。为了解决这一问题&#xff0c;我们引入了…

共享之道——享元模式(Python实现)

共享之道——享元模式&#xff08;Python实现&#xff09; 大家好&#xff0c;今天我们继续来讲结构型设计模式&#xff0c;上一期我们介绍了外观模式&#xff0c;这一期我们来讲享元模式&#xff08;Flyweight Pattern&#xff09;。 享元模式&#xff08;Flyweight Pattern…

Bitwise 首席投资官:忽略短期的市场波动,关注加密货币的发展前景

原文标题&#xff1a;《The Crypto Market Sell-Off: What Happened and Where We Go From Here》撰文&#xff1a;Matt Hougan&#xff0c;Bitwise 首席投资官编译&#xff1a;Chris&#xff0c;Techub News 加密货币市场在周末经历了大幅下跌。从上周五下午 4 点到周一早上 7…

优质电器/机械岗位推荐:经验不限大厂直招,薪资最高30K!

本周优质电器/机械岗位推荐&#xff0c;涵盖C、自动化、开发、安卓开发、项目管理等岗位&#xff0c;经验不限&#xff0c;更有大厂直招岗位&#xff0c;薪资最高30K&#xff01;&#xff01; 抓紧投递&#xff0c;早投早入职&#xff01; &#x1f447;点击职位名称查看详情…

PHP + Laravel + RabbitMQ + Redis 实现消息队列 (三) 消费队列在RabbitMQ和redis中的发布和订阅

发布订阅&#xff08;Pub/Sub&#xff09; 对于消息队列传统的模式来说&#xff0c;一个消费者消费一条消息&#xff0c;这条消息被消费之后就不会再次被其它的消费者消费。但是在发布订阅模式中&#xff0c;一条消息是可以被多个消费者消费的&#xff0c;这些消费者其实相当于…

前端构建工具|vite快速入门

认识vite vite组成部分 Vite是一种新型前端构建工具&#xff0c;能够显著提升前端开发体验。它主要由两部分组成&#xff1a; 一个开发服务器&#xff0c;它基于 原生 ES 模块 提供了 丰富的内建功能&#xff0c;如速度快到惊人的 模块热更新&#xff08;HMR&#xff09;。一…

C++——类模板经典案例——自定义通用数组类

案例&#xff1a;自定义数组类 需求&#xff1a; 1&#xff0c;对内置数据及自定义数据类型的数据存储 2&#xff0c;将数组中的数据存储到堆区 3&#xff0c;构造函数中可以存入数组的容量 4&#xff0c;提供对应的拷贝构造函数和运算符重载防止浅拷贝问题的发生 5&#xff0c…

基于Springboot + Vue的宿舍管理系统

前言 文末获取源码数据库 感兴趣的可以先收藏起来&#xff0c;需要学编程的可以给我留言咨询&#xff0c;希望帮助更多的人 精彩专栏推荐订阅 不然下次找不到哟 Java精品毕设原创实战项目 作者的B站地址&#xff1a;程序员云翼的个人空间-程序员云翼个人主页-哔哩哔哩视频 csd…

vue3+axios请求导出excel文件

在Vue 3中使用axios请求导出Excel文件&#xff0c;可以发送一个GET或POST请求&#xff0c;并设置响应类型为blob或arraybuffer&#xff0c;然后使用new Blob()构造函数创建一个二进制文件&#xff0c;最后使用URL.createObjectURL()生成一个可以下载的链接。 先看代码 import…

Stable Diffusion绘画 | 必备插件安装推荐

新手必备安装的插件推荐如下&#xff1a; 汉化语言包&#xff1a;汉化插件GitHub地址&#xff1b;双语对照插件GitHub地址无边图库&#xff1a;无边图库插件GitHub地址ControlNet&#xff1a;已默认安装 插件安装 最推荐的安装方式&#xff1a;通过「可下载」、「从网址安装…

Qt Modbus 寄存器读写实例

一.线圈状态寄存器读写 项目效果如下 1. 写单个寄存器 MODBUS_API int modbus_write_bit(modbus_t *ctx, int coil_addr, int status); int addrui->spinBoxwirte_addr->value();int dataui->spinBoxwirte_data->value();int ret modbus_write_bit(mb,addr,d…