kubernetes存储架构之PV controller源码解读

kubernetes存储之PV controller源码解读

摘要

本文介绍kubernetes存储架构的基础,并重点对PV controller的源码进行了学习

引入

从一个业务场景出发,假如你所在的公司,有个物理机,上面部署了web服务器,随着业务增长,本地磁盘空间不够使用了。业务使用方想你所在的基础设施部门申请加1T的块设备存储空间。于是你给出了解决方案:

  • 方案一:给物理机添加一块新磁盘

实施步骤如下:

  1. 获取一块磁盘(买、借都可以)

  2. 将磁盘插入服务器插槽

  3. 物理机OS层面识别磁盘,并格式化后再mount到目录

  • 方案二: 假如已经一个ceph存储集群了,也可以从ceph存储划一个卷,映射给物理机使用

实施步骤如下:

  1. rbd create 创建一个卷
  2. rbd map将卷映射给物理机
  3. 物理机OS层面识别磁盘,并格式化后再mount到目录

再进一步设想,假如业务使用的容器POD,而不是物理机呢?

方案一:我们把pod当成物理机,与物理机一样,在pod内部安装ceph rbd客户端,再把卷rbd map给pod使用,最后进入pod 把卷mount到用户需要的目录。一切都手动完成,效率低、也是非主流方式。

方案二:使用kubernetes+ceph csi,实现为pod添加持久化存储的自动化

kubernetes存储架构

存储基础

在kubernetes环境中,通过PVC与PV对存储卷进行抽象。

PV与PVC

  • PersistentVolumeClaim (简称PVC): 是用户存储的请求。它和Pod类似。Pod消耗Node资源,PVC消耗PV资源。Pod可以请求特定级别的资源(CPU和MEM)。PVC可以请求特定大小和访问模式的PV。

  • PersistentVolume (简称PV): 由管理员设置的存储,它是集群的一部分。就像节点(Node)是集群中的资源一样,PV也是集群中的资源。它包含存储类型,存储大小和访问模式。它的生命周期独立于Pod,例如当使用它的Pod销毁时对PV没有影响。

PV提供方式

  • 静态PV:集群管理员创建许多PV,它们包含可供集群用户使用的实际存储的详细信息。
  • 动态PV:当管理员创建的静态PV都不匹配用户创建的PersistentVolumeClaim时,集群会为PVC动态的配置卷。此配置基于StorageClasses:PVC必须请求存储类(storageclasses),并且管理员必须已创建并配置该类,以便进行动态创建。

PV 的访问模式

  • ReadWriteOnce - 卷以读写方式挂载到单个节点
  • ReadOnlyMany - 卷以只读方式挂载到多个节点
  • ReadWriteMany - 卷以读写方式挂载到多个节点

PVC回收策略

  • Retain - 手动回收。在删除pvc后PV变为Released不可用状态, 若想重新被使用,需要管理员删除pv,重新创建pv,删除pv并不会删除存储的资源,只是删除pv对象而已;若想保留数据,请使用该Retain。
  • Recycle - 基本擦洗(rm -rf /thevolume/)。 删除pvc自动清除PV中的数据,效果相当于执行 rm -rf /thevolume/。删除pvc时,pv的状态由Bound变为Available。此时可重新被pvc申请绑定。
  • Delete - 删除存储上的对应存储资源。关联的存储资产(如AWS EBS,GCE PD,Azure磁盘或OpenStack Cinder卷)将被删除。NFS不支持delete策略。

PV的状态

  • Available(可用状态) - 一块空闲资源还没有被任何声明绑定
  • Bound(绑定状态) - 声明分配到PVC进行绑定,PV进入绑定状态
  • Released(释放状态) - PVC被删除,PV进入释放状态,等待回收处理
  • Failed(失败状态) - PV执行自动清理回收策略失败

图片来自于网络
(图片来自于网络)

PVC的状态

  • Pending(等待状态) - 等待绑定PV
  • Bound(绑定状态) - PV已绑定PVC
  • Lost(绑定丢失) - 再次绑定PV后进入Bound状态

在这里插入图片描述
(图片来自于网络)

PVC yaml 字段描述

---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:annotations:# 这个注解用于标记 PV 是否已经成功绑定到一个 PVC。当 PV 和 PVC 成功绑定时,Kubernetes 会在 PV 上添加这个注解# Kubernetes pv controller 控制器会检查这个注解来确定绑定过程的状态,从而进行相应的管理操作。pv.kubernetes.io/bind-completed: "yes"# 这个注解用于标记 PV 是否是由 Kubernetes 的存储控制器自动绑定到 PVC 的。如果 PV 是通过 StorageClass 动态创建的,或者是由控制器自动绑定的,这个注解会被设置为 "yes"。pv.kubernetes.io/bound-by-controller: "yes"# 这个注解用于指定创建 PV 的存储预配置器(storageClass)的名称。在动态存储管理中,存储预配置器会根据 PVC 的请求自动创建和配置 PV,而不需要管理员手动创建 PVvolume.beta.kubernetes.io/storage-provisioner: rbd.csi.obs.comcreationTimestamp: 2024-08-05T06:29:03Zfinalizers:- kubernetes.io/pvc-protection# PVC 的名称name: basepvc-dg03dev8jxdh# PVC所在的命名空间namespace: publicresourceVersion: "11508437467"selfLink: /api/v1/namespaces/public/persistentvolumeclaims/basepvc-dg03dev80407964jxdh-swap# 用于唯一表示PVC,同时用于管理 PVC 与 PersistentVolume (PV) 之间的绑定关系。uid: 020270a5-52f4-11ef-b4be-e8ebd398881c
spec:# PVC 的访问模式accessModes:- ReadWriteOnce# 如果是克隆卷,这里用于标识由哪个快照创建而来的dataSource: nullresources:requests:# 请求卷的空间大小storage: 51GistorageClassName: csi-virt-rbd# block 或 filesystem,是否需要文件系统格式化volumeMode: Filesystem# 绑定PV的名称,规范是"pvc-<pvc的uid>"volumeName: pvc-020270a5-52f4-11ef-b4be-e8ebd398881c
status:accessModes:- ReadWriteOncecapacity:storage: 51Gi# PVC当前的状态  phase: Bound

PV yaml 字段描述

---
apiVersion: v1
kind: PersistentVolume
metadata:annotations:# 这个注解用于指定创建 PV 的存储预配置器的名称。pv.kubernetes.io/provisioned-by: rbd.csi.obs.comcreationTimestamp: 2024-08-05T06:29:03Zfinalizers:- kubernetes.io/pv-protectionlabels:baymax.io/rbd-cluster-name: ceph10name: pvc-020270a5-52f4-11ef-b4be-e8ebd398881cresourceVersion: "10132283575"selfLink: /api/v1/persistentvolumes/pvc-020270a5-52f4-11ef-b4be-e8ebd398881cuid: 0220ec79-52f4-11ef-b4be-e8ebd398881c
spec:accessModes:- ReadWriteOncecapacity:storage: 51Gi# claimRef 用于记录 PV 绑定到的 PVC 的信息,包括 PVC 的名称、命名空间和 UID。claimRef:apiVersion: v1kind: PersistentVolumeClaim# pvc的名称name: basepvc-dg03dev80407964jxdh-swapnamespace: publicresourceVersion: "10132283544"# pvc的uiduid: 020270a5-52f4-11ef-b4be-e8ebd398881c# PV 对应的 volume卷由csi进行生命周期管理  csi:# 存储驱动的名称driver: rbd.csi.obs.com# 卷格式化时指定的文件系统类型fsType: ext4volumeAttributes:adminKeyring: AQBdZFtkGyvfxxxxxxxxxxxxxxxxxxxxxxx (密码隐藏)baymax.io/rbd-pvc-name: basepvc-dg03dev80407964jxdh-swapbaymax.io/rbd-pvc-namespace: publicclustername: ceph10monitors: 10.x.x.x:6789,10.x.x.x:6789,10.x.x.x:6789 (IP隐藏)storage.kubernetes.io/csiProvisionerIdentity: 1678171935067-8081-rbd.csi.obs.comuserKeyring: AQBKqlxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx (密码隐藏)# volumeHandle 是由 CSI 存储驱动程序生成的唯一标识符,用于标识唯一标识一个特定的存储卷。  # 在 PV 和 PVC 成功绑定后,volumeHandle 确保 Kubernetes 能够正确地将 PV 与 PVC 关联起来。volumeHandle: csi-rbd-pvc-020270a5-52f4-11ef-b4be-e8ebd398881c# 回收策略  persistentVolumeReclaimPolicy: Delete# storageClass的名称storageClassName: csi-virt-rbdvolumeMode: Filesystem
status:phase: Bound

** storageClass yaml 字段描述**

# kubectl get storageclasses.storage.k8s.io  ceph-nbd
NAME       PROVISIONER            AGE
ceph-nbd   ceph-nbd.csi.obs.com   53d
# kubectl get storageclasses.storage.k8s.io  ceph-nbd -oyaml
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:annotations:kubectl.kubernetes.io/last-applied-configuration: |{"apiVersion":"storage.k8s.io/v1","kind":"StorageClass","metadata":{"annotations":{},"name":"ceph-nbd"},"provisioner":"ceph-nbd.csi.obs.com","reclaimPolicy":"Delete"}creationTimestamp: 2024-10-25T02:03:28Zname: ceph-nbdresourceVersion: "10961600994"selfLink: /apis/storage.k8s.io/v1/storageclasses/ceph-nbduid: 53c697ad-9275-11ef-88f0-e8ebd3986310
provisioner: ceph-nbd.csi.obs.com
# volume回收模式
reclaimPolicy: Delete
# 绑定策略,立即还是延迟
volumeBindingMode: Immediate

kubernetes存储结构

在kubernetes中,与存储相关的组件如下:

在这里插入图片描述
(图片来自于网络)

  • PV Controller: 负责 PV/PVC 的绑定、生命周期管理,并根据需求进行数据卷的 Provision/Delete 操作;

  • AD Controller:负责存储设备的 Attach/Detach 操作,将设备挂载到目标节点;

  • Volume Manager:管理卷的Mount/Unmount操作、卷设备的格式化以及挂载到一些公用目录上的操作;

  • Volume Plugins:它主要是对上面所有挂载功能的实现;

PV Controller、AD Controller、Volume Manager 主要是进行操作的调用,而具体操作则是由 Volume Plugins 实现的。

接下来,本文将对PV controller的源码进行分析,深入学习PV controller是如何实现的。

PV controller 的启动

如果对 kube-controller-manager 组件有一定了解,kube-controller-manager 包括了多种controller,包括node contrller、deployment controller、service controller、Daemonset controller、PV controller和AD controller等。

在kube-controller-manager进程启动时,会依次启动这些controller.

pv controller 启动流程如下:

main()—> NewHyperKubeCommand()—>kubecontrollermanager.NewControllerManagerCommand()—>Run() —> StartControllers() —> NewControllerInitializers() —> startPersistentVolumeBinderController(ctx) —>NewController()—> go volumeController.Run(ctx.Stop)

kube-controller-manager 在启动Run()时,会调用startPersistentVolumeBinderController(ctx)函数,在函数内部会调用函数NewController(),先实例化一个PV controller,再通过协程启动PV controller: go volumeController.Run(ctx.Stop)

源码路径: k8s.io/kubernetes/pkg/controller/volume/persistentvolume/pv_controller_base.go

备注: 本文代码是v1.12版本

NewController

PV controller 结构体包括的如下字段,重要的有:各种lister,pv controller通过lister 可以从本地cache中查询资源的信息和状态;包括2个本地cache缓存空间,用来分别保存k8s已有的volume和claims的信息与状态。另外定义了2个消息队列,当有新的claim和volume 对应的创建、删除、更新等事件时,就将对应事件放到队列,等待PV controller进一步处理。

在这里插入图片描述

startPersistentVolumeBinderController(ctx) 源码中使用这个函数实例化一个PV controller并通过协程方式启动。

// kubernetes/pkg/controller/volume/persistentvolume/pv_controller_base.gofunc startPersistentVolumeBinderController(ctx ControllerContext) (http.Handler, bool, error) {params := persistentvolumecontroller.ControllerParameters{KubeClient:                ctx.ClientBuilder.ClientOrDie("persistent-volume-binder"),SyncPeriod:                ctx.ComponentConfig.PersistentVolumeBinderController.PVClaimBinderSyncPeriod.Duration,VolumePlugins:             ProbeControllerVolumePlugins(ctx.Cloud, ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration),Cloud:                     ctx.Cloud,ClusterName:               ctx.ComponentConfig.KubeCloudShared.ClusterName,VolumeInformer:            ctx.InformerFactory.Core().V1().PersistentVolumes(),ClaimInformer:             ctx.InformerFactory.Core().V1().PersistentVolumeClaims(),ClassInformer:             ctx.InformerFactory.Storage().V1().StorageClasses(),PodInformer:               ctx.InformerFactory.Core().V1().Pods(),NodeInformer:              ctx.InformerFactory.Core().V1().Nodes(),EnableDynamicProvisioning: ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration.EnableDynamicProvisioning,}// 实例化一个pv controllervolumeController, volumeControllerErr := persistentvolumecontroller.NewController(params)if volumeControllerErr != nil {return nil, true, fmt.Errorf("failed to construct persistentvolume controller: %v", volumeControllerErr)}// 协程方式启动 PV controllergo volumeController.Run(ctx.Stop)return nil, true, nil
}

NewController() 创建一个新的pv controller

// kubernetes/pkg/controller/volume/persistentvolume/pv_controller_base.go// NewController creates a new PersistentVolume controller
func NewController(p ControllerParameters) (*PersistentVolumeController, error) {// 初始化一个 事件记录器eventRecorder := p.EventRecorderif eventRecorder == nil {broadcaster := record.NewBroadcaster()broadcaster.StartLogging(glog.Infof)broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: p.KubeClient.CoreV1().Events("")})eventRecorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "persistentvolume-controller"})}// 实例化一个PV controllercontroller := &PersistentVolumeController{volumes:           newPersistentVolumeOrderedIndex(),claims:            cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc),kubeClient:        p.KubeClient,eventRecorder:     eventRecorder,runningOperations: goroutinemap.NewGoRoutineMap(true /* exponentialBackOffOnError */),cloud:             p.Cloud,enableDynamicProvisioning:     p.EnableDynamicProvisioning,clusterName:                   p.ClusterName,createProvisionedPVRetryCount: createProvisionedPVRetryCount,createProvisionedPVInterval:   createProvisionedPVInterval,claimQueue:                    workqueue.NewNamed("claims"),volumeQueue:                   workqueue.NewNamed("volumes"),resyncPeriod:                  p.SyncPeriod,}// Prober is nil because PV is not aware of Flexvolume.if err := controller.volumePluginMgr.InitPlugins(p.VolumePlugins, nil /* prober */, controller); err != nil {return nil, fmt.Errorf("Could not initialize volume plugins for PersistentVolume Controller: %v", err)}/// 定义volume事件处理函数,当有新的volume事件时会放到消息队列 controller.volumeQueue 中去p.VolumeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc:    func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) },UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.volumeQueue, newObj) },DeleteFunc: func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) },},)controller.volumeLister = p.VolumeInformer.Lister()controller.volumeListerSynced = p.VolumeInformer.Informer().HasSynced// 定义claim事件处理函数,当有新的volume事件时会放到消息队列 controller.volumeQueue 中去p.ClaimInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc:    func(obj interface{}) { controller.enqueueWork(controller.claimQueue, obj) },UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.claimQueue, newObj) },DeleteFunc: func(obj interface{}) { controller.enqueueWork(controller.claimQueue, obj) },},)controller.claimLister = p.ClaimInformer.Lister()controller.claimListerSynced = p.ClaimInformer.Informer().HasSyncedcontroller.classLister = p.ClassInformer.Lister()controller.classListerSynced = p.ClassInformer.Informer().HasSyncedcontroller.podLister = p.PodInformer.Lister()controller.podListerSynced = p.PodInformer.Informer().HasSyncedcontroller.NodeLister = p.NodeInformer.Lister()controller.NodeListerSynced = p.NodeInformer.Informer().HasSyncedreturn controller, nil
}

ctrl.Run()

Run() 是pv controller启动入口, run() 的作用是启动了三个工作协程:

ctrl.resync: 定期重新同步控制器的状态,确保控制器的状态与集群中的实际状态一致,具体的实现是:定时循环查询出pv和pvc列表,然后放入到队列volumeQueue和claimQueue中,让volumeWorker和claimWorker进行消费。

ctrl.volumeWorker:通过消费队列volumeQueue,来处理与 PersistentVolume (PV) 相关的操作,包括创建、绑定、释放和删除 PV

ctrl.claimWorker:通过消费队列claimQueue,来处理与 PersistentVolumeClaim (PVC) 相关的操作,包括创建、绑定和删除 PVC。
任务:

// kubernetes/pkg/controller/volume/persistentvolume/pv_controller_base.gofunc (ctrl *PersistentVolumeController) Run(stopCh <-chan struct{}) {defer utilruntime.HandleCrash()defer ctrl.claimQueue.ShutDown()defer ctrl.volumeQueue.ShutDown()klog.Infof("Starting persistent volume controller")defer klog.Infof("Shutting down persistent volume controller")if !cache.WaitForNamedCacheSync("persistent volume", stopCh, ctrl.volumeListerSynced, ctrl.claimListerSynced, ctrl.classListerSynced, ctrl.podListerSynced, ctrl.NodeListerSynced) {return}ctrl.initializeCaches(ctrl.volumeLister, ctrl.claimLister)// 核心代码! 启动三个协程go wait.Until(ctrl.resync, ctrl.resyncPeriod, stopCh)go wait.Until(ctrl.volumeWorker, time.Second, stopCh)go wait.Until(ctrl.claimWorker, time.Second, stopCh)metrics.Register(ctrl.volumes.store, ctrl.claims)<-stopCh
}

简单图示

在这里插入图片描述

ctrl.resync

作用是定时从cache中获取 pvc列表数据并放入到消费队列 ctrl.claimQueue,之后ctrl.claimWorker 循环任务来消费处理内部数据;同时定时从cache中获取 PV列表数据,并放入消费队列ctrl.volumeQueue,之后ctrl.volumeWorker 循环任务来消费处理内部数据

//kubernetes/pkg/controller/volume/persistentvolume/pv_controller_base.gofunc (ctrl *PersistentVolumeController) resync() {klog.V(4).Infof("resyncing PV controller")// 遍历pvc ,将pvc 放入 ctrl.claimQueue,等待处理pvcs, err := ctrl.claimLister.List(labels.NewSelector())if err != nil {klog.Warningf("cannot list claims: %s", err)return}// 将 pvc 放入消费队列 ctrl.claimQueuefor _, pvc := range pvcs {ctrl.enqueueWork(ctrl.claimQueue, pvc)}// 遍历pv ,将 pv 放入 ctrl.volumeQueue,等待处理pvs, err := ctrl.volumeLister.List(labels.NewSelector())if err != nil {klog.Warningf("cannot list persistent volumes: %s", err)return}// 将 pv 放入消费队列 ctrl.volumeQueuefor _, pv := range pvs {ctrl.enqueueWork(ctrl.volumeQueue, pv)}
}

简单图示:

在这里插入图片描述

这是一个典型的”生产者与消费者“模型,ctrl.resync负责将数据放入2个队列,之后ctrl.volumeWorker与ctrl.claimWorker消费队列里面的数据。

ctrl.volumeWorker

volumeWorker会不断循环消费volumeQueue队列里面的数据,然后获取到相应的PV执行updateVolume操作。

volumeWorker()函数的代码比较长,我们展开分析一下。


// volumeWorker processes items from volumeQueue. It must run only once,
// syncVolume is not assured to be reentrant.
func (ctrl *PersistentVolumeController) volumeWorker() {workFunc := func() bool {// 从消费队列 ctrl.volumeQueue 取一个 PV(或称为volume)keyObj, quit := ctrl.volumeQueue.Get()if quit {return true}defer ctrl.volumeQueue.Done(keyObj)key := keyObj.(string)glog.V(5).Infof("volumeWorker[%s]", key)_, name, err := cache.SplitMetaNamespaceKey(key)if err != nil {glog.V(4).Infof("error getting name of volume %q to get volume from informer: %v", key, err)return false}// 通过 volume name 查询本地cache,从cache中获取到 volume 对象volume, err := ctrl.volumeLister.Get(name)if err == nil {// The volume still exists in informer cache, the event must have// been add/update/sync// 如果 volume 存在,则用updateVolume() 函数处理这个volumectrl.updateVolume(volume)return false}if !errors.IsNotFound(err) {glog.V(2).Infof("error getting volume %q from informer: %v", key, err)return false}// The volume is not in informer cache, the event must have been// "delete"volumeObj, found, err := ctrl.volumes.store.GetByKey(key)if err != nil {glog.V(2).Infof("error getting volume %q from cache: %v", key, err)return false}if !found {// The controller has already processed the delete event and// deleted the volume from its cacheglog.V(2).Infof("deletion of volume %q was already processed", key)return false}volume, ok := volumeObj.(*v1.PersistentVolume)if !ok {glog.Errorf("expected volume, got %+v", volumeObj)return false}// 如果volume资源不存在,说明需要删除volume,则调用方法ctrl.deleteVolume(volume),让底层删除卷 ctrl.deleteVolume(volume)return false}// 不断执行 workFunc() 函数for {if quit := workFunc(); quit {glog.Infof("volume worker queue shutting down")return}}
}

在这里插入图片描述

updateVolume() 方法,新了一个check后调用ctrl.syncVolume(volume)进一步处理。

// updateVolume runs in worker thread and handles "volume added",
// "volume updated" and "periodic sync" events.
func (ctrl *PersistentVolumeController) updateVolume(volume *v1.PersistentVolume) {// Store the new volume version in the cache and do not process it if this// is an old version.new, err := ctrl.storeVolumeUpdate(volume)if err != nil {glog.Errorf("%v", err)}if !new {return}// 根据当前 PV 对象的规格对 PV 和 PVC 进行绑定或者解绑err = ctrl.syncVolume(volume)if err != nil {if errors.IsConflict(err) {// Version conflict error happens quite often and the controller// recovers from it easily.glog.V(3).Infof("could not sync volume %q: %+v", volume.Name, err)} else {glog.Errorf("could not sync volume %q: %+v", volume.Name, err)}}
}

ctrl.syncVolume(volume)

是一个重要的函数,需要再仔细解读

说明: 代码中的 “volume” 等价于 PV, “claim” 等价于 “PVC”

syncVolume方法为核心方法,主要调谐更新pv的状态:
(1)如果spec.claimRef未设置,则是未使用过的pv,则调用updateVolumePhase函数更新状态设置 phase 为 available;
(2)如果spec.claimRef不为空,则该pv已经与pvc bound过了,此时若对应的pvc不存在,则更新pv状态为released;
(3)如果pv对应的pvc被删除了,调用ctrl.reclaimVolume根据pv的回收策略进行相应操作,如果是retain,则不做操作,如果是delete,则调用volume plugin来删除底层存储,并删除pv对象(当volume plugin为csi时,将走out-tree逻辑,pv controller不做删除存储与pv对象的操作,由external provisioner组件来完成该操作)。


// syncVolume is the main controller method to decide what to do with a volume.
// It's invoked by appropriate cache.Controller callbacks when a volume is
// created, updated or periodically synced. We do not differentiate between
// these events.
func (ctrl *PersistentVolumeController) syncVolume(volume *v1.PersistentVolume) error {glog.V(4).Infof("synchronizing PersistentVolume[%s]: %s", volume.Name, getVolumeStatusForLogging(volume))// [Unit test set 4]// 如果 volume.Spec.ClaimRef 为 nil,说明 volume 是未使用的,那么就调用ctrl.updateVolumePhase()函数// 将 volume 的状态更新为 Availableif volume.Spec.ClaimRef == nil {// Volume is unusedglog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is unused", volume.Name)if _, err := ctrl.updateVolumePhase(volume, v1.VolumeAvailable, ""); err != nil {// Nothing was saved; we will fall back into the same// condition in the next call to this methodreturn err}return nil} else /* pv.Spec.ClaimRef != nil */ {// Volume is bound to a claim.// 如果  volume.Spec.ClaimRef.UID 为 ”“,说明 volume 是未使用的,那么就调用ctrl.updateVolumePhase()函数// 将 volume 的状态更新为 Availableif volume.Spec.ClaimRef.UID == "" {// The PV is reserved for a PVC; that PVC has not yet been// bound to this PV; the PVC sync will handle it.glog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is pre-bound to claim %s", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))if _, err := ctrl.updateVolumePhase(volume, v1.VolumeAvailable, ""); err != nil {// Nothing was saved; we will fall back into the same// condition in the next call to this methodreturn err}return nil}glog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is bound to claim %s", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))// Get the PVC by _name_// 通过 volume.Spec.ClaimRef 从本地 cache 中查询到对应的 PVC 对象var claim *v1.PersistentVolumeClaimclaimName := claimrefToClaimKey(volume.Spec.ClaimRef)obj, found, err := ctrl.claims.GetByKey(claimName)if err != nil {return err}// 如果没有在本地 cache 中查找到 PVC,同时 PV 的 annotation // 又是包括"pv.kubernetes.io/bound-by-controller"的// 需要再去 apiserver 做double-checkif !found && metav1.HasAnnotation(volume.ObjectMeta, annBoundByController) {// If PV is bound by external PV binder (e.g. kube-scheduler), it's// possible on heavy load that corresponding PVC is not synced to// controller local cache yet. So we need to double-check PVC in//   1) informer cache//   2) apiserver if not found in informer cache// to make sure we will not reclaim a PV wrongly.// Note that only non-released and non-failed volumes will be// updated to Released state when PVC does not eixst.if volume.Status.Phase != v1.VolumeReleased && volume.Status.Phase != v1.VolumeFailed {// 从 cache 中查找 pvcobj, err = ctrl.claimLister.PersistentVolumeClaims(volume.Spec.ClaimRef.Namespace).Get(volume.Spec.ClaimRef.Name)if err != nil && !apierrs.IsNotFound(err) {return err}found = !apierrs.IsNotFound(err)if !found {// 如果 cache 中不存在则再去 apiserver 查询是否存在obj, err = ctrl.kubeClient.CoreV1().PersistentVolumeClaims(volume.Spec.ClaimRef.Namespace).Get(volume.Spec.ClaimRef.Name, metav1.GetOptions{})if err != nil && !apierrs.IsNotFound(err) {return err}found = !apierrs.IsNotFound(err)}}}// 如果都没找到 PVC,则抛出错误if !found {glog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s not found", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))// Fall through with claim = nil// 如果 PVC 找到了, 将 cache 中的 PVC 转换为 PVC 对象} else {var ok boolclaim, ok = obj.(*v1.PersistentVolumeClaim)if !ok {return fmt.Errorf("Cannot convert object from volume cache to volume %q!?: %#v", claim.Spec.VolumeName, obj)}glog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s found: %s", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef), getClaimStatusForLogging(claim))}// 如果 PVC 的 UID 与 PV 中的 UID 不同,说明绑定错误了if claim != nil && claim.UID != volume.Spec.ClaimRef.UID {// The claim that the PV was pointing to was deleted, and another// with the same name created.glog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s has different UID, the old one must have been deleted", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))// Treat the volume as bound to a missing claim.claim = nil}// 如果 PVC 不存在,而且 PV 的状态是 ”Released“ 或 "Failed", 则将 apiserver 和 本地 cache中的PV 状态都更新为 "Released"if claim == nil {// If we get into this block, the claim must have been deleted;// NOTE: reclaimVolume may either release the PV back into the pool or// recycle it or do nothing (retain)// Do not overwrite previous Failed state - let the user see that// something went wrong, while we still re-try to reclaim the// volume.if volume.Status.Phase != v1.VolumeReleased && volume.Status.Phase != v1.VolumeFailed {// Also, log this only once:glog.V(2).Infof("volume %q is released and reclaim policy %q will be executed", volume.Name, volume.Spec.PersistentVolumeReclaimPolicy)if volume, err = ctrl.updateVolumePhase(volume, v1.VolumeReleased, ""); err != nil {// Nothing was saved; we will fall back into the same condition// in the next call to this methodreturn err}}if err = ctrl.reclaimVolume(volume); err != nil {// Release failed, we will fall back into the same condition// in the next call to this methodreturn err}return nil// 如果 claim 不为 nil 而且 claim.Spec.VolumeName == "",则再检查 volumeMode 是否匹配// 如果不匹配,则写 evenRecorder 后返回} else if claim.Spec.VolumeName == "" {if isMisMatch, err := checkVolumeModeMisMatches(&claim.Spec, &volume.Spec); err != nil || isMisMatch {// Binding for the volume won't be called in syncUnboundClaim,// because findBestMatchForClaim won't return the volume due to volumeMode mismatch.volumeMsg := fmt.Sprintf("Cannot bind PersistentVolume to requested PersistentVolumeClaim %q due to incompatible volumeMode.", claim.Name)ctrl.eventRecorder.Event(volume, v1.EventTypeWarning, events.VolumeMismatch, volumeMsg)claimMsg := fmt.Sprintf("Cannot bind PersistentVolume %q to requested PersistentVolumeClaim due to incompatible volumeMode.", volume.Name)ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.VolumeMismatch, claimMsg)// Skipping syncClaimreturn nil}// 如果 volumeMode 是匹配的,则继续往下走: 判断 PV 是否有 annotation: "pv.kubernetes.io/bound-by-controller" // 如果有这个annotation, 后续会交给 PVC sync 来处理// 如果没有这个annotation, 等待用户自行修复if metav1.HasAnnotation(volume.ObjectMeta, annBoundByController) {// The binding is not completed; let PVC sync handle itglog.V(4).Infof("synchronizing PersistentVolume[%s]: volume not bound yet, waiting for syncClaim to fix it", volume.Name)} else {// Dangling PV; try to re-establish the link in the PVC syncglog.V(4).Infof("synchronizing PersistentVolume[%s]: volume was bound and got unbound (by user?), waiting for syncClaim to fix it", volume.Name)}// In both cases, the volume is Bound and the claim is Pending.// Next syncClaim will fix it. To speed it up, we enqueue the claim// into the controller, which results in syncClaim to be called// shortly (and in the right worker goroutine).// This speeds up binding of provisioned volumes - provisioner saves// only the new PV and it expects that next syncClaim will bind the// claim to it.// 把 PVC 加入 claimQueue 队列,等待 syncClaim() 任务来处理ctrl.claimQueue.Add(claimToClaimKey(claim))return nil// 如果 claim 不为空,而且 claim.Spec.VolumeName == volume.Name,说明 binding 是正确的。// 这是将volume 的状态更新为 Bound} else if claim.Spec.VolumeName == volume.Name {// Volume is bound to a claim properly, update status if necessaryglog.V(4).Infof("synchronizing PersistentVolume[%s]: all is bound", volume.Name)if _, err = ctrl.updateVolumePhase(volume, v1.VolumeBound, ""); err != nil {// Nothing was saved; we will fall back into the same// condition in the next call to this methodreturn err}return nil} else {// Volume is bound to a claim, but the claim is bound elsewhere// 如果PV 的 annotation 包括 "pv.kubernetes.io/provisioned-by" 并且回收策略 volume.Spec.PersistentVolumeReclaimPolicy 是 Delete时,// 再判断 PV 的phase 如果不是 Released 和 Failed,则更新 PV 的Phase为 Releasedif metav1.HasAnnotation(volume.ObjectMeta, annDynamicallyProvisioned) && volume.Spec.PersistentVolumeReclaimPolicy == v1.PersistentVolumeReclaimDelete {// This volume was dynamically provisioned for this claim. The// claim got bound elsewhere, and thus this volume is not// needed. Delete it.// Mark the volume as Released for external deleters and to let// the user know. Don't overwrite existing Failed status!if volume.Status.Phase != v1.VolumeReleased && volume.Status.Phase != v1.VolumeFailed {// Also, log this only once:glog.V(2).Infof("dynamically volume %q is released and it will be deleted", volume.Name)// 更新 PV 的 phaseif volume, err = ctrl.updateVolumePhase(volume, v1.VolumeReleased, ""); err != nil {// Nothing was saved; we will fall back into the same condition// in the next call to this methodreturn err}}// 之后再 调用 ctrl.reclaimVolume(volume) 对PV 进行回收处理if err = ctrl.reclaimVolume(volume); err != nil {// Deletion failed, we will fall back into the same condition// in the next call to this methodreturn err}return nil// 执行 unbindVolume()操作} else {// Volume is bound to a claim, but the claim is bound elsewhere// and it's not dynamically provisioned.if metav1.HasAnnotation(volume.ObjectMeta, annBoundByController) {// This is part of the normal operation of the controller; the// controller tried to use this volume for a claim but the claim// was fulfilled by another volume. We did this; fix it.glog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is bound by controller to a claim that is bound to another volume, unbinding", volume.Name)if err = ctrl.unbindVolume(volume); err != nil {return err}return nil} else {// The PV must have been created with this ptr; leave it alone.glog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is bound by user to a claim that is bound to another volume, waiting for the claim to get unbound", volume.Name)// This just updates the volume phase and clears// volume.Spec.ClaimRef.UID. It leaves the volume pre-bound// to the claim.if err = ctrl.unbindVolume(volume); err != nil {return err}return nil}}}}
}

由于代码较长,梳理出主要逻辑的流程图大致如下:

在这里插入图片描述

updateVolumePhase()

  1. 更新etcd中 volume 的phase状态
  2. 更新本地informer cache中的phase状态
// updateVolumePhase saves new volume phase to API server.
func (ctrl *PersistentVolumeController) updateVolumePhase(volume *v1.PersistentVolume, phase v1.PersistentVolumePhase, message string) (*v1.PersistentVolume, error) {glog.V(4).Infof("updating PersistentVolume[%s]: set phase %s", volume.Name, phase)// 如果 Phase 已经满足,就什么也不做if volume.Status.Phase == phase {// Nothing to do.glog.V(4).Infof("updating PersistentVolume[%s]: phase %s already set", volume.Name, phase)return volume, nil}// 核心代码!volumeClone := volume.DeepCopy()volumeClone.Status.Phase = phasevolumeClone.Status.Message = message// 核心代码!请求 api server 对 volume 的状态进行修改newVol, err := ctrl.kubeClient.CoreV1().PersistentVolumes().UpdateStatus(volumeClone)if err != nil {glog.V(4).Infof("updating PersistentVolume[%s]: set phase %s failed: %v", volume.Name, phase, err)return newVol, err}// 之后再修改本地 informer 中 cache 中的数据_, err = ctrl.storeVolumeUpdate(newVol)if err != nil {glog.V(4).Infof("updating PersistentVolume[%s]: cannot update internal cache: %v", volume.Name, err)return newVol, err}glog.V(2).Infof("volume %q entered phase %q", volume.Name, phase)return newVol, err
}

unbindVolume(volume)

作用是将 一个 Bound 状态的 volume 执行 unbound 操作

  1. 将本地cache中的 volume 进行unbond操作:1) volumeClone.Spec.ClaimRef = nil 2) volumeClone.Annotations 中删除 “pv.kubernetes.io/bound-by-controller” 字段; 3) volume phase 修改为 “Available”
  2. 发起http请求,对apiserver中的 volume 进行unbound操作(同上)。
// unbindVolume rolls back previous binding of the volume. This may be necessary
// when two controllers bound two volumes to single claim - when we detect this,
// only one binding succeeds and the second one must be rolled back.
// This method updates both Spec and Status.
// It returns on first error, it's up to the caller to implement some retry
// mechanism.
func (ctrl *PersistentVolumeController) unbindVolume(volume *v1.PersistentVolume) error {glog.V(4).Infof("updating PersistentVolume[%s]: rolling back binding from %q", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))// Save the PV only when any modification is necessary.volumeClone := volume.DeepCopy()if metav1.HasAnnotation(volume.ObjectMeta, annBoundByController) {// The volume was bound by the controller.// 将volume中的 ClaimRef 置为 nilvolumeClone.Spec.ClaimRef = nil// 从删除volume的annotation对应字段delete(volumeClone.Annotations, annBoundByController)if len(volumeClone.Annotations) == 0 {// No annotations look better than empty annotation map (and it's easier// to test).volumeClone.Annotations = nil}} else {// The volume was pre-bound by user. Clear only the binging UID.// 如果是用户手动创建的volume 则把 UID 设置为""volumeClone.Spec.ClaimRef.UID = ""}newVol, err := ctrl.kubeClient.CoreV1().PersistentVolumes().Update(volumeClone)if err != nil {glog.V(4).Infof("updating PersistentVolume[%s]: rollback failed: %v", volume.Name, err)return err}_, err = ctrl.storeVolumeUpdate(newVol)if err != nil {glog.V(4).Infof("updating PersistentVolume[%s]: cannot update internal cache: %v", volume.Name, err)return err}glog.V(4).Infof("updating PersistentVolume[%s]: rolled back", newVol.Name)// Update the status_, err = ctrl.updateVolumePhase(newVol, v1.VolumeAvailable, "")return err
}

ctrl.reclaimVolume

// reclaimVolume implements volume.Spec.PersistentVolumeReclaimPolicy and
// starts appropriate reclaim action.
func (ctrl *PersistentVolumeController) reclaimVolume(volume *v1.PersistentVolume) error {switch volume.Spec.PersistentVolumeReclaimPolicy {// 如果回收策略是 Retain ,则什么也不做case v1.PersistentVolumeReclaimRetain:glog.V(4).Infof("reclaimVolume[%s]: policy is Retain, nothing to do", volume.Name)// 如果回收策略是 recycle, 调用plugin 执行 volume 的清理操作(清理后,该PV后续可以被重复使用)case v1.PersistentVolumeReclaimRecycle:glog.V(4).Infof("reclaimVolume[%s]: policy is Recycle", volume.Name)opName := fmt.Sprintf("recycle-%s[%s]", volume.Name, string(volume.UID))ctrl.scheduleOperation(opName, func() error {ctrl.recycleVolumeOperation(volume)return nil})// 如果回收策略是 Delete,调用plugin 执行 volume 的 delete操作case v1.PersistentVolumeReclaimDelete:glog.V(4).Infof("reclaimVolume[%s]: policy is Delete", volume.Name)opName := fmt.Sprintf("delete-%s[%s]", volume.Name, string(volume.UID))startTime := time.Now()// 调用plugin 执行 volume 的 delete操作ctrl.scheduleOperation(opName, func() error {pluginName, err := ctrl.deleteVolumeOperation(volume)timeTaken := time.Since(startTime).Seconds()metrics.RecordVolumeOperationMetric(pluginName, "delete", timeTaken, err)return err})default:// Unknown PersistentVolumeReclaimPolicyif _, err := ctrl.updateVolumePhaseWithEvent(volume, v1.VolumeFailed, v1.EventTypeWarning, "VolumeUnknownReclaimPolicy", "Volume has unrecognized PersistentVolumeReclaimPolicy"); err != nil {return err}}return nil
}

ctrl.deleteVolumeOperation()


// deleteVolumeOperation deletes a volume. This method is running in standalone
// goroutine and already has all necessary locks.
func (ctrl *PersistentVolumeController) deleteVolumeOperation(volume *v1.PersistentVolume) (string, error) {glog.V(4).Infof("deleteVolumeOperation [%s] started", volume.Name)// This method may have been waiting for a volume lock for some time.// Previous deleteVolumeOperation might just have saved an updated version, so// read current volume state now.newVolume, err := ctrl.kubeClient.CoreV1().PersistentVolumes().Get(volume.Name, metav1.GetOptions{})if err != nil {glog.V(3).Infof("error reading persistent volume %q: %v", volume.Name, err)return "", nil}needsReclaim, err := ctrl.isVolumeReleased(newVolume)if err != nil {glog.V(3).Infof("error reading claim for volume %q: %v", volume.Name, err)return "", nil}if !needsReclaim {glog.V(3).Infof("volume %q no longer needs deletion, skipping", volume.Name)return "", nil}// 核心! 执行 delete volume 操作pluginName, deleted, err := ctrl.doDeleteVolume(volume)if err != nil {// Delete failed, update the volume and emit an event.glog.V(3).Infof("deletion of volume %q failed: %v", volume.Name, err)if vol.IsDeletedVolumeInUse(err) {// The plugin needs more time, don't mark the volume as Failed// and send Normal event onlyctrl.eventRecorder.Event(volume, v1.EventTypeNormal, events.VolumeDelete, err.Error())} else {// The plugin failed, mark the volume as Failed and send Warning// eventif _, err := ctrl.updateVolumePhaseWithEvent(volume, v1.VolumeFailed, v1.EventTypeWarning, events.VolumeFailedDelete, err.Error()); err != nil {glog.V(4).Infof("deleteVolumeOperation [%s]: failed to mark volume as failed: %v", volume.Name, err)// Save failed, retry on the next deletion attemptreturn pluginName, err}}// Despite the volume being Failed, the controller will retry deleting// the volume in every syncVolume() call.return pluginName, err}if !deleted {// The volume waits for deletion by an external plugin. Do nothing.return pluginName, nil}glog.V(4).Infof("deleteVolumeOperation [%s]: success", volume.Name)// Delete the volume// 当底层 volume 被删除后,则将apiserver中的 volume if err = ctrl.kubeClient.CoreV1().PersistentVolumes().Delete(volume.Name, nil); err != nil {// Oops, could not delete the volume and therefore the controller will// try to delete the volume again on next update. We _could_ maintain a// cache of "recently deleted volumes" and avoid unnecessary deletion,// this is left out as future optimization.glog.V(3).Infof("failed to delete volume %q from database: %v", volume.Name, err)return pluginName, nil}return pluginName, nil
}// doDeleteVolume finds appropriate delete plugin and deletes given volume, returning
// the volume plugin name. Also, it returns 'true', when the volume was deleted and
// 'false' when the volume cannot be deleted because of the deleter is external. No
// error should be reported in this case.
func (ctrl *PersistentVolumeController) doDeleteVolume(volume *v1.PersistentVolume) (string, bool, error) {glog.V(4).Infof("doDeleteVolume [%s]", volume.Name)var err error// 查到插件 volume pluginplugin, err := ctrl.findDeletablePlugin(volume)if err != nil {return "", false, err}if plugin == nil {// External deleter is requested, do nothingglog.V(3).Infof("external deleter for volume %q requested, ignoring", volume.Name)return "", false, nil}// Plugin foundpluginName := plugin.GetPluginName()glog.V(5).Infof("found a deleter plugin %q for volume %q", pluginName, volume.Name)spec := vol.NewSpecFromPersistentVolume(volume, false)// 创建一个plugin 的 deleterdeleter, err := plugin.NewDeleter(spec)if err != nil {// Cannot create deleterreturn pluginName, false, fmt.Errorf("Failed to create deleter for volume %q: %v", volume.Name, err)}//调用 Delete()方法执行 存储层面volume的删除opComplete := util.OperationCompleteHook(pluginName, "volume_delete")err = deleter.Delete()opComplete(&err)if err != nil {// Deleter failedreturn pluginName, false, err}glog.V(2).Infof("volume %q deleted", volume.Name)return pluginName, true, nil
}// findDeletablePlugin finds a deleter plugin for a given volume. It returns
// either the deleter plugin or nil when an external deleter is requested.
func (ctrl *PersistentVolumeController) findDeletablePlugin(volume *v1.PersistentVolume) (vol.DeletableVolumePlugin, error) {// Find a plugin. Try to find the same plugin that provisioned the volumevar plugin vol.DeletableVolumePluginif metav1.HasAnnotation(volume.ObjectMeta, annDynamicallyProvisioned) {provisionPluginName := volume.Annotations[annDynamicallyProvisioned]if provisionPluginName != "" {// 通过 volume 中 annotation的 "pv.kubernetes.io/provisioned-by" 查找对应的 plugin 名称plugin, err := ctrl.volumePluginMgr.FindDeletablePluginByName(provisionPluginName)if err != nil {if !strings.HasPrefix(provisionPluginName, "kubernetes.io/") {// External provisioner is requested, do not report errorreturn nil, nil}return nil, err}return plugin, nil}}// 如果上面没找到再去 volume 的 Spec中查找// The plugin that provisioned the volume was not found or the volume// was not dynamically provisioned. Try to find a plugin by spec.spec := vol.NewSpecFromPersistentVolume(volume, false)plugin, err := ctrl.volumePluginMgr.FindDeletablePluginBySpec(spec)if err != nil {// No deleter found. Emit an event and mark the volume Failed.return nil, fmt.Errorf("Error getting deleter volume plugin for volume %q: %v", volume.Name, err)}return plugin, nil
}

ctrl.claimWorker

ctrl.claimWorker() 方法是通过协程方式启动,对claim(PVC) 进行生命周期管理,即负责处理 PVC 的绑定、创建和删除等操作。

(1) 先从本地informer cache中查询claim, 如果 informer cache中存在, 表示PVC是新来的事件待处理。 交给updateClaim()进行处理;

(2) 如果 informer cache中没有, 表示PVC是已经被删除了的。需要做删除处理

// claimWorker processes items from claimQueue. It must run only once,
// syncClaim is not reentrant.
func (ctrl *PersistentVolumeController) claimWorker() {workFunc := func() bool {keyObj, quit := ctrl.claimQueue.Get()if quit {return true}defer ctrl.claimQueue.Done(keyObj)key := keyObj.(string)glog.V(5).Infof("claimWorker[%s]", key)namespace, name, err := cache.SplitMetaNamespaceKey(key)if err != nil {glog.V(4).Infof("error getting namespace & name of claim %q to get claim from informer: %v", key, err)return false}claim, err := ctrl.claimLister.PersistentVolumeClaims(namespace).Get(name)if err == nil {// The claim still exists in informer cache, the event must have// been add/update/sync// 核心代码! 如果 informer cache中存在, 表示PVC是新来的事件待处理。 交给updateClaim()进行处理ctrl.updateClaim(claim)return false}if !errors.IsNotFound(err) {glog.V(2).Infof("error getting claim %q from informer: %v", key, err)return false}// The claim is not in informer cache, the event must have been "delete"claimObj, found, err := ctrl.claims.GetByKey(key)if err != nil {glog.V(2).Infof("error getting claim %q from cache: %v", key, err)return false}if !found {// The controller has already processed the delete event and// deleted the claim from its cacheglog.V(2).Infof("deletion of claim %q was already processed", key)return false}claim, ok := claimObj.(*v1.PersistentVolumeClaim)if !ok {glog.Errorf("expected claim, got %+v", claimObj)return false}// 核心代码! 如果 informer cache中没有, 表示PVC是已经被删除了的。需要做删除处理ctrl.deleteClaim(claim)return false}// 核心代码! 持续循环地从 claimQueue 里面获取到的 PersistentVolumeClaim,并进行相应处理for {if quit := workFunc(); quit {glog.Infof("claim worker queue shutting down")return}}
}

重点函数包括了ctrl.updateClaim(claim)ctrl.deleteClaim(claim)

ctrl.updateClaim(claim)

ctrl.updateClaim(claim) 用于处理 claim 的 ”创建“、”更新“以及周期性更新的事件

// updateClaim runs in worker thread and handles "claim added",
// "claim updated" and "periodic sync" events.
func (ctrl *PersistentVolumeController) updateClaim(claim *v1.PersistentVolumeClaim) {// Store the new claim version in the cache and do not process it if this is// an old version.// 先检查 claim 的版本号,如果不是"新的",就不处理new, err := ctrl.storeClaimUpdate(claim)if err != nil {glog.Errorf("%v", err)}if !new {return}// 核心代码!如果通过上面检查,则使用函数ctrl.syncClaim(claim)进一步处理err = ctrl.syncClaim(claim)if err != nil {if errors.IsConflict(err) {// Version conflict error happens quite often and the controller// recovers from it easily.glog.V(3).Infof("could not sync claim %q: %+v", claimToClaimKey(claim), err)} else {glog.Errorf("could not sync volume %q: %+v", claimToClaimKey(claim), err)}}
}

ctrl.syncClaim(claim)

syncClaim是 pv controler的主要方法,它决定了具体如何处理claim。该方法又根据annotation中是否包括"pv.kubernetes.io/bind-completed",再决定将claim进一步交个ctrl.syncUnboundClaim(claim) 或 ctrl.syncBoundClaim(claim) 处理

// syncClaim is the main controller method to decide what to do with a claim.
// It's invoked by appropriate cache.Controller callbacks when a claim is
// created, updated or periodically synced. We do not differentiate between
// these events.
// For easier readability, it was split into syncUnboundClaim and syncBoundClaim
// methods.
func (ctrl *PersistentVolumeController) syncClaim(claim *v1.PersistentVolumeClaim) error {glog.V(4).Infof("synchronizing PersistentVolumeClaim[%s]: %s", claimToClaimKey(claim), getClaimStatusForLogging(claim))if !metav1.HasAnnotation(claim.ObjectMeta, annBindCompleted) {// 如果不包括annotation "pv.kubernetes.io/bind-completed",这个annotation用于标记 bind 是否完成。// 此时没有这个annotation,说明是还没有Bind操作的claim,比如处理pengding状态的claim,接下来交给 ctrl.syncUnboundClaim(claim) 处理return ctrl.syncUnboundClaim(claim)} else {// 如果包括annotation "pv.kubernetes.io/bind-completed",说明已经 bind 过的claim,接下来交给 ctrl.syncBoundClaim(claim) 处理return ctrl.syncBoundClaim(claim)}
}

ctrl.syncBoundClaim()

ctrl.syncBoundClaim() 作为 pv controller 最重要的方法,必须仔细分析。代码的逻辑如下:

  1. 如果 claim.Spec.VolumeName 的值为空,说明 claim 之前 Bound过的但是这个值确为空了,这种情况就将claim状态改为"lost"并抛出event并返回
  2. 从本地informer cache中查找对应的 claim.Spec.VolumeName
    • 如果没找对应的volume,说明claim bind了一个不存在的volume。这种情况就将claim状态改为"lost"并抛出event并返回
    • 如果找到对应的volume
      • 如果volume.Spec.ClaimRef == nil,说明claim 已经bind 了volume, 但volume确没有bind对应的claim,这种情况就调用 ctrl.bind(volume, claim)方法让volume 绑定 claim
      • 如果volume.Spec.ClaimRef.UID == claim.UID,说明逻辑都是正常的。接下来任然做一次ctrl.bind(),但是大多数情况会直接返回(因为所有的操作都已经做完了)
      • 其他情况,比如volume.Spec.ClaimRef.UID 不等于 claim.UID,说明bind()错了。这种情况就将claim状态改为"lost"并抛出event并返回
// syncBoundClaim is the main controller method to decide what to do with a
// bound claim.
func (ctrl *PersistentVolumeController) syncBoundClaim(claim *v1.PersistentVolumeClaim) error {// HasAnnotation(pvc, annBindCompleted)// This PVC has previously been bound// OBSERVATION: pvc is not "Pending"// [Unit test set 3]// 第1种情况: 如果 claim.Spec.VolumeName 的值为空,说明 claim 之前 Bound过的但是这个值异常了,抛出event并返回 if claim.Spec.VolumeName == "" {// Claim was bound before but not any more.if _, err := ctrl.updateClaimStatusWithEvent(claim, v1.ClaimLost, nil, v1.EventTypeWarning, "ClaimLost", "Bound claim has lost reference to PersistentVolume. Data on the volume is lost!"); err != nil {return err}return nil}// 找到对应的 volume obj, found, err := ctrl.volumes.store.GetByKey(claim.Spec.VolumeName)if err != nil {return err}// 第2种情况: 如果claim.Spec.VolumeName 有值,claim之前Bound过的,但 volume 不存在了或许被删除了,抛出event并返回 if !found {// Claim is bound to a non-existing volume.if _, err = ctrl.updateClaimStatusWithEvent(claim, v1.ClaimLost, nil, v1.EventTypeWarning, "ClaimLost", "Bound claim has lost its PersistentVolume. Data on the volume is lost!"); err != nil {return err}return nil} else {volume, ok := obj.(*v1.PersistentVolume)if !ok {return fmt.Errorf("Cannot convert object from volume cache to volume %q!?: %#v", claim.Spec.VolumeName, obj)}glog.V(4).Infof("synchronizing bound PersistentVolumeClaim[%s]: volume %q found: %s", claimToClaimKey(claim), claim.Spec.VolumeName, getVolumeStatusForLogging(volume))// 第3种情况:如果claim.Spec.VolumeName 有值,而且volume也存在,但是volume对应的值volume.Spec.ClaimRef为nil// 可能有2种场景:1)claim bound了但是volume 确是unbound的。2)claim bound了但是controller 没有来得及updated volume// 不论具体是哪种场景,接下来都再次执行一次ctrl.bind()操作if volume.Spec.ClaimRef == nil {// Claim is bound but volume has come unbound.// Or, a claim was bound and the controller has not received updated// volume yet. We can't distinguish these cases.// Bind the volume again and set all states to Bound.glog.V(4).Infof("synchronizing bound PersistentVolumeClaim[%s]: volume is unbound, fixing", claimToClaimKey(claim))if err = ctrl.bind(volume, claim); err != nil {// Objects not saved, next syncPV or syncClaim will try againreturn err}return nil// 第4种情况: 如果claim.Spec.VolumeName 有值,而且volume也存在,而且volume对应的值volume.Spec.ClaimRef 不为nil // 而且 volume.Spec.ClaimRef.UID == claim.UID;说明逻辑都是正常的。接下来任然做一次ctrl.bind()} else if volume.Spec.ClaimRef.UID == claim.UID {// All is well// NOTE: syncPV can handle this so it can be left out.// NOTE: bind() call here will do nothing in most cases as// everything should be already set.glog.V(4).Infof("synchronizing bound PersistentVolumeClaim[%s]: claim is already correctly bound", claimToClaimKey(claim))if err = ctrl.bind(volume, claim); err != nil {// Objects not saved, next syncPV or syncClaim will try againreturn err}return nil// 其他情况: volume.Spec.ClaimRef.UID 不等于 claim.UID,说明bind()错了。这时抛出event} else {// Claim is bound but volume has a different claimant.// Set the claim phase to 'Lost', which is a terminal// phase.if _, err = ctrl.updateClaimStatusWithEvent(claim, v1.ClaimLost, nil, v1.EventTypeWarning, "ClaimMisbound", "Two claims are bound to the same volume, this one is bound incorrectly"); err != nil {return err}return nil}}
}

ctrl.bind(volume, claim)


// bind saves binding information both to the volume and the claim and marks
// both objects as Bound. Volume is saved first.
// It returns on first error, it's up to the caller to implement some retry
// mechanism.
func (ctrl *PersistentVolumeController) bind(volume *v1.PersistentVolume, claim *v1.PersistentVolumeClaim) error {var err errorvar updatedClaim *v1.PersistentVolumeClaimvar updatedVolume *v1.PersistentVolume// 将 volume bind 到 claimif updatedVolume, err = ctrl.bindVolumeToClaim(volume, claim); err != nil {return err}volume = updatedVolume// 更新 volume 的phase 为Bound 状态if updatedVolume, err = ctrl.updateVolumePhase(volume, v1.VolumeBound, ""); err != nil {return err}volume = updatedVolume// 将 volume claim 到 volumeif updatedClaim, err = ctrl.bindClaimToVolume(claim, volume); err != nil {return err}claim = updatedClaim// 更新 claim 的phase 为Bound 状态if updatedClaim, err = ctrl.updateClaimStatus(claim, v1.ClaimBound, volume); err != nil {return err}claim = updatedClaimreturn nil
}

syncUnboundClaim

  1. 如果claim.Spec.VolumeName == " ",说明该PVC 是一个还没有完成bingding的新的PVC,此时PVC处于"Pengding"状态

    • 这时就从集群中存在的 volume 中查询是否匹配的 claim(空间大小满足同时access模式满足而且处于”Available“ 状态的), 根据是否没找到了匹配的volume,分2中情况分配处理:

    • 如果没找到匹配的volume:

      • 再检查 PVC 对象 中的storageClass,看storageClass的volumeBindingMode 是否启用了延迟bingding,如果启用了,就返回,等该PVC消费者例如pod,需要时才会对这个claim进行bind处理。

      • 如果没有启用延迟bingding,也就是用了默认的Immediate模式,就判断 storageClass 是否设置为"",如果不为空就用claim中设置的 storageClass ,调用ctrl.provisionClaim(claim)方法创建一个新的 volume

    • 如果找到匹配的volume,则调用ctrl.bind(volume, claim)将volume与claim进行绑定,如果绑定正常就正常退出,如果bind错误就返回错误。

  2. 如果claim.Spec.VolumeName != " ",说明 claim 指定了绑定到一个特定的 VolumeName。那接下来先去本地cache中查找对应名称的 volume

    • 如果本地cache中没有找到,说明这时可能 volume 还没创建,那就将claim 状态设置为 “Pengding”,等下一个循环周期再处理,有可能下一个循环周期有volume 就创建好了。

    • 如果本地cache中找到了指定名称的 volume,那就检查volume 的字段volume.Spec.ClaimRef是不是为nil:

      • 如果为nil,说明 volume 是没有被使用的、处于"Availabel"的 volume ,接下来就将volume 与 claim 绑定,完成绑定后,claim 是 “Bound” 状态, pv 也是 "Bound"状态,这时就可以正常退出

      • 如果不为nil,调用方法 isVolumeBoundToClaim(volume, claim) 判断volume 是否已经绑定给这个 claim 了,“也就是说claim 指定要的是 volume,volume也要的是该claim,双向奔赴”,如果已经绑定了,则在次调用ctrl.bind(volume, claim),完成绑定


// syncUnboundClaim is the main controller method to decide what to do with an
// unbound claim.
func (ctrl *PersistentVolumeController) syncUnboundClaim(claim *v1.PersistentVolumeClaim) error {// This is a new PVC that has not completed binding// OBSERVATION: pvc is "Pending"// claim.Spec.VolumeName == "",说明该PVC 是一个还没有完成bingding的新的PVC,此时PVC处于"Pengding"状态if claim.Spec.VolumeName == "" {// User did not care which PV they get.// 检查 PVC 对象 中的storageClass,看storageClass的volumeBindingMode 是否启用了延迟bingding(默认值为Immediate)delayBinding, err := ctrl.shouldDelayBinding(claim)if err != nil {return err}// [Unit test set 1]// 从集群中存在的 volume 中查询是否匹配的 claimvolume, err := ctrl.volumes.findBestMatchForClaim(claim, delayBinding)if err != nil {glog.V(2).Infof("synchronizing unbound PersistentVolumeClaim[%s]: Error finding PV for claim: %v", claimToClaimKey(claim), err)return fmt.Errorf("Error finding PV for claim %q: %v", claimToClaimKey(claim), err)}// volume == nil,说明没有找到匹配claim的volumeif volume == nil {glog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: no volume found", claimToClaimKey(claim))// No PV could be found// OBSERVATION: pvc is "Pending", will retryswitch {// 检查 storageClass 中的volumeBindingMode字段是否设置了delayBinding,如果为真,这里只是抛出event,不会再继续为PVC 创建对应的 PV// 需要等到消费者完成准备(比如需要挂载该PVC的pod已经OK了),才会去创建PV,并做绑定case delayBinding:ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, events.WaitForFirstConsumer, "waiting for first consumer to be created before binding")// 判断 storageClass 是否设置为"",如果不为空就用claim中设置的 storageClass 来创建一个新的 volume case v1helper.GetPersistentVolumeClaimClass(claim) != "":// ctrl.provisionClaim(claim)方法 用于创建新的 volumeif err = ctrl.provisionClaim(claim); err != nil {return err}return nil// 如果没有找到,而且也没设置 storageClass ,则抛出eventdefault:ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, events.FailedBinding, "no persistent volumes available for this claim and no storage class is set")}// Mark the claim as Pending and try to find a match in the next// periodic syncClaim// 将 claim 状态设置为 "Pending",等待下一个循环周期到来时会再次处理if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil {return err}return nil// 如果 volume != nil,说明找到匹配claim的处于”Available“ 状态的 volume,这种情况就调用ctrl.bind(volume, claim),将volume 与 claim 绑定} else /* pv != nil */ {// Found a PV for this claim// OBSERVATION: pvc is "Pending", pv is "Available"glog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q found: %s", claimToClaimKey(claim), volume.Name, getVolumeStatusForLogging(volume))if err = ctrl.bind(volume, claim); err != nil {// On any error saving the volume or the claim, subsequent// syncClaim will finish the binding.return err}// OBSERVATION: claim is "Bound", pv is "Bound"// 完成绑定后,claim 是 "Bound" 状态, pv 也是 "Bound"状态,这时就可以正常退出return nil}// 如果 pvc.Spec.VolumeName != nil ,说明 claim 指定了绑定到对应的 VolumeName;那边接下来先去本地cache中查找对应名称的 volume} else /* pvc.Spec.VolumeName != nil */ {// [Unit test set 2]// User asked for a specific PV.glog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q requested", claimToClaimKey(claim), claim.Spec.VolumeName)obj, found, err := ctrl.volumes.store.GetByKey(claim.Spec.VolumeName)if err != nil {return err}// 如果本地cache中没有找到,说明这是可能 volume 还没创建,那就将claim 状态设置为 "Pengding",等下一个循环周期再处理,有可能下一个循环周期 // 有可能下一个循环周期 volume 就创建好了if !found {// User asked for a PV that does not exist.// OBSERVATION: pvc is "Pending"// Retry later.glog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q requested and not found, will try again next time", claimToClaimKey(claim), claim.Spec.VolumeName)// 更新 claim 的状态为 'Pengding'if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil {return err}return nil// 如果本地cache中找到了指定名称的 volume,那就检查volume 的字段volume.Spec.ClaimRef不是为nil//如果为nil,说明 volume 是没有被使用的,处于"Availabel"的 volume ,接下来就将volume 与 claim 绑定} else {volume, ok := obj.(*v1.PersistentVolume)if !ok {return fmt.Errorf("Cannot convert object from volume cache to volume %q!?: %+v", claim.Spec.VolumeName, obj)}glog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q requested and found: %s", claimToClaimKey(claim), claim.Spec.VolumeName, getVolumeStatusForLogging(volume))// volume.Spec.ClaimRef == nil, 则将volume 与 claim 绑定if volume.Spec.ClaimRef == nil {// User asked for a PV that is not claimed// OBSERVATION: pvc is "Pending", pv is "Available"glog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume is unbound, binding", claimToClaimKey(claim))// 绑定前做一些检查工作if err = checkVolumeSatisfyClaim(volume, claim); err != nil {glog.V(4).Infof("Can't bind the claim to volume %q: %v", volume.Name, err)//send an eventmsg := fmt.Sprintf("Cannot bind to requested volume %q: %s", volume.Name, err)ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.VolumeMismatch, msg)//volume does not satisfy the requirements of the claimif _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil {return err}// 执行 claim 与 volume的绑定} else if err = ctrl.bind(volume, claim); err != nil {// On any error saving the volume or the claim, subsequent// syncClaim will finish the binding.return err}// OBSERVATION: pvc is "Bound", pv is "Bound"// 如果完成绑定后,claim 是 "Bound" 状态, pv 也是 "Bound"状态,这时就可以正常退出return nil// 接下来看看,如果volume.Spec.ClaimRef != nil时,// 而且isVolumeBoundToClaim(volume, claim)判断volume 如果已经绑定给 claim 了,而此时应为PVC 是 "Pengding"的,所以调用ctrl.bind(volume, claim),完成绑定} else if isVolumeBoundToClaim(volume, claim) {// User asked for a PV that is claimed by this PVC// OBSERVATION: pvc is "Pending", pv is "Bound"glog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume already bound, finishing the binding", claimToClaimKey(claim))// Finish the volume binding by adding claim UID.if err = ctrl.bind(volume, claim); err != nil {return err}// OBSERVATION: pvc is "Bound", pv is "Bound"// 如果完成绑定后,claim 是 "Bound" 状态, pv 也是 "Bound"状态,这时就可以正常退出return nil} else {// User asked for a PV that is claimed by someone else// OBSERVATION: pvc is "Pending", pv is "Bound"if !metav1.HasAnnotation(claim.ObjectMeta, annBoundByController) {glog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume already bound to different claim by user, will retry later", claimToClaimKey(claim))// User asked for a specific PV, retry laterif _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil {return err}return nil} else {// This should never happen because someone had to remove// annBindCompleted annotation on the claim.glog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume already bound to different claim %q by controller, THIS SHOULD NEVER HAPPEN", claimToClaimKey(claim), claimrefToClaimKey(volume.Spec.ClaimRef))return fmt.Errorf("Invalid binding of claim %q to volume %q: volume already claimed by %q", claimToClaimKey(claim), claim.Spec.VolumeName, claimrefToClaimKey(volume.Spec.ClaimRef))}}}}
}

provisionClaim

接下来分析一下provisionClaim是如何为claim,创建volume的。方法调用流程如下:

provisionClaim(claim) —> ctrl.provisionClaimOperation(claim) —> provisionClaimOperation(claim)

provisionClaim(claim)的逻辑是先判断是否启用了动态dynamicProvisoning,如果没启动则退出。如果启用了调用ctrl.provisionClaimOperation(claim)方法继续处理。

// provisionClaim starts new asynchronous operation to provision a claim if
// provisioning is enabled.
func (ctrl *PersistentVolumeController) provisionClaim(claim *v1.PersistentVolumeClaim) error {if !ctrl.enableDynamicProvisioning {return nil}glog.V(4).Infof("provisionClaim[%s]: started", claimToClaimKey(claim))opName := fmt.Sprintf("provision-%s[%s]", claimToClaimKey(claim), string(claim.UID))startTime := time.Now()// 调用ctrl.scheduleOperation()方法,执行provisionClaim; 这里ctrl.scheduleOperation()方法是保证只有一个gorouting运行ctrl.scheduleOperation(opName, func() error {// ctrl.provisionClaimOperation(claim) 为核心代码pluginName, err := ctrl.provisionClaimOperation(claim)timeTaken := time.Since(startTime).Seconds()metrics.RecordVolumeOperationMetric(pluginName, "provision", timeTaken, err)return err})return nil
}

provisionClaimOperation() 是创建卷的核心方法,它的逻辑如下

(1) 获取创建claim的storageclass

(2) 为 claim 添加一个 provisioner 的 annotation标识(volume.beta.kubernetes.io/storage-provisioner= class.Provisioner),这个标识表示这个claim 是由哪个plugin实现provision的

(3) 获取 Provisiong 的插件plugin,如果没有plugin==nil表示没有找到插件,则抛出event后退出。

(4) 如果找到plugin:

- 通过claim 的名称组装pv的名称(pv的名称就是"pvc"-claim.UID)
- 通过pvName 去k8s apisever查询是否存在volume, 如果已存在,则不需要再provision volume了,则退出
- 如果volume 不存在,准备开始创建volume,先准备创建volume需要的参数,之后创建一个provisioner 接口,provisioner实现了provision volume 的具体方法
- 执行 provision() 方法,这个方法会调用csi plugin,创建一个新的volume。具体什么volume,则要看plugin类型,比如ceph rbd或nfs等等
- 为创建出来的 volume 关联 pvc 对象(ClaimRef),尝试为volume 创建 pv 对象 (重复多次)

// provisionClaimOperation provisions a volume. This method is running in
// standalone goroutine and already has all necessary locks.
func (ctrl *PersistentVolumeController) provisionClaimOperation(claim *v1.PersistentVolumeClaim) (string, error) {// 获取创建claim的storageclassclaimClass := v1helper.GetPersistentVolumeClaimClass(claim)glog.V(4).Infof("provisionClaimOperation [%s] started, class: %q", claimToClaimKey(claim), claimClass)// 获取 Provisiong 的插件pluginplugin, storageClass, err := ctrl.findProvisionablePlugin(claim)if err != nil {ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, err.Error())glog.V(2).Infof("error finding provisioning plugin for claim %s: %v", claimToClaimKey(claim), err)// The controller will retry provisioning the volume in every// syncVolume() call.return "", err}var pluginName stringif plugin != nil {pluginName = plugin.GetPluginName()}// Add provisioner annotation so external provisioners know when to start// 为 claim 添加一个 provisioner annotationnewClaim, err := ctrl.setClaimProvisioner(claim, storageClass)if err != nil {// Save failed, the controller will retry in the next syncglog.V(2).Infof("error saving claim %s: %v", claimToClaimKey(claim), err)return pluginName, err}claim = newClaim// 如果没有找到plugin,则抛出event后退出。if plugin == nil {// findProvisionablePlugin returned no error nor plugin.// This means that an unknown provisioner is requested. Report an event// and wait for the external provisionermsg := fmt.Sprintf("waiting for a volume to be created, either by external provisioner %q or manually created by system administrator", storageClass.Provisioner)ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, events.ExternalProvisioning, msg)glog.V(3).Infof("provisioning claim %q: %s", claimToClaimKey(claim), msg)return pluginName, nil}// internal provisioning//  A previous doProvisionClaim may just have finished while we were waiting for//  the locks. Check that PV (with deterministic name) hasn't been provisioned//  yet.// plugin != nil// 通过claim 的名称组装pv的名称(pv的名称就是"pvc"-claim.UID)pvName := ctrl.getProvisionedVolumeNameForClaim(claim)// 通过pvName 去k8s apisever查询是否存在volume, 如果已存在,则不需要再provision volume了,则退出函数,volume, err := ctrl.kubeClient.CoreV1().PersistentVolumes().Get(pvName, metav1.GetOptions{})if err == nil && volume != nil {// Volume has been already provisioned, nothing to do.glog.V(4).Infof("provisionClaimOperation [%s]: volume already exists, skipping", claimToClaimKey(claim))return pluginName, err}// Prepare a claimRef to the claim early (to fail before a volume is// provisioned)claimRef, err := ref.GetReference(scheme.Scheme, claim)if err != nil {glog.V(3).Infof("unexpected error getting claim reference: %v", err)return pluginName, err}// Gather provisioning optionstags := make(map[string]string)tags[CloudVolumeCreatedForClaimNamespaceTag] = claim.Namespacetags[CloudVolumeCreatedForClaimNameTag] = claim.Nametags[CloudVolumeCreatedForVolumeNameTag] = pvName// 准备创建volume需要的参数options := vol.VolumeOptions{PersistentVolumeReclaimPolicy: *storageClass.ReclaimPolicy,MountOptions:                  storageClass.MountOptions,CloudTags:                     &tags,ClusterName:                   ctrl.clusterName,PVName:                        pvName,PVC:                           claim,Parameters:                    storageClass.Parameters,}// Refuse to provision if the plugin doesn't support mount options, creation// of PV would be rejected by validation anywayif !plugin.SupportsMountOption() && len(options.MountOptions) > 0 {strerr := fmt.Sprintf("Mount options are not supported by the provisioner but StorageClass %q has mount options %v", storageClass.Name, options.MountOptions)glog.V(2).Infof("Mount options are not supported by the provisioner but claim %q's StorageClass %q has mount options %v", claimToClaimKey(claim), storageClass.Name, options.MountOptions)ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)return pluginName, fmt.Errorf("provisioner %q doesn't support mount options", plugin.GetPluginName())}// Provision the volume// 开始provision一个volume// 创建一个provisioner对象,provisioner实现了创建volume 的具体方法provisioner, err := plugin.NewProvisioner(options)if err != nil {strerr := fmt.Sprintf("Failed to create provisioner: %v", err)glog.V(2).Infof("failed to create provisioner for claim %q with StorageClass %q: %v", claimToClaimKey(claim), storageClass.Name, err)ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)return pluginName, err}var selectedNode *v1.Node = nilif nodeName, ok := claim.Annotations[annSelectedNode]; ok {selectedNode, err = ctrl.NodeLister.Get(nodeName)if err != nil {strerr := fmt.Sprintf("Failed to get target node: %v", err)glog.V(3).Infof("unexpected error getting target node %q for claim %q: %v", nodeName, claimToClaimKey(claim), err)ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)return pluginName, err}}allowedTopologies := storageClass.AllowedTopologiesopComplete := util.OperationCompleteHook(plugin.GetPluginName(), "volume_provision")// 执行 provision() 方法,这个方法会调用csi plugin,创建一个新的volume。// 具体什么volume,则要看plugin类型,比如ceph rbd或nfs等等volume, err = provisioner.Provision(selectedNode, allowedTopologies)opComplete(&err)if err != nil {// Other places of failure have nothing to do with VolumeScheduling,// so just let controller retry in the next sync. We'll only call func// rescheduleProvisioning here when the underlying provisioning actually failed.ctrl.rescheduleProvisioning(claim)strerr := fmt.Sprintf("Failed to provision volume with StorageClass %q: %v", storageClass.Name, err)glog.V(2).Infof("failed to provision volume for claim %q with StorageClass %q: %v", claimToClaimKey(claim), storageClass.Name, err)ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)return pluginName, err}glog.V(3).Infof("volume %q for claim %q created", volume.Name, claimToClaimKey(claim))// Create Kubernetes PV object for the volume.if volume.Name == "" {volume.Name = pvName}// Bind it to the claim// 将 volume bind 到 claimvolume.Spec.ClaimRef = claimRefvolume.Status.Phase = v1.VolumeBoundvolume.Spec.StorageClassName = claimClass// Add annBoundByController (used in deleting the volume)// 为 volume 添加必要的 annotationmetav1.SetMetaDataAnnotation(&volume.ObjectMeta, annBoundByController, "yes")metav1.SetMetaDataAnnotation(&volume.ObjectMeta, annDynamicallyProvisioned, plugin.GetPluginName())// Try to create the PV object several timesfor i := 0; i < ctrl.createProvisionedPVRetryCount; i++ {glog.V(4).Infof("provisionClaimOperation [%s]: trying to save volume %s", claimToClaimKey(claim), volume.Name)var newVol *v1.PersistentVolumeif newVol, err = ctrl.kubeClient.CoreV1().PersistentVolumes().Create(volume); err == nil || apierrs.IsAlreadyExists(err) {// Save succeeded.if err != nil {glog.V(3).Infof("volume %q for claim %q already exists, reusing", volume.Name, claimToClaimKey(claim))err = nil} else {glog.V(3).Infof("volume %q for claim %q saved", volume.Name, claimToClaimKey(claim))_, updateErr := ctrl.storeVolumeUpdate(newVol)if updateErr != nil {// We will get an "volume added" event soon, this is not a big errorglog.V(4).Infof("provisionClaimOperation [%s]: cannot update internal cache: %v", volume.Name, updateErr)}}break}// Save failed, try again after a while.glog.V(3).Infof("failed to save volume %q for claim %q: %v", volume.Name, claimToClaimKey(claim), err)time.Sleep(ctrl.createProvisionedPVInterval)}// err != nil 说明创建volume失败了,失败后需要对残留的volume 进行删除清理if err != nil {// Save failed. Now we have a storage asset outside of Kubernetes,// but we don't have appropriate PV object for it.// Emit some event here and try to delete the storage asset several// times.strerr := fmt.Sprintf("Error creating provisioned PV object for claim %s: %v. Deleting the volume.", claimToClaimKey(claim), err)glog.V(3).Info(strerr)ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)var deleteErr errorvar deleted boolfor i := 0; i < ctrl.createProvisionedPVRetryCount; i++ {// 删除 volume_, deleted, deleteErr = ctrl.doDeleteVolume(volume)if deleteErr == nil && deleted {// Delete succeededglog.V(4).Infof("provisionClaimOperation [%s]: cleaning volume %s succeeded", claimToClaimKey(claim), volume.Name)break}if !deleted {// This is unreachable code, the volume was provisioned by an// internal plugin and therefore there MUST be an internal// plugin that deletes it.glog.Errorf("Error finding internal deleter for volume plugin %q", plugin.GetPluginName())break}// Delete failed, try again after a while.glog.V(3).Infof("failed to delete volume %q: %v", volume.Name, deleteErr)time.Sleep(ctrl.createProvisionedPVInterval)}if deleteErr != nil {// Delete failed several times. There is an orphaned volume and there// is nothing we can do about it.strerr := fmt.Sprintf("Error cleaning provisioned volume for claim %s: %v. Please delete manually.", claimToClaimKey(claim), deleteErr)glog.V(2).Info(strerr)ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningCleanupFailed, strerr)}// err == nil,表示provision创建成功,这是抛出event并退出} else {glog.V(2).Infof("volume %q provisioned for claim %q", volume.Name, claimToClaimKey(claim))msg := fmt.Sprintf("Successfully provisioned volume %s using %s", volume.Name, plugin.GetPluginName())ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, events.ProvisioningSucceeded, msg)}return pluginName, nil
}

总结

PV controller的作用是对PV和PVC资源生命周期管理: 首先,PV控制器负责将PVC与合适的PV进行绑定。它会根据PVC的请求和PV的可用性来匹配合适的存储资源。同时PV控制器管理PV的状态,确保PV在被使用时不会被其他PVC绑定,同时也会处理PV的回收和删除。其次,如果PVC配置了StorageClass,PV控制器可以动态地为PVC创建新的PV。

本文通过源码学习了控制器的原理是如何实现。

参考文献

  1. kube-controller-manager源码分析-PV controller分析

  2. 从零开始入门 K8s | Kubernetes 存储架构及插件使用

  3. kubernetes pv-controller 解析

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/496114.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

springboot497基于java国产动漫网站设计和实现(论文+源码)_kaic

摘 要 互联网发展至今&#xff0c;无论是其理论还是技术都已经成熟&#xff0c;而且它广泛参与在社会中的方方面面。它让信息都可以通过网络传播&#xff0c;搭配信息管理工具可以很好地为人们提供服务。针对信息管理混乱&#xff0c;出错率高&#xff0c;信息安全性差&#x…

音视频入门知识(二)、图像篇

⭐二、图像篇 视频基本要素&#xff1a;宽、高、帧率、编码方式、码率、分辨率 ​ 其中码率的计算&#xff1a;码率(kbps)&#xff1d;文件大小(KB)&#xff0a;8&#xff0f;时间(秒)&#xff0c;即码率和视频文件大小成正比 YUV和RGB可相互转换 ★YUV&#xff08;原始数据&am…

Windows下C++使用SQLite

1、安装 进入SQLite Download Page页面&#xff0c;下载sqlite-dll-win-x86-*.zip、sqlite-amalgamation-*.zip、sqlite-tools-win-x64-*.zip三个包&#xff0c;这三个包里分别包含dll文件和def文件、头文件、exe工具。 使用vs命令行工具生成.lib文件&#xff1a;进入dll和def文…

[代码随想录23回溯]回溯的组合问题+分割子串

前言 回溯是什么&#xff1f; 题目链接 39. 组合总和 - 力扣&#xff08;LeetCode&#xff09; 40. 组合总和 II - 力扣&#xff08;LeetCode&#xff09; 131. 分割回文串 - 力扣&#xff08;LeetCode&#xff09; 一、组合问题 private:vector<vector<int>>r…

xinput1_3.dll放在哪里?当xinput1_3.dll丢失时的应对策略:详细解决方法汇总

在计算机系统的运行过程中&#xff0c;我们偶尔会遇到一些令人困扰的问题&#xff0c;其中xinput1_3.dll文件丢失就是较为常见的一种情况。这个看似不起眼的动态链接库文件&#xff0c;实则在许多软件和游戏的正常运行中发挥着至关重要的作用。一旦它丢失&#xff0c;可能会导致…

运行Zr.Admin项目(后端)

1.下载Zr.Admin代码压缩包 https://codeload.github.com/izhaorui/Zr.Admin.NET/zip/refs/heads/main 2.打开项目 我这里装的是VS2022社区版 进入根目录&#xff0c;双击ZRAdmin.sln打开项目 3.安装.net7运行时 我当时下载的代码版本是.net7的 点击安装 点击安装&#xff0…

MySQL 锁概述

1.锁的分类 根据不同的分类角度可将锁分为&#xff1a; 按是否共享分&#xff1a;S 锁、X 锁按粒度分&#xff1a;表级锁、行级锁、全局锁&#xff08;锁整个库&#xff09;、页锁&#xff08;锁数据页&#xff09;意向锁&#xff1a;意向 S 锁、意向 X 锁&#xff1a;都是表…

记Fastjson2的一个报ConcurrentModificationException的bug

错误背景&#xff1a;fastjson2的parseObject方法&#xff0c;在spring webflux项目中被调用&#xff0c;有时会报java.util.ConcurrentModificationException错误。报错处的代码如下图&#xff1a; 改了半天与并发安全相关的代码&#xff0c;还是会报此错误。后来改变思路搜…

【VScode】第三方GPT编程工具-CodeMoss安装教程

一、CodeMoss是什么&#xff1f; CodeMoss是一款集编程、学习和办公于一体的高效工具。它兼容多种主流平台&#xff0c;包括VSCode、IDER、Chrome插件、Web和APP等&#xff0c;支持插件安装&#xff0c;尤其在VSCode和IDER上的表现尤为出色。无论你是编程新手还是资深开发者&a…

音视频入门基础:AAC专题(13)——FFmpeg源码中,获取ADTS格式的AAC裸流音频信息的实现

音视频入门基础&#xff1a;AAC专题系列文章&#xff1a; 音视频入门基础&#xff1a;AAC专题&#xff08;1&#xff09;——AAC官方文档下载 音视频入门基础&#xff1a;AAC专题&#xff08;2&#xff09;——使用FFmpeg命令生成AAC裸流文件 音视频入门基础&#xff1a;AAC…

docker-compose搭建sfpt服务器

1. 搭建 创建sftp目录&#xff0c;进入该目录创建docker-compose.yml文件内容如下&#xff1a; version: 3.7services:sftp:image: atmoz/sftpcontainer_name: sftpports:- "122:22"volumes:- ./sftp-data:/homeenvironment:SFTP_USERS: "liubei:liubei161:10…

散斑/横向剪切/迈克尔逊/干涉条纹仿真技术分析

摘要 本博文提供了多种数据类型的干涉条纹仿真&#xff0c;并展示了它们对应的散斑干涉条纹。还分别给出了横向剪切干涉以及剪切散斑干涉条纹的仿真。 一、迈克尔逊干涉与散斑干涉仿真 下图为干涉条纹与对应的散斑干涉条纹的仿真示意图。其中&#xff0c;干涉条纹可认为是源…

Go快速开发框架2.6.0版本更新内容快速了解

GoFly企业版框架2.6.0版本更新内容较多&#xff0c;为了大家能够快速了解&#xff0c;本文将把更新内容列出详细讲解。本次更新一段时间以来大伙反馈的问题&#xff0c;并且升级后台安全认证机制&#xff0c;增加了RBAC权限管理及系统操作日志等提升后台数据安全性。 更新明细…

通过GRE协议组建VPN网络

GRE&#xff08;Generic Routing Encapsulation&#xff0c;通用路由封装协议&#xff09;协议是一种简单而有效的封装协议&#xff0c;它在网络中的广泛应用&#xff0c;比如在构建VPN网络。   GRE是一种封装协议&#xff0c;它允许网络层协议&#xff08;如IP&#xff09;的…

论文阅读 - 《Large Language Models Are Zero-Shot Time Series Forecasters》

Abstract 通过将时间序列编码为数字组成的字符串&#xff0c;我们可以将时间序列预测当做文本中下一个 token预测的框架。通过开发这种方法&#xff0c;我们发现像GPT-3和LLaMA-2这样的大语言模型在下游任务上可以有零样本时间序列外推能力上持平或者超过专门设计的时间序列训…

16 循环语句——for循环

#字符串是可以进行迭代的 for 循环: for 变量 in 可迭代的东西: 代码 把可迭代的东西中的每一项内容拿出来&#xff0c;挨个的赋值给变量&#xff0c;每一次的赋值都要执行一次循环体(代码) s "你好呀&#xff0c;我叫赛利…

K8s 不同层次的进程间通信实现

在 Kubernetes (K8s) 中&#xff0c;不同层次的进程间通信实现方式如下&#xff1a; 1. Pod 内进程间通信 Pod 是 Kubernetes 中的最小部署单元&#xff0c;通常包含一个或多个共享相同网络命名空间的容器。 方式&#xff1a; 使用 localhost 和容器暴露的端口进行通信。共享文…

PH热榜 | 2024-12-26

1. Tutor LMS 3.0 标语&#xff1a;一体化WordPress学习管理系统 介绍&#xff1a;Tutor LMS 3.0焕然一新&#xff0c;内置电商功能和AI工具&#xff0c;让用户可以轻松创建引人入胜的在线课程&#xff0c;管理订阅&#xff0c;并直接在平台上创收。 产品网站&#xff1a; 立…

有没有免费提取音频的软件?音频编辑软件介绍!

出于工作和生活娱乐等原因&#xff0c;有时候我们需要把音频单独提取出来&#xff08;比如歌曲伴奏、人声清唱等、乐器独奏等&#xff09;。要提取音频必须借助音频处理软件&#xff0c;那么有没有免费提取音频的软件呢&#xff1f;下面我们将为大家介绍几款免费软件&#xff0…

C++--------------树

探索 C 中的树结构&#xff1a;从基础到应用 在 C 编程的世界里&#xff0c;树结构是一种非常重要且强大的数据结构&#xff0c;它在许多领域都有着广泛的应用&#xff0c;从简单的数据存储到复杂的算法实现&#xff0c;树结构都展现出了独特的优势。今天&#xff0c;就让我们一…