【kubernetes】kubernetes中的Controller

1 什么是Controller?

kubernetes采用了声明式API,与声明式API相对应的是命令式API:

  • 声明式API:用户只需要告诉期望达到的结果,系统自动去完成用户的期望
  • 命令式API:用户需要关注过程,通过命令一步一步完成用户的需求

因此,用户向k8s提交的yaml文件中最重要的部分就是spec,相当于就是用户期望的结果,而使用-o yaml选项查看时,还有一个很重要的部分就是status,它表示的就是当前状态,因此,k8s主要任务就是完成status->spec的转变。这项工作就是Controller(控制器)完成的。

对于不同的资源,控制逻辑是不一样的,因此,就有很多Controller,例如,DeploymentController负责将Deployment的status向spec进行转变,ReplicaSetController负责将ReplicaSet的status向spec进行转变。

从上面可以看出,Controller的工作方式如下:

  • 监听资源变化
  • 得到资源的当前状态status和期望状态spec
  • 执行逻辑使得status->spec

下面以Deployment的创建操作为例说明整个流程:
请添加图片描述

通过上图重新复习下各组件的工作方式:

  • apiserver:为其他组件提供接口,并且所有的组件都通过apiserver进行交互
  • etcd:存储集群的资源对象
  • Controller Manager:管理控制器,Watch -> Analyze -> Act,监听资源的变化,分析出spec和status的差别,执行操作使得status向spec转变
  • Scheduler:监听资源的变化,如果发现未调度的Pod,通过一定的策略选择出Node,设置Pod的Node字段
  • Kubelet:监听调度给当前Node的Pod,并执行对应的操作

可以发现,除了apiserver和etcd,其他组件都可以称为Controller。

2 Controller的实现

知道了Controller的工作方式,如果是我们自己实现Controller,可以会这样来实现:

请添加图片描述

Controller直接通过Apiserver的接口监控对应资源的变化,当资源发生变化时,直接执行对应的业务逻辑,也就是调协循环。

这样会有啥问题呢?

当集群中Node很多时,就会有很多kubelet监控Pod的状态变化,而所有的监听操作都需要通过apiserver,那么apiserver的压力就会很大,就会造成集群的不稳定。

当然,其他资源(例如,Pod或者服务)很多时,同样会造成集群不稳定。

因此,k8s的client-go(client-go)库采用了另外的设计:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

client-go components:

  • Reflector:对特定类型的资源执行ListAndWatch,当监听到资源变更时,通过API获取最新的资源对象,然后将它放到Delta Fifo queue队列中
  • Informer:从Delta Fifo queue队列中弹出对象,然后调用Indexer放到Store里面,同时调用用户提交的回调函数(ResourceEventHandler)
  • Indexer:用于操作Store中的对象

Custom Controller components:

  • Informer Reference和Indexer Reference都是对client-go中对象的引用,用户控制器可以通过cache库直接创建或者使用Factory工厂函数创建
  • ResourceEventHandler:用户控制器接收对象的回调函数,一般来说,里面的逻辑就是,获取对象的key,然后将key写入WorkQueue
  • WorkQueue:用户控制器创建的队列,负责存储用户控制器需要处理的对象的key
  • Process Item:从WorkQueue中读取key,通过key获取对应的对象

上图是通常会给出的关于Controller的实际实现的逻辑,初看还是挺复杂的,大致的模块和功能如下:

请添加图片描述

于是,Controller实现的步骤如下:

  • 获取Informer和Indexer的引用,指定要监控变更的资源类型,注册ResourceEventHandler,并创建WorkQueue,用上述的3个对象初始化我们自己的Controller
  • 编写Process Item Loop,从WorkQueue中读取key,然后执行我们自己的业务逻辑

因此,整个Controller我们需要注入的逻辑只有2个部分,其他都是相对固定的:

  • ResourceEventHandler
  • Process Item

3 Controller的使用

上面介绍了k8s中的Controller的实现,而要使用

下面对client-go中的workqueue的例子进行分析:

workqueue example by client-go

type Controller struct {indexer  cache.Indexer // Indexer,缓存的索引queue    workqueue.RateLimitingInterface // 带限速功能的WorkQueueinformer cache.Controller // Informer
}// 创建控制器
func NewController(queue workqueue.RateLimitingInterface, indexer cache.Indexer, informer cache.Controller) *Controller {return &Controller{informer: informer,indexer:  indexer,queue:    queue,}
}// worker的具体执行逻辑
func (c *Controller) processNextItem() bool {// 从workqueue中获取keykey, quit := c.queue.Get()if quit {return false}// 告诉队列已经处理完毕defer c.queue.Done(key)err := c.syncToStdout(key.(string))// 错误处理c.handleErr(err, key)return true
}// 控制器的业务逻辑,这里就执行status->spec的转变
func (c *Controller) syncToStdout(key string) error {obj, exists, err := c.indexer.GetByKey(key)if err != nil {klog.Errorf("Fetching object with key %s from store failed with %v", key, err)return err}if !exists {// Pod已经不存在fmt.Printf("Pod %s does not exist anymore\n", key)} else {// 这里执行status->spec的转变逻辑fmt.Printf("Sync/Add/Update for Pod %s\n", obj.(*v1.Pod).GetName())}return nil
}// 错误处理,包含重试处理
func (c *Controller) handleErr(err error, key interface{}) {if err == nil {// 处理完毕c.queue.Forget(key)return}// 如果出现问题,会进行重试,也就是重新入workqueue// 但是,入workqueue不超过5次if c.queue.NumRequeues(key) < 5 {klog.Infof("Error syncing pod %v: %v", key, err)// 重新入workqueuec.queue.AddRateLimited(key)return}c.queue.Forget(key)runtime.HandleError(err)klog.Infof("Dropping pod %q out of the queue: %v", key, err)
}// 启动我们自己的控制器
func (c *Controller) Run(workers int, stopCh chan struct{}) {defer runtime.HandleCrash()defer c.queue.ShutDown()// 启动Informer开始监听资源变化go c.informer.Run(stopCh)// 等待cache同步if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))return}// 运行若干个worker,// wait.Until(),每隔1秒执行runWorker()函数,直到stopCh收到结束信号for i := 0; i < workers; i++ {go wait.Until(c.runWorker, time.Second, stopCh)}// 读取结束信号,结束控制器<-stopChklog.Info("Stopping Pod controller")
}func (c *Controller) runWorker() {for c.processNextItem() {}
}func main() {var kubeconfig stringvar master stringflag.StringVar(&kubeconfig, "kubeconfig", "", "absolute path to the kubeconfig file")flag.StringVar(&master, "master", "", "master url")flag.Parse()// 通过master和kubeconfig生成配置对象config, err := clientcmd.BuildConfigFromFlags(master, kubeconfig)if err != nil {klog.Fatal(err)}// 根据配置对象生成clientset,用于连接k8sclientset, err := kubernetes.NewForConfig(config)if err != nil {klog.Fatal(err)}// 创建Pod的watcherpodListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceDefault, fields.Everything())// 创建workqueuequeue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())// 创建Indexer和Informer,其中重要的是两个参数,Pod的watcher和回调函数// 告知Informer,我们只监听Pod的资源变化,并且,给Infomer注册回调函数indexer, informer := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{AddFunc: func(obj interface{}) {key, err := cache.MetaNamespaceKeyFunc(obj)if err == nil {queue.Add(key)}},UpdateFunc: func(old interface{}, new interface{}) {key, err := cache.MetaNamespaceKeyFunc(new)if err == nil {queue.Add(key)}},DeleteFunc: func(obj interface{}) {key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)if err == nil {queue.Add(key)}},}, cache.Indexers{})// 创建我们自己的控制器controller := NewController(queue, indexer, informer)// 启动控制器stop := make(chan struct{})defer close(stop)go controller.Run(1, stop)// Wait foreverselect {}
}

参考资料:

1 client-go under the hood

2 client-go Examples

3 k8s-client-go demo

4 writing controllers

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/148001.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Scapy样例三则

1. 演示ls()/lsc()用法: ##Exec1.pyfrom scapy.all import *## 列出scapy支持的命令 def ListScapyCmd():lsc()## 列出指定协议的各个字段, 用于构成packet def ListProtocolField(protoclName):ls(protoclName)if __name__ "__main__":print("\nexample of …

SNAP与Sen2Cor下载与安装

SNAP软件下载与安装 一、下载地址 首先进入网站 找到DOWNLOAD下载页&#xff0c; 安装完成后&#xff0c;界面如下 还需要再装一个Sen2cor下载好之后&#xff0c;解压到用户文件夹下 然后打开L2A_Process.bat文件 打开CMD&#xff0c;输入 cd C:\Users\lenovo\AppData\L…

ubuntu 安装 flowiseai

flowiseai 可以快速的搭建AI应用 安装docker 安装docker https://docs.docker.com/desktop/install/linux-install/ 安装docker-compose 安装docker-compose https://blog.csdn.net/sunyuhua_keyboard/article/details/133070011?csdn_share_tail%7B%22type%22%3A%22blo…

安装matplotlib_

安装pip 安装matplotlib 安装完毕 导入出现bug......

C++人事管理系统

一、设计目的 企业员工管理系统主要是针对企业员工的基本信息进行增、删、改、查的相关操作&#xff0c;以便用户使用本管理系统时可以快速对企业员工的信息进行管理。 二、设计内容 1.用户首次使用本系统时进行密码设置和初始化操作。 2.实现添加功能&#xff0c;即添加员工…

httpserver 下载服务器demo 以及libevent版本的 httpserver

实现效果如下&#xff1a; 图片可以直接显示 cpp h 这些可以直接显示 其他的 则是提示是否要下载 单线程 还有bug 代码如下 先放上来 #include "httpserver.h" #include "stdio.h" #include <stdlib.h> #include <arpa/inet.h> #include…

为什么Spring不建议使用基于字段的依赖注入

在我们通过IDEA编写Spring的代码的时候&#xff0c;假如我们编写了如下代码&#xff1a; IDEA会给我们一个warning警告&#xff1a; 翻阅官方文档&#xff1b;我们会发现&#xff1a; 大意就是强制依赖使用构造器注入&#xff0c;可选依赖使用setter注入那么这是为什么呢&am…

一键智能视频语音转文本——基于PaddlePaddle语音识别与Python轻松提取视频语音并生成文案

前言 如今进行入自媒体行业的人越来越多&#xff0c;短视频也逐渐成为了主流&#xff0c;但好多时候是想如何把视频里面的语音转成文字&#xff0c;比如&#xff0c;录制会议视频后&#xff0c;做会议纪要&#xff1b;比如&#xff0c;网课教程视频&#xff0c;想要做笔记&…

【算法|动态规划No.10】leetcode LCR 089. 打家劫舍 LCR 090. 打家劫舍 II

个人主页&#xff1a;兜里有颗棉花糖 欢迎 点赞&#x1f44d; 收藏✨ 留言✉ 加关注&#x1f493;本文由 兜里有颗棉花糖 原创 收录于专栏【手撕算法系列专栏】【LeetCode】 &#x1f354;本专栏旨在提高自己算法能力的同时&#xff0c;记录一下自己的学习过程&#xff0c;希望…

fcpx插件:82种复古电影胶卷框架和效果mFilm Matte

无论您是在制作音乐剪辑、私人假期视频还是大型广告活动&#xff0c;这个专业的插件都将帮助您为您的镜头赋予真正的电影角色。 复古效果在任何视频中都能立即识别出来&#xff0c;增添了感伤的复古氛围&#xff0c;并使镜头更具说服力。使用 mFilm Matte 轻松实现这些特征&…

C++算法 —— 动态规划(10)二维费用背包

文章目录 1、动规思路简介2、一和零3、盈利计划 背包问题需要读者先明白动态规划是什么&#xff0c;理解动规的思路&#xff0c;并不能给刚接触动规的人学习。所以最好是看了之前的动规博客&#xff0c;以及两个背包博客&#xff0c;或者你本人就已经懂得动规了。 1、动规思路简…

九、2023.10.3.Linux(end).9

文章目录 33、简述mmap的原理和使用场景&#xff1f;34、互斥量能不能在进程中使用&#xff1f;35、协程是轻量级线程&#xff0c;轻量级表现在哪里&#xff1f;36、说说常见信号有哪些&#xff0c;表示什么含义&#xff1f;37、说说线程间通信的方式有哪些&#xff1f;38、说说…

Access注入---Cookie注入

Access注入----Cookie注入Access数据库&#xff08;微软&#xff09; 逐渐淘汰 &#xff08;没有库的概念&#xff0c;是表的集合&#xff09;Access没有系统自带库Cookie注入&#xff08;头注入HEAD注入的&#xff09;php中产生Cookie注入的可能性小&#xff0c;但ASP产生Cook…

RabbitMQ的基本介绍

什么是MQ 本质是一个队列&#xff0c;只不过队列中存放的信息是message罢了&#xff0c;还是一种跨进程的通信机制&#xff0c;用于上下游传递信息。在互联网架构中&#xff0c;MQ是一种非常常见的上下游“逻辑解耦物理解耦”的消息通信服务。使用了MQ之后&#xff0c;信息发送…

postgresql16-新特性

postgresql16-新特性 any_value数组抽样数组排序 any_value any_value 返回任意一个值 select e.department_id ,count(*), any_value(e.last_name) from cps.public.employees e group by e.department_id ;数组抽样 -- 从数组中随机抽取一个元素 array_sample(数组&#…

以太网基础学习(四)——IP协议

一 、IP协议概述 IP&#xff08;Internet Protocol&#xff0c;互联网协议&#xff09;是互联网通信的基础协议&#xff0c;它负责将数据包从源地址传输到目的地址。IP协议定义了如何封装数据包&#xff0c;如何寻址数据包以及如何路由数据包&#xff0c;它是随着互联网的出现而…

弧度、圆弧上的点、圆的半径(r)、弧长(s)之间的关系

要计算弧度和圆弧上的点&#xff0c;需要知道以下几个要素&#xff1a; 圆的半径&#xff08;r&#xff09;&#xff1a;即圆的中心到圆周上任意一点的距离。 弧长&#xff08;s&#xff09;&#xff1a;从圆周上的一个点到另一个点所经过的弧长。 弧度&#xff08;θ&#x…

【CAD二次开发】给CAD添加TRUSTEDPATHS避免dll插件信任弹窗

找到配置文件目录,遍历下面的每个配置文件; 找到 Variables 下的TRUSTEDPATHS项目;在后面添加新的目录即可,多个目录使用分号分隔; public static void AddPath(string trusedPath){// 指定注册表键的路径

指针笔试题(带解析版)

题目2&#xff1a; struct MyStruct {int num;char* pcname;short sdate;char cha[2];short sba[4]; }*p; //结构体大小为32字节 //p0x100000 int main() {p 0x100000;printf("%p\n", p 0x1);//p&#xff1a;结构体指针&#xff0c;1下一个结构体指针&#xff0c;…

axb_2019_brop64

axb_2019_brop64 Arch: amd64-64-little RELRO: Partial RELRO Stack: No canary found NX: NX enabled PIE: No PIE (0x400000)64位&#xff0c;只开了NX __int64 repeater() {size_t v1; // raxchar s[208]; // [rsp0h] [rbp-D0h] BYREFprintf("…