zookeeper watch

目录

    • 回顾回调&观察者模式&发布订阅模式
    • Zookeeper 客户端/ 服务端 watch
      • getChildren 为例
      • 最后归纳

在这里插入图片描述

回顾回调&观察者模式&发布订阅模式

  • 回调的思想
  1. 类A的a()方法调用类B的b()方法
  2. 类B的b()方法执行完毕主动调用类A的callback()方法

回调分为同步回调异步回调, 假如以买彩票的场景来模拟, 我买彩票, 调用彩票网,给我返回的结果确定是否中奖,同步回调就是,我买了彩票之后, 需要等待彩票网给我返回的结果, 这个时候我不能做其他事情, 我必须等待这个结果, 这就叫同步回调, 同步, 就意味着等待, 我不能去做其他事情, 必须等待, 异步回调就是, 我买了彩票之后, 可以去做其他事情, 然后当彩票网有了结果和消息, 再给我返回消息。

  • 观察者模式
    在这里插入图片描述
  • 发布订阅,对比 观察者模式
    在这里插入图片描述

Zookeeper 客户端/ 服务端 watch

  • 客户端维持的 socket 连接 ClientCnxn
/*** This class manages the socket i/o for the client. ClientCnxn maintains a list* of available servers to connect to and "transparently" switches servers it is* connected to as needed.**/
public class ClientCnxn {
/*** Manage watchers & handle events generated by the ClientCnxn object.** We are implementing this as a nested class of ZooKeeper so that* the public methods will not be exposed as part of the ZooKeeper client* API.*/static class ZKWatchManager implements ClientWatchManager {
  • 服务端 DataTree
/*** This class maintains the tree data structure. It doesn't have any networking* or client connection code in it so that it can be tested in a stand alone* way.* <p>* The tree maintains two parallel data structures: a hashtable that maps from* full paths to DataNodes and a tree of DataNodes. All accesses to a path is* through the hashtable. The tree is traversed only when serializing to disk.*/
public class DataTree {

getChildren 为例

/*** Return the list of the children of the node of the given path.* <p>* If the watch is non-null and the call is successful (no exception is thrown),* a watch will be left on the node with the given path. The watch willbe* triggered by a successful operation that deletes the node of the given* path or creates/delete a child under the node.* <p>* The list of children returned is not sorted and no guarantee is provided* as to its natural or lexical order.* <p>* A KeeperException with error code KeeperException.NoNode will be thrown* if no node with the given path exists.** @param path* @param watcher explicit watcher* @return an unordered array of children of the node with the given path* @throws InterruptedException If the server transaction is interrupted.* @throws KeeperException If the server signals an error with a non-zero error code.* @throws IllegalArgumentException if an invalid path is specified*/public List<String> getChildren(final String path, Watcher watcher)throws KeeperException, InterruptedException{final String clientPath = path;PathUtils.validatePath(clientPath);// the watch contains the un-chroot pathWatchRegistration wcb = null;if (watcher != null) {wcb = new ChildWatchRegistration(watcher, clientPath);}final String serverPath = prependChroot(clientPath);RequestHeader h = new RequestHeader();h.setType(ZooDefs.OpCode.getChildren);GetChildrenRequest request = new GetChildrenRequest();request.setPath(serverPath);request.setWatch(watcher != null);GetChildrenResponse response = new GetChildrenResponse();ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);if (r.getErr() != 0) {throw KeeperException.create(KeeperException.Code.get(r.getErr()),clientPath);}return response.getChildren();}

ReplyHeader r = cnxn.submitRequest(h, request, response, wcb); 发送请求给服务端

public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration) throws InterruptedException {ReplyHeader r = new ReplyHeader();// 客户端与服务端的网络传输ClientCnxn.Packet packet = this.queuePacket(h, r, request, response, (AsyncCallback)null, (String)null, (String)null, (Object)null, watchRegistration);synchronized(packet) {while(!packet.finished) {packet.wait();}return r;}
}ClientCnxn.Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration watchRegistration) {ClientCnxn.Packet packet = null;LinkedList var11 = this.outgoingQueue;synchronized(this.outgoingQueue) {// 传输的对象都包装成Packet对象packet = new ClientCnxn.Packet(h, r, request, response, watchRegistration);packet.cb = cb;packet.ctx = ctx;packet.clientPath = clientPath;packet.serverPath = serverPath;if (this.state.isAlive() && !this.closing) {if (h.getType() == -11) {this.closing = true;}// 放入发送队列中,等待发送this.outgoingQueue.add(packet);} else {this.conLossPacket(packet);}}this.sendThread.getClientCnxnSocket().wakeupCnxn();return packet;
}

outgoingQueue的处理
在这里插入图片描述
在这里插入图片描述
服务端org.apache.zookeeper.server.FinalRequestProcessor#processRequest处理

 case OpCode.getChildren: {lastOp = "GETC";GetChildrenRequest getChildrenRequest = new GetChildrenRequest();ByteBufferInputStream.byteBuffer2Record(request.request,getChildrenRequest);DataNode n = zks.getZKDatabase().getNode(getChildrenRequest.getPath());if (n == null) {throw new KeeperException.NoNodeException();}PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().aclForNode(n),ZooDefs.Perms.READ,request.authInfo);// 返回children,// 这里根据客户端设置的是否有watch变量来传入watcher对象// 如果true则将当前的ServerCnxn传入(ServerCnxn代表客户端和服务端的连接)      List<String> children = zks.getZKDatabase().getChildren(getChildrenRequest.getPath(), null, getChildrenRequest.getWatch() ? cnxn : null);rsp = new GetChildrenResponse(children);break;}

将数据节点路径和ServerCnxn对象存储在WatcherManager的watchTablewatch2Paths

 public List<String> getChildren(String path, Stat stat, Watcher watcher)throws KeeperException.NoNodeException {DataNode n = nodes.get(path);if (n == null) {throw new KeeperException.NoNodeException();}synchronized (n) {if (stat != null) {n.copyStat(stat);}List<String> children=new ArrayList<String>(n.getChildren());if (watcher != null) {childWatches.addWatch(path, watcher);}return children;}}
  • 当服务端处理完毕之后,客户端的SendThread线程负责接收服务端的响应,finishPacket方法会从packet中取出WatchRegistration并注册到ZKWatchManager中

/*** This class services the outgoing request queue and generates the heart* beats. It also spawns the ReadThread.*/class SendThread extends ZooKeeperThread {private long lastPingSentNs;private final ClientCnxnSocket clientCnxnSocket;private Random r = new Random(System.nanoTime());        private boolean isFirstConnect = true;void readResponse(ByteBuffer incomingBuffer) throws IOException {ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);ReplyHeader replyHdr = new ReplyHeader();replyHdr.deserialize(bbia, "header");if (replyHdr.getXid() == -2) {// -2 is the xid for pingsif (LOG.isDebugEnabled()) {LOG.debug("Got ping response for sessionid: 0x"+ Long.toHexString(sessionId)+ " after "+ ((System.nanoTime() - lastPingSentNs) / 1000000)+ "ms");}return;}if (replyHdr.getXid() == -4) {// -4 is the xid for AuthPacket               if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {state = States.AUTH_FAILED;                    eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.AuthFailed, null) );            		            		}if (LOG.isDebugEnabled()) {LOG.debug("Got auth sessionid:0x"+ Long.toHexString(sessionId));}return;}if (replyHdr.getXid() == -1) {// -1 means notificationif (LOG.isDebugEnabled()) {LOG.debug("Got notification sessionid:0x"+ Long.toHexString(sessionId));}WatcherEvent event = new WatcherEvent();event.deserialize(bbia, "response");// convert from a server path to a client pathif (chrootPath != null) {String serverPath = event.getPath();if(serverPath.compareTo(chrootPath)==0)event.setPath("/");else if (serverPath.length() > chrootPath.length())event.setPath(serverPath.substring(chrootPath.length()));else {LOG.warn("Got server path " + event.getPath()+ " which is too short for chroot path "+ chrootPath);}}WatchedEvent we = new WatchedEvent(event);if (LOG.isDebugEnabled()) {LOG.debug("Got " + we + " for sessionid 0x"+ Long.toHexString(sessionId));}eventThread.queueEvent( we );return;}// If SASL authentication is currently in progress, construct and// send a response packet immediately, rather than queuing a// response as with other packets.if (tunnelAuthInProgress()) {GetSASLRequest request = new GetSASLRequest();request.deserialize(bbia,"token");zooKeeperSaslClient.respondToServer(request.getToken(),ClientCnxn.this);return;}Packet packet;synchronized (pendingQueue) {if (pendingQueue.size() == 0) {throw new IOException("Nothing in the queue, but got "+ replyHdr.getXid());}packet = pendingQueue.remove();}/** Since requests are processed in order, we better get a response* to the first request!*/try {if (packet.requestHeader.getXid() != replyHdr.getXid()) {packet.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue());throw new IOException("Xid out of order. Got Xid "+ replyHdr.getXid() + " with err " ++ replyHdr.getErr() +" expected Xid "+ packet.requestHeader.getXid()+ " for a packet with details: "+ packet );}packet.replyHeader.setXid(replyHdr.getXid());packet.replyHeader.setErr(replyHdr.getErr());packet.replyHeader.setZxid(replyHdr.getZxid());if (replyHdr.getZxid() > 0) {lastZxid = replyHdr.getZxid();}if (packet.response != null && replyHdr.getErr() == 0) {packet.response.deserialize(bbia, "response");}if (LOG.isDebugEnabled()) {LOG.debug("Reading reply sessionid:0x"+ Long.toHexString(sessionId) + ", packet:: " + packet);}} finally {finishPacket(packet);}}private void finishPacket(Packet p) {int err = p.replyHeader.getErr();if (p.watchRegistration != null) {p.watchRegistration.register(err);}// Add all the removed watch events to the event queue, so that the// clients will be notified with 'Data/Child WatchRemoved' event type.if (p.watchDeregistration != null) {Map<EventType, Set<Watcher>> materializedWatchers = null;try {materializedWatchers = p.watchDeregistration.unregister(err);for (Entry<EventType, Set<Watcher>> entry : materializedWatchers.entrySet()) {Set<Watcher> watchers = entry.getValue();if (watchers.size() > 0) {queueEvent(p.watchDeregistration.getClientPath(), err,watchers, entry.getKey());// ignore connectionloss when removing from local// sessionp.replyHeader.setErr(Code.OK.intValue());}}} catch (KeeperException.NoWatcherException nwe) {LOG.error("Failed to find watcher!", nwe);p.replyHeader.setErr(nwe.code().intValue());} catch (KeeperException ke) {LOG.error("Exception when removing watcher", ke);p.replyHeader.setErr(ke.code().intValue());}}if (p.cb == null) {synchronized (p) {p.finished = true;p.notifyAll();}} else {p.finished = true;eventThread.queuePacket(p);}}

触发watcher org.apache.zookeeper.server.WatchManager#triggerWatch

 Set<Watcher> triggerWatch(String path, EventType type) {return triggerWatch(path, type, null);}Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {WatchedEvent e = new WatchedEvent(type,KeeperState.SyncConnected, path);HashSet<Watcher> watchers;// 主要做的就是从watchTable和watch2Paths中移除该路径的watcher,Watcher机制是一次性的synchronized (this) {watchers = watchTable.remove(path);if (watchers == null || watchers.isEmpty()) {if (LOG.isTraceEnabled()) {ZooTrace.logTraceMessage(LOG,ZooTrace.EVENT_DELIVERY_TRACE_MASK,"No watchers for " + path);}return null;}for (Watcher w : watchers) {HashSet<String> paths = watch2Paths.get(w);if (paths != null) {paths.remove(path);}}}for (Watcher w : watchers) {if (supress != null && supress.contains(w)) {continue;}// 真正的回调和业务逻辑执行都在客户端org.apache.zookeeper.server.NIOServerCnxn#processw.process(e);}return watchers;}

最后归纳

流程

  1. 客户端把注册的Watcher传到服务端,处理请求加入处理队列
  2. 服务端从处理队列取出事件,并处理请求返回给客户端
  3. 回调Watcher处理在客户端处理,并会被删除

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/19796.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

PAT乙组(1016 部分A+B 1017 A除以B)C语言超详细

文章目录 1016 部分AB1017 A除以B 1016 部分AB 输入样例 1&#xff1a; 3862767 6 13530293 3输出样例 1&#xff1a; 399输入样例 2&#xff1a; 3862767 1 13530293 8输出样例 2&#xff1a; 0代码长度限制 16 KB 时间限制 150 ms 内存限制 64 MB 栈限制 8192 KB 思路解析…

论文笔记:Multi-Head Mixture-of-Experts

2024 neurips 1 背景 稀疏混合专家&#xff08;SMoE&#xff09;可在不显著增加训练和推理成本的前提下提升模型的能力【比如Mixtral 8*7B&#xff0c;表现可以媲美LLaMA-2 70B】 但它也有两个问题 专家激活率低&#xff08;下图左&#xff09; 在优化时只有一小部分专家会被…

【Azure 架构师学习笔记】- Azure Databricks (11) -- UC搭建

本文属于【Azure 架构师学习笔记】系列。 本文属于【Azure Databricks】系列。 接上文 【Azure 架构师学习笔记】- Azure Databricks (10) – UC 使用 前言 由于ADB 的更新速度很快&#xff0c;在几个月之后重新搭建ADB 时发现UC 已经更新了很多&#xff0c;为了后续做ADB 的功…

解锁养生秘籍,拥抱健康生活

在这个快节奏的时代&#xff0c;人们行色匆匆&#xff0c;常常在忙碌中忽略了健康。其实&#xff0c;养生并非遥不可及&#xff0c;它就藏在生活的细微之处&#xff0c;等待我们去发现和实践。 规律作息是健康的基础。日出而作&#xff0c;日落而息&#xff0c;顺应自然规律&am…

动手学Agent——Day2

文章目录 一、用 Llama-index 创建 Agent1. 测试模型2. 自定义一个接口类3. 使用 ReActAgent & FunctionTool 构建 Agent 二、数据库对话 Agent1. SQLite 数据库1.1 创建数据库 & 连接1.2 创建、插入、查询、更新、删除数据1.3 关闭连接建立数据库 2. ollama3. 配置对话…

最新国内 ChatGPT Plus/Pro 获取教程

最后更新版本&#xff1a;20250202 教程介绍&#xff1a; 本文将详细介绍如何快速获取一张虚拟信用卡&#xff0c;并通过该卡来获取ChatGPT Plus和ChatGPT Pro。 # 教程全程约15分钟开通ChatGPT Plus会员帐号前准备工作 一个尚未升级的ChatGPT帐号&#xff01;一张虚拟信用卡…

Redis哈希槽机制的实现

Redis哈希槽机制的实现 Redis集群使用哈希槽&#xff08;Hash Slot&#xff09;来管理数据分布&#xff0c;整个集群被划分为固定的16384个哈希槽。当我们在集群中存储一个键时&#xff0c;Redis会先对键进行哈希运算&#xff0c;得到一个哈希值。然后&#xff0c;Redis将该哈…

下载安装运行测试开源vision-language-action(VLA)模型OpenVLA

1. 安装 项目官网OpenVLA 首先按照官网提示的以下代码&#xff0c;执行创建环境->安装最小依赖->git克隆项目等 # Create and activate conda environment conda create -n openvla python3.10 -y conda activate openvla# Install PyTorch. Below is a sample comma…

外贸跨境订货系统流程设计、功能列表及源码输出

在全球化的商业环境下&#xff0c;外贸跨境订货系统对于企业拓展国际市场、提升运营效率至关重要。该系统旨在为外贸企业提供一个便捷、高效、安全的订货平台&#xff0c;实现商品展示、订单管理、物流跟踪等功能&#xff0c;满足跨境业务的多样化需求。以下将详细阐述外贸订货…

排序算法复习——包括插入排序、希尔排序、冒泡排序、快排(包括霍尔法、挖坑法、快慢指针法)、堆排、选择排序、归并排序等 (代码采用c/c++混编)

1.插入排序 插入排序就像我们打斗地主的时候&#xff0c;有一大把牌我们来不及理&#xff0c;就会一张一张的拿然后把拿到的牌放到合适的位置。 对于插入排序我们可以将待排序的数组理解为那一堆没有整理的牌&#xff0c;将排序好的部分理解为手上的牌&#xff0c;对于第i张牌我…

RocketMQ 5.0安装部署

0.前言 在微服务架构逐渐成为主流的今天&#xff0c;消息队列如同数字世界的快递员&#xff0c;承担着系统间高效通信的重要使命。 Apache RocketMQ 自诞生以来&#xff0c;因其架构简单、业务功能丰富、具备极强可扩展性等特点被众多企业开发者以及云厂商广泛采用。历经十余…

Jetson Agx Orin平台preferred_stride调试记录--1924x720图像异常

1.问题描述 硬件: AGX Orin 在Jetpack 5.0.1和Jetpack 5.0.2上测试验证 图像分辨率在1920x720和1024x1920下图像采集正常 但是当采集图像分辨率为1924x720视频时,图像输出异常 像素格式:yuv_uyvy16 gstreamer命令如下 gst-launch-1.0 v4l2src device=/dev/video0 ! …

【玩转全栈】----Django模板语法、请求与响应

目录 一、引言 二、模板语法 三、传参 1、视图函数到模板文件 2、模板文件到视图函数 四、引入静态文件 五、请求与响应 ?1、请求 2、响应 六、综合小案例 1、源码展示 2、注意事项以及部分解释 3、展示 一、引言 像之前那个页面&#xff0c;太过简陋&#xff0c;而且一个完整…

#渗透测试#批量漏洞挖掘#CyberPanel面板远程命令执行漏洞(CVE-2024-51567)

免责声明 本教程仅为合法的教学目的而准备,严禁用于任何形式的违法犯罪活动及其他商业行为,在使用本教程前,您应确保该行为符合当地的法律法规,继续阅读即表示您需自行承担所有操作的后果,如有异议,请立即停止本文章读。 目录 一、漏洞特征与影响 二、修复方案与技术细…

C++多态

目录 多态的概念多态的定义及实现协变析构函数的重写通过一段代码理解多态C11 final 和 override重载、覆盖(重写)、隐藏(重定义)的对比多态调用原理单继承中的虚函数表抽象类多继承中的虚函数表 多态的概念 概念&#xff1a;通俗来说&#xff0c;就是多种形态&#xff0c;具体…

PosgreSQL比MySQL更优秀吗?

一日&#xff0c;一群开发者对PosgreSQL是不是比MySQL更优秀进行了激烈的辩论&#xff0c;双方吵的都要打起来了 正方有以下理由&#xff1a; PostgreSQL严格遵循SQL标准规范&#xff0c;相较MySQL在语法兼容性和功能完整性方面展现出更强的体系化设计&#xff0c;尤其在事务处…

『大模型笔记』Jason Wei: 大语言模型的扩展范式!

Jason Wei: 大语言模型的扩展范式! 文章目录 一. What is scaling and why do it?1. 什么是Scaling?2. 为什么要Scaling?二. Paradigm 1: Scaling next-word prediction1. 下一个词预测2. 极限多任务学习3. Why does scaling work?三. The challenge with next-word predi…

TCP协议(Transmission Control Protocol)

TCP协议&#xff0c;即传输控制协议&#xff0c;其最大的特征就是对传输的数据进行可靠、高效的控制&#xff0c;其段格式如下&#xff1a; 源端口和目的端口号表示数据从哪个进程来&#xff0c;到哪个进程去&#xff0c;四位报头长度表示的是TCP头部有多少个4字节&#xff0c;…

瑞萨RA-T系列芯片ADCGPT功能模块的配合使用

在马达或电源工程中&#xff0c;往往需要采集多路AD信号&#xff0c;且这些信号的优先级和采样时机不相同。本篇介绍在使用RA-T系列芯片建立马达或电源工程时&#xff0c;如何根据需求来设置主要功能模块ADC&GPT&#xff0c;包括采样通道打包和分组&#xff0c;GPT触发启动…

TraeAi上手体验

一、Trae介绍 由于MarsCode 在国内由于规定限制&#xff0c;无法使用 Claude 3.5 Sonnet 模型&#xff0c;字节跳动选择在海外推出 Trae&#xff0c;官网&#xff1a;https://www.trae.ai/。 二、安装 1.下载安装Trae-Setup-x64.exe 2.注册登录 安装完成后&#xff0c;点击登…