kubernetes存储之PV controller源码解读
摘要
本文介绍kubernetes存储架构的基础,并重点对PV controller的源码进行了学习
引入
从一个业务场景出发,假如你所在的公司,有个物理机,上面部署了web服务器,随着业务增长,本地磁盘空间不够使用了。业务使用方想你所在的基础设施部门申请加1T的块设备存储空间。于是你给出了解决方案:
- 方案一:给物理机添加一块新磁盘
实施步骤如下:
-
获取一块磁盘(买、借都可以)
-
将磁盘插入服务器插槽
-
物理机OS层面识别磁盘,并格式化后再mount到目录
- 方案二: 假如已经一个ceph存储集群了,也可以从ceph存储划一个卷,映射给物理机使用
实施步骤如下:
- rbd create 创建一个卷
- rbd map将卷映射给物理机
- 物理机OS层面识别磁盘,并格式化后再mount到目录
再进一步设想,假如业务使用的容器POD,而不是物理机呢?
方案一:我们把pod当成物理机,与物理机一样,在pod内部安装ceph rbd客户端,再把卷rbd map给pod使用,最后进入pod 把卷mount到用户需要的目录。一切都手动完成,效率低、也是非主流方式。
方案二:使用kubernetes+ceph csi,实现为pod添加持久化存储的自动化
kubernetes存储架构
存储基础
在kubernetes环境中,通过PVC与PV对存储卷进行抽象。
PV与PVC
-
PersistentVolumeClaim (简称PVC): 是用户存储的请求。它和Pod类似。Pod消耗Node资源,PVC消耗PV资源。Pod可以请求特定级别的资源(CPU和MEM)。PVC可以请求特定大小和访问模式的PV。
-
PersistentVolume (简称PV): 由管理员设置的存储,它是集群的一部分。就像节点(Node)是集群中的资源一样,PV也是集群中的资源。它包含存储类型,存储大小和访问模式。它的生命周期独立于Pod,例如当使用它的Pod销毁时对PV没有影响。
PV提供方式
- 静态PV:集群管理员创建许多PV,它们包含可供集群用户使用的实际存储的详细信息。
- 动态PV:当管理员创建的静态PV都不匹配用户创建的PersistentVolumeClaim时,集群会为PVC动态的配置卷。此配置基于StorageClasses:PVC必须请求存储类(storageclasses),并且管理员必须已创建并配置该类,以便进行动态创建。
PV 的访问模式
- ReadWriteOnce - 卷以读写方式挂载到单个节点
- ReadOnlyMany - 卷以只读方式挂载到多个节点
- ReadWriteMany - 卷以读写方式挂载到多个节点
PVC回收策略
- Retain - 手动回收。在删除pvc后PV变为Released不可用状态, 若想重新被使用,需要管理员删除pv,重新创建pv,删除pv并不会删除存储的资源,只是删除pv对象而已;若想保留数据,请使用该Retain。
- Recycle - 基本擦洗(rm -rf /thevolume/)。 删除pvc自动清除PV中的数据,效果相当于执行 rm -rf /thevolume/。删除pvc时,pv的状态由Bound变为Available。此时可重新被pvc申请绑定。
- Delete - 删除存储上的对应存储资源。关联的存储资产(如AWS EBS,GCE PD,Azure磁盘或OpenStack Cinder卷)将被删除。NFS不支持delete策略。
PV的状态
- Available(可用状态) - 一块空闲资源还没有被任何声明绑定
- Bound(绑定状态) - 声明分配到PVC进行绑定,PV进入绑定状态
- Released(释放状态) - PVC被删除,PV进入释放状态,等待回收处理
- Failed(失败状态) - PV执行自动清理回收策略失败
(图片来自于网络)
PVC的状态
- Pending(等待状态) - 等待绑定PV
- Bound(绑定状态) - PV已绑定PVC
- Lost(绑定丢失) - 再次绑定PV后进入Bound状态
(图片来自于网络)
PVC yaml 字段描述
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:annotations:# 这个注解用于标记 PV 是否已经成功绑定到一个 PVC。当 PV 和 PVC 成功绑定时,Kubernetes 会在 PV 上添加这个注解# Kubernetes pv controller 控制器会检查这个注解来确定绑定过程的状态,从而进行相应的管理操作。pv.kubernetes.io/bind-completed: "yes"# 这个注解用于标记 PV 是否是由 Kubernetes 的存储控制器自动绑定到 PVC 的。如果 PV 是通过 StorageClass 动态创建的,或者是由控制器自动绑定的,这个注解会被设置为 "yes"。pv.kubernetes.io/bound-by-controller: "yes"# 这个注解用于指定创建 PV 的存储预配置器(storageClass)的名称。在动态存储管理中,存储预配置器会根据 PVC 的请求自动创建和配置 PV,而不需要管理员手动创建 PVvolume.beta.kubernetes.io/storage-provisioner: rbd.csi.obs.comcreationTimestamp: 2024-08-05T06:29:03Zfinalizers:- kubernetes.io/pvc-protection# PVC 的名称name: basepvc-dg03dev8jxdh# PVC所在的命名空间namespace: publicresourceVersion: "11508437467"selfLink: /api/v1/namespaces/public/persistentvolumeclaims/basepvc-dg03dev80407964jxdh-swap# 用于唯一表示PVC,同时用于管理 PVC 与 PersistentVolume (PV) 之间的绑定关系。uid: 020270a5-52f4-11ef-b4be-e8ebd398881c
spec:# PVC 的访问模式accessModes:- ReadWriteOnce# 如果是克隆卷,这里用于标识由哪个快照创建而来的dataSource: nullresources:requests:# 请求卷的空间大小storage: 51GistorageClassName: csi-virt-rbd# block 或 filesystem,是否需要文件系统格式化volumeMode: Filesystem# 绑定PV的名称,规范是"pvc-<pvc的uid>"volumeName: pvc-020270a5-52f4-11ef-b4be-e8ebd398881c
status:accessModes:- ReadWriteOncecapacity:storage: 51Gi# PVC当前的状态 phase: Bound
PV yaml 字段描述
---
apiVersion: v1
kind: PersistentVolume
metadata:annotations:# 这个注解用于指定创建 PV 的存储预配置器的名称。pv.kubernetes.io/provisioned-by: rbd.csi.obs.comcreationTimestamp: 2024-08-05T06:29:03Zfinalizers:- kubernetes.io/pv-protectionlabels:baymax.io/rbd-cluster-name: ceph10name: pvc-020270a5-52f4-11ef-b4be-e8ebd398881cresourceVersion: "10132283575"selfLink: /api/v1/persistentvolumes/pvc-020270a5-52f4-11ef-b4be-e8ebd398881cuid: 0220ec79-52f4-11ef-b4be-e8ebd398881c
spec:accessModes:- ReadWriteOncecapacity:storage: 51Gi# claimRef 用于记录 PV 绑定到的 PVC 的信息,包括 PVC 的名称、命名空间和 UID。claimRef:apiVersion: v1kind: PersistentVolumeClaim# pvc的名称name: basepvc-dg03dev80407964jxdh-swapnamespace: publicresourceVersion: "10132283544"# pvc的uiduid: 020270a5-52f4-11ef-b4be-e8ebd398881c# PV 对应的 volume卷由csi进行生命周期管理 csi:# 存储驱动的名称driver: rbd.csi.obs.com# 卷格式化时指定的文件系统类型fsType: ext4volumeAttributes:adminKeyring: AQBdZFtkGyvfxxxxxxxxxxxxxxxxxxxxxxx (密码隐藏)baymax.io/rbd-pvc-name: basepvc-dg03dev80407964jxdh-swapbaymax.io/rbd-pvc-namespace: publicclustername: ceph10monitors: 10.x.x.x:6789,10.x.x.x:6789,10.x.x.x:6789 (IP隐藏)storage.kubernetes.io/csiProvisionerIdentity: 1678171935067-8081-rbd.csi.obs.comuserKeyring: AQBKqlxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx (密码隐藏)# volumeHandle 是由 CSI 存储驱动程序生成的唯一标识符,用于标识唯一标识一个特定的存储卷。 # 在 PV 和 PVC 成功绑定后,volumeHandle 确保 Kubernetes 能够正确地将 PV 与 PVC 关联起来。volumeHandle: csi-rbd-pvc-020270a5-52f4-11ef-b4be-e8ebd398881c# 回收策略 persistentVolumeReclaimPolicy: Delete# storageClass的名称storageClassName: csi-virt-rbdvolumeMode: Filesystem
status:phase: Bound
** storageClass yaml 字段描述**
# kubectl get storageclasses.storage.k8s.io ceph-nbd
NAME PROVISIONER AGE
ceph-nbd ceph-nbd.csi.obs.com 53d
# kubectl get storageclasses.storage.k8s.io ceph-nbd -oyaml
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:annotations:kubectl.kubernetes.io/last-applied-configuration: |{"apiVersion":"storage.k8s.io/v1","kind":"StorageClass","metadata":{"annotations":{},"name":"ceph-nbd"},"provisioner":"ceph-nbd.csi.obs.com","reclaimPolicy":"Delete"}creationTimestamp: 2024-10-25T02:03:28Zname: ceph-nbdresourceVersion: "10961600994"selfLink: /apis/storage.k8s.io/v1/storageclasses/ceph-nbduid: 53c697ad-9275-11ef-88f0-e8ebd3986310
provisioner: ceph-nbd.csi.obs.com
# volume回收模式
reclaimPolicy: Delete
# 绑定策略,立即还是延迟
volumeBindingMode: Immediate
kubernetes存储结构
在kubernetes中,与存储相关的组件如下:
(图片来自于网络)
-
PV Controller: 负责
PV/PVC
的绑定、生命周期管理,并根据需求进行数据卷的 Provision/Delete 操作; -
AD Controller:负责存储设备的
Attach/Detach
操作,将设备挂载到目标节点; -
Volume Manager:管理卷的
Mount/Unmount
操作、卷设备的格式化以及挂载到一些公用目录上的操作; -
Volume Plugins:它主要是对上面所有挂载功能的实现;
PV Controller、AD Controller、Volume Manager 主要是进行操作的调用,而具体操作则是由 Volume Plugins 实现的。
接下来,本文将对PV controller的源码进行分析,深入学习PV controller是如何实现的。
PV controller 的启动
如果对 kube-controller-manager 组件有一定了解,kube-controller-manager 包括了多种controller,包括node contrller、deployment controller、service controller、Daemonset controller、PV controller和AD controller等。
在kube-controller-manager进程启动时,会依次启动这些controller.
pv controller 启动流程如下:
main()—> NewHyperKubeCommand()—>kubecontrollermanager.NewControllerManagerCommand()—>Run() —> StartControllers() —> NewControllerInitializers() —> startPersistentVolumeBinderController(ctx) —>NewController()—> go volumeController.Run(ctx.Stop)
kube-controller-manager 在启动Run()时,会调用startPersistentVolumeBinderController(ctx)函数,在函数内部会调用函数NewController(),先实例化一个PV controller,再通过协程启动PV controller: go volumeController.Run(ctx.Stop)
源码路径: k8s.io/kubernetes/pkg/controller/volume/persistentvolume/pv_controller_base.go
备注: 本文代码是v1.12版本
NewController
PV controller 结构体包括的如下字段,重要的有:各种lister,pv controller通过lister 可以从本地cache中查询资源的信息和状态;包括2个本地cache缓存空间,用来分别保存k8s已有的volume和claims的信息与状态。另外定义了2个消息队列,当有新的claim和volume 对应的创建、删除、更新等事件时,就将对应事件放到队列,等待PV controller进一步处理。
startPersistentVolumeBinderController(ctx) 源码中使用这个函数实例化一个PV controller并通过协程方式启动。
// kubernetes/pkg/controller/volume/persistentvolume/pv_controller_base.gofunc startPersistentVolumeBinderController(ctx ControllerContext) (http.Handler, bool, error) {params := persistentvolumecontroller.ControllerParameters{KubeClient: ctx.ClientBuilder.ClientOrDie("persistent-volume-binder"),SyncPeriod: ctx.ComponentConfig.PersistentVolumeBinderController.PVClaimBinderSyncPeriod.Duration,VolumePlugins: ProbeControllerVolumePlugins(ctx.Cloud, ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration),Cloud: ctx.Cloud,ClusterName: ctx.ComponentConfig.KubeCloudShared.ClusterName,VolumeInformer: ctx.InformerFactory.Core().V1().PersistentVolumes(),ClaimInformer: ctx.InformerFactory.Core().V1().PersistentVolumeClaims(),ClassInformer: ctx.InformerFactory.Storage().V1().StorageClasses(),PodInformer: ctx.InformerFactory.Core().V1().Pods(),NodeInformer: ctx.InformerFactory.Core().V1().Nodes(),EnableDynamicProvisioning: ctx.ComponentConfig.PersistentVolumeBinderController.VolumeConfiguration.EnableDynamicProvisioning,}// 实例化一个pv controllervolumeController, volumeControllerErr := persistentvolumecontroller.NewController(params)if volumeControllerErr != nil {return nil, true, fmt.Errorf("failed to construct persistentvolume controller: %v", volumeControllerErr)}// 协程方式启动 PV controllergo volumeController.Run(ctx.Stop)return nil, true, nil
}
NewController() 创建一个新的pv controller
// kubernetes/pkg/controller/volume/persistentvolume/pv_controller_base.go// NewController creates a new PersistentVolume controller
func NewController(p ControllerParameters) (*PersistentVolumeController, error) {// 初始化一个 事件记录器eventRecorder := p.EventRecorderif eventRecorder == nil {broadcaster := record.NewBroadcaster()broadcaster.StartLogging(glog.Infof)broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: p.KubeClient.CoreV1().Events("")})eventRecorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "persistentvolume-controller"})}// 实例化一个PV controllercontroller := &PersistentVolumeController{volumes: newPersistentVolumeOrderedIndex(),claims: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc),kubeClient: p.KubeClient,eventRecorder: eventRecorder,runningOperations: goroutinemap.NewGoRoutineMap(true /* exponentialBackOffOnError */),cloud: p.Cloud,enableDynamicProvisioning: p.EnableDynamicProvisioning,clusterName: p.ClusterName,createProvisionedPVRetryCount: createProvisionedPVRetryCount,createProvisionedPVInterval: createProvisionedPVInterval,claimQueue: workqueue.NewNamed("claims"),volumeQueue: workqueue.NewNamed("volumes"),resyncPeriod: p.SyncPeriod,}// Prober is nil because PV is not aware of Flexvolume.if err := controller.volumePluginMgr.InitPlugins(p.VolumePlugins, nil /* prober */, controller); err != nil {return nil, fmt.Errorf("Could not initialize volume plugins for PersistentVolume Controller: %v", err)}/// 定义volume事件处理函数,当有新的volume事件时会放到消息队列 controller.volumeQueue 中去p.VolumeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc: func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) },UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.volumeQueue, newObj) },DeleteFunc: func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) },},)controller.volumeLister = p.VolumeInformer.Lister()controller.volumeListerSynced = p.VolumeInformer.Informer().HasSynced// 定义claim事件处理函数,当有新的volume事件时会放到消息队列 controller.volumeQueue 中去p.ClaimInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc: func(obj interface{}) { controller.enqueueWork(controller.claimQueue, obj) },UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.claimQueue, newObj) },DeleteFunc: func(obj interface{}) { controller.enqueueWork(controller.claimQueue, obj) },},)controller.claimLister = p.ClaimInformer.Lister()controller.claimListerSynced = p.ClaimInformer.Informer().HasSyncedcontroller.classLister = p.ClassInformer.Lister()controller.classListerSynced = p.ClassInformer.Informer().HasSyncedcontroller.podLister = p.PodInformer.Lister()controller.podListerSynced = p.PodInformer.Informer().HasSyncedcontroller.NodeLister = p.NodeInformer.Lister()controller.NodeListerSynced = p.NodeInformer.Informer().HasSyncedreturn controller, nil
}
ctrl.Run()
Run()
是pv controller启动入口, run() 的作用是启动了三个工作协程:
ctrl.resync: 定期重新同步控制器的状态,确保控制器的状态与集群中的实际状态一致,具体的实现是:定时循环查询出pv和pvc列表,然后放入到队列volumeQueue和claimQueue中,让volumeWorker和claimWorker进行消费。
ctrl.volumeWorker:通过消费队列volumeQueue,来处理与 PersistentVolume (PV) 相关的操作,包括创建、绑定、释放和删除 PV
ctrl.claimWorker:通过消费队列claimQueue,来处理与 PersistentVolumeClaim (PVC) 相关的操作,包括创建、绑定和删除 PVC。
任务:
// kubernetes/pkg/controller/volume/persistentvolume/pv_controller_base.gofunc (ctrl *PersistentVolumeController) Run(stopCh <-chan struct{}) {defer utilruntime.HandleCrash()defer ctrl.claimQueue.ShutDown()defer ctrl.volumeQueue.ShutDown()klog.Infof("Starting persistent volume controller")defer klog.Infof("Shutting down persistent volume controller")if !cache.WaitForNamedCacheSync("persistent volume", stopCh, ctrl.volumeListerSynced, ctrl.claimListerSynced, ctrl.classListerSynced, ctrl.podListerSynced, ctrl.NodeListerSynced) {return}ctrl.initializeCaches(ctrl.volumeLister, ctrl.claimLister)// 核心代码! 启动三个协程go wait.Until(ctrl.resync, ctrl.resyncPeriod, stopCh)go wait.Until(ctrl.volumeWorker, time.Second, stopCh)go wait.Until(ctrl.claimWorker, time.Second, stopCh)metrics.Register(ctrl.volumes.store, ctrl.claims)<-stopCh
}
简单图示
ctrl.resync
作用是定时从cache中获取 pvc列表数据并放入到消费队列 ctrl.claimQueue,之后ctrl.claimWorker 循环任务来消费处理内部数据;同时定时从cache中获取 PV列表数据,并放入消费队列ctrl.volumeQueue,之后ctrl.volumeWorker 循环任务来消费处理内部数据
//kubernetes/pkg/controller/volume/persistentvolume/pv_controller_base.gofunc (ctrl *PersistentVolumeController) resync() {klog.V(4).Infof("resyncing PV controller")// 遍历pvc ,将pvc 放入 ctrl.claimQueue,等待处理pvcs, err := ctrl.claimLister.List(labels.NewSelector())if err != nil {klog.Warningf("cannot list claims: %s", err)return}// 将 pvc 放入消费队列 ctrl.claimQueuefor _, pvc := range pvcs {ctrl.enqueueWork(ctrl.claimQueue, pvc)}// 遍历pv ,将 pv 放入 ctrl.volumeQueue,等待处理pvs, err := ctrl.volumeLister.List(labels.NewSelector())if err != nil {klog.Warningf("cannot list persistent volumes: %s", err)return}// 将 pv 放入消费队列 ctrl.volumeQueuefor _, pv := range pvs {ctrl.enqueueWork(ctrl.volumeQueue, pv)}
}
简单图示:
这是一个典型的”生产者与消费者“模型,ctrl.resync负责将数据放入2个队列,之后ctrl.volumeWorker与ctrl.claimWorker消费队列里面的数据。
ctrl.volumeWorker
volumeWorker会不断循环消费volumeQueue队列里面的数据,然后获取到相应的PV执行updateVolume操作。
volumeWorker()函数的代码比较长,我们展开分析一下。
// volumeWorker processes items from volumeQueue. It must run only once,
// syncVolume is not assured to be reentrant.
func (ctrl *PersistentVolumeController) volumeWorker() {workFunc := func() bool {// 从消费队列 ctrl.volumeQueue 取一个 PV(或称为volume)keyObj, quit := ctrl.volumeQueue.Get()if quit {return true}defer ctrl.volumeQueue.Done(keyObj)key := keyObj.(string)glog.V(5).Infof("volumeWorker[%s]", key)_, name, err := cache.SplitMetaNamespaceKey(key)if err != nil {glog.V(4).Infof("error getting name of volume %q to get volume from informer: %v", key, err)return false}// 通过 volume name 查询本地cache,从cache中获取到 volume 对象volume, err := ctrl.volumeLister.Get(name)if err == nil {// The volume still exists in informer cache, the event must have// been add/update/sync// 如果 volume 存在,则用updateVolume() 函数处理这个volumectrl.updateVolume(volume)return false}if !errors.IsNotFound(err) {glog.V(2).Infof("error getting volume %q from informer: %v", key, err)return false}// The volume is not in informer cache, the event must have been// "delete"volumeObj, found, err := ctrl.volumes.store.GetByKey(key)if err != nil {glog.V(2).Infof("error getting volume %q from cache: %v", key, err)return false}if !found {// The controller has already processed the delete event and// deleted the volume from its cacheglog.V(2).Infof("deletion of volume %q was already processed", key)return false}volume, ok := volumeObj.(*v1.PersistentVolume)if !ok {glog.Errorf("expected volume, got %+v", volumeObj)return false}// 如果volume资源不存在,说明需要删除volume,则调用方法ctrl.deleteVolume(volume),让底层删除卷 ctrl.deleteVolume(volume)return false}// 不断执行 workFunc() 函数for {if quit := workFunc(); quit {glog.Infof("volume worker queue shutting down")return}}
}
updateVolume() 方法,新了一个check后调用ctrl.syncVolume(volume)进一步处理。
// updateVolume runs in worker thread and handles "volume added",
// "volume updated" and "periodic sync" events.
func (ctrl *PersistentVolumeController) updateVolume(volume *v1.PersistentVolume) {// Store the new volume version in the cache and do not process it if this// is an old version.new, err := ctrl.storeVolumeUpdate(volume)if err != nil {glog.Errorf("%v", err)}if !new {return}// 根据当前 PV 对象的规格对 PV 和 PVC 进行绑定或者解绑err = ctrl.syncVolume(volume)if err != nil {if errors.IsConflict(err) {// Version conflict error happens quite often and the controller// recovers from it easily.glog.V(3).Infof("could not sync volume %q: %+v", volume.Name, err)} else {glog.Errorf("could not sync volume %q: %+v", volume.Name, err)}}
}
ctrl.syncVolume(volume)
是一个重要的函数,需要再仔细解读
说明: 代码中的 “volume” 等价于 PV, “claim” 等价于 “PVC”
syncVolume方法为核心方法,主要调谐更新pv的状态:
(1)如果spec.claimRef未设置,则是未使用过的pv,则调用updateVolumePhase函数更新状态设置 phase 为 available;
(2)如果spec.claimRef不为空,则该pv已经与pvc bound过了,此时若对应的pvc不存在,则更新pv状态为released;
(3)如果pv对应的pvc被删除了,调用ctrl.reclaimVolume根据pv的回收策略进行相应操作,如果是retain,则不做操作,如果是delete,则调用volume plugin来删除底层存储,并删除pv对象(当volume plugin为csi时,将走out-tree逻辑,pv controller不做删除存储与pv对象的操作,由external provisioner组件来完成该操作)。
// syncVolume is the main controller method to decide what to do with a volume.
// It's invoked by appropriate cache.Controller callbacks when a volume is
// created, updated or periodically synced. We do not differentiate between
// these events.
func (ctrl *PersistentVolumeController) syncVolume(volume *v1.PersistentVolume) error {glog.V(4).Infof("synchronizing PersistentVolume[%s]: %s", volume.Name, getVolumeStatusForLogging(volume))// [Unit test set 4]// 如果 volume.Spec.ClaimRef 为 nil,说明 volume 是未使用的,那么就调用ctrl.updateVolumePhase()函数// 将 volume 的状态更新为 Availableif volume.Spec.ClaimRef == nil {// Volume is unusedglog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is unused", volume.Name)if _, err := ctrl.updateVolumePhase(volume, v1.VolumeAvailable, ""); err != nil {// Nothing was saved; we will fall back into the same// condition in the next call to this methodreturn err}return nil} else /* pv.Spec.ClaimRef != nil */ {// Volume is bound to a claim.// 如果 volume.Spec.ClaimRef.UID 为 ”“,说明 volume 是未使用的,那么就调用ctrl.updateVolumePhase()函数// 将 volume 的状态更新为 Availableif volume.Spec.ClaimRef.UID == "" {// The PV is reserved for a PVC; that PVC has not yet been// bound to this PV; the PVC sync will handle it.glog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is pre-bound to claim %s", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))if _, err := ctrl.updateVolumePhase(volume, v1.VolumeAvailable, ""); err != nil {// Nothing was saved; we will fall back into the same// condition in the next call to this methodreturn err}return nil}glog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is bound to claim %s", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))// Get the PVC by _name_// 通过 volume.Spec.ClaimRef 从本地 cache 中查询到对应的 PVC 对象var claim *v1.PersistentVolumeClaimclaimName := claimrefToClaimKey(volume.Spec.ClaimRef)obj, found, err := ctrl.claims.GetByKey(claimName)if err != nil {return err}// 如果没有在本地 cache 中查找到 PVC,同时 PV 的 annotation // 又是包括"pv.kubernetes.io/bound-by-controller"的// 需要再去 apiserver 做double-checkif !found && metav1.HasAnnotation(volume.ObjectMeta, annBoundByController) {// If PV is bound by external PV binder (e.g. kube-scheduler), it's// possible on heavy load that corresponding PVC is not synced to// controller local cache yet. So we need to double-check PVC in// 1) informer cache// 2) apiserver if not found in informer cache// to make sure we will not reclaim a PV wrongly.// Note that only non-released and non-failed volumes will be// updated to Released state when PVC does not eixst.if volume.Status.Phase != v1.VolumeReleased && volume.Status.Phase != v1.VolumeFailed {// 从 cache 中查找 pvcobj, err = ctrl.claimLister.PersistentVolumeClaims(volume.Spec.ClaimRef.Namespace).Get(volume.Spec.ClaimRef.Name)if err != nil && !apierrs.IsNotFound(err) {return err}found = !apierrs.IsNotFound(err)if !found {// 如果 cache 中不存在则再去 apiserver 查询是否存在obj, err = ctrl.kubeClient.CoreV1().PersistentVolumeClaims(volume.Spec.ClaimRef.Namespace).Get(volume.Spec.ClaimRef.Name, metav1.GetOptions{})if err != nil && !apierrs.IsNotFound(err) {return err}found = !apierrs.IsNotFound(err)}}}// 如果都没找到 PVC,则抛出错误if !found {glog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s not found", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))// Fall through with claim = nil// 如果 PVC 找到了, 将 cache 中的 PVC 转换为 PVC 对象} else {var ok boolclaim, ok = obj.(*v1.PersistentVolumeClaim)if !ok {return fmt.Errorf("Cannot convert object from volume cache to volume %q!?: %#v", claim.Spec.VolumeName, obj)}glog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s found: %s", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef), getClaimStatusForLogging(claim))}// 如果 PVC 的 UID 与 PV 中的 UID 不同,说明绑定错误了if claim != nil && claim.UID != volume.Spec.ClaimRef.UID {// The claim that the PV was pointing to was deleted, and another// with the same name created.glog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s has different UID, the old one must have been deleted", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))// Treat the volume as bound to a missing claim.claim = nil}// 如果 PVC 不存在,而且 PV 的状态是 ”Released“ 或 "Failed", 则将 apiserver 和 本地 cache中的PV 状态都更新为 "Released"if claim == nil {// If we get into this block, the claim must have been deleted;// NOTE: reclaimVolume may either release the PV back into the pool or// recycle it or do nothing (retain)// Do not overwrite previous Failed state - let the user see that// something went wrong, while we still re-try to reclaim the// volume.if volume.Status.Phase != v1.VolumeReleased && volume.Status.Phase != v1.VolumeFailed {// Also, log this only once:glog.V(2).Infof("volume %q is released and reclaim policy %q will be executed", volume.Name, volume.Spec.PersistentVolumeReclaimPolicy)if volume, err = ctrl.updateVolumePhase(volume, v1.VolumeReleased, ""); err != nil {// Nothing was saved; we will fall back into the same condition// in the next call to this methodreturn err}}if err = ctrl.reclaimVolume(volume); err != nil {// Release failed, we will fall back into the same condition// in the next call to this methodreturn err}return nil// 如果 claim 不为 nil 而且 claim.Spec.VolumeName == "",则再检查 volumeMode 是否匹配// 如果不匹配,则写 evenRecorder 后返回} else if claim.Spec.VolumeName == "" {if isMisMatch, err := checkVolumeModeMisMatches(&claim.Spec, &volume.Spec); err != nil || isMisMatch {// Binding for the volume won't be called in syncUnboundClaim,// because findBestMatchForClaim won't return the volume due to volumeMode mismatch.volumeMsg := fmt.Sprintf("Cannot bind PersistentVolume to requested PersistentVolumeClaim %q due to incompatible volumeMode.", claim.Name)ctrl.eventRecorder.Event(volume, v1.EventTypeWarning, events.VolumeMismatch, volumeMsg)claimMsg := fmt.Sprintf("Cannot bind PersistentVolume %q to requested PersistentVolumeClaim due to incompatible volumeMode.", volume.Name)ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.VolumeMismatch, claimMsg)// Skipping syncClaimreturn nil}// 如果 volumeMode 是匹配的,则继续往下走: 判断 PV 是否有 annotation: "pv.kubernetes.io/bound-by-controller" // 如果有这个annotation, 后续会交给 PVC sync 来处理// 如果没有这个annotation, 等待用户自行修复if metav1.HasAnnotation(volume.ObjectMeta, annBoundByController) {// The binding is not completed; let PVC sync handle itglog.V(4).Infof("synchronizing PersistentVolume[%s]: volume not bound yet, waiting for syncClaim to fix it", volume.Name)} else {// Dangling PV; try to re-establish the link in the PVC syncglog.V(4).Infof("synchronizing PersistentVolume[%s]: volume was bound and got unbound (by user?), waiting for syncClaim to fix it", volume.Name)}// In both cases, the volume is Bound and the claim is Pending.// Next syncClaim will fix it. To speed it up, we enqueue the claim// into the controller, which results in syncClaim to be called// shortly (and in the right worker goroutine).// This speeds up binding of provisioned volumes - provisioner saves// only the new PV and it expects that next syncClaim will bind the// claim to it.// 把 PVC 加入 claimQueue 队列,等待 syncClaim() 任务来处理ctrl.claimQueue.Add(claimToClaimKey(claim))return nil// 如果 claim 不为空,而且 claim.Spec.VolumeName == volume.Name,说明 binding 是正确的。// 这是将volume 的状态更新为 Bound} else if claim.Spec.VolumeName == volume.Name {// Volume is bound to a claim properly, update status if necessaryglog.V(4).Infof("synchronizing PersistentVolume[%s]: all is bound", volume.Name)if _, err = ctrl.updateVolumePhase(volume, v1.VolumeBound, ""); err != nil {// Nothing was saved; we will fall back into the same// condition in the next call to this methodreturn err}return nil} else {// Volume is bound to a claim, but the claim is bound elsewhere// 如果PV 的 annotation 包括 "pv.kubernetes.io/provisioned-by" 并且回收策略 volume.Spec.PersistentVolumeReclaimPolicy 是 Delete时,// 再判断 PV 的phase 如果不是 Released 和 Failed,则更新 PV 的Phase为 Releasedif metav1.HasAnnotation(volume.ObjectMeta, annDynamicallyProvisioned) && volume.Spec.PersistentVolumeReclaimPolicy == v1.PersistentVolumeReclaimDelete {// This volume was dynamically provisioned for this claim. The// claim got bound elsewhere, and thus this volume is not// needed. Delete it.// Mark the volume as Released for external deleters and to let// the user know. Don't overwrite existing Failed status!if volume.Status.Phase != v1.VolumeReleased && volume.Status.Phase != v1.VolumeFailed {// Also, log this only once:glog.V(2).Infof("dynamically volume %q is released and it will be deleted", volume.Name)// 更新 PV 的 phaseif volume, err = ctrl.updateVolumePhase(volume, v1.VolumeReleased, ""); err != nil {// Nothing was saved; we will fall back into the same condition// in the next call to this methodreturn err}}// 之后再 调用 ctrl.reclaimVolume(volume) 对PV 进行回收处理if err = ctrl.reclaimVolume(volume); err != nil {// Deletion failed, we will fall back into the same condition// in the next call to this methodreturn err}return nil// 执行 unbindVolume()操作} else {// Volume is bound to a claim, but the claim is bound elsewhere// and it's not dynamically provisioned.if metav1.HasAnnotation(volume.ObjectMeta, annBoundByController) {// This is part of the normal operation of the controller; the// controller tried to use this volume for a claim but the claim// was fulfilled by another volume. We did this; fix it.glog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is bound by controller to a claim that is bound to another volume, unbinding", volume.Name)if err = ctrl.unbindVolume(volume); err != nil {return err}return nil} else {// The PV must have been created with this ptr; leave it alone.glog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is bound by user to a claim that is bound to another volume, waiting for the claim to get unbound", volume.Name)// This just updates the volume phase and clears// volume.Spec.ClaimRef.UID. It leaves the volume pre-bound// to the claim.if err = ctrl.unbindVolume(volume); err != nil {return err}return nil}}}}
}
由于代码较长,梳理出主要逻辑的流程图大致如下:
updateVolumePhase()
- 更新etcd中 volume 的phase状态
- 更新本地informer cache中的phase状态
// updateVolumePhase saves new volume phase to API server.
func (ctrl *PersistentVolumeController) updateVolumePhase(volume *v1.PersistentVolume, phase v1.PersistentVolumePhase, message string) (*v1.PersistentVolume, error) {glog.V(4).Infof("updating PersistentVolume[%s]: set phase %s", volume.Name, phase)// 如果 Phase 已经满足,就什么也不做if volume.Status.Phase == phase {// Nothing to do.glog.V(4).Infof("updating PersistentVolume[%s]: phase %s already set", volume.Name, phase)return volume, nil}// 核心代码!volumeClone := volume.DeepCopy()volumeClone.Status.Phase = phasevolumeClone.Status.Message = message// 核心代码!请求 api server 对 volume 的状态进行修改newVol, err := ctrl.kubeClient.CoreV1().PersistentVolumes().UpdateStatus(volumeClone)if err != nil {glog.V(4).Infof("updating PersistentVolume[%s]: set phase %s failed: %v", volume.Name, phase, err)return newVol, err}// 之后再修改本地 informer 中 cache 中的数据_, err = ctrl.storeVolumeUpdate(newVol)if err != nil {glog.V(4).Infof("updating PersistentVolume[%s]: cannot update internal cache: %v", volume.Name, err)return newVol, err}glog.V(2).Infof("volume %q entered phase %q", volume.Name, phase)return newVol, err
}
unbindVolume(volume)
作用是将 一个 Bound 状态的 volume 执行 unbound 操作
- 将本地cache中的 volume 进行unbond操作:1) volumeClone.Spec.ClaimRef = nil 2) volumeClone.Annotations 中删除 “pv.kubernetes.io/bound-by-controller” 字段; 3) volume phase 修改为 “Available”
- 发起http请求,对apiserver中的 volume 进行unbound操作(同上)。
// unbindVolume rolls back previous binding of the volume. This may be necessary
// when two controllers bound two volumes to single claim - when we detect this,
// only one binding succeeds and the second one must be rolled back.
// This method updates both Spec and Status.
// It returns on first error, it's up to the caller to implement some retry
// mechanism.
func (ctrl *PersistentVolumeController) unbindVolume(volume *v1.PersistentVolume) error {glog.V(4).Infof("updating PersistentVolume[%s]: rolling back binding from %q", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))// Save the PV only when any modification is necessary.volumeClone := volume.DeepCopy()if metav1.HasAnnotation(volume.ObjectMeta, annBoundByController) {// The volume was bound by the controller.// 将volume中的 ClaimRef 置为 nilvolumeClone.Spec.ClaimRef = nil// 从删除volume的annotation对应字段delete(volumeClone.Annotations, annBoundByController)if len(volumeClone.Annotations) == 0 {// No annotations look better than empty annotation map (and it's easier// to test).volumeClone.Annotations = nil}} else {// The volume was pre-bound by user. Clear only the binging UID.// 如果是用户手动创建的volume 则把 UID 设置为""volumeClone.Spec.ClaimRef.UID = ""}newVol, err := ctrl.kubeClient.CoreV1().PersistentVolumes().Update(volumeClone)if err != nil {glog.V(4).Infof("updating PersistentVolume[%s]: rollback failed: %v", volume.Name, err)return err}_, err = ctrl.storeVolumeUpdate(newVol)if err != nil {glog.V(4).Infof("updating PersistentVolume[%s]: cannot update internal cache: %v", volume.Name, err)return err}glog.V(4).Infof("updating PersistentVolume[%s]: rolled back", newVol.Name)// Update the status_, err = ctrl.updateVolumePhase(newVol, v1.VolumeAvailable, "")return err
}
ctrl.reclaimVolume
// reclaimVolume implements volume.Spec.PersistentVolumeReclaimPolicy and
// starts appropriate reclaim action.
func (ctrl *PersistentVolumeController) reclaimVolume(volume *v1.PersistentVolume) error {switch volume.Spec.PersistentVolumeReclaimPolicy {// 如果回收策略是 Retain ,则什么也不做case v1.PersistentVolumeReclaimRetain:glog.V(4).Infof("reclaimVolume[%s]: policy is Retain, nothing to do", volume.Name)// 如果回收策略是 recycle, 调用plugin 执行 volume 的清理操作(清理后,该PV后续可以被重复使用)case v1.PersistentVolumeReclaimRecycle:glog.V(4).Infof("reclaimVolume[%s]: policy is Recycle", volume.Name)opName := fmt.Sprintf("recycle-%s[%s]", volume.Name, string(volume.UID))ctrl.scheduleOperation(opName, func() error {ctrl.recycleVolumeOperation(volume)return nil})// 如果回收策略是 Delete,调用plugin 执行 volume 的 delete操作case v1.PersistentVolumeReclaimDelete:glog.V(4).Infof("reclaimVolume[%s]: policy is Delete", volume.Name)opName := fmt.Sprintf("delete-%s[%s]", volume.Name, string(volume.UID))startTime := time.Now()// 调用plugin 执行 volume 的 delete操作ctrl.scheduleOperation(opName, func() error {pluginName, err := ctrl.deleteVolumeOperation(volume)timeTaken := time.Since(startTime).Seconds()metrics.RecordVolumeOperationMetric(pluginName, "delete", timeTaken, err)return err})default:// Unknown PersistentVolumeReclaimPolicyif _, err := ctrl.updateVolumePhaseWithEvent(volume, v1.VolumeFailed, v1.EventTypeWarning, "VolumeUnknownReclaimPolicy", "Volume has unrecognized PersistentVolumeReclaimPolicy"); err != nil {return err}}return nil
}
ctrl.deleteVolumeOperation()
// deleteVolumeOperation deletes a volume. This method is running in standalone
// goroutine and already has all necessary locks.
func (ctrl *PersistentVolumeController) deleteVolumeOperation(volume *v1.PersistentVolume) (string, error) {glog.V(4).Infof("deleteVolumeOperation [%s] started", volume.Name)// This method may have been waiting for a volume lock for some time.// Previous deleteVolumeOperation might just have saved an updated version, so// read current volume state now.newVolume, err := ctrl.kubeClient.CoreV1().PersistentVolumes().Get(volume.Name, metav1.GetOptions{})if err != nil {glog.V(3).Infof("error reading persistent volume %q: %v", volume.Name, err)return "", nil}needsReclaim, err := ctrl.isVolumeReleased(newVolume)if err != nil {glog.V(3).Infof("error reading claim for volume %q: %v", volume.Name, err)return "", nil}if !needsReclaim {glog.V(3).Infof("volume %q no longer needs deletion, skipping", volume.Name)return "", nil}// 核心! 执行 delete volume 操作pluginName, deleted, err := ctrl.doDeleteVolume(volume)if err != nil {// Delete failed, update the volume and emit an event.glog.V(3).Infof("deletion of volume %q failed: %v", volume.Name, err)if vol.IsDeletedVolumeInUse(err) {// The plugin needs more time, don't mark the volume as Failed// and send Normal event onlyctrl.eventRecorder.Event(volume, v1.EventTypeNormal, events.VolumeDelete, err.Error())} else {// The plugin failed, mark the volume as Failed and send Warning// eventif _, err := ctrl.updateVolumePhaseWithEvent(volume, v1.VolumeFailed, v1.EventTypeWarning, events.VolumeFailedDelete, err.Error()); err != nil {glog.V(4).Infof("deleteVolumeOperation [%s]: failed to mark volume as failed: %v", volume.Name, err)// Save failed, retry on the next deletion attemptreturn pluginName, err}}// Despite the volume being Failed, the controller will retry deleting// the volume in every syncVolume() call.return pluginName, err}if !deleted {// The volume waits for deletion by an external plugin. Do nothing.return pluginName, nil}glog.V(4).Infof("deleteVolumeOperation [%s]: success", volume.Name)// Delete the volume// 当底层 volume 被删除后,则将apiserver中的 volume if err = ctrl.kubeClient.CoreV1().PersistentVolumes().Delete(volume.Name, nil); err != nil {// Oops, could not delete the volume and therefore the controller will// try to delete the volume again on next update. We _could_ maintain a// cache of "recently deleted volumes" and avoid unnecessary deletion,// this is left out as future optimization.glog.V(3).Infof("failed to delete volume %q from database: %v", volume.Name, err)return pluginName, nil}return pluginName, nil
}// doDeleteVolume finds appropriate delete plugin and deletes given volume, returning
// the volume plugin name. Also, it returns 'true', when the volume was deleted and
// 'false' when the volume cannot be deleted because of the deleter is external. No
// error should be reported in this case.
func (ctrl *PersistentVolumeController) doDeleteVolume(volume *v1.PersistentVolume) (string, bool, error) {glog.V(4).Infof("doDeleteVolume [%s]", volume.Name)var err error// 查到插件 volume pluginplugin, err := ctrl.findDeletablePlugin(volume)if err != nil {return "", false, err}if plugin == nil {// External deleter is requested, do nothingglog.V(3).Infof("external deleter for volume %q requested, ignoring", volume.Name)return "", false, nil}// Plugin foundpluginName := plugin.GetPluginName()glog.V(5).Infof("found a deleter plugin %q for volume %q", pluginName, volume.Name)spec := vol.NewSpecFromPersistentVolume(volume, false)// 创建一个plugin 的 deleterdeleter, err := plugin.NewDeleter(spec)if err != nil {// Cannot create deleterreturn pluginName, false, fmt.Errorf("Failed to create deleter for volume %q: %v", volume.Name, err)}//调用 Delete()方法执行 存储层面volume的删除opComplete := util.OperationCompleteHook(pluginName, "volume_delete")err = deleter.Delete()opComplete(&err)if err != nil {// Deleter failedreturn pluginName, false, err}glog.V(2).Infof("volume %q deleted", volume.Name)return pluginName, true, nil
}// findDeletablePlugin finds a deleter plugin for a given volume. It returns
// either the deleter plugin or nil when an external deleter is requested.
func (ctrl *PersistentVolumeController) findDeletablePlugin(volume *v1.PersistentVolume) (vol.DeletableVolumePlugin, error) {// Find a plugin. Try to find the same plugin that provisioned the volumevar plugin vol.DeletableVolumePluginif metav1.HasAnnotation(volume.ObjectMeta, annDynamicallyProvisioned) {provisionPluginName := volume.Annotations[annDynamicallyProvisioned]if provisionPluginName != "" {// 通过 volume 中 annotation的 "pv.kubernetes.io/provisioned-by" 查找对应的 plugin 名称plugin, err := ctrl.volumePluginMgr.FindDeletablePluginByName(provisionPluginName)if err != nil {if !strings.HasPrefix(provisionPluginName, "kubernetes.io/") {// External provisioner is requested, do not report errorreturn nil, nil}return nil, err}return plugin, nil}}// 如果上面没找到再去 volume 的 Spec中查找// The plugin that provisioned the volume was not found or the volume// was not dynamically provisioned. Try to find a plugin by spec.spec := vol.NewSpecFromPersistentVolume(volume, false)plugin, err := ctrl.volumePluginMgr.FindDeletablePluginBySpec(spec)if err != nil {// No deleter found. Emit an event and mark the volume Failed.return nil, fmt.Errorf("Error getting deleter volume plugin for volume %q: %v", volume.Name, err)}return plugin, nil
}
ctrl.claimWorker
ctrl.claimWorker() 方法是通过协程方式启动,对claim(PVC) 进行生命周期管理,即负责处理 PVC 的绑定、创建和删除等操作。
(1) 先从本地informer cache中查询claim, 如果 informer cache中存在, 表示PVC是新来的事件待处理。 交给updateClaim()进行处理;
(2) 如果 informer cache中没有, 表示PVC是已经被删除了的。需要做删除处理
// claimWorker processes items from claimQueue. It must run only once,
// syncClaim is not reentrant.
func (ctrl *PersistentVolumeController) claimWorker() {workFunc := func() bool {keyObj, quit := ctrl.claimQueue.Get()if quit {return true}defer ctrl.claimQueue.Done(keyObj)key := keyObj.(string)glog.V(5).Infof("claimWorker[%s]", key)namespace, name, err := cache.SplitMetaNamespaceKey(key)if err != nil {glog.V(4).Infof("error getting namespace & name of claim %q to get claim from informer: %v", key, err)return false}claim, err := ctrl.claimLister.PersistentVolumeClaims(namespace).Get(name)if err == nil {// The claim still exists in informer cache, the event must have// been add/update/sync// 核心代码! 如果 informer cache中存在, 表示PVC是新来的事件待处理。 交给updateClaim()进行处理ctrl.updateClaim(claim)return false}if !errors.IsNotFound(err) {glog.V(2).Infof("error getting claim %q from informer: %v", key, err)return false}// The claim is not in informer cache, the event must have been "delete"claimObj, found, err := ctrl.claims.GetByKey(key)if err != nil {glog.V(2).Infof("error getting claim %q from cache: %v", key, err)return false}if !found {// The controller has already processed the delete event and// deleted the claim from its cacheglog.V(2).Infof("deletion of claim %q was already processed", key)return false}claim, ok := claimObj.(*v1.PersistentVolumeClaim)if !ok {glog.Errorf("expected claim, got %+v", claimObj)return false}// 核心代码! 如果 informer cache中没有, 表示PVC是已经被删除了的。需要做删除处理ctrl.deleteClaim(claim)return false}// 核心代码! 持续循环地从 claimQueue 里面获取到的 PersistentVolumeClaim,并进行相应处理for {if quit := workFunc(); quit {glog.Infof("claim worker queue shutting down")return}}
}
重点函数包括了ctrl.updateClaim(claim)
和 ctrl.deleteClaim(claim)
ctrl.updateClaim(claim)
ctrl.updateClaim(claim) 用于处理 claim 的 ”创建“、”更新“以及周期性更新的事件
// updateClaim runs in worker thread and handles "claim added",
// "claim updated" and "periodic sync" events.
func (ctrl *PersistentVolumeController) updateClaim(claim *v1.PersistentVolumeClaim) {// Store the new claim version in the cache and do not process it if this is// an old version.// 先检查 claim 的版本号,如果不是"新的",就不处理new, err := ctrl.storeClaimUpdate(claim)if err != nil {glog.Errorf("%v", err)}if !new {return}// 核心代码!如果通过上面检查,则使用函数ctrl.syncClaim(claim)进一步处理err = ctrl.syncClaim(claim)if err != nil {if errors.IsConflict(err) {// Version conflict error happens quite often and the controller// recovers from it easily.glog.V(3).Infof("could not sync claim %q: %+v", claimToClaimKey(claim), err)} else {glog.Errorf("could not sync volume %q: %+v", claimToClaimKey(claim), err)}}
}
ctrl.syncClaim(claim)
syncClaim是 pv controler的主要方法,它决定了具体如何处理claim。该方法又根据annotation中是否包括"pv.kubernetes.io/bind-completed",再决定将claim进一步交个ctrl.syncUnboundClaim(claim) 或 ctrl.syncBoundClaim(claim) 处理
// syncClaim is the main controller method to decide what to do with a claim.
// It's invoked by appropriate cache.Controller callbacks when a claim is
// created, updated or periodically synced. We do not differentiate between
// these events.
// For easier readability, it was split into syncUnboundClaim and syncBoundClaim
// methods.
func (ctrl *PersistentVolumeController) syncClaim(claim *v1.PersistentVolumeClaim) error {glog.V(4).Infof("synchronizing PersistentVolumeClaim[%s]: %s", claimToClaimKey(claim), getClaimStatusForLogging(claim))if !metav1.HasAnnotation(claim.ObjectMeta, annBindCompleted) {// 如果不包括annotation "pv.kubernetes.io/bind-completed",这个annotation用于标记 bind 是否完成。// 此时没有这个annotation,说明是还没有Bind操作的claim,比如处理pengding状态的claim,接下来交给 ctrl.syncUnboundClaim(claim) 处理return ctrl.syncUnboundClaim(claim)} else {// 如果包括annotation "pv.kubernetes.io/bind-completed",说明已经 bind 过的claim,接下来交给 ctrl.syncBoundClaim(claim) 处理return ctrl.syncBoundClaim(claim)}
}
ctrl.syncBoundClaim()
ctrl.syncBoundClaim() 作为 pv controller 最重要的方法,必须仔细分析。代码的逻辑如下:
- 如果 claim.Spec.VolumeName 的值为空,说明 claim 之前 Bound过的但是这个值确为空了,这种情况就将claim状态改为"lost"并抛出event并返回
- 从本地informer cache中查找对应的 claim.Spec.VolumeName
- 如果没找对应的volume,说明claim bind了一个不存在的volume。这种情况就将claim状态改为"lost"并抛出event并返回
- 如果找到对应的volume
- 如果volume.Spec.ClaimRef == nil,说明claim 已经bind 了volume, 但volume确没有bind对应的claim,这种情况就调用 ctrl.bind(volume, claim)方法让volume 绑定 claim
- 如果volume.Spec.ClaimRef.UID == claim.UID,说明逻辑都是正常的。接下来任然做一次ctrl.bind(),但是大多数情况会直接返回(因为所有的操作都已经做完了)
- 其他情况,比如volume.Spec.ClaimRef.UID 不等于 claim.UID,说明bind()错了。这种情况就将claim状态改为"lost"并抛出event并返回
// syncBoundClaim is the main controller method to decide what to do with a
// bound claim.
func (ctrl *PersistentVolumeController) syncBoundClaim(claim *v1.PersistentVolumeClaim) error {// HasAnnotation(pvc, annBindCompleted)// This PVC has previously been bound// OBSERVATION: pvc is not "Pending"// [Unit test set 3]// 第1种情况: 如果 claim.Spec.VolumeName 的值为空,说明 claim 之前 Bound过的但是这个值异常了,抛出event并返回 if claim.Spec.VolumeName == "" {// Claim was bound before but not any more.if _, err := ctrl.updateClaimStatusWithEvent(claim, v1.ClaimLost, nil, v1.EventTypeWarning, "ClaimLost", "Bound claim has lost reference to PersistentVolume. Data on the volume is lost!"); err != nil {return err}return nil}// 找到对应的 volume obj, found, err := ctrl.volumes.store.GetByKey(claim.Spec.VolumeName)if err != nil {return err}// 第2种情况: 如果claim.Spec.VolumeName 有值,claim之前Bound过的,但 volume 不存在了或许被删除了,抛出event并返回 if !found {// Claim is bound to a non-existing volume.if _, err = ctrl.updateClaimStatusWithEvent(claim, v1.ClaimLost, nil, v1.EventTypeWarning, "ClaimLost", "Bound claim has lost its PersistentVolume. Data on the volume is lost!"); err != nil {return err}return nil} else {volume, ok := obj.(*v1.PersistentVolume)if !ok {return fmt.Errorf("Cannot convert object from volume cache to volume %q!?: %#v", claim.Spec.VolumeName, obj)}glog.V(4).Infof("synchronizing bound PersistentVolumeClaim[%s]: volume %q found: %s", claimToClaimKey(claim), claim.Spec.VolumeName, getVolumeStatusForLogging(volume))// 第3种情况:如果claim.Spec.VolumeName 有值,而且volume也存在,但是volume对应的值volume.Spec.ClaimRef为nil// 可能有2种场景:1)claim bound了但是volume 确是unbound的。2)claim bound了但是controller 没有来得及updated volume// 不论具体是哪种场景,接下来都再次执行一次ctrl.bind()操作if volume.Spec.ClaimRef == nil {// Claim is bound but volume has come unbound.// Or, a claim was bound and the controller has not received updated// volume yet. We can't distinguish these cases.// Bind the volume again and set all states to Bound.glog.V(4).Infof("synchronizing bound PersistentVolumeClaim[%s]: volume is unbound, fixing", claimToClaimKey(claim))if err = ctrl.bind(volume, claim); err != nil {// Objects not saved, next syncPV or syncClaim will try againreturn err}return nil// 第4种情况: 如果claim.Spec.VolumeName 有值,而且volume也存在,而且volume对应的值volume.Spec.ClaimRef 不为nil // 而且 volume.Spec.ClaimRef.UID == claim.UID;说明逻辑都是正常的。接下来任然做一次ctrl.bind()} else if volume.Spec.ClaimRef.UID == claim.UID {// All is well// NOTE: syncPV can handle this so it can be left out.// NOTE: bind() call here will do nothing in most cases as// everything should be already set.glog.V(4).Infof("synchronizing bound PersistentVolumeClaim[%s]: claim is already correctly bound", claimToClaimKey(claim))if err = ctrl.bind(volume, claim); err != nil {// Objects not saved, next syncPV or syncClaim will try againreturn err}return nil// 其他情况: volume.Spec.ClaimRef.UID 不等于 claim.UID,说明bind()错了。这时抛出event} else {// Claim is bound but volume has a different claimant.// Set the claim phase to 'Lost', which is a terminal// phase.if _, err = ctrl.updateClaimStatusWithEvent(claim, v1.ClaimLost, nil, v1.EventTypeWarning, "ClaimMisbound", "Two claims are bound to the same volume, this one is bound incorrectly"); err != nil {return err}return nil}}
}
ctrl.bind(volume, claim)
// bind saves binding information both to the volume and the claim and marks
// both objects as Bound. Volume is saved first.
// It returns on first error, it's up to the caller to implement some retry
// mechanism.
func (ctrl *PersistentVolumeController) bind(volume *v1.PersistentVolume, claim *v1.PersistentVolumeClaim) error {var err errorvar updatedClaim *v1.PersistentVolumeClaimvar updatedVolume *v1.PersistentVolume// 将 volume bind 到 claimif updatedVolume, err = ctrl.bindVolumeToClaim(volume, claim); err != nil {return err}volume = updatedVolume// 更新 volume 的phase 为Bound 状态if updatedVolume, err = ctrl.updateVolumePhase(volume, v1.VolumeBound, ""); err != nil {return err}volume = updatedVolume// 将 volume claim 到 volumeif updatedClaim, err = ctrl.bindClaimToVolume(claim, volume); err != nil {return err}claim = updatedClaim// 更新 claim 的phase 为Bound 状态if updatedClaim, err = ctrl.updateClaimStatus(claim, v1.ClaimBound, volume); err != nil {return err}claim = updatedClaimreturn nil
}
syncUnboundClaim
-
如果claim.Spec.VolumeName == " ",说明该PVC 是一个还没有完成bingding的新的PVC,此时PVC处于"Pengding"状态
-
这时就从集群中存在的 volume 中查询是否匹配的 claim(空间大小满足同时access模式满足而且处于”Available“ 状态的), 根据是否没找到了匹配的volume,分2中情况分配处理:
-
如果没找到匹配的volume:
-
再检查 PVC 对象 中的storageClass,看storageClass的volumeBindingMode 是否启用了延迟bingding,如果启用了,就返回,等该PVC消费者例如pod,需要时才会对这个claim进行bind处理。
-
如果没有启用延迟bingding,也就是用了默认的Immediate模式,就判断 storageClass 是否设置为"",如果不为空就用claim中设置的 storageClass ,调用ctrl.provisionClaim(claim)方法创建一个新的 volume
-
-
如果找到匹配的volume,则调用ctrl.bind(volume, claim)将volume与claim进行绑定,如果绑定正常就正常退出,如果bind错误就返回错误。
-
-
如果claim.Spec.VolumeName != " ",说明 claim
指定了
绑定到一个特定的 VolumeName。那接下来先去本地cache中查找对应名称的 volume-
如果本地cache中没有找到,说明这时可能 volume 还没创建,那就将claim 状态设置为 “Pengding”,等下一个循环周期再处理,有可能下一个循环周期有volume 就创建好了。
-
如果本地cache中找到了指定名称的 volume,那就检查volume 的字段volume.Spec.ClaimRef是不是为nil:
-
如果为nil,说明 volume 是没有被使用的、处于"Availabel"的 volume ,接下来就将volume 与 claim 绑定,完成绑定后,claim 是 “Bound” 状态, pv 也是 "Bound"状态,这时就可以正常退出
-
如果不为nil,调用方法 isVolumeBoundToClaim(volume, claim) 判断volume 是否已经绑定给这个 claim 了,“也就是说claim 指定要的是 volume,volume也要的是该claim,双向奔赴”,如果已经绑定了,则在次调用ctrl.bind(volume, claim),完成绑定
-
-
// syncUnboundClaim is the main controller method to decide what to do with an
// unbound claim.
func (ctrl *PersistentVolumeController) syncUnboundClaim(claim *v1.PersistentVolumeClaim) error {// This is a new PVC that has not completed binding// OBSERVATION: pvc is "Pending"// claim.Spec.VolumeName == "",说明该PVC 是一个还没有完成bingding的新的PVC,此时PVC处于"Pengding"状态if claim.Spec.VolumeName == "" {// User did not care which PV they get.// 检查 PVC 对象 中的storageClass,看storageClass的volumeBindingMode 是否启用了延迟bingding(默认值为Immediate)delayBinding, err := ctrl.shouldDelayBinding(claim)if err != nil {return err}// [Unit test set 1]// 从集群中存在的 volume 中查询是否匹配的 claimvolume, err := ctrl.volumes.findBestMatchForClaim(claim, delayBinding)if err != nil {glog.V(2).Infof("synchronizing unbound PersistentVolumeClaim[%s]: Error finding PV for claim: %v", claimToClaimKey(claim), err)return fmt.Errorf("Error finding PV for claim %q: %v", claimToClaimKey(claim), err)}// volume == nil,说明没有找到匹配claim的volumeif volume == nil {glog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: no volume found", claimToClaimKey(claim))// No PV could be found// OBSERVATION: pvc is "Pending", will retryswitch {// 检查 storageClass 中的volumeBindingMode字段是否设置了delayBinding,如果为真,这里只是抛出event,不会再继续为PVC 创建对应的 PV// 需要等到消费者完成准备(比如需要挂载该PVC的pod已经OK了),才会去创建PV,并做绑定case delayBinding:ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, events.WaitForFirstConsumer, "waiting for first consumer to be created before binding")// 判断 storageClass 是否设置为"",如果不为空就用claim中设置的 storageClass 来创建一个新的 volume case v1helper.GetPersistentVolumeClaimClass(claim) != "":// ctrl.provisionClaim(claim)方法 用于创建新的 volumeif err = ctrl.provisionClaim(claim); err != nil {return err}return nil// 如果没有找到,而且也没设置 storageClass ,则抛出eventdefault:ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, events.FailedBinding, "no persistent volumes available for this claim and no storage class is set")}// Mark the claim as Pending and try to find a match in the next// periodic syncClaim// 将 claim 状态设置为 "Pending",等待下一个循环周期到来时会再次处理if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil {return err}return nil// 如果 volume != nil,说明找到匹配claim的处于”Available“ 状态的 volume,这种情况就调用ctrl.bind(volume, claim),将volume 与 claim 绑定} else /* pv != nil */ {// Found a PV for this claim// OBSERVATION: pvc is "Pending", pv is "Available"glog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q found: %s", claimToClaimKey(claim), volume.Name, getVolumeStatusForLogging(volume))if err = ctrl.bind(volume, claim); err != nil {// On any error saving the volume or the claim, subsequent// syncClaim will finish the binding.return err}// OBSERVATION: claim is "Bound", pv is "Bound"// 完成绑定后,claim 是 "Bound" 状态, pv 也是 "Bound"状态,这时就可以正常退出return nil}// 如果 pvc.Spec.VolumeName != nil ,说明 claim 指定了绑定到对应的 VolumeName;那边接下来先去本地cache中查找对应名称的 volume} else /* pvc.Spec.VolumeName != nil */ {// [Unit test set 2]// User asked for a specific PV.glog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q requested", claimToClaimKey(claim), claim.Spec.VolumeName)obj, found, err := ctrl.volumes.store.GetByKey(claim.Spec.VolumeName)if err != nil {return err}// 如果本地cache中没有找到,说明这是可能 volume 还没创建,那就将claim 状态设置为 "Pengding",等下一个循环周期再处理,有可能下一个循环周期 // 有可能下一个循环周期 volume 就创建好了if !found {// User asked for a PV that does not exist.// OBSERVATION: pvc is "Pending"// Retry later.glog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q requested and not found, will try again next time", claimToClaimKey(claim), claim.Spec.VolumeName)// 更新 claim 的状态为 'Pengding'if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil {return err}return nil// 如果本地cache中找到了指定名称的 volume,那就检查volume 的字段volume.Spec.ClaimRef不是为nil//如果为nil,说明 volume 是没有被使用的,处于"Availabel"的 volume ,接下来就将volume 与 claim 绑定} else {volume, ok := obj.(*v1.PersistentVolume)if !ok {return fmt.Errorf("Cannot convert object from volume cache to volume %q!?: %+v", claim.Spec.VolumeName, obj)}glog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q requested and found: %s", claimToClaimKey(claim), claim.Spec.VolumeName, getVolumeStatusForLogging(volume))// volume.Spec.ClaimRef == nil, 则将volume 与 claim 绑定if volume.Spec.ClaimRef == nil {// User asked for a PV that is not claimed// OBSERVATION: pvc is "Pending", pv is "Available"glog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume is unbound, binding", claimToClaimKey(claim))// 绑定前做一些检查工作if err = checkVolumeSatisfyClaim(volume, claim); err != nil {glog.V(4).Infof("Can't bind the claim to volume %q: %v", volume.Name, err)//send an eventmsg := fmt.Sprintf("Cannot bind to requested volume %q: %s", volume.Name, err)ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.VolumeMismatch, msg)//volume does not satisfy the requirements of the claimif _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil {return err}// 执行 claim 与 volume的绑定} else if err = ctrl.bind(volume, claim); err != nil {// On any error saving the volume or the claim, subsequent// syncClaim will finish the binding.return err}// OBSERVATION: pvc is "Bound", pv is "Bound"// 如果完成绑定后,claim 是 "Bound" 状态, pv 也是 "Bound"状态,这时就可以正常退出return nil// 接下来看看,如果volume.Spec.ClaimRef != nil时,// 而且isVolumeBoundToClaim(volume, claim)判断volume 如果已经绑定给 claim 了,而此时应为PVC 是 "Pengding"的,所以调用ctrl.bind(volume, claim),完成绑定} else if isVolumeBoundToClaim(volume, claim) {// User asked for a PV that is claimed by this PVC// OBSERVATION: pvc is "Pending", pv is "Bound"glog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume already bound, finishing the binding", claimToClaimKey(claim))// Finish the volume binding by adding claim UID.if err = ctrl.bind(volume, claim); err != nil {return err}// OBSERVATION: pvc is "Bound", pv is "Bound"// 如果完成绑定后,claim 是 "Bound" 状态, pv 也是 "Bound"状态,这时就可以正常退出return nil} else {// User asked for a PV that is claimed by someone else// OBSERVATION: pvc is "Pending", pv is "Bound"if !metav1.HasAnnotation(claim.ObjectMeta, annBoundByController) {glog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume already bound to different claim by user, will retry later", claimToClaimKey(claim))// User asked for a specific PV, retry laterif _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil {return err}return nil} else {// This should never happen because someone had to remove// annBindCompleted annotation on the claim.glog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume already bound to different claim %q by controller, THIS SHOULD NEVER HAPPEN", claimToClaimKey(claim), claimrefToClaimKey(volume.Spec.ClaimRef))return fmt.Errorf("Invalid binding of claim %q to volume %q: volume already claimed by %q", claimToClaimKey(claim), claim.Spec.VolumeName, claimrefToClaimKey(volume.Spec.ClaimRef))}}}}
}
provisionClaim
接下来分析一下provisionClaim是如何为claim,创建volume的。方法调用流程如下:
provisionClaim(claim) —> ctrl.provisionClaimOperation(claim) —> provisionClaimOperation(claim)
provisionClaim(claim)的逻辑是先判断是否启用了动态dynamicProvisoning,如果没启动则退出。如果启用了调用ctrl.provisionClaimOperation(claim)方法继续处理。
// provisionClaim starts new asynchronous operation to provision a claim if
// provisioning is enabled.
func (ctrl *PersistentVolumeController) provisionClaim(claim *v1.PersistentVolumeClaim) error {if !ctrl.enableDynamicProvisioning {return nil}glog.V(4).Infof("provisionClaim[%s]: started", claimToClaimKey(claim))opName := fmt.Sprintf("provision-%s[%s]", claimToClaimKey(claim), string(claim.UID))startTime := time.Now()// 调用ctrl.scheduleOperation()方法,执行provisionClaim; 这里ctrl.scheduleOperation()方法是保证只有一个gorouting运行ctrl.scheduleOperation(opName, func() error {// ctrl.provisionClaimOperation(claim) 为核心代码pluginName, err := ctrl.provisionClaimOperation(claim)timeTaken := time.Since(startTime).Seconds()metrics.RecordVolumeOperationMetric(pluginName, "provision", timeTaken, err)return err})return nil
}
provisionClaimOperation() 是创建卷的核心方法,它的逻辑如下
(1) 获取创建claim的storageclass
(2) 为 claim 添加一个 provisioner 的 annotation标识(volume.beta.kubernetes.io/storage-provisioner= class.Provisioner),这个标识表示这个claim 是由哪个plugin实现provision的
(3) 获取 Provisiong 的插件plugin,如果没有plugin==nil表示没有找到插件,则抛出event后退出。
(4) 如果找到plugin:
- 通过claim 的名称组装pv的名称(pv的名称就是"pvc"-claim.UID)
- 通过pvName 去k8s apisever查询是否存在volume, 如果已存在,则不需要再provision volume了,则退出
- 如果volume 不存在,准备开始创建volume,先准备创建volume需要的参数,之后创建一个provisioner 接口,provisioner实现了provision volume 的具体方法
- 执行 provision() 方法,这个方法会调用csi plugin,创建一个新的volume。具体什么volume,则要看plugin类型,比如ceph rbd或nfs等等
- 为创建出来的 volume 关联 pvc 对象(ClaimRef),尝试为volume 创建 pv 对象 (重复多次)
// provisionClaimOperation provisions a volume. This method is running in
// standalone goroutine and already has all necessary locks.
func (ctrl *PersistentVolumeController) provisionClaimOperation(claim *v1.PersistentVolumeClaim) (string, error) {// 获取创建claim的storageclassclaimClass := v1helper.GetPersistentVolumeClaimClass(claim)glog.V(4).Infof("provisionClaimOperation [%s] started, class: %q", claimToClaimKey(claim), claimClass)// 获取 Provisiong 的插件pluginplugin, storageClass, err := ctrl.findProvisionablePlugin(claim)if err != nil {ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, err.Error())glog.V(2).Infof("error finding provisioning plugin for claim %s: %v", claimToClaimKey(claim), err)// The controller will retry provisioning the volume in every// syncVolume() call.return "", err}var pluginName stringif plugin != nil {pluginName = plugin.GetPluginName()}// Add provisioner annotation so external provisioners know when to start// 为 claim 添加一个 provisioner annotationnewClaim, err := ctrl.setClaimProvisioner(claim, storageClass)if err != nil {// Save failed, the controller will retry in the next syncglog.V(2).Infof("error saving claim %s: %v", claimToClaimKey(claim), err)return pluginName, err}claim = newClaim// 如果没有找到plugin,则抛出event后退出。if plugin == nil {// findProvisionablePlugin returned no error nor plugin.// This means that an unknown provisioner is requested. Report an event// and wait for the external provisionermsg := fmt.Sprintf("waiting for a volume to be created, either by external provisioner %q or manually created by system administrator", storageClass.Provisioner)ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, events.ExternalProvisioning, msg)glog.V(3).Infof("provisioning claim %q: %s", claimToClaimKey(claim), msg)return pluginName, nil}// internal provisioning// A previous doProvisionClaim may just have finished while we were waiting for// the locks. Check that PV (with deterministic name) hasn't been provisioned// yet.// plugin != nil// 通过claim 的名称组装pv的名称(pv的名称就是"pvc"-claim.UID)pvName := ctrl.getProvisionedVolumeNameForClaim(claim)// 通过pvName 去k8s apisever查询是否存在volume, 如果已存在,则不需要再provision volume了,则退出函数,volume, err := ctrl.kubeClient.CoreV1().PersistentVolumes().Get(pvName, metav1.GetOptions{})if err == nil && volume != nil {// Volume has been already provisioned, nothing to do.glog.V(4).Infof("provisionClaimOperation [%s]: volume already exists, skipping", claimToClaimKey(claim))return pluginName, err}// Prepare a claimRef to the claim early (to fail before a volume is// provisioned)claimRef, err := ref.GetReference(scheme.Scheme, claim)if err != nil {glog.V(3).Infof("unexpected error getting claim reference: %v", err)return pluginName, err}// Gather provisioning optionstags := make(map[string]string)tags[CloudVolumeCreatedForClaimNamespaceTag] = claim.Namespacetags[CloudVolumeCreatedForClaimNameTag] = claim.Nametags[CloudVolumeCreatedForVolumeNameTag] = pvName// 准备创建volume需要的参数options := vol.VolumeOptions{PersistentVolumeReclaimPolicy: *storageClass.ReclaimPolicy,MountOptions: storageClass.MountOptions,CloudTags: &tags,ClusterName: ctrl.clusterName,PVName: pvName,PVC: claim,Parameters: storageClass.Parameters,}// Refuse to provision if the plugin doesn't support mount options, creation// of PV would be rejected by validation anywayif !plugin.SupportsMountOption() && len(options.MountOptions) > 0 {strerr := fmt.Sprintf("Mount options are not supported by the provisioner but StorageClass %q has mount options %v", storageClass.Name, options.MountOptions)glog.V(2).Infof("Mount options are not supported by the provisioner but claim %q's StorageClass %q has mount options %v", claimToClaimKey(claim), storageClass.Name, options.MountOptions)ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)return pluginName, fmt.Errorf("provisioner %q doesn't support mount options", plugin.GetPluginName())}// Provision the volume// 开始provision一个volume// 创建一个provisioner对象,provisioner实现了创建volume 的具体方法provisioner, err := plugin.NewProvisioner(options)if err != nil {strerr := fmt.Sprintf("Failed to create provisioner: %v", err)glog.V(2).Infof("failed to create provisioner for claim %q with StorageClass %q: %v", claimToClaimKey(claim), storageClass.Name, err)ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)return pluginName, err}var selectedNode *v1.Node = nilif nodeName, ok := claim.Annotations[annSelectedNode]; ok {selectedNode, err = ctrl.NodeLister.Get(nodeName)if err != nil {strerr := fmt.Sprintf("Failed to get target node: %v", err)glog.V(3).Infof("unexpected error getting target node %q for claim %q: %v", nodeName, claimToClaimKey(claim), err)ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)return pluginName, err}}allowedTopologies := storageClass.AllowedTopologiesopComplete := util.OperationCompleteHook(plugin.GetPluginName(), "volume_provision")// 执行 provision() 方法,这个方法会调用csi plugin,创建一个新的volume。// 具体什么volume,则要看plugin类型,比如ceph rbd或nfs等等volume, err = provisioner.Provision(selectedNode, allowedTopologies)opComplete(&err)if err != nil {// Other places of failure have nothing to do with VolumeScheduling,// so just let controller retry in the next sync. We'll only call func// rescheduleProvisioning here when the underlying provisioning actually failed.ctrl.rescheduleProvisioning(claim)strerr := fmt.Sprintf("Failed to provision volume with StorageClass %q: %v", storageClass.Name, err)glog.V(2).Infof("failed to provision volume for claim %q with StorageClass %q: %v", claimToClaimKey(claim), storageClass.Name, err)ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)return pluginName, err}glog.V(3).Infof("volume %q for claim %q created", volume.Name, claimToClaimKey(claim))// Create Kubernetes PV object for the volume.if volume.Name == "" {volume.Name = pvName}// Bind it to the claim// 将 volume bind 到 claimvolume.Spec.ClaimRef = claimRefvolume.Status.Phase = v1.VolumeBoundvolume.Spec.StorageClassName = claimClass// Add annBoundByController (used in deleting the volume)// 为 volume 添加必要的 annotationmetav1.SetMetaDataAnnotation(&volume.ObjectMeta, annBoundByController, "yes")metav1.SetMetaDataAnnotation(&volume.ObjectMeta, annDynamicallyProvisioned, plugin.GetPluginName())// Try to create the PV object several timesfor i := 0; i < ctrl.createProvisionedPVRetryCount; i++ {glog.V(4).Infof("provisionClaimOperation [%s]: trying to save volume %s", claimToClaimKey(claim), volume.Name)var newVol *v1.PersistentVolumeif newVol, err = ctrl.kubeClient.CoreV1().PersistentVolumes().Create(volume); err == nil || apierrs.IsAlreadyExists(err) {// Save succeeded.if err != nil {glog.V(3).Infof("volume %q for claim %q already exists, reusing", volume.Name, claimToClaimKey(claim))err = nil} else {glog.V(3).Infof("volume %q for claim %q saved", volume.Name, claimToClaimKey(claim))_, updateErr := ctrl.storeVolumeUpdate(newVol)if updateErr != nil {// We will get an "volume added" event soon, this is not a big errorglog.V(4).Infof("provisionClaimOperation [%s]: cannot update internal cache: %v", volume.Name, updateErr)}}break}// Save failed, try again after a while.glog.V(3).Infof("failed to save volume %q for claim %q: %v", volume.Name, claimToClaimKey(claim), err)time.Sleep(ctrl.createProvisionedPVInterval)}// err != nil 说明创建volume失败了,失败后需要对残留的volume 进行删除清理if err != nil {// Save failed. Now we have a storage asset outside of Kubernetes,// but we don't have appropriate PV object for it.// Emit some event here and try to delete the storage asset several// times.strerr := fmt.Sprintf("Error creating provisioned PV object for claim %s: %v. Deleting the volume.", claimToClaimKey(claim), err)glog.V(3).Info(strerr)ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, strerr)var deleteErr errorvar deleted boolfor i := 0; i < ctrl.createProvisionedPVRetryCount; i++ {// 删除 volume_, deleted, deleteErr = ctrl.doDeleteVolume(volume)if deleteErr == nil && deleted {// Delete succeededglog.V(4).Infof("provisionClaimOperation [%s]: cleaning volume %s succeeded", claimToClaimKey(claim), volume.Name)break}if !deleted {// This is unreachable code, the volume was provisioned by an// internal plugin and therefore there MUST be an internal// plugin that deletes it.glog.Errorf("Error finding internal deleter for volume plugin %q", plugin.GetPluginName())break}// Delete failed, try again after a while.glog.V(3).Infof("failed to delete volume %q: %v", volume.Name, deleteErr)time.Sleep(ctrl.createProvisionedPVInterval)}if deleteErr != nil {// Delete failed several times. There is an orphaned volume and there// is nothing we can do about it.strerr := fmt.Sprintf("Error cleaning provisioned volume for claim %s: %v. Please delete manually.", claimToClaimKey(claim), deleteErr)glog.V(2).Info(strerr)ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningCleanupFailed, strerr)}// err == nil,表示provision创建成功,这是抛出event并退出} else {glog.V(2).Infof("volume %q provisioned for claim %q", volume.Name, claimToClaimKey(claim))msg := fmt.Sprintf("Successfully provisioned volume %s using %s", volume.Name, plugin.GetPluginName())ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, events.ProvisioningSucceeded, msg)}return pluginName, nil
}
总结
PV controller的作用是对PV和PVC资源生命周期管理: 首先,PV控制器负责将PVC与合适的PV进行绑定。它会根据PVC的请求和PV的可用性来匹配合适的存储资源。同时PV控制器管理PV的状态,确保PV在被使用时不会被其他PVC绑定,同时也会处理PV的回收和删除。其次,如果PVC配置了StorageClass,PV控制器可以动态地为PVC创建新的PV。
本文通过源码学习了控制器的原理是如何实现。
参考文献
-
kube-controller-manager源码分析-PV controller分析
-
从零开始入门 K8s | Kubernetes 存储架构及插件使用
-
kubernetes pv-controller 解析