client-go中ListAndWatch机制,informer源码详解

文章首发地址: 学一下 (suxueit.com)icon-default.png?t=N7T8https://suxueit.com/article_detail/s9UMb44BWZdDRfKqFv22

先上一张,不知道是那个大佬画的图

图片

简单描述一下流程

client-go封装部分

以pod为例

  1. 、先List所有的Pod资源,然后通过已经获取的pod资源的最大版本去发起watch请求,watch持续接收api-server的事件推送,

  2. 将所有的pod写入到queue

  3. 从队列中取出pod

  4. 4和5将 取出的pod缓存到本地

  5. 调用用户自定义的资源处理函数【AddEventHandler】

用户自定义部分

  1. 将事件写入,自定义的工作队列

  2. 遍历队列,取出资源key

  3. 用key从缓存取出对应资源,进行逻辑处理

阅读完成后续部分,你会发现上面的流程是有一点问题的

list后会立刻写入队列,然后再发起watch,并将监控的事件入队

informer入口分析

通常我们写controller都会初始化一个informer,然后lister对应资源,或者给资源添加的hook点

// 开始运行informer
kubeInformerFactory.Start(stopCh)
//
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {f.lock.Lock()defer f.lock.Unlock()
​// 这里为什么是一个 数组?for informerType, informer := range f.informers {if !f.startedInformers[informerType] {// informer入口go informer.Run(stopCh)f.startedInformers[informerType] = true}}
}

面试问题

一个informer可以监听多个资源吗?

不能: 我们在使用时是看似是通过定义的一个informer客户端去监听多个资源【该informer不是实际意义上的informer,而是一个工厂函数】,实际上,该informer每监听一个资源会生成一个informer并存入工厂informer数组中,启动时再分别调用【goruntine】

因为 一个informers是可以listAndWatch多种资源的 当你调用 kubeInformerFactory.Core().V1().Pods().Lister() kubeInformerFactory.Core().V1().ConfigMaps().Lister() 会分别给 pods和configmap的资源类型生成一个informer

func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {f.lock.Lock()defer f.lock.Unlock()
​informerType := reflect.TypeOf(obj)informer, exists := f.informers[informerType]if exists {return informer}
​resyncPeriod, exists := f.customResync[informerType]if !exists {resyncPeriod = f.defaultResync}
​informer = newFunc(f.client, resyncPeriod)// 通过类型将 资源的informer进行存储f.informers[informerType] = informer
​return informer
}

sharedIndexInformer分析

主要结构
type sharedIndexInformer struct {indexer    Indexercontroller Controller// 封装多个事件消费者的处理逻辑,client端通过AddEventHandler接口加入到事件消费Listener列表中processor             *sharedProcessorcacheMutationDetector MutationDetectorlisterWatcher ListerWatcherobjectType runtime.Objectstarted, stopped bool....
}
  • indexer: 本地缓存,底层的实现是threadSafeMap

  • controller: 内部调用Reflector进行ListAndWatch, 然后将事件发送给自定义事件消费者【往上获取apiserver事件,往下发送事件给定义的消费者】

  • processor: 封装多个事件消费者的处理逻辑,client端通过AddEventHandler接口加入到事件消费Listener列表中

    kubeLabelInformer.Core().V1().Pods().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc: controller.enqueuePodFn,UpdateFunc: func(old, new interface{}) {newPod := new.(*covev1.Pod)oldPod := old.(*covev1.Pod)
    ​if newPod.ResourceVersion == oldPod.ResourceVersion {return}controller.enqueuePodFn(new)},DeleteFunc: controller.enqueuePodFn,})
    //  AddEventHandler 主要内容
    // handler 就是注册的函数listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)// 不能重复加入,所有判断是否已经开始了if !s.started {s.processor.addListener(listener)return}
  • listerWatcher:实现从apiserver进行ListAndWatch的对象,发起watch请求,将server推送的事件传入本地channel,等待消费

  • objectType: 该informer监听的资源类型,例如 Pods

informer.run都干了什么
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {defer utilruntime.HandleCrash()
​// 定义了 DeltaFIFO 队列fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{KnownObjects:          s.indexer,EmitDeltaTypeReplaced: true,})
​cfg := &Config{Queue: fifo,// listand watch 的接入口ListerWatcher:    s.listerWatcher,ObjectType:       s.objectType,FullResyncPeriod: s.resyncCheckPeriod,RetryOnError:     false,ShouldResync:     s.processor.shouldResync,
​// Process 是将事件发送给本地注册的事件处理函数的入口Process:           s.HandleDeltas,WatchErrorHandler: s.watchErrorHandler,}
​func() {// 这里为什么要加锁呢?// 猜测: 可能是防止有人不规范使用 informer,在多个goruntine中启动Start,导致多次初始化s.startedLock.Lock()defer s.startedLock.Unlock()
​s.controller = New(cfg)s.controller.(*controller).clock = s.clocks.started = true}()
​// Separate stop channel because Processor should be stopped strictly after controllerprocessorStopCh := make(chan struct{})var wg wait.Groupdefer wg.Wait()              // Wait for Processor to stopdefer close(processorStopCh) // Tell Processor to stop
​// 这里如果使用的是 kubebuild和代码生成,默认使用的是 defaultCacheMutationDetectorwg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)// 运行 sharedProcessorwg.StartWithChannel(processorStopCh, s.processor.run)
​defer func() {s.startedLock.Lock()defer s.startedLock.Unlock()s.stopped = true // Don't want any new listeners}()s.controller.Run(stopCh)
}

这里主要讲解一下

wg.StartWithChannel(processorStopCh, s.processor.run)

记得我们上面分析了,processor: 封装多个事件消费者的处理逻辑,client端通过AddEventHandler接口加入到事件消费Listener列表中,这里就开始运行Listeners

运行两个函数:

for _, listener := range p.listeners {// 内部会定时运行 1 秒运行一次去获取p.wg.Start(listener.run)p.wg.Start(listener.pop)
}
  • listener.run 从channel【nextCh】中读取数据,然后去触发注册的函数

    图片

  • 将数据从channel【addch】发送到 nextCh 【后面还会有将事件发送到channel【addCh】的操作】

    图片

controller分析

func (c *controller) Run(stopCh <-chan struct{}) {defer utilruntime.HandleCrash()r := NewReflector(c.config.ListerWatcher,c.config.ObjectType,c.config.Queue,c.config.FullResyncPeriod,)....// 运行 Reflector ,进行listAndWatch// 先进行list,将list的数据存入 队列,并存入队列自带的缓存item【map结构】// 然后watch// 对服务端推送的事件进行解码后,会将事件存入 queue【包括queue和item】// 详细见: 后面的reflector分析wg.StartWithChannel(stopCh, r.Run)
​// 循环执行processLoop// 内部调用 //err := process(item) process就是 HandleDeltas// 执行 HandleDeltas// HandleDeltas这里做两件事// 1,将数据存到 本地缓存,也就是  ThreadSafeStore【实际开发中就可以通过: lister直接获取】// 2、只是将事件通过distribute 函数发送到了一个channel【Addch】wait.Until(c.processLoop, time.Second, stopCh)wg.Wait()
}

processLoop——》c.config.Queue.Pop——》HandleDeltas

Pop函数

id := f.queue[0]f.queue = f.queue[1:]if f.initialPopulationCount > 0 {f.initialPopulationCount--}// 获取对象item, ok := f.items[id]if !ok {// This should never happenklog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id)continue}// 删除items中的缓存delete(f.items, id)
​// 调用process,前面的 sharedIndexInformer.HandleDeltas,// 将事件发送到本地注册的处理函数err := process(item)// 如果处理失败 从新加入队列if e, ok := err.(ErrRequeue); ok {f.addIfNotPresent(id, item)err = e.Err}

HandleDeltas函数

加载失败

面试题

什么情况下资源对象会在DeltaFIFO存在,但是缓存中不存在【ThreadSafeStore】

答: 写入队列和写入缓存是有先后顺序的,事件到达后会先写入队列,再通过队列的Pop方法进行处理,写入缓存,

扩展: 但是在实际使用中并不影响,因为自定的代码中的队列是在写入缓存后才会有事件通知到我们注册的handler,这时才会添加事件进入我们定义的队列,并开始运行代码更新资源

因为:HandleDeltas函数和 Watch写入队列是异步的,而且肯定是等Watch写入队列后,才会调度HandleDeltas进行缓存写入所有这个中间会有延迟

会不会出现缓存中有,而队列中没有的情况?

答: 是的,确实会有这种情况

1、队列中的事件处理后就会被清理,所有总是会出现这种情况的

Reflector分析

reflector做三件事

  1. 启动的时候向apiserver发起List请求,获取所有监听的资源,放入 DeltaFIFO

  2. 进行resync,定期将item中的资源,重新同步到queue中

  3. watch资源,通过rest接口发起watch请求,并等待apiserver推送的数据,

type DeltaFIFO struct {// 缓存资源对象items map[string]Deltas// 采用slice作为队列queue []string// 基于下面两个参数可以判断资源是否同步完成// 只要添加数据就会设置为 truepopulated bool// 第一次镜像replace时会设置 为资源数量【List阶段同步数据到队列调用的是 DeltaFIFO的replace】// 调用Pop时会initialPopulationCount--,Pod时会调用HandleDeltas,将数据同步到自定义的队列中,第一批插入的数据都Pop完成后,initialPopulationCount==0.说明同步完成initialPopulationCount int
}

func (r *Reflector) Run(stopCh <-chan struct{}) {wait.BackoffUntil(func() {// 开始进行ListAndWatchif err := r.ListAndWatch(stopCh); err != nil {r.watchErrorHandler(r, err)}}, r.backoffManager, true, stopCh)
​
}

List阶段

这里没啥可说的,就是请求数据写入队列

// 发起List,这里采用了分页获取【如果设置了chunk】
list, paginatedResult, err = pager.List(context.Background(), options)
// 将数据写入队列
if err := r.syncWith(items, resourceVersion); err != nil {return fmt.Errorf("unable to sync list result: %v", err)}
// 设置资源版本,防止后续网络断开需要重试的情况,可以从已经获取的版本开始获取
r.setLastSyncResourceVersion(resourceVersion)

Watch过程

  1. 指定资源版本通过rest请求apiserver进行Watch

  2. apiserver推送的数据会被Watch对象写入channel【result】

  3. 从Result这个channel中不断接收原生,将事件通过 switch 不同的类型调用不同的函数

第一阶段
options = metav1.ListOptions{// 该值会持续更新,如果网络异常导致 连续中断,则会从接收到的版本再次进行watchResourceVersion: resourceVersion,....
}
​
// start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent
start := r.clock.Now()
// 开始对资源进行watch, apiserver推送事件后,会将事件推送到一个 result的channel中,然后由后续的watchHandler进行处理
w, err := r.listerWatcher.Watch(options)
Watch对象的实现
retry := r.retryFn(r.maxRetries)
url := r.URL().String()
for {if err := retry.Before(ctx, r); err != nil {return nil, retry.WrapPreviousError(err)}// 构造请求req, err := r.newHTTPRequest(ctx)if err != nil {return nil, err}
​resp, err := client.Do(req)updateURLMetrics(ctx, r, resp, err)retry.After(ctx, r, resp, err)if err == nil && resp.StatusCode == http.StatusOK {// 返回流对象return r.newStreamWatcher(resp)}
}
流对象
// NewStreamWatcher creates a StreamWatcher from the given decoder.
func NewStreamWatcher(d Decoder, r Reporter) *StreamWatcher {sw := &StreamWatcher{source:   d,reporter: r,// It's easy for a consumer to add buffering via an extra// goroutine/channel, but impossible for them to remove it,// so nonbuffered is better.result: make(chan Event),// If the watcher is externally stopped there is no receiver anymore// and the send operations on the result channel, especially the// error reporting might block forever.// Therefore a dedicated stop channel is used to resolve this blocking.done: make(chan struct{}),}go sw.receive()return sw
}
sw.receive()

图片

从result这个channel获取数据,并调用对应的事件

会在这里循环读取数据,ResultChan()返回的就是 result 这个channel

图片

通过不同的事件类型,调用不同的队列方法 【store是前面定义的 DeltaFIFO】

同时还会将已经获取的 资源版本进行更新【这里传进来的是指针,所有更改后 外面会生效】

图片

reSync过程
// 这里进行重新 同步数据到队列中, 同步主要是为了 能够周期性的去触发我们自己写的代码更新资源状态go func() {resyncCh, cleanup := r.resyncChan()defer func() {cleanup() // Call the last one written into cleanup}()for {select {case <-resyncCh:case <-stopCh:returncase <-cancelCh:return}// 重新同步资源对象到队列中if r.ShouldResync == nil || r.ShouldResync() {klog.V(4).Infof("%s: forcing resync", r.name)if err := r.store.Resync(); err != nil {resyncerrc <- errreturn}}cleanup()// 是否可以进行同步: 这里是去重试时间开启了一个定时通知resyncCh, cleanup = r.resyncChan()}}()
​
// 进行重新同步
func (f *DeltaFIFO) Resync() error {f.lock.Lock()defer f.lock.Unlock()
​if f.knownObjects == nil {return nil}
​// fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{//      KnownObjects:          s.indexer,//      EmitDeltaTypeReplaced: true,//  })// 这里同步是去取 indexer里面的数据: indexer就是 threadSafeMapkeys := f.knownObjects.ListKeys()for _, k := range keys {if err := f.syncKeyLocked(k); err != nil {return err}}return nil
}

面试题

网络异常,会不会导致事件丢失?

不会,网络异常只会影响到Watch,这中间发送的事件无法接收到,但是一旦网络恢复,重新开始Watch,客户端会维护一个已经接收事件的版本号,当网络恢复,会从这个版本号开始进行watch资源,

如果客户端重启呢,会不会丢失事件?

是的,客户端重启肯定是会丢失事件的,但是并不影响controller的运行,重启会重新获取全量的资源列表,这时能够获取到最新的版本,controller的目标就是将用户期望spec,实际进行应用,所有只需要应用最新版本即可

存的资源版本号,是每个资源对象都有存吗,例如pod资源,pod1的版本号和pod2的版本号?

不用,只用存储一个版本号即可,因为资源的的版本号是递增的,只用记录最后一个同步的版本即可,

ListAndWatch中的watch过程,是每一个资源都有一个watch吗?

:不是,client-go采用的是采用的区间watch【同时watch满足条件的一批资源】,所以只需要一个watch请求,

扩展:是一类对象一个watch,例如pod,configmap,这是一类对象

k8s采用了多路复用,可以将一个controller发起的watch请求,通过一个连接进行发送,可以降低api-server的连接数量

为什么要进行重新同步?

1、重新同步是为了controller能够定期的去更新对应的资源

2、controller在处理事件时,如果需要等待或者处理错误,通过重新同步可以再次触发更新

listAndWatch的流程?

1、发起List请求,获取所有需要Watch的资源 2、将这些资源写入队列

3、定期的中缓存中取出 资源对象,写入队列

4、对资源进行Watch,如果有事件会同步到队列

5、从队列中取出资源,写入到缓存

6、调用用户定义的handler处理事件

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/287863.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

阿里云 -- 连接云服务器ECS、管理云服务器ECS、WordPress 页面配置

连接云服务器ECS 1. 远程连接云服务器ECS&#xff0c;点击实例最右侧操作列的远程连接按钮&#xff0c;并在弹出的对话框中点击立即登录 2. 登录云服务器ECS&#xff0c;通过密码认证方式&#xff0c;输入用户名和密码 提示&#xff1a;新创建的ECS实例状态即使为运行中&#…

微信投票小程序源码系统:创建新活动+多模板一键切换 带完整的安装代码包以及搭建教程

随着社交媒体的普及&#xff0c;投票活动已经成为人们表达意见、参与决策的重要方式。微信小程序作为一种新兴的应用形态&#xff0c;具有用户基数大、使用门槛低的特点&#xff0c;非常适合举办投票活动。然而&#xff0c;对于普通用户来说&#xff0c;开发一个功能完善的投票…

xilinx的高速接口构成原理和连接结构

本文来源&#xff1a; V3学院 尤老师的培训班笔记【高速收发器】xilinx高速收发器学习记录Xilinx-7Series-FPGA高速收发器使用学习—概述与参考时钟GT Transceiver的总体架构梳理 文章目录 一、概述&#xff1a;二、高速收发器结构&#xff1a;2.1 QUAD2.1.1 时钟2.1.2 CHANNEL…

前端html常用标签 笔记

一、基础 开始标签 结束标签 大部分是成对出现的标签&#xff0c;这个是空标签(/放在最后&#xff0c;/可以省掉) 缩进 向后Tab 、前向ShiftTab 红的就是元素属性 标签可以使内容有一些特殊的表现: 给body颜色后&#xff1a; h1 span(连起来) 标签&#xff1a; 标题h1 h2 h3 …

介绍部署esxi8.0产品的方式

什么是esxi esxi的中文叫裸机虚拟机管理器 ESXi是由VMware公司开发的一种裸机虚拟机管理器&#xff0c;全称为VMware ESXi。 ESXi是一种虚拟化技术&#xff0c;专门设计用于在物理服务器上运行虚拟机&#xff0c;它的主要特点是能够最大限度地降低硬件配置要求并简化部署过程…

Android 性能优化实例分享-内存优化 兼顾效率与性能

背景 项目上线一段时间后,回顾重要页面 保证更好用户体验及生产效率&#xff0c;做了内存优化和下载导出优化&#xff0c;具体效果如最后的一节的表格所示。 下面针对拍摄流程的两个页面 预览页 导出页优化实例进行介绍&#xff1a; 一.拍摄前预览页面优化 预览效果问题 存在…

tcp 协议详解

什么是 TCP 协议 TCP全称为 “传输控制协议(Transmission Control Protocol”). 人如其名, 要对数据的传输进行一个详细的控制。TCP 是一个传输层的协议。 如下图&#xff1a; 我们接下来在讲解 TCP/IP 协议栈的下三层时都会先解决这两个问题&#xff1a; 报头与有效载荷如何…

EtherCAT转RS232网关在风电领域的应用

开疆智能EtherCAT转RS232网关在风电领域的应用主要体现在以下几个方面&#xff1a; 1.数据采集与传输&#xff1a;在风力发电设备中&#xff0c;传感器和执行器的数据采集和传输至关重要。EtherCAT转RS232网关可以将风力发电设备中的RS232通信协议转换为EtherCAT协议&#xff0…

何恺明重提十年之争——模型表现好是源于能力提升还是捕获数据集偏置?

想象一下&#xff0c;如果把世界上所有的图片都找来&#xff0c;给它们放到一块巨大的空地上&#xff0c;其中内容相似的图片放得近一些&#xff0c;内容不相似的图片放得远一些&#xff08;类比向量嵌入&#xff09;。然后&#xff0c;我随机地向这片空地撒一把豆子&#xff0…

【Linux实践室】Linux用户管理实战指南:用户密码管理操作详解

&#x1f308;个人主页&#xff1a;聆风吟_ &#x1f525;系列专栏&#xff1a;Linux实践室、网络奇遇记 &#x1f516;少年有梦不应止于心动&#xff0c;更要付诸行动。 文章目录 一. ⛳️任务描述二. ⛳️相关知识2.1 &#x1f514;用户密码存放地及方式2.2 &#x1f514;使用…

yolov5+pyside6+登录+用户管理目标检测可视化源码

一、软件简介 这是基于yolov5目标检测实现的源码&#xff0c;提供了用户登录功能界面&#xff1b; 用户需要输入正确的用户名和密码才可以登录。如果是超级管理员&#xff0c;可以修改普通用户的信息&#xff0c;并且在检测界面的右上角显示【管理用户】按钮。 支持图片、视频、…

DNS协议 是什么?说说DNS 完整的查询过程?

一、是什么 DNS&#xff08;Domain Names System&#xff09;&#xff0c;域名系统&#xff0c;是互联网一项服务&#xff0c;是进行域名和与之相对应的 IP 地址进行转换的服务器 简单来讲&#xff0c;DNS相当于一个翻译官&#xff0c;负责将域名翻译成ip地址 IP 地址&#…

Web API —— DOM 学习(四)(完结)

目录 一、日期对象 &#xff08;一&#xff09;实例化 &#xff08;二&#xff09;日期对象方法 1.时间戳介绍 2.获得时间戳的方式 getTime()方法 new Date()方法 Date.now()方法 二、节点操作 &#xff08;一&#xff09;DOM 节点 1.节点类型 元素节点 &#xff08…

SpringBoot 邮件服务集成配置全面解析

前言 本文以网易邮箱&#xff08;及 163 邮箱&#xff09;为例&#xff0c;展示如何为 SpringBoot 项目集成邮件服务&#xff0c;其他邮箱配置类似&#xff0c;可以自行查看 Spring Email 指南 或是其他官方文档 授权码 首先我们需要获取授权码&#xff0c;用于后续配置&…

【线段树二分】第十三届蓝桥杯省赛C++ A组/研究生组 Python 研究生组《扫描游戏》(C++)

【题目描述】 有一根围绕原点 O 顺时针旋转的棒 OA&#xff0c;初始时指向正上方&#xff08;Y 轴正向&#xff09;。 在平面中有若干物件&#xff0c;第 i 个物件的坐标为&#xff08;,)&#xff0c;价值为 。 当棒扫到某个物件时&#xff0c;棒的长度会瞬间增长 &#xff…

R语言ggplot2 | 热图+随机森林重要性!升级版~

&#x1f4cb;文章目录 原图复现定义ggrf_ggcor_plot()函数加载数据集一键出图函数优点 今天推出一个升级版&#xff1a; ggrf_ggcor_plot的函数。只需要输入 响应变量的矩阵和 解释变量的矩阵&#xff0c;就能轻松一键生成随机森林重要性相关性热图。 原图 所需复现的随机森…

发车,易安联签约某新能源汽车领军品牌,为科技创新保驾护航

近日&#xff0c;易安联成功签约某新能源汽车领军品牌&#xff0c;为其 数十万终端用户 建立一个全新的 安全、便捷、高效一体化的零信任终端安全办公平台。 随着新能源汽车行业的高速发展&#xff0c;战略布局的不断扩大&#xff0c;技术创新不断引领其市场价值走向高点&am…

如何在数字化转型中确保数据安全

随着科技的飞速发展&#xff0c;数字化转型已成为企业发展的必然趋势。数字化转型是指企业利用数字技术对业务流程、组织结构和商业模式进行全面创新和变革&#xff0c;以提高企业的竞争力和创新能力。然而&#xff0c;在数字化转型过程中&#xff0c;数据安全问题日益凸显&…

新能源汽车充电桩主板各模块成本占比解析

汽车充电桩主板是汽车充电桩的重要组件&#xff0c;主要由微处理器模块、通信模块、控制模块、安全保护模块、传感器模块等多个模块构成。深入探究各模块在总成本中的比重&#xff0c;我们可以更好地优化成本结构、提高生产效率&#xff0c;并为未来的技术创新和市场需求变化做…

R语言学习——Rstudio软件

R语言免费但有点难上手&#xff0c;是数据挖掘的入门级别语言&#xff0c;拥有顶级的可视化功能。 优点&#xff1a; 1统计分析&#xff08;可以实现各种分析方法&#xff09;和计算&#xff08;有很多函数&#xff09; 2强大的绘图功能 3扩展包多&#xff0c;适合领域多 …