kubernetes Volume有什么作用

发布时间:2021-12-20 10:08:46 作者:iii
来源:亿速云 阅读:126

本篇内容主要讲解“kubernetes Volume有什么作用”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“kubernetes Volume有什么作用”吧!

VolumeBinder in Scheduler

VolumeBinder是Kubernetes default scheduler中的一个模块。

pkg/scheduler/volumebinder/volume_binder.go:33

// VolumeBinder sets up the volume binding library and manages
// the volume binding operations with a queue.
type VolumeBinder struct {
	Binder    persistentvolume.SchedulerVolumeBinder
	BindQueue *workqueue.Type
}

SchedulerVolumeBinder

SchedulerVolumeBinder用于调度时Volume Bind的考虑,以保证调度后的Node也满足Pod所需的PV NodeAffinity需求,而不仅是Resource Request等其他Predicate Policies得到满足。它实际上是基于StorageClass的VolumeBindingMode为WaitForFirstConsumer来决定要延迟Bind PV,然后schduler predicate时等待并确保Pod的all PVCs均成功Bind到满足条件的PVs时,才会最终触发Bind API完成Pod和Node的Bind。

pkg/controller/volume/persistentvolume/scheduler_binder.go:58

// SchedulerVolumeBinder is used by the scheduler to handle PVC/PV binding
// and dynamic provisioning.  The binding decisions are integrated into the pod scheduling
// workflow so that the PV NodeAffinity is also considered along with the pod's other
// scheduling requirements.
//
// This integrates into the existing default scheduler workflow as follows:
// 1. The scheduler takes a Pod off the scheduler queue and processes it serially:
//    a. Invokes all predicate functions, parallelized across nodes.  FindPodVolumes() is invoked here.
//    b. Invokes all priority functions.  Future/TBD
//    c. Selects the best node for the Pod.
//    d. Cache the node selection for the Pod. (Assume phase)
//       i.  If PVC binding is required, cache in-memory only:
//           * Updated PV objects for prebinding to the corresponding PVCs.
//           * For the pod, which PVs need API updates.
//           AssumePodVolumes() is invoked here.  Then BindPodVolumes() is called asynchronously by the
//           scheduler.  After BindPodVolumes() is complete, the Pod is added back to the scheduler queue
//           to be processed again until all PVCs are bound.
//       ii. If PVC binding is not required, cache the Pod->Node binding in the scheduler's pod cache,
//           and asynchronously bind the Pod to the Node.  This is handled in the scheduler and not here.
// 2. Once the assume operation is done, the scheduler processes the next Pod in the scheduler queue
//    while the actual binding operation occurs in the background.
type SchedulerVolumeBinder interface {
	// FindPodVolumes checks if all of a Pod's PVCs can be satisfied by the node.
	//
	// If a PVC is bound, it checks if the PV's NodeAffinity matches the Node.
	// Otherwise, it tries to find an available PV to bind to the PVC.
	//
	// It returns true if all of the Pod's PVCs have matching PVs or can be dynamic provisioned,
	// and returns true if bound volumes satisfy the PV NodeAffinity.
	//
	// This function is called by the volume binding scheduler predicate and can be called in parallel
	FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVolumesSatisified, boundVolumesSatisfied bool, err error)

	// AssumePodVolumes will:
	// 1. Take the PV matches for unbound PVCs and update the PV cache assuming
	// that the PV is prebound to the PVC.
	// 2. Take the PVCs that need provisioning and update the PVC cache with related
	// annotations set.
	//
	// It returns true if all volumes are fully bound, and returns true if any volume binding/provisioning
	// API operation needs to be done afterwards.
	//
	// This function will modify assumedPod with the node name.
	// This function is called serially.
	AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (allFullyBound bool, bindingRequired bool, err error)

	// BindPodVolumes will:
	// 1. Initiate the volume binding by making the API call to prebind the PV
	// to its matching PVC.
	// 2. Trigger the volume provisioning by making the API call to set related
	// annotations on the PVC
	//
	// This function can be called in parallel.
	BindPodVolumes(assumedPod *v1.Pod) error

	// GetBindingsCache returns the cache used (if any) to store volume binding decisions.
	GetBindingsCache() PodBindingCache
}

SchedulerVolumeBinder Interface包含如下三个方法:

Scheduler中VolumeBinder的初始化由volumebinder.NewVolumeBinder完成。

pkg/scheduler/volumebinder/volume_binder.go:39

// NewVolumeBinder sets up the volume binding library and binding queue
func NewVolumeBinder(
	client clientset.Interface,
	pvcInformer coreinformers.PersistentVolumeClaimInformer,
	pvInformer coreinformers.PersistentVolumeInformer,
	storageClassInformer storageinformers.StorageClassInformer) *VolumeBinder {

	return &VolumeBinder{
		Binder:    persistentvolume.NewVolumeBinder(client, pvcInformer, pvInformer, storageClassInformer),
		BindQueue: workqueue.NewNamed("podsToBind"),
	}
}

scheduler volumebinder.NewVolumeBinder负责:

在Scheduler NewConfigFactory中调用volumebinder.NewVolumeBinder完成其初始化,其中很重要的部分是完成pvcInformer, pvInformer, storageClassInformer的初始化,然后传递给persistentvolume.NewVolumeBinder完成Binder的创建。

pkg/scheduler/factory/factory.go:145

// NewConfigFactory initializes the default implementation of a Configurator To encourage eventual privatization of the struct type, we only
// return the interface.
func NewConfigFactory(
	schedulerName string,
	client clientset.Interface,
	nodeInformer coreinformers.NodeInformer,
	podInformer coreinformers.PodInformer,
	pvInformer coreinformers.PersistentVolumeInformer,
	pvcInformer coreinformers.PersistentVolumeClaimInformer,
	replicationControllerInformer coreinformers.ReplicationControllerInformer,
	replicaSetInformer extensionsinformers.ReplicaSetInformer,
	statefulSetInformer appsinformers.StatefulSetInformer,
	serviceInformer coreinformers.ServiceInformer,
	pdbInformer policyinformers.PodDisruptionBudgetInformer,
	storageClassInformer storageinformers.StorageClassInformer,
	hardPodAffinitySymmetricWeight int32,
	enableEquivalenceClassCache bool,
	disablePreemption bool,
) scheduler.Configurator {
	stopEverything := make(chan struct{})
	schedulerCache := schedulercache.New(30*time.Second, stopEverything)

	// storageClassInformer is only enabled through VolumeScheduling feature gate
	var storageClassLister storagelisters.StorageClassLister
	if storageClassInformer != nil {
		storageClassLister = storageClassInformer.Lister()
	}

	...
	
	// On add and delete of PVs, it will affect equivalence cache items
	// related to persistent volume
	pvInformer.Informer().AddEventHandler(
		cache.ResourceEventHandlerFuncs{
			// MaxPDVolumeCountPredicate: since it relies on the counts of PV.
			AddFunc:    c.onPvAdd,
			UpdateFunc: c.onPvUpdate,
			DeleteFunc: c.onPvDelete,
		},
	)
	c.pVLister = pvInformer.Lister()

	// This is for MaxPDVolumeCountPredicate: add/delete PVC will affect counts of PV when it is bound.
	pvcInformer.Informer().AddEventHandler(
		cache.ResourceEventHandlerFuncs{
			AddFunc:    c.onPvcAdd,
			UpdateFunc: c.onPvcUpdate,
			DeleteFunc: c.onPvcDelete,
		},
	)
	c.pVCLister = pvcInformer.Lister()

	...

	if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
		// Setup volume binder
		c.volumeBinder = volumebinder.NewVolumeBinder(client, pvcInformer, pvInformer, storageClassInformer)

		storageClassInformer.Informer().AddEventHandler(
			cache.ResourceEventHandlerFuncs{
				AddFunc:    c.onStorageClassAdd,
				DeleteFunc: c.onStorageClassDelete,
			},
		)
	}

	...

	return c
}

scheduler volumebinder.NewVolumeBinder的调用前提是Enable VolumeScheduling Feature Gate。

volumeBinder in PV Controller

前面提到,scheduler volumebinder.NewVolumeBinder在初始化Binder时是通过persistentvolume.NewVolumeBinder完成的,因此这里我们将对persistentvolume.volumeBinder进行分析。

PV Contorller中的volumeBinder就是前面提到的SchedulerVolumeBinder Interface的实现,实现了其中的FindPodVolumes、AssumePodVolumes、BindPodVolumes、GetBindingsCache这些接口。

pkg/controller/volume/persistentvolume/scheduler_binder.go:96

type volumeBinder struct {
	ctrl *PersistentVolumeController

	pvcCache PVCAssumeCache
	pvCache  PVAssumeCache

	// Stores binding decisions that were made in FindPodVolumes for use in AssumePodVolumes.
	// AssumePodVolumes modifies the bindings again for use in BindPodVolumes.
	podBindingCache PodBindingCache
}

pkg/controller/volume/persistentvolume/scheduler_binder.go:108
// NewVolumeBinder sets up all the caches needed for the scheduler to make volume binding decisions.
func NewVolumeBinder(
	kubeClient clientset.Interface,
	pvcInformer coreinformers.PersistentVolumeClaimInformer,
	pvInformer coreinformers.PersistentVolumeInformer,
	storageClassInformer storageinformers.StorageClassInformer) SchedulerVolumeBinder {

	// TODO: find better way...
	ctrl := &PersistentVolumeController{
		kubeClient:  kubeClient,
		classLister: storageClassInformer.Lister(),
	}

	b := &volumeBinder{
		ctrl:            ctrl,
		pvcCache:        NewPVCAssumeCache(pvcInformer.Informer()),
		pvCache:         NewPVAssumeCache(pvInformer.Informer()),
		podBindingCache: NewPodBindingCache(),
	}

	return b
}

volumeBinder struct主要包含pvController实例、pvCache、pvcCache、podBindingCache。

podBindingCache结构体是我们需要关注的:

pkg/controller/volume/persistentvolume/scheduler_binder_cache.go:48

type podBindingCache struct {
	mutex sync.Mutex

	// Key = pod name
	// Value = nodeDecisions
	bindingDecisions map[string]nodeDecisions
}

// Key = nodeName
// Value = bindings & provisioned PVCs of the node
type nodeDecisions map[string]nodeDecision

// A decision includes bindingInfo and provisioned PVCs of the node
type nodeDecision struct {
	bindings      []*bindingInfo
	provisionings []*v1.PersistentVolumeClaim
}

type bindingInfo struct {
	// Claim that needs to be bound
	pvc *v1.PersistentVolumeClaim

	// Proposed PV to bind to this claim
	pv *v1.PersistentVolume
}

kubernetes Volume有什么作用

VolumeBindingChecker Predicate

在Scheduler NewConfigFactory时完成VolumeBinder的创建,然后CheckVolumeBinding Predicate Policy注册到default scheduler。注意默认的所有predicate policies的执行是有先后顺序的:

predicatesOrdering = []string{CheckNodeConditionPred, CheckNodeUnschedulablePred,
		GeneralPred, HostNamePred, PodFitsHostPortsPred,
		MatchNodeSelectorPred, PodFitsResourcesPred, NoDiskConflictPred,
		PodToleratesNodeTaintsPred, PodToleratesNodeNoExecuteTaintsPred, CheckNodeLabelPresencePred,
		CheckServiceAffinityPred, MaxEBSVolumeCountPred, MaxGCEPDVolumeCountPred,
		MaxAzureDiskVolumeCountPred, CheckVolumeBindingPred, NoVolumeZoneConflictPred,
		CheckNodeMemoryPressurePred, CheckNodePIDPressurePred, CheckNodeDiskPressurePred, MatchInterPodAffinityPred}

VolumeBindingChecker.predicate就是对应的predicate实现。

pkg/scheduler/algorithm/predicates/predicates.go:1680

func (c *VolumeBindingChecker) predicate(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
	if !utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
		return true, nil, nil
	}

	node := nodeInfo.Node()
	if node == nil {
		return false, nil, fmt.Errorf("node not found")
	}

	unboundSatisfied, boundSatisfied, err := c.binder.Binder.FindPodVolumes(pod, node)
	if err != nil {
		return false, nil, err
	}

	failReasons := []algorithm.PredicateFailureReason{}
	if !boundSatisfied {
		glog.V(5).Infof("Bound PVs not satisfied for pod %v/%v, node %q", pod.Namespace, pod.Name, node.Name)
		failReasons = append(failReasons, ErrVolumeNodeConflict)
	}

	if !unboundSatisfied {
		glog.V(5).Infof("Couldn't find matching PVs for pod %v/%v, node %q", pod.Namespace, pod.Name, node.Name)
		failReasons = append(failReasons, ErrVolumeBindConflict)
	}

	if len(failReasons) > 0 {
		return false, failReasons, nil
	}

	// All volumes bound or matching PVs found for all unbound PVCs
	glog.V(5).Infof("All PVCs found matches for pod %v/%v, node %q", pod.Namespace, pod.Name, node.Name)
	return true, nil, nil
}

调度时VolumeBindingChecker失败会怎么样

如果VolumeBindingChecker.predicate失败,会怎么样?熟悉scheduler逻辑的同学应该知道,调度失败,会触发MakeDefaultErrorFunc。

func (c *configFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue core.SchedulingQueue) func(pod *v1.Pod, err error) {
	return func(pod *v1.Pod, err error) {
		
		...

		backoff.Gc()
		// Retry asynchronously.
		// Note that this is extremely rudimentary and we need a more real error handling path.
		go func() {
			defer runtime.HandleCrash()
			podID := types.NamespacedName{
				Namespace: pod.Namespace,
				Name:      pod.Name,
			}
			origPod := pod

			// When pod priority is enabled, we would like to place an unschedulable
			// pod in the unschedulable queue. This ensures that if the pod is nominated
			// to run on a node, scheduler takes the pod into account when running
			// predicates for the node.
			if !util.PodPriorityEnabled() {
				entry := backoff.GetEntry(podID)
				if !entry.TryWait(backoff.MaxDuration()) {
					glog.Warningf("Request for pod %v already in flight, abandoning", podID)
					return
				}
			}
			// Get the pod again; it may have changed/been scheduled already.
			getBackoff := initialGetBackoff
			for {
				pod, err := c.client.CoreV1().Pods(podID.Namespace).Get(podID.Name, metav1.GetOptions{})
				if err == nil {
					if len(pod.Spec.NodeName) == 0 {
						podQueue.AddUnschedulableIfNotPresent(pod)
					} else {
						if c.volumeBinder != nil {
							// Volume binder only wants to keep unassigned pods
							c.volumeBinder.DeletePodBindings(pod)
						}
					}
					break
				}
				if errors.IsNotFound(err) {
					glog.Warningf("A pod %v no longer exists", podID)

					if c.volumeBinder != nil {
						// Volume binder only wants to keep unassigned pods
						c.volumeBinder.DeletePodBindings(origPod)
					}
					return
			
				glog.Errorf("Error getting pod %v for retry: %v; retrying...", podID, err)
				if getBackoff = getBackoff * 2; getBackoff > maximalGetBackoff {
					getBackoff = maximalGetBackoff
				}
				time.Sleep(getBackoff)
			}
		}()
	}
}

MakeDefaultErrorFunc会对调度失败的Pod进行异步重试:

调度时VolumeBindingChecker成功会如何?

NewConfigFactory中注册了从unscheduled pod queue中删除pod(意味着调度成功)的Event handler:deletePodFromSchedulingQueue。

pkg/scheduler/factory/factory.go:745

func (c *configFactory) deletePodFromSchedulingQueue(obj interface{}) {
	var pod *v1.Pod
	...
	if err := c.podQueue.Delete(pod); err != nil {
		runtime.HandleError(fmt.Errorf("unable to dequeue %T: %v", obj, err))
	}
	if c.volumeBinder != nil {
		// Volume binder only wants to keep unassigned pods
		c.volumeBinder.DeletePodBindings(pod)
	}
}

deletePodFromSchedulingQueue的处理逻辑,除了将pod从podQueue中删除外,如果volumeBinder不为空(意味着Enable VolumeScheduling Feature Gate),还需要同MakeDefaultErrorFunc一样,调用podBindingCache.DeleteBindings将该pod对应的bindingDecisions从podBindingCache中删除,因为volumeBinder仅处理unassigned pods。

volumeBinder

接下来,我们看看volumeBinder的各个接口的实现,及何时被调用。

FindPodVolumes

前面分析VolumeBindingChecker Predicate的时看到,其中调用了volumeBinder.FindPodVolumes。

FindPodVolumes用于检查Pod的PVCs是否都能被该Node满足。

pkg/controller/volume/persistentvolume/scheduler_binder.go:135

// FindPodVolumes caches the matching PVs and PVCs to provision per node in podBindingCache
func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVolumesSatisfied, boundVolumesSatisfied bool, err error) {
	podName := getPodName(pod)

	// Warning: Below log needs high verbosity as it can be printed several times (#60933).
	glog.V(5).Infof("FindPodVolumes for pod %q, node %q", podName, node.Name)

	// Initialize to true for pods that don't have volumes
	unboundVolumesSatisfied = true
	boundVolumesSatisfied = true

	// The pod's volumes need to be processed in one call to avoid the race condition where
	// volumes can get bound/provisioned in between calls.
	boundClaims, claimsToBind, unboundClaimsImmediate, err := b.getPodVolumes(pod)
	if err != nil {
		return false, false, err
	}

	// Immediate claims should be bound
	if len(unboundClaimsImmediate) > 0 {
		return false, false, fmt.Errorf("pod has unbound immediate PersistentVolumeClaims")
	}

	// Check PV node affinity on bound volumes
	if len(boundClaims) > 0 {
		boundVolumesSatisfied, err = b.checkBoundClaims(boundClaims, node, podName)
		if err != nil {
			return false, false, err
		}
	}

	if len(claimsToBind) > 0 {
		var claimsToProvision []*v1.PersistentVolumeClaim
		unboundVolumesSatisfied, claimsToProvision, err = b.findMatchingVolumes(pod, claimsToBind, node)
		if err != nil {
			return false, false, err
		}

		if utilfeature.DefaultFeatureGate.Enabled(features.DynamicProvisioningScheduling) {
			// Try to provision for unbound volumes
			if !unboundVolumesSatisfied {
				unboundVolumesSatisfied, err = b.checkVolumeProvisions(pod, claimsToProvision, node)
				if err != nil {
					return false, false, err
				}
			}
		}
	}

	return unboundVolumesSatisfied, boundVolumesSatisfied, nil
}

FindPodVolumes中调用了三个重要的方法:

下面我们重点看getPodVolumes、findMatchingVolumes和checkVolumeProvisions。

getPodVolumes
pkg/controller/volume/persistentvolume/scheduler_binder.go:359

// getPodVolumes returns a pod's PVCs separated into bound (including prebound), unbound with delayed binding,
// and unbound with immediate binding
func (b *volumeBinder) getPodVolumes(pod *v1.Pod) (boundClaims []*v1.PersistentVolumeClaim, unboundClaims []*bindingInfo, unboundClaimsImmediate []*v1.PersistentVolumeClaim, err error) {
	boundClaims = []*v1.PersistentVolumeClaim{}
	unboundClaimsImmediate = []*v1.PersistentVolumeClaim{}
	unboundClaims = []*bindingInfo{}

	for _, vol := range pod.Spec.Volumes {
		volumeBound, pvc, err := b.isVolumeBound(pod.Namespace, &vol, false)
		if err != nil {
			return nil, nil, nil, err
		}
		if pvc == nil {
			continue
		}
		if volumeBound {
			boundClaims = append(boundClaims, pvc)
		} else {
			delayBinding, err := b.ctrl.shouldDelayBinding(pvc)
			if err != nil {
				return nil, nil, nil, err
			}
			if delayBinding {
				// Scheduler path
				unboundClaims = append(unboundClaims, &bindingInfo{pvc: pvc})
			} else {
				// Immediate binding should have already been bound
				unboundClaimsImmediate = append(unboundClaimsImmediate, pvc)
			}
		}
	}
	return boundClaims, unboundClaims, unboundClaimsImmediate, nil
}

getPodVolumes将pod的PVCs分成三类:

那么什么样的PVCs是delay binding的呢?我们看看shouldDelayBinding的逻辑:

func (ctrl *PersistentVolumeController) shouldDelayBinding(claim *v1.PersistentVolumeClaim) (bool, error) {
	if !utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
		return false, nil
	}

	if utilfeature.DefaultFeatureGate.Enabled(features.DynamicProvisioningScheduling) {
		// When feature DynamicProvisioningScheduling enabled,
		// Scheduler signal to the PV controller to start dynamic
		// provisioning by setting the "annSelectedNode" annotation
		// in the PVC
		if _, ok := claim.Annotations[annSelectedNode]; ok {
			return false, nil
		}
	}

	className := v1helper.GetPersistentVolumeClaimClass(claim)
	if className == "" {
		return false, nil
	}

	class, err := ctrl.classLister.Get(className)
	if err != nil {
		return false, nil
	}

	if class.VolumeBindingMode == nil {
		return false, fmt.Errorf("VolumeBindingMode not set for StorageClass %q", className)
	}

	return *class.VolumeBindingMode == storage.VolumeBindingWaitForFirstConsumer, nil
}
findMatchingVolumes

如果getPodVolumes返回的claimsToBind不为空,则调用findMatchingVolumes从pvcache中选择匹配条件的size smallestPV,如果没有匹配成功的,则调用checkVolumeProvisions检查是否dynamic provision。

pkg/controller/volume/persistentvolume/scheduler_binder.go:413

// findMatchingVolumes tries to find matching volumes for given claims,
// and return unbound claims for further provision.
func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*bindingInfo, node *v1.Node) (foundMatches bool, unboundClaims []*v1.PersistentVolumeClaim, err error) {
	podName := getPodName(pod)
	// Sort all the claims by increasing size request to get the smallest fits
	sort.Sort(byPVCSize(claimsToBind))

	chosenPVs := map[string]*v1.PersistentVolume{}

	foundMatches = true
	matchedClaims := []*bindingInfo{}

	for _, bindingInfo := range claimsToBind {
		// Get storage class name from each PVC
		storageClassName := ""
		storageClass := bindingInfo.pvc.Spec.StorageClassName
		if storageClass != nil {
			storageClassName = *storageClass
		}
		allPVs := b.pvCache.ListPVs(storageClassName)

		// Find a matching PV
		bindingInfo.pv, err = findMatchingVolume(bindingInfo.pvc, allPVs, node, chosenPVs, true)
		if err != nil {
			return false, nil, err
		}
		if bindingInfo.pv == nil {
			glog.V(4).Infof("No matching volumes for Pod %q, PVC %q on node %q", podName, getPVCName(bindingInfo.pvc), node.Name)
			unboundClaims = append(unboundClaims, bindingInfo.pvc)
			foundMatches = false
			continue
		}

		// matching PV needs to be excluded so we don't select it again
		chosenPVs[bindingInfo.pv.Name] = bindingInfo.pv
		matchedClaims = append(matchedClaims, bindingInfo)
		glog.V(5).Infof("Found matching PV %q for PVC %q on node %q for pod %q", bindingInfo.pv.Name, getPVCName(bindingInfo.pvc), node.Name, podName)
	}

	// Mark cache with all the matches for each PVC for this node
	if len(matchedClaims) > 0 {
		b.podBindingCache.UpdateBindings(pod, node.Name, matchedClaims)
	}

	if foundMatches {
		glog.V(4).Infof("Found matching volumes for pod %q on node %q", podName, node.Name)
	}

	return
}
checkVolumeProvisions
pkg/controller/volume/persistentvolume/scheduler_binder.go:465

// checkVolumeProvisions checks given unbound claims (the claims have gone through func
// findMatchingVolumes, and do not have matching volumes for binding), and return true
// if all of the claims are eligible for dynamic provision.
func (b *volumeBinder) checkVolumeProvisions(pod *v1.Pod, claimsToProvision []*v1.PersistentVolumeClaim, node *v1.Node) (provisionSatisfied bool, err error) {
	podName := getPodName(pod)
	provisionedClaims := []*v1.PersistentVolumeClaim{}

	for _, claim := range claimsToProvision {
		className := v1helper.GetPersistentVolumeClaimClass(claim)
		if className == "" {
			return false, fmt.Errorf("no class for claim %q", getPVCName(claim))
		}

		class, err := b.ctrl.classLister.Get(className)
		if err != nil {
			return false, fmt.Errorf("failed to find storage class %q", className)
		}
		provisioner := class.Provisioner
		if provisioner == "" || provisioner == notSupportedProvisioner {
			glog.V(4).Infof("storage class %q of claim %q does not support dynamic provisioning", className, getPVCName(claim))
			return false, nil
		}

		// Check if the node can satisfy the topology requirement in the class
		if !v1helper.MatchTopologySelectorTerms(class.AllowedTopologies, labels.Set(node.Labels)) {
			glog.V(4).Infof("Node %q cannot satisfy provisioning topology requirements of claim %q", node.Name, getPVCName(claim))
			return false, nil
		}

		// TODO: Check if capacity of the node domain in the storage class
		// can satisfy resource requirement of given claim

		provisionedClaims = append(provisionedClaims, claim)

	}
	glog.V(4).Infof("Provisioning for claims of pod %q that has no matching volumes on node %q ...", podName, node.Name)

	// Mark cache with all the PVCs that need provisioning for this node
	b.podBindingCache.UpdateProvisionedPVCs(pod, node.Name, provisionedClaims)

	return true, nil
}

AssumePodVolumes

volumeBinder的AssumePodVolumes啥时候被调用呢?我们看看scheduleOne的相关代码:

scheduleOne invoke assumeAndBindVolumes
pkg/scheduler/scheduler.go:439

// scheduleOne does the entire scheduling workflow for a single pod.  It is serialized on the scheduling algorithm's host fitting.
func (sched *Scheduler) scheduleOne() {
	pod := sched.config.NextPod()
	
	...
	
	suggestedHost, err := sched.schedule(pod)
	
	...
	
	// Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet.
	// This allows us to keep scheduling without waiting on binding to occur.
	assumedPod := pod.DeepCopy()

	// Assume volumes first before assuming the pod.
	//
	// If no volumes need binding, then nil is returned, and continue to assume the pod.
	//
	// Otherwise, error is returned and volume binding is started asynchronously for all of the pod's volumes.
	// scheduleOne() returns immediately on error, so that it doesn't continue to assume the pod.
	//
	// After the asynchronous volume binding updates are made, it will send the pod back through the scheduler for
	// subsequent passes until all volumes are fully bound.
	//
	// This function modifies 'assumedPod' if volume binding is required.
	err = sched.assumeAndBindVolumes(assumedPod, suggestedHost)
	if err != nil {
		return
	}

	// assume modifies `assumedPod` by setting NodeName=suggestedHost
	err = sched.assume(assumedPod, suggestedHost)
	...
	// bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
	go func() {
		err := sched.bind(assumedPod, &v1.Binding{
			...
		}
	}()
}

在sched.schedule(pod)完成pod的predicate,priority后,先调用sched.assumeAndBindVolumes,然后再调用sched.assume进行pod assume,最后调用sched.bind进行Bind操作。

assumeAndBindVolumes add assume pod to BindQueue
pkg/scheduler/scheduler.go:268

// assumeAndBindVolumes will update the volume cache and then asynchronously bind volumes if required.
//
// If volume binding is required, then the bind volumes routine will update the pod to send it back through
// the scheduler.
//
// Otherwise, return nil error and continue to assume the pod.
//
// This function modifies assumed if volume binding is required.
func (sched *Scheduler) assumeAndBindVolumes(assumed *v1.Pod, host string) error {
	if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
		allBound, bindingRequired, err := sched.config.VolumeBinder.Binder.AssumePodVolumes(assumed, host)
		if err != nil {
			sched.config.Error(assumed, err)
			sched.config.Recorder.Eventf(assumed, v1.EventTypeWarning, "FailedScheduling", "AssumePodVolumes failed: %v", err)
			sched.config.PodConditionUpdater.Update(assumed, &v1.PodCondition{
				Type:    v1.PodScheduled,
				Status:  v1.ConditionFalse,
				Reason:  "SchedulerError",
				Message: err.Error(),
			})
			return err
		}
		if !allBound {
			err = fmt.Errorf("Volume binding started, waiting for completion")
			if bindingRequired {
				if sched.config.Ecache != nil {
					invalidPredicates := sets.NewString(predicates.CheckVolumeBindingPred)
					sched.config.Ecache.InvalidatePredicates(invalidPredicates)
				}

				// bindVolumesWorker() will update the Pod object to put it back in the scheduler queue
				sched.config.VolumeBinder.BindQueue.Add(assumed)
			} else {
				// We are just waiting for PV controller to finish binding, put it back in the
				// scheduler queue
				sched.config.Error(assumed, err)
				sched.config.Recorder.Eventf(assumed, v1.EventTypeNormal, "FailedScheduling", "%v", err)
				sched.config.PodConditionUpdater.Update(assumed, &v1.PodCondition{
					Type:   v1.PodScheduled,
					Status: v1.ConditionFalse,
					Reason: "VolumeBindingWaiting",
				})
			}
			return err
		}
	}
	return nil
}

assumeAndBindVolumes调用volumeBinder.AssumePodVolumes。

pkg/controller/volume/persistentvolume/scheduler_binder.go:191

// AssumePodVolumes will take the cached matching PVs and PVCs to provision
// in podBindingCache for the chosen node, and:
// 1. Update the pvCache with the new prebound PV.
// 2. Update the pvcCache with the new PVCs with annotations set
// It will update podBindingCache again with the PVs and PVCs that need an API update.
func (b *volumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (allFullyBound, bindingRequired bool, err error) {
	podName := getPodName(assumedPod)

	glog.V(4).Infof("AssumePodVolumes for pod %q, node %q", podName, nodeName)

	if allBound := b.arePodVolumesBound(assumedPod); allBound {
		glog.V(4).Infof("AssumePodVolumes for pod %q, node %q: all PVCs bound and nothing to do", podName, nodeName)
		return true, false, nil
	}

	assumedPod.Spec.NodeName = nodeName
	// Assume PV
	claimsToBind := b.podBindingCache.GetBindings(assumedPod, nodeName)
	newBindings := []*bindingInfo{}

	for _, binding := range claimsToBind {
		newPV, dirty, err := b.ctrl.getBindVolumeToClaim(binding.pv, binding.pvc)
		glog.V(5).Infof("AssumePodVolumes: getBindVolumeToClaim for pod %q, PV %q, PVC %q.  newPV %p, dirty %v, err: %v",
			podName,
			binding.pv.Name,
			binding.pvc.Name,
			newPV,
			dirty,
			err)
		if err != nil {
			b.revertAssumedPVs(newBindings)
			return false, true, err
		}
		if dirty {
			err = b.pvCache.Assume(newPV)
			if err != nil {
				b.revertAssumedPVs(newBindings)
				return false, true, err
			}

			newBindings = append(newBindings, &bindingInfo{pv: newPV, pvc: binding.pvc})
		}
	}

	// Don't update cached bindings if no API updates are needed.  This can happen if we
	// previously updated the PV object and are waiting for the PV controller to finish binding.
	if len(newBindings) != 0 {
		bindingRequired = true
		b.podBindingCache.UpdateBindings(assumedPod, nodeName, newBindings)
	}

	// Assume PVCs
	claimsToProvision := b.podBindingCache.GetProvisionedPVCs(assumedPod, nodeName)

	newProvisionedPVCs := []*v1.PersistentVolumeClaim{}
	for _, claim := range claimsToProvision {
		// The claims from method args can be pointing to watcher cache. We must not
		// modify these, therefore create a copy.
		claimClone := claim.DeepCopy()
		metav1.SetMetaDataAnnotation(&claimClone.ObjectMeta, annSelectedNode, nodeName)
		err = b.pvcCache.Assume(claimClone)
		if err != nil {
			b.revertAssumedPVs(newBindings)
			b.revertAssumedPVCs(newProvisionedPVCs)
			return
		}

		newProvisionedPVCs = append(newProvisionedPVCs, claimClone)
	}

	if len(newProvisionedPVCs) != 0 {
		bindingRequired = true
		b.podBindingCache.UpdateProvisionedPVCs(assumedPod, nodeName, newProvisionedPVCs)
	}

	return
}

volumeBinder.AssumePodVolumes主要逻辑:

BindQueue中的Pods由bindVolumesWorker进行逐个处理,其中会调用volumeBinder.BindPodVolumes完成volume binding operation,下面我们看看bindVolumesWorker干了啥。

bindVolumesWorker

bindVolumesWorker负责循环处理volumeBinder中的BindQueue内的Pods,完成volume bind。我们得先知道bindVolumesWorker在哪里启动的。

pkg/scheduler/scheduler.go:174

// Run begins watching and scheduling. It waits for cache to be synced, then starts a goroutine and returns immediately.
func (sched *Scheduler) Run() {
	if !sched.config.WaitForCacheSync() {
		return
	}

	if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
		go sched.config.VolumeBinder.Run(sched.bindVolumesWorker, sched.config.StopEverything)
	}

	go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)
}

在default scheduler启动时,如果VolumeScheduling Feature Gate Enable,则会启动bindVolumesWorker goroutine。

pkg/scheduler/scheduler.go:312

// bindVolumesWorker() processes pods queued in assumeAndBindVolumes() and tries to
// make the API update for volume binding.
// This function runs forever until the volume BindQueue is closed.
func (sched *Scheduler) bindVolumesWorker() {
	workFunc := func() bool {
		keyObj, quit := sched.config.VolumeBinder.BindQueue.Get()
		if quit {
			return true
		}
		defer sched.config.VolumeBinder.BindQueue.Done(keyObj)

		assumed, ok := keyObj.(*v1.Pod)
		if !ok {
			glog.V(4).Infof("Object is not a *v1.Pod")
			return false
		}

		// TODO: add metrics
		var reason string
		var eventType string

		glog.V(5).Infof("Trying to bind volumes for pod \"%v/%v\"", assumed.Namespace, assumed.Name)

		// The Pod is always sent back to the scheduler afterwards.
		err := sched.config.VolumeBinder.Binder.BindPodVolumes(assumed)
		if err != nil {
			glog.V(1).Infof("Failed to bind volumes for pod \"%v/%v\": %v", assumed.Namespace, assumed.Name, err)
			reason = "VolumeBindingFailed"
			eventType = v1.EventTypeWarning
		} else {
			glog.V(4).Infof("Successfully bound volumes for pod \"%v/%v\"", assumed.Namespace, assumed.Name)
			reason = "VolumeBindingWaiting"
			eventType = v1.EventTypeNormal
			err = fmt.Errorf("Volume binding started, waiting for completion")
		}

		// Always fail scheduling regardless of binding success.
		// The Pod needs to be sent back through the scheduler to:
		// * Retry volume binding if it fails.
		// * Retry volume binding if dynamic provisioning fails.
		// * Bind the Pod to the Node once all volumes are bound.
		sched.config.Error(assumed, err)
		sched.config.Recorder.Eventf(assumed, eventType, "FailedScheduling", "%v", err)
		sched.config.PodConditionUpdater.Update(assumed, &v1.PodCondition{
			Type:   v1.PodScheduled,
			Status: v1.ConditionFalse,
			Reason: reason,
		})
		return false
	}

	for {
		if quit := workFunc(); quit {
			glog.V(4).Infof("bindVolumesWorker shutting down")
			break
		}
	}
}

bindVolumesWorker会调用volumeBinder.BindPodVolumes进行podBindingCache中的volume binding operation。

BindPodVolumes

pkg/controller/volume/persistentvolume/scheduler_binder.go:266

// BindPodVolumes gets the cached bindings and PVCs to provision in podBindingCache
// and makes the API update for those PVs/PVCs.
func (b *volumeBinder) BindPodVolumes(assumedPod *v1.Pod) error {
	podName := getPodName(assumedPod)
	glog.V(4).Infof("BindPodVolumes for pod %q", podName)

	bindings := b.podBindingCache.GetBindings(assumedPod, assumedPod.Spec.NodeName)
	claimsToProvision := b.podBindingCache.GetProvisionedPVCs(assumedPod, assumedPod.Spec.NodeName)

	// Do the actual prebinding. Let the PV controller take care of the rest
	// There is no API rollback if the actual binding fails
	for i, bindingInfo := range bindings {
		glog.V(5).Infof("BindPodVolumes: Pod %q, binding PV %q to PVC %q", podName, bindingInfo.pv.Name, bindingInfo.pvc.Name)
		_, err := b.ctrl.updateBindVolumeToClaim(bindingInfo.pv, bindingInfo.pvc, false)
		if err != nil {
			// only revert assumed cached updates for volumes we haven't successfully bound
			b.revertAssumedPVs(bindings[i:])
			// Revert all of the assumed cached updates for claims,
			// since no actual API update will be done
			b.revertAssumedPVCs(claimsToProvision)
			return err
		}
	}

	// Update claims objects to trigger volume provisioning. Let the PV controller take care of the rest
	// PV controller is expect to signal back by removing related annotations if actual provisioning fails
	for i, claim := range claimsToProvision {
		if _, err := b.ctrl.kubeClient.CoreV1().PersistentVolumeClaims(claim.Namespace).Update(claim); err != nil {
			glog.V(4).Infof("updating PersistentVolumeClaim[%s] failed: %v", getPVCName(claim), err)
			// only revert assumed cached updates for claims we haven't successfully updated
			b.revertAssumedPVCs(claimsToProvision[i:])
			return err
		}
	}

	return nil
}

关键流程

kubernetes Volume有什么作用

到此,相信大家对“kubernetes Volume有什么作用”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

推荐阅读:
  1. Kubernetes中volume存储的类型介绍
  2. Kubernetes volume数据卷

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

volume kubernetes

上一篇:cgroup怎么使用

下一篇:kubernetes提升Scheduler吞吐量的工作机制是什么

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》