【ETCD】【源码阅读】深入解析 EtcdServer.applySnapshot方法

今天我们来一步步分析ETCD中applySnapshot函数

一、函数完整代码

函数的完整代码如下:

func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {if raft.IsEmptySnap(apply.snapshot) {return}applySnapshotInProgress.Inc()lg := s.Logger()lg.Info("applying snapshot",zap.Uint64("current-snapshot-index", ep.snapi),zap.Uint64("current-applied-index", ep.appliedi),zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),)defer func() {lg.Info("applied snapshot",zap.Uint64("current-snapshot-index", ep.snapi),zap.Uint64("current-applied-index", ep.appliedi),zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),)applySnapshotInProgress.Dec()}()if apply.snapshot.Metadata.Index <= ep.appliedi {lg.Panic("unexpected leader snapshot from outdated index",zap.Uint64("current-snapshot-index", ep.snapi),zap.Uint64("current-applied-index", ep.appliedi),zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),)}// wait for raftNode to persist snapshot onto the disk<-apply.notifycnewbe, err := openSnapshotBackend(s.Cfg, s.snapshotter, apply.snapshot, s.beHooks)if err != nil {lg.Panic("failed to open snapshot backend", zap.Error(err))}// We need to set the backend to consistIndex before recovering the lessor,// because lessor.Recover will commit the boltDB transaction, accordingly it// will get the old consistent_index persisted into the db in OnPreCommitUnsafe.// Eventually the new consistent_index value coming from snapshot is overwritten// by the old value.s.consistIndex.SetBackend(newbe)// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.if s.lessor != nil {lg.Info("restoring lease store")s.lessor.Recover(newbe, func() lease.TxnDelete { return s.kv.Write(traceutil.TODO()) })lg.Info("restored lease store")}lg.Info("restoring mvcc store")if err := s.kv.Restore(newbe); err != nil {lg.Panic("failed to restore mvcc store", zap.Error(err))}newbe.SetTxPostLockInsideApplyHook(s.getTxPostLockInsideApplyHook())lg.Info("restored mvcc store", zap.Uint64("consistent-index", s.consistIndex.ConsistentIndex()))// Closing old backend might block until all the txns// on the backend are finished.// We do not want to wait on closing the old backend.s.bemu.Lock()oldbe := s.bego func() {lg.Info("closing old backend file")defer func() {lg.Info("closed old backend file")}()if err := oldbe.Close(); err != nil {lg.Panic("failed to close old backend", zap.Error(err))}}()s.be = newbes.bemu.Unlock()lg.Info("restoring alarm store")if err := s.restoreAlarms(); err != nil {lg.Panic("failed to restore alarm store", zap.Error(err))}lg.Info("restored alarm store")if s.authStore != nil {lg.Info("restoring auth store")s.authStore.Recover(newbe)lg.Info("restored auth store")}lg.Info("restoring v2 store")if err := s.v2store.Recovery(apply.snapshot.Data); err != nil {lg.Panic("failed to restore v2 store", zap.Error(err))}if err := assertNoV2StoreContent(lg, s.v2store, s.Cfg.V2Deprecation); err != nil {lg.Panic("illegal v2store content", zap.Error(err))}lg.Info("restored v2 store")s.cluster.SetBackend(newbe)lg.Info("restoring cluster configuration")s.cluster.Recover(api.UpdateCapability)lg.Info("restored cluster configuration")lg.Info("removing old peers from network")// recover raft transports.r.transport.RemoveAllPeers()lg.Info("removed old peers from network")lg.Info("adding peers from new cluster configuration")for _, m := range s.cluster.Members() {if m.ID == s.ID() {continue}s.r.transport.AddPeer(m.ID, m.PeerURLs)}lg.Info("added peers from new cluster configuration")ep.appliedt = apply.snapshot.Metadata.Termep.appliedi = apply.snapshot.Metadata.Indexep.snapi = ep.appliediep.confState = apply.snapshot.Metadata.ConfState
}

二、函数功能概览

上述函数的核心功能是 应用一个来自 Raft 协议的快照,并在应用过程中恢复系统的各个关键组件,以确保系统的状态与最新的快照一致。具体来说,函数完成了以下核心任务:

  1. 检查快照有效性:判断快照是否为空或过时,如果无效则提前退出。

  2. 记录快照应用的开始和结束:通过日志记录快照应用的开始和结束,同时更新相关的监控指标。

  3. 等待并加载快照数据:等待 Raft 节点将快照持久化到磁盘,并打开快照后端。

  4. 恢复一致性索引:将新的快照后端设置为一致性索引的后端,确保一致性。

  5. 恢复存储组件

    • 租约存储(lease store):恢复与租约相关的数据。
    • MVCC 存储:恢复多版本并发控制(MVCC)存储。
    • 报警存储:恢复报警数据。
    • 认证存储:恢复认证相关数据(如果存在)。
    • V2 存储:恢复 V2 存储,并进行合法性检查。
  6. 恢复集群配置:更新集群配置,并确保集群的一致性。

  7. 更新 Raft 网络成员:移除旧的集群成员并添加新的集群成员到网络中。

  8. 更新应用进度:更新快照的任期、索引等应用进度信息。

三、函数详细分析

好的,接下来我将逐步解析这段代码,并用中文进行解释。

1. 检查快照是否为空

if raft.IsEmptySnap(apply.snapshot) {return
}
  • 这段代码判断传入的快照是否为空。如果快照为空(即没有数据需要恢复),则直接返回,结束该函数的执行。

2. 增加快照应用中的度量

applySnapshotInProgress.Inc()
  • 这行代码会将 applySnapshotInProgress 计数器增加 1,表示当前有一个快照正在被应用。这个计数器通常用于监控系统中,帮助跟踪正在进行的操作。

3. 日志记录快照应用的开始

lg := s.Logger()
lg.Info("applying snapshot",zap.Uint64("current-snapshot-index", ep.snapi),zap.Uint64("current-applied-index", ep.appliedi),zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),
)
  • 获取日志记录器 lg,并记录一条信息级别的日志,说明快照正在被应用。
  • 日志中包括当前的快照索引、已应用的索引以及来自领导者的快照的索引和任期(term)。

4. 使用 defer 确保快照应用完成后记录日志

defer func() {lg.Info("applied snapshot",zap.Uint64("current-snapshot-index", ep.snapi),zap.Uint64("current-applied-index", ep.appliedi),zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),)applySnapshotInProgress.Dec()
}()
  • 使用 defer 语句确保在函数结束时,记录一条日志表示快照应用已经完成。
  • 同时,减少 applySnapshotInProgress 计数器,表示快照应用过程结束。

5. 检查快照的索引是否过时

if apply.snapshot.Metadata.Index <= ep.appliedi {lg.Panic("unexpected leader snapshot from outdated index",zap.Uint64("current-snapshot-index", ep.snapi),zap.Uint64("current-applied-index", ep.appliedi),zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),)
}
  • 这里会检查传入的快照索引是否小于等于已应用的索引 ep.appliedi。如果是,说明接收到的快照来自一个过时的领导者,这会导致系统崩溃(通过 lg.Panic 打印错误日志并触发 panic)。

6. 等待 Raft 节点将快照持久化到磁盘

<-apply.notifyc
  • 等待一个信号,确保 Raft 节点已将快照持久化到磁盘。apply.notifyc 是一个通道,程序会在此处阻塞,直到 Raft 节点完成快照的持久化操作。

7. 打开新的快照后端

newbe, err := openSnapshotBackend(s.Cfg, s.snapshotter, apply.snapshot, s.beHooks)
if err != nil {lg.Panic("failed to open snapshot backend", zap.Error(err))
}
  • 这行代码通过 openSnapshotBackend 函数打开新的快照后端(即读取快照的存储),如果打开失败,则记录错误日志并触发 panic。

8. 设置新的后端为一致性索引

s.consistIndex.SetBackend(newbe)
  • 将新的快照后端设置为一致性索引的后端。这是为了确保一致性索引能够正确地与新的快照数据同步。

9. 恢复租约存储(lease store)

if s.lessor != nil {lg.Info("restoring lease store")s.lessor.Recover(newbe, func() lease.TxnDelete { return s.kv.Write(traceutil.TODO()) })lg.Info("restored lease store")
}
  • 如果系统中有 lessor(负责管理租约的组件),则会从新快照后端恢复租约存储。
  • 恢复过程中,会在事务回滚时写入 KV 存储。

10. 恢复 MVCC 存储

lg.Info("restoring mvcc store")
if err := s.kv.Restore(newbe); err != nil {lg.Panic("failed to restore mvcc store", zap.Error(err))
}
lg.Info("restored mvcc store", zap.Uint64("consistent-index", s.consistIndex.ConsistentIndex()))
  • 接下来,恢复 MVCC(多版本并发控制)存储,如果恢复失败,则触发 panic。
  • 恢复成功后,记录恢复的日志,包括一致性索引。

11. 关闭旧的后端

s.bemu.Lock()
oldbe := s.be
go func() {lg.Info("closing old backend file")defer func() {lg.Info("closed old backend file")}()if err := oldbe.Close(); err != nil {lg.Panic("failed to close old backend", zap.Error(err))}
}()
s.be = newbe
s.bemu.Unlock()
  • 使用锁 (bemu.Lock) 来确保线程安全地切换后端文件。
  • 在一个新的 goroutine 中关闭旧的快照后端,防止在关闭过程中阻塞主线程。
  • 更新 s.be 为新的快照后端,并解锁。

12. 恢复报警存储

lg.Info("restoring alarm store")
if err := s.restoreAlarms(); err != nil {lg.Panic("failed to restore alarm store", zap.Error(err))
}
lg.Info("restored alarm store")
  • 恢复报警存储,如果恢复失败,则触发 panic。

13. 恢复认证存储

if s.authStore != nil {lg.Info("restoring auth store")s.authStore.Recover(newbe)lg.Info("restored auth store")
}
  • 如果存在认证存储(authStore),则恢复认证存储。

14. 恢复 V2 存储

lg.Info("restoring v2 store")
if err := s.v2store.Recovery(apply.snapshot.Data); err != nil {lg.Panic("failed to restore v2 store", zap.Error(err))
}
if err := assertNoV2StoreContent(lg, s.v2store, s.Cfg.V2Deprecation); err != nil {lg.Panic("illegal v2store content", zap.Error(err))
}
lg.Info("restored v2 store")
  • 恢复 V2 存储,并进行检查,确保没有非法的 V2 存储内容。

15. 恢复集群配置

s.cluster.SetBackend(newbe)
lg.Info("restoring cluster configuration")
s.cluster.Recover(api.UpdateCapability)
lg.Info("restored cluster configuration")
  • 将集群配置恢复到新的快照后端,并恢复集群配置。

16. 移除旧的网络成员

lg.Info("removing old peers from network")
s.r.transport.RemoveAllPeers()
lg.Info("removed old peers from network")
  • 从网络中移除旧的集群成员。

17. 添加新的集群成员

lg.Info("adding peers from new cluster configuration")
for _, m := range s.cluster.Members() {if m.ID == s.ID() {continue}s.r.transport.AddPeer(m.ID, m.PeerURLs)
}
lg.Info("added peers from new cluster configuration")
  • 将新的集群成员添加到网络中。

18. 更新应用进度

ep.appliedt = apply.snapshot.Metadata.Term
ep.appliedi = apply.snapshot.Metadata.Index
ep.snapi = ep.appliedi
ep.confState = apply.snapshot.Metadata.ConfState
  • 更新应用进度(包括任期、索引等信息),确保与新的快照数据一致。

总结:这段代码的核心任务是应用一个来自 Raft 协议的快照。它通过多个步骤确保快照数据被正确地恢复到系统的各个存储组件中,同时进行了一系列的检查和恢复操作,确保系统的一致性和健康。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/492482.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

通过算法识别运行过程中产生的常见缺陷,及时处理,避免运行故障,影响正常作业的智慧快消开源了

智慧快消视频监控平台是一款功能强大且简单易用的实时算法视频监控系统。它的愿景是最底层打通各大芯片厂商相互间的壁垒&#xff0c;省去繁琐重复的适配流程&#xff0c;实现芯片、算法、应用的全流程组合&#xff0c;从而大大减少企业级应用约95%的开发成本。 基于多年的深度…

μC/OS-Ⅱ源码学习(7)---软件定时器

快速回顾 μC/OS-Ⅱ中的多任务 μC/OS-Ⅱ源码学习(1)---多任务系统的实现 μC/OS-Ⅱ源码学习(2)---多任务系统的实现(下) μC/OS-Ⅱ源码学习(3)---事件模型 μC/OS-Ⅱ源码学习(4)---信号量 μC/OS-Ⅱ源码学习(5)---消息队列 μC/OS-Ⅱ源码学习(6)---事件标志组 本文进一…

CRYPTO密码学

加解密算法/编码 编码base家族unicodeASCII哈希算法MD5 Message Digest AlgorithmnSM3SHA-3GBGB18030GB2312GBKutf家族恺撒二进制分区法DSADSSCRC32校验对称非对称gbk编码h264SEA初探smc动态代码保护四方密码曼彻斯特编码剖析基本概念什么是编码?什么是加密与解密寻找银弹-有没…

【前端】深入探讨 JavaScript 的 reduce() 方法

博客主页&#xff1a; [小ᶻ☡꙳ᵃⁱᵍᶜ꙳] 本文专栏: 前端 文章目录 &#x1f4af;前言&#x1f4af;什么是 reduce() 方法&#xff1f;定义与核心概念语法结构参数解析返回值 &#x1f4af;基础用法与示例示例 1&#xff1a;计算数组元素的和解析 示例 2&#xff1a;统计…

postman关联接口用于登录(验证码会变情况)

目录 一、介绍 二、操作步骤 (一)Fiddler抓取到登录信息 (二)postman发送请求 新建请求一&#xff1a;登录值请求 (三)易变值赋值固定住 新建请求二&#xff1a;易变值验证码(uuid)请求 切换到请求一里面进行赋值绑定 一、介绍 接口有两种形式&#xff0c;一种是单…

SSC338Q SigmaStar 摄像头主控芯片

SSC338Q 是一款由 SigmaStar&#xff08;星宸科技&#xff09;推出的高集成度多媒体系统级芯片&#xff08;SoC&#xff09;&#xff0c;广泛应用于高分辨率智能视频录制设备&#xff0c;如 IP 摄像机、车载摄像机和 USB 摄像机。 处理器&#xff1a; CPU&#xff1a;32 位双…

苹果将推出超薄和折叠款iPhone,2024年带来哪些变化?

苹果公司&#xff08;AAPL&#xff09;近日宣布&#xff0c;将对其iPhone系列进行重大升级&#xff0c;以应对当前市场中的销量压力。这一改变&#xff0c;或许会为苹果带来新的增长动力。那么&#xff0c;苹果的2024年新iPhone究竟有哪些亮点呢&#xff1f;下面我们来详细了解…

QML 粒子模拟

粒子模拟 粒子模拟 粒子模拟的核心是粒子系统&#xff08;ParticleSystem&#xff09;, 它控制共享时间线。一个粒子使用发射器元素&#xff08;Emitter&#xff09;发射&#xff0c; 使用粒子画笔&#xff08;ParticlePainter&#xff09;实现可视化&#xff0c; 它可以是一张…

Java中的Consumer接口应该如何使用(通俗易懂图解)

应用场景&#xff1a; 第一次程序员A写好了个基础的遍历方法&#xff1a; public class Demo1 {public static void main(String[] args) {//假设main方法为程序员B写的,此时需要去调用A写好的一个遍历方法//1.如果此时B突然发现想将字符串以小写的形式打印出来&#xff0c;则…

WPF+MVVM案例实战与特效(四十四)- WPF多语言支持全解析:轻松实现国际化应用

文章目录 1、引言2、案例效果3、准备工作1、创建项目结构2、代码实现1、语言资源2、资源引用3、页面功能4、实现效果3、总结1、引言 在当今全球化的背景下,开发一个多语言支持的应用程序变得越来越重要。WPF提供了强大的功能来实现应用程序的国际化和本地化。本文将详细介绍如…

Java爬虫大冒险:如何征服1688商品搜索之巅

在这个信息爆炸的时代&#xff0c;数据就是力量。对于电商平台而言&#xff0c;数据更是金矿。今天&#xff0c;我们要踏上一场Java爬虫的冒险之旅&#xff0c;目标是征服1688这个B2B电商巨头&#xff0c;获取按关键字搜索的商品信息。这不仅是技术的挑战&#xff0c;更是智慧的…

《Django 5 By Example》读后感

一、 为什么选择这本书&#xff1f; 本人的工作方向为Python Web方向&#xff0c;想了解下今年该方向有哪些新书出版&#xff0c;遂上packt出版社网站上看了看&#xff0c;发现这本书出版时间比较新(2024年9月)&#xff0c;那就它了。 从2024年11月11日至2024年12月18日期间&…

TouchGFX移植(3)增加SDRAM驱动

一&#xff09;SDRAM驱动增加到工程中 1&#xff09;加入驱动sdram.c文件&#xff0c;文件在上节课里有源代码。 2&#xff09;在fmc.c文件里指定位置增加代码 SDRAM_Init();另外需要包含文件&#xff1a;#include “sdram.h” /* USER CODE BEGIN 0 / #include “sdram.h” …

Apache Kylin最简单的解析、了解

官网&#xff1a;Overview | Apache Kylin 一、Apache Kylin是什么&#xff1f; 由中国团队研发具有浓厚的中国韵味&#xff0c;使用神兽麒麟&#xff08;kylin&#xff09;为名 的一个OLAP多维数据分析引擎:&#xff08;据官方给出的数据&#xff09; 亚秒级响应&#xff…

【Token】校验、会话技术、登录请求、拦截器【期末实训】实战项目学生和班级管理系统\Day15-后端Web实战(登录认证)\讲义

登录认证 在前面的课程中&#xff0c;我们已经实现了部门管理、员工管理的基本功能&#xff0c;但是大家会发现&#xff0c;我们并没有登录&#xff0c;就直接访问到了Tlias智能学习辅助系统的后台。 这是不安全的&#xff0c;所以我们今天的主题就是登录认证。 最终我们要实现…

前端(组件传参案例)

父组件(商品详情页) 子组件上边放大图 底下缩小轮播图 需求分析&#xff1a;父组件获取图片数据&#xff0c;传给底下子组件进行进行轮播&#xff0c;实现父组件给子组件传参。然后底下子组件轮播后&#xff0c;把当前图片下标给父组件&#xff0c;实现子组件给父组件传参。父组…

【Linux网络编程】第十二弹---构建与优化HTTP请求处理:从HttpRequest到HttpServer的实战

✨个人主页&#xff1a; 熬夜学编程的小林 &#x1f497;系列专栏&#xff1a; 【C语言详解】 【数据结构详解】【C详解】【Linux系统编程】【Linux网络编程】 目录 1、HttpRequest类 1.1、基本结构 1.2、构造析构函数 1.3、反序列化函数 1.4、GetLine() 1.5、打印函数…

使用k6进行kafka负载测试

1.安装环境 kafka环境 参考Docker搭建kafka环境-CSDN博客 xk6-kafka环境 ./xk6 build --with github.com/mostafa/xk6-kafkalatest 查看安装情况 2.编写脚本 test_kafka.js // Either import the module object import * as kafka from "k6/x/kafka";// Or in…

Linux内存管理 --- 进程创建虚拟地址的过程

文章目录 前言一、进程虚拟地址空间二、进程号1的创建过程2.1 kernel_init2.2 kernel_execve2.2.1 alloc_bprm2.2.2 bprm_stack_limits2.2.3 copy_string_kernel2.2.4 bprm_execve 2.3 bprm_execve2.3.1 prepare_binprm2.3.2 load_binary2.3.3 interpreter 三、load_elf_binary…

uniapp blob格式转换为video .mp4文件使用ffmpeg工具

前言 介绍一下这三种对象使用场景 您前端一旦涉及到文件或图片上传Q到服务器&#xff0c;就势必离不了 Blob/File /base64 三种主流的类型它们之间 互转 也成了常态 Blob - FileBlob -Base64Base64 - BlobFile-Base64Base64 _ File uniapp 上传文件 现在已获取到了blob格式的…