深入源码分析kubernetes informer机制(二)Reflector


[阅读指南]
这是该系列第二篇
基于kubernetes 1.27 stage版本
为了方便阅读,后续所有代码均省略了错误处理及与关注逻辑无关的部分。


文章目录

  • Reflector是什么
  • 整体结构
  • 工作流程
    • list拉取数据
    • 缓存resync操作
    • watch监听操作
  • 总结

Reflector是什么

reflector在informer中就像是一个对外的窗口,它与api-server建立连接,监听和获取来自api-server的资源变化信息,并把这些信息放进deltaFIFO中,交给下一个环节处理。

整体结构

与api-server进行交互,通过list获取指定的全量资源,watch监听指定的资源变化事件,并将这些事件放入delta FIFO队列中。
结构与交互如下图

// 省略了部分字段,只留下我们关注的
type Reflector struct {// name identifies this reflector. By default it will be a file:line if possible.name string// reflector对象需要监控的资源类型,比如上一节workqueue中的&v1.Pod{}expectedType reflect.Type// deltaFIFO 队列存储对象store Store// 实现list/watchlisterWatcher ListerWatcher// 上次更新的资源版本号,用来判断当前的node的资源状况lastSyncResourceVersion string......
}

工作流程

reflecter主函数比较简单,循环同步运行ListAndWatch直到收到stop信号。

func (r *Reflector) Run(stopCh <-chan struct{}) {wait.BackoffUntil(func() {if err := r.ListAndWatch(stopCh); err != nil {r.watchErrorHandler(r, err)}}, r.backoffManager, true, stopCh)
}

ListAndWatch主要做了这几件事:

  1. 通过stream或者chunk方式拉取全量list数据
  2. 开启一个协程进行缓存resync操作。
  3. 循环执行watch监听操作
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {...fallbackToList := !r.UseWatchList// stream式同步if r.UseWatchList {w, err = r.watchList(stopCh)...if err != nil {...fallbackToList = truew = nil}}// chunk式同步if fallbackToList {err = r.list(stopCh)if err != nil {return err}}...go r.startResync(stopCh, cancelCh, resyncerrc)return r.watch(w, stopCh, resyncerrc)
}

接下来咱一步步来看。

list拉取数据

ListAndWatch拉取全量数据时,出现了两种数据拉取的方式,list /watchstream list /watch

stream list是 kubernetes 1.27 引入的新方案,通过 ENABLE_CLIENT_GO_WATCH_LIST_ALPHA 变量可以启用stream list,默认会使用原有的list/watch。后续会单独开一篇介绍stream list方案,详情可以通过KEP-3157了解

前者在初始化时list拉取全量数据,通过watch更新增量变化。
后者可以通过watch 请求的方式获取list数据,从而减轻大规模集群初始化list数据时的资源消耗。
在建立watch连接时,携带如下两个参数即可告知服务器使用streaming list进行一致性读取。
sendInitialEvents=true
resourceVersionMatch=NotOlderThan

常规的list流程借用这个博主画的时序图来看下。
在这里插入图片描述

缓存resync操作

resync负责定期将本地的缓存重新加入deltaFIFO队列,确保本地缓存与controller的数据一致性。

国内太多博客没了解清楚就介绍这一部分是与api-server交互,进行relist。实际上resync完全没有涉及到服务端的部分,他就是一个本地缓存的同步机制。与服务端的交互使用list/watch已经完全可以确保资源一致性了,基本不怎么需要进行relist操作,并且对于节点非常多的大集群来说,list非常消耗资源,何况是定期relist呢。

关于resync机制的介绍,不在这里展开,详细看下一篇笔记。

watch监听操作

watch的实现非常巧妙,它利用了http的chunk编码传输机制建立长连接,来实现动态的数据监听,可以了解分块传输编码。
同样借用一张时序图来看下watch的流程
在这里插入图片描述

reflector通过Watcher监听api-server端的数据delta事件,并将这些事件放入deltaFIFO中统一处理。

// 在这里向服务端发起watch请求,并接收和处理资源变更事件
func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc chan error) error {...for {...// w == nil表示使用常规的list/watch方式,streaming 方式会创建特殊的watcherif w == nil {timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))options := metav1.ListOptions{// 上次同步的资源版本,也就是本地的资源版本。以此来获取增量的数据ResourceVersion: r.LastSyncResourceVersion(),// watch 超时时间,长时间没有接受任务事件的watcher会被关掉,避免长时间挂起。TimeoutSeconds: &timeoutSeconds,// watch书签,避免watch重启时请求api-server导致的消耗。AllowWatchBookmarks: true,}// 创建一个watch对象,监听api-server的资源变更事件,将接收到的事件丢进resultChan中w, err = r.listerWatcher.Watch(options)...}// 将resultChan中的取出放入FIFO 队列err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion, nil, r.clock, resyncerrc, stopCh)// 失败重试逻辑...}
}

建立连接的逻辑在这一行
w, err = r.listerWatcher.Watch(options)

还是用上一篇workqueue来看看这个Watch实例的实现。
从Watch函数一路往上追溯,可以看到先是与server建立了http连接,再通过watch标记建立了watch连接,创建stream watcher对象,并拉起一个协程去处理监听到的事件信息。

  • 此后所有监听的delta事件都会经过receive协程进入到resultChan中。
// reflector调用的watch函数
func (lw *ListWatch) Watch(options metav1.ListOptions) (watch.Interface, error) {return lw.WatchFunc(options)
}// watchFunc函数的定义
func NewFilteredListWatchFromClient(c Getter, resource string, namespace string, optionsModifier func(options *metav1.ListOptions)) *ListWatch {...watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {options.Watch = true // 向服务端请求chunk连接optionsModifier(&options)return c.Get().Namespace(namespace).Resource(resource).VersionedParams(&options, metav1.ParameterCodec).Watch(context.TODO()) // 这里调用了getter的watch函数// getter是controller初始化时建立的http客户端: clientset.CoreV1().RESTClient()}return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
}func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {...	url := r.URL().String()for {req, err := r.newHTTPRequest(ctx)resp, err := client.Do(req)if err == nil && resp.StatusCode == http.StatusOK {return r.newStreamWatcher(resp)}...}
}func NewStreamWatcher(d Decoder, r Reporter) *StreamWatcher {sw := &StreamWatcher{source:   d,reporter: r,result: make(chan Event),done: make(chan struct{}),}go sw.receive() // 处理消息事件的协程return sw
}// 解析接收到的事件,并放到resultChan中等待后续处理。
func (sw *StreamWatcher) receive() {for {// 解析数据action, obj, err := sw.source.Decode()select {case <-sw.done:return// 将事件发送到resultChancase sw.result <- Event{Type:   action,Object: obj,}:}}
}
  • 进入resultChan的事件,由watchHandler取出再分类添加到FIFO队列中。
func watchHandler(start time.Time,w watch.Interface,	// watch实例store Store, // 存储对象 比如delta FIFO queue...
) error {...
loop:for {select {case <-stopCh:return errorStopRequestedcase err := <-errc:return err// 从ResultChan中取出变更事件,并放进队列中,比如delta FIFO队列中case event, ok := <-w.ResultChan():// 省略了一些资源过滤和错误处理...// 解析监听到的事件数据meta, err := meta.Accessor(event.Object)if err != nil {utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event))continue}// 解析资源事件的版本resourceVersion := meta.GetResourceVersion()switch event.Type {case watch.Added: err := store.Add(event.Object) // 往队列添加add delta事件... // err handlecase watch.Modified: err := store.Update(event.Object) // 往队列添加update delta事件... // err handlecase watch.Deleted:	err := store.Delete(event.Object) // 往队列添加delete delta事件,在此之前会判断事件对应的资源对象是否存在... // err handlecase watch.Bookmark:...default:... // err handle}// 更新resourceVersion版本号,下一轮watch就不会再收到重复的更新事件setLastSyncResourceVersion(resourceVersion)if rvu, ok := store.(ResourceVersionUpdater); ok {rvu.UpdateResourceVersion(resourceVersion)}...}}...return nil
}

总结

用一个图来回顾下reflector各个模块的关系~
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/92350.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何实现安全上网

l 场景描述 政府、军工、科研等涉密单位或企业往往要比其他组织更早接触高精尖的技术与产品&#xff0c;相对应的数据保密性要求更高。常规的内外网物理隔离手段&#xff0c;已经满足不了这些涉密单位的保密需求&#xff0c;发展到现在&#xff0c;需求已经演变成既要保证网络…

(十六)大数据实战——安装使用mysql版的hive服务

前言 hive默认使用的是内嵌据库derby&#xff0c;Derby 是一个嵌入式数据库&#xff0c;可以轻松地以库的形式集成到应用程序中。它不需要独立的服务器进程&#xff0c;所有的数据存储在应用程序所在的文件系统中。为了支持hive服务更方便的使用&#xff0c;我们使用mysql数据…

【数据结构】“栈”的模拟实现

&#x1f490; &#x1f338; &#x1f337; &#x1f340; &#x1f339; &#x1f33b; &#x1f33a; &#x1f341; &#x1f343; &#x1f342; &#x1f33f; &#x1f344;&#x1f35d; &#x1f35b; &#x1f364; &#x1f4c3;个人主页 &#xff1a;阿然成长日记 …

MySQL 索引 详解(保姆级教程)

一、索引概述 索引是帮助 MySQL 高效获取数据的数据结构&#xff08;有序&#xff09;。在数据之外&#xff0c;数据库系统还维护着满足特定查找算法的数据结构&#xff0c;这些数据结构以某种方式引用&#xff08;指向&#xff09;数据&#xff0c;这样就可以在这些数据结构上…

本地oracle登录账号锁定处理,the account is locked

1.打开cmd命令窗口 2.打开sqlplus: sqlplus /nolog(加/nolog是不登录服务器的意思&#xff0c;不加就需要输账号密码) 3.切换到管理员&#xff1a;conn / as sysdba; 第2步第3步可以合并&#xff0c;直接使用sysdba登录&#xff1a;sqlplus / as sysdba; 4.解锁账号&#x…

Python web实战之Django 的 WebSocket 支持详解

关键词&#xff1a;Python, Django, WebSocket, Web 如何使用 Django 实现 WebSocket 功能&#xff1f;本文将详细介绍 WebSocket 的概念、Django 的 WebSocket 支持以及如何利用它来创建动态、响应式的 Web 应用。 1. WebSocket 简介 1.1 什么是 WebSocket&#xff1f; 在 W…

阿里云FRP内网穿透挂载多台服务器

1. FRP介绍 FRP (Fast Reverse Proxy) 是比较流行的一款。FRP 是一个免费开源的用于内网穿透的反向代理应用&#xff0c;它支持 TCP、UDP 协议&#xff0c; 也为 http 和 https 协议提供了额外的支持。你可以粗略理解它是一个中转站&#xff0c; 帮你实现 公网 ←→ FRP(服务器…

OpenCV importerror:dll load failed

从预编译的二进制文件安装OpenCV&#xff0c;从github下载opencv-4.8.0-windows.exe 编译好的文件。按照官方文档拖入cv2.pyd文件。 https://docs.opencv.org/4.8.0/d5/de5/tutorial_py_setup_in_windows.html 使用pycharm运行时&#xff0c;出现报错&#xff0c;importerror…

ITK-SNAP医学影像处理软件无法打开问题

安装ITK-SNAP后成功打开了一次之后再次打开就一直显示无法打开的提示, 在检查全路径为英文和其他版本的ITK-SNAP仍然无法打开&#xff1b; 解决办法&#xff1a; 根据报错的提示的路径&#xff0c;找到UserPreferences.xml 文件&#xff0c;将xml文件删掉&#xff0c;然后就可以…

好用的networkx绘图包

1. NetworkX简介 NetworkX 是一个用于创建、操作和研究复杂网络的 Python 库。它可以创建、分析和可视化各种类型的网络(包括有向图和无向图)&#xff0c;例如社交网络、Web图、生物网络等。 NetworkX 提供了许多图的算法和分析工具&#xff0c;比如节点的度、网络的直径、最短…

【linux】2 软件管理器yum和编辑器vim

目录 1. linux软件包管理器yum 1.1 什么是软件包 1.2 关于rzsz 1.3 注意事项 1.4 查看软件包 1.5 如何安装、卸载软件 1.6 centos 7设置成国内yum源 2. linux开发工具-Linux编辑器-vim使用 2.1 vim的基本概念 2.2 vim的基本操作 2.3 vim正常模式命令集 2.4 vim末行…

GT Code - 图译算法编辑器(集成QT、C++、C、Linux、Git、java、web、go、高并发、服务器、分布式、网络编程、云计算、大数据项目)

目录 项目概述 发文意义 项目介绍 功能分析 设计概要 功能展示 项目文档 项目概述 “GT Code 图译算法编辑器”是一款跨平台、轻量级的代码编辑器&#xff0c;主要面向软件开发人员&#xff0c;它实现了编辑、编译、绘制代码流程图、生成调试演示动画等功能&#xff0c;以…

透过源码理解Flutter中widget、state和element的关系

1、framework源码组成 Flutter中widget、state、element的源码位于framework.dart中&#xff0c;整个文件6693行(版本Flutter 3.12.0-14.0.pre.28)。整个代码可划分为若干部分&#xff0c;主要包括key、widget、state、element四部分。 1.1 key 关于key的代码65行到272行&am…

如何让CSDN学习成就个人能力六边形全是100分:解析个人能力雷达图的窍门

&#x1f337;&#x1f341; 博主猫头虎 带您 Go to New World.✨&#x1f341; &#x1f984; 博客首页——猫头虎的博客&#x1f390; &#x1f433;《面试题大全专栏》 文章图文并茂&#x1f995;生动形象&#x1f996;简单易学&#xff01;欢迎大家来踩踩~&#x1f33a; &a…

142. 环形链表 II

题目描述 给定一个链表的头节点 head &#xff0c;返回链表开始入环的第一个节点。 如果链表无环&#xff0c;则返回 null。 如果链表中有某个节点&#xff0c;可以通过连续跟踪 next 指针再次到达&#xff0c;则链表中存在环。 为了表示给定链表中的环&#xff0c;评测系统内…

运维监控学习笔记8

在服务器端&#xff0c;我们添加了nginx-server的主机&#xff1a; 在解决Error问题的过程中&#xff0c;我还通过zabbix_get这个命令进行了测试&#xff0c;发现是没有的&#xff0c;后来确认是在web页面配置的过程中&#xff0c;我输错了密码。 yum install zabbix-getzabbi…

应急响应-Webshell

文章目录 一、Webshell概述什么是WebshellWebshell分类基于编程语言基于文件大小/提供的功能多少 Webshell 检测方法 二、常规处置方法三、技术指南1、初步预判2、 Webshell排查3、Web日志分析&#xff08;查找攻击路径及失陷原因&#xff09;4、系统排查4.1 Windows4.2 Linux …

淘宝API接口的实时数据和缓存数据区别

电商API接口实时数据是指通过API接口获取到的与电商相关的实时数据。这些数据可以包括商品库存、订单状态、销售额、用户活跃度等信息。 通过电商API接口&#xff0c;可以实时获取到电商平台上的各种数据&#xff0c;这些数据可以帮助企业或开发者做出及时的决策和分析。例如&…

SAP MM学习笔记23-购买发注的账户分配类型(勘定Category)

SAP中控制财务凭证过账科目的是 账号分配类型&#xff08;勘定Category&#xff09;栏目。 ・账号分配类型&#xff08;勘定Category&#xff09;有&#xff1a; 1&#xff0c;K 原价Center&#xff08;成本中心。用于消耗物料采购 的过账&#xff09; 2&#xff0c;E 得意先…

医疗PACS源码,支持三维多平面重建、三维容积重建、三维表面重建、三维虚拟内窥镜

C/S架构的PACS系统源码&#xff0c;PACS主要进行病人信息和影像的获取、处理、存储、调阅、检索、管理&#xff0c;并通过网络向全院提供病人检查影像及诊断报告&#xff1b;各影像科室之间共享不同设备的病人检查影像及诊断报告;在诊断工作站上&#xff0c;调阅HIS中病人的其它…