zookeeper源码分析之事务请求处理

一.参考

zookeeper启动和选举的源码分析参考之前的帖子.

二.源码

1.职责链模式.

每次经过的processor都是异步处理,加入当前processor的队列,然后新的线程从队列里面取出数据处理.

PrepRequestProcessor 检查ACL权限,创建ChangeRecord.

SyncRequestProcessor 负责事务日志的持久化,写入snapshot和log文件.

FinalRequestProcessor 提交leader事务.向observer发送请求,提交事务.

1.1如果是leader启动的时候会倒序初始化下面6个processor,

LeaderRequestProcessor->PrepRequestProcessor->ProposalRequestProcessor(含有SyncRequestProcessor的成员变量,processRequest中先调用下一个,再调用SyncRequestProcessor)->CommitProcessor->ToBeAppliedRequestProcessor->FinalRequestProcessor.

 protected void setupRequestProcessors() {RequestProcessor finalProcessor = new FinalRequestProcessor(this);RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener());commitProcessor.start();ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor);proposalProcessor.initialize();prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);prepRequestProcessor.start();firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);setupContainerManager();
}

1.2.如果是follower启动,会进入ZooKeeperServer#startupWithServerState倒序初始化下面三个processor,

FollowerRequestProcessor->CommitProcessor->FinalRequestProcessor.

另外两个,是在Follower的主循环中,接收到PROPOSAL请求时,会依次调用SyncRequestProcessor->SendAckRequestProcessor,在下面二.2中有详细分析.

  protected void setupRequestProcessors() {RequestProcessor finalProcessor = new FinalRequestProcessor(this);commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());commitProcessor.start();firstProcessor = new FollowerRequestProcessor(this, commitProcessor);((FollowerRequestProcessor) firstProcessor).start();syncProcessor = new SyncRequestProcessor(this, new SendAckRequestProcessor(getFollower()));syncProcessor.start();}

2.处理网络请求

如果是follower,栈如下图所示,进入NIOServerCnxnFactory.SelectorThread#run,提交到RequestThrottler#submittedRequests队列里面,开始职责链处理.

3.ceate命令栈

发起creat /test2命令时,如果是leader,栈如下图所示:

如果是follower角色,栈如下所示:

4.主循环处理create

如果是follower,进入QuorumPeer#run方法,这个是follower角色的死循环主方法,进入Follower#followLeader方法,代码如下:

void followLeader() throws InterruptedException {//xxxtry {self.setZabState(QuorumPeer.ZabState.DISCOVERY);//启动过程中,读取的选票中获取leader.QuorumServer leaderServer = findLeader();try {//连接leaderconnectToLeader(leaderServer.addr, leaderServer.hostname);connectionTime = System.currentTimeMillis();//向leader中注册自己follower的存在long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);if (self.isReconfigStateChange()) {throw new Exception("learned about role change");}//check to see if the leader zxid is lower than ours//this should never happen but is just a safety check//leader 代数比自己小,抛异常,正常不会这样.long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);if (newEpoch < self.getAcceptedEpoch()) {LOG.error("Proposed leader epoch "+ ZxidUtils.zxidToString(newEpochZxid)+ " is less than our accepted epoch "+ ZxidUtils.zxidToString(self.getAcceptedEpoch()));throw new IOException("Error: Epoch of leader is lower");}long startTime = Time.currentElapsedTime();try {self.setLeaderAddressAndId(leaderServer.addr, leaderServer.getId());self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);//向leader同步数据,主要是拉取leader比较新的事务日志syncWithLeader(newEpochZxid);self.setZabState(QuorumPeer.ZabState.BROADCAST);completedSync = true;} finally {long syncTime = Time.currentElapsedTime() - startTime;ServerMetrics.getMetrics().FOLLOWER_SYNC_TIME.add(syncTime);}if (self.getObserverMasterPort() > 0) {LOG.info("Starting ObserverMaster");om = new ObserverMaster(self, fzk, self.getObserverMasterPort());om.start();} else {om = null;}// create a reusable packet to reduce gc impactQuorumPacket qp = new QuorumPacket();while (this.isRunning()) {readPacket(qp);//处理客户端请求,在后面2处分析processPacket(qp);}} catch (Exception e) {LOG.warn("Exception when following the leader", e);closeSocket();// clear pending revalidationspendingRevalidations.clear();}} finally {//xxx}}

5.进入职责链

依次进入Follower#processPacket(case Leader.PROPOSAL)->FollowerZooKeeperServer#logRequest(类中有SyncRequestProcessor成员引用)->SyncRequestProcessor#processRequest方法,加入到SyncRequestProcessor#queuedRequests队列里面.代码如下.

protected void processPacket(QuorumPacket qp) throws Exception {switch (qp.getType()) {case Leader.PING:ping(qp);break;case Leader.PROPOSAL://提案处理//xxxfzk.logRequest(hdr, txn, digest);//xxx//xxx}
}public void logRequest(TxnHeader hdr, Record txn, TxnDigest digest) {Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());request.setTxnDigest(digest);if ((request.zxid & 0xffffffffL) != 0) {pendingTxns.add(request);}syncProcessor.processRequest(request);
}

6.SyncRequestProcessor队列处理

进入新线程的方法SyncRequestProcessor#run.代码如下所示:

public void run() {try {// we do this in an attempt to ensure that not all of the servers// in the ensemble take a snapshot at the same timeresetSnapshotStats();lastFlushTime = Time.currentElapsedTime();while (true) {ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_SIZE.add(queuedRequests.size());long pollTime = Math.min(zks.getMaxWriteQueuePollTime(), getRemainingDelay());//从队列取出请求Request si = queuedRequests.poll(pollTime, TimeUnit.MILLISECONDS);if (si == null) {/* We timed out looking for more writes to batch, go ahead and flush immediately */flush();si = queuedRequests.take();}if (si == REQUEST_OF_DEATH) {break;}long startProcessTime = Time.currentElapsedTime();ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_TIME.add(startProcessTime - si.syncQueueStartTime);// track the number of records written to the log//写请求添加到DataTree数据库内,写日志文件,如下面4代码.if (zks.getZKDatabase().append(si)) {//判断是否需要重新建立快照if (shouldSnapshot()) {resetSnapshotStats();// roll the logzks.getZKDatabase().rollLog();// take a snapshotif (!snapThreadMutex.tryAcquire()) {LOG.warn("Too busy to snap, skipping");} else {new ZooKeeperThread("Snapshot Thread") {public void run() {try {//启动新线程,创建新的快照.见下面代码5zks.takeSnapshot();} catch (Exception e) {LOG.warn("Unexpected exception", e);} finally {snapThreadMutex.release();}}}.start();}}} else if (toFlush.isEmpty()) {// optimization for read heavy workloads// iff this is a read, and there are no pending// flushes (writes), then just pass this to the next// processorif (nextProcessor != null) {nextProcessor.processRequest(si);if (nextProcessor instanceof Flushable) {((Flushable) nextProcessor).flush();}}continue;}toFlush.add(si);if (shouldFlush()) {flush();}ServerMetrics.getMetrics().SYNC_PROCESS_TIME.add(Time.currentElapsedTime() - startProcessTime);}} catch (Throwable t) {handleException(this.getName(), t);}LOG.info("SyncRequestProcessor exited!");}

7.写事务日志

进入FileTxnLog#append()写事务日志到文件,如下代码.

  public synchronized boolean append(TxnHeader hdr, Record txn, TxnDigest digest) throws IOException {if (hdr == null) {return false;}if (hdr.getZxid() <= lastZxidSeen) {LOG.warn("Current zxid {} is <= {} for {}",hdr.getZxid(),lastZxidSeen,hdr.getType());} else {lastZxidSeen = hdr.getZxid();}if (logStream == null) {LOG.info("Creating new log file: {}", Util.makeLogName(hdr.getZxid()));//创建日志文件log-xxxlogFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid()));fos = new FileOutputStream(logFileWrite);logStream = new BufferedOutputStream(fos);oa = BinaryOutputArchive.getArchive(logStream);FileHeader fhdr = new FileHeader(TXNLOG_MAGIC, VERSION, dbId);fhdr.serialize(oa, "fileheader");// Make sure that the magic number is written before padding.logStream.flush();filePadding.setCurrentSize(fos.getChannel().position());streamsToFlush.add(fos);}filePadding.padFile(fos.getChannel());//事务写到buf内.byte[] buf = Util.marshallTxnEntry(hdr, txn, digest);if (buf == null || buf.length == 0) {throw new IOException("Faulty serialization for header " + "and txn");}Checksum crc = makeChecksumAlgorithm();//buf写到crc内,写到文件crc.update(buf, 0, buf.length);oa.writeLong(crc.getValue(), "txnEntryCRC");Util.writeTxnBytes(oa, buf);return true;}

8.写事务日志文件

建立事务日志的快照,从上面的3过来.依次进入ZooKeeperServer#takeSnapshot()->FileTxnSnapLog#save(创建snapshot文件)->FileSnap#serialize()->SerializeUtils#serializeSnapshot->DataTree#serialize->DataTree#serializeNode.主要是向文件流中写入各种数据。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/404574.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ArcGIS空间自相关Moran‘s I——探究人口空间格局的20年变迁

先了解什么是莫兰指数&#xff1f; 莫兰指数&#xff08;Morans I&#xff09;是一种用于衡量空间自相关性的统计量&#xff0c;即它可以帮助我们了解一个地理区域内的观测值是否彼此相关以及这种相关性的强度和方向。 莫兰指数分类&#xff1a; 全局莫兰指数 (Global Moran…

聊聊如何利用ingress-nginx实现应用层容灾

前言 容灾是一种主动的风险管理策略&#xff0c;旨在通过构建和维护异地的冗余系统&#xff0c;确保在面临灾难性事件时&#xff0c;关键业务能够持续运作&#xff0c;数据能够得到保护&#xff0c;从而最大限度地减少对组织运营的影响和潜在经济损失。因此容灾的重要性不言而…

zabbix实战-磁盘空间告警

1.创建监控项 选择&#xff1a;键值&#xff1a;vfs.fs.size[fs,<mode>] 。 直接写 vfs.fs.size[fs,<mode>]是不出数据的。我们要写具体的值 &#xff1a;vfs.fs.size[/,free] &#xff0c;这个表示查看根的剩余空间。 2.创建图形 为磁盘剩余空间监控项创建图形&am…

redis 遍渐进式历

1.scan cursor [match pattern] [coutn] [type]:以渐进式的方式进行建的遍历 cursor:是光标 指向当前遍历的位置 设置成0表示当前从0开始获取 math parttern &#xff1a;和keys命令一样的 keys * count: 限制一次遍历能够获取到多少个 元素默认是10 type :这次遍历只想获取…

[Python学习日记-10] Python中的流程控制(if...else...)

[Python学习日记-10] Python中的流程控制&#xff08;if...else...&#xff09; 简介 缩进 单分支 双分支 多分支 练习 简介 假如把写程序比做走路&#xff0c;那我们到现在为止&#xff0c;一直走的都是直路&#xff0c;还没遇到过分叉口&#xff0c;想象现实中&#x…

【python】Python实现XGBoost算法的详细理论讲解与应用实战

✨✨ 欢迎大家来到景天科技苑✨✨ &#x1f388;&#x1f388; 养成好习惯&#xff0c;先赞后看哦~&#x1f388;&#x1f388; &#x1f3c6; 作者简介&#xff1a;景天科技苑 &#x1f3c6;《头衔》&#xff1a;大厂架构师&#xff0c;华为云开发者社区专家博主&#xff0c;…

Python编码系列—Python数据可视化:Matplotlib与Seaborn的实战应用

&#x1f31f;&#x1f31f; 欢迎来到我的技术小筑&#xff0c;一个专为技术探索者打造的交流空间。在这里&#xff0c;我们不仅分享代码的智慧&#xff0c;还探讨技术的深度与广度。无论您是资深开发者还是技术新手&#xff0c;这里都有一片属于您的天空。让我们在知识的海洋中…

Ps:首选项 - 单位与标尺

Ps菜单&#xff1a;编辑/首选项 Edit/Preferences 快捷键&#xff1a;Ctrl K Photoshop 首选项中的“单位与标尺” Units & Rulers选项卡允许用户根据工作需求定制 Photoshop 的测量单位和标尺显示方式。这对于保持工作的一致性和精确性&#xff0c;尤其是在跨设备或跨平台…

专题--自底向上的计算机网络(物理层)

目录 计算机网络概述 物理层 数据链路层 网络层 运输层 应用层 网络安全 详细见http://t.csdnimg.cn/MY5aI http://t.csdnimg.cn/8Ipa4 http://t.csdnimg.cn/uvMxS 信道复用技术不仅在物理层有运用&#xff0c;在数据链路层也确实需要信道复用技术。‌ 数据链路层是…

第N8周:使用Word2vec实现文本分类

本文为365天深度学习训练营 中的学习记录博客原作者&#xff1a;K同学啊 一、数据预处理 任务说明: 本次将加入Word2vec使用PyTorch实现中文文本分类&#xff0c;Word2Vec 则是其中的一种词嵌入方法&#xff0c;是一种用于生成词向量的浅层神经网络模型&#xff0c;由Tomas M…

隐语隐私计算实训营「数据分析」第 5 课:隐语PSI介绍及开发实践

深入理解隐语(SecretFlow)中的PSI技术 隐私集合求交(Private Set Intersection, PSI)是隐私计算中的一个重要技术,它允许多方在不泄露自己数据的前提下找出共同的数据交集。在本文中,我们将深入探讨SecretFlow(隐语)中PSI的实现和应用。 PSI的基本概念 PSI是一种特殊的安全多…

鸿蒙内核源码分析(时钟任务篇)

时钟概念 时间是非常重要的概念&#xff0c;我们整个学生阶段有个东西很重要,就是校园铃声. 它控制着上课,下课,吃饭,睡觉的节奏.没有它学校的管理就乱套了,老师拖课想拖多久就多久,那可不行,下课铃声一响就是在告诉老师时间到了,该停止了让学生HAPPY去了. 操作系统也一样&…

php源码编译与初始化

1 php源码编译 解压 yum install -y bzip2 # 安装解压工具 tar -xf php-7.4.12.tar.bz2 # 解压文件./condigure ./configure --prefix/usr/local/php --with-config-file-path/usr/local/php/etc --enable-fpm --with-fpm-usernginx --with-fpm-groupnginx --with-curl --wi…

线程面试题

1.JDK自带的线程池有哪些&#xff1f; 2.线程池中核心线程数与最大线程数与缓冲任务队列的关系&#xff1f; 先使用核心线程执行任务。 当核心线程不足时&#xff0c;新任务入队列等待。 当队列满且线程数未达最大值时&#xff0c;增加非核心线程执行任务。 当队列满且线程…

Leetcode每日刷题之209.长度最小的子数组(C++)

1.题目解析 根据题目我们知道所给的数组均是正整数&#xff0c;我们需要找到的是该数组的子数组&#xff0c;使其子数组内所有元素之和大于或等于给出的目标数字target&#xff0c;然后返回其长度&#xff0c;最终找出所以满足条件的子数组&#xff0c;并且要返回长度最小的子数…

网络硬盘录像机NVR程序源码海思3520D NVR 安防监控智能升级运用方案

随着安防技术的不断发展&#xff0c;传统的监控系统正逐步向智能化方向转变。海思Hi3520D作为一款高性能的网络视频处理芯片&#xff0c;在NVR&#xff08;网络视频录像机&#xff09;领域有着广泛的应用。本方案旨在探讨如何利用海思Hi3520D芯片的强大功能对现有的NVR系统进行…

vue2使用天地图

需求&#xff1a;用vue2使用天地图展示对应点位数据以及开发中出现的问题等&#xff0c;其实天地图的写法和百度地图差不多 注意&#xff01;&#xff01;&#xff01;天地图的接口不稳定&#xff0c;时常报错418&#xff0c;官网也是一样的情况&#xff0c;推荐还是使用百度或…

C++:C/C++的内存管理

目录 C/C内存分布 C语言中动态内存管理方式 C内存管理方式 new/delete操作内置类型 new/delete操作自定义类型 operator new与operator delete函数 new和delete的实现原理 定位new表达式 常见问题 malloc/free和new/delete的区别 内存泄漏 C/C内存分布 我们先来看以…

【机器学习】(基础篇七) —— 神经网络

神经网络 神经网络是一种模仿人脑神经元结构的计算模型&#xff0c;用于处理复杂的数据模式识别和预测问题。它由大量简单的处理单元&#xff08;称为“神经元”或“节点”&#xff09;组成&#xff0c;这些单元通过连接权重相互连接。神经网络可以学习从输入数据到输出结果之…

GitLab Merge Request流水线

GitLab Merge Request 流程文档 为了提升代码质量&#xff0c;让开发人员参与代码review&#xff0c;现在输出Merge Request的流程文档&#xff1a; 1.项目创建各自开发者的分支&#xff0c;命名规则是dev_名字首字母&#xff0c;比如我是dev_cwq.然后把本地分支推到远端orgin…