kubernetes提升Scheduler吞吐量的工作机制是什么

发布时间:2021-12-20 10:09:22 作者:iii
来源:亿速云 阅读:130

本篇内容主要讲解“kubernetes提升Scheduler吞吐量的工作机制是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“kubernetes提升Scheduler吞吐量的工作机制是什么”吧!


Equivalence Class概念及其意义

2015年,google发表的关于Borg的论文“Large-scale cluster management at Google with Borg”中对Equivalence Class的描述如下:

Equivalence classes: Tasks in a Borg job usually have identical requirements and constraints, so rather than determining feasibility for every pending task on every machine, and scoring all the feasible machines, Borg only does feasibility and scoring for one task per equivalence class – a group of tasks with identical requirements.

Equivalence Class目前是用来在Kubernetes Scheduler加速Predicate,提升Scheduler的吞吐性能。Kubernetes scheduler及时维护着Equivalence Cache的数据,当某些情况发生时(比如delete node、bind pod等事件),需要立刻invalid相关的Equivalence Cache中的缓存数据。

一个Equivalence Class是用来定义一组具有相同Requirements和Constraints的Pods的相关信息的集合,在Scheduler进行Predicate阶段时可以只需对Equivalence Class中一个Pod进行Predicate,并把Predicate的结果放到Equivalence Cache中以供该Equivalence Class中其他Pods(成为Equivalent Pods)重用该结果。只有当Equivalence Cache中没有可以重用的Predicate Result才会进行正常的Predicate流程。

什么样的Pods会被归类到同一Equivalence Class呢?按照其定义,其实只要Pods有某些相同的field,比如resources requirement、label、affinity等,它们都被认为是Equivalent Pods,属于同一Equivalence Class。但是考虑到用户可能随时修改Pods的fields,会导致Scheduler需要及时更新该Pod所属的Equivalence Class变动,从而导致可能正在进行的Predicate需要感知这一变化并作出变更,这使得问题变得异常复杂。因此,目前Scheduler只把那些属于同一OwnerReference(包括RC,RS,Job, StatefulSet)的Pods归类到同一Equivalence Class,比如某个RS定义了N个副本,那么这N个副本Pods就对应一个Equivalence Class。Scheduler会为每个Equivalence Class中的Equivalent Pods计算出一个uint64 EquivalenceHash值。

注意,截止Kubernetes 1.10,即使有两个一样Pod Template的RS,也会对应两个Equivalence Class。

Equivalence Class工作原理

要想使用Equivalence Class需要启用EnableEquivalenceClassCache Feature Gate,截止Kubernetes 1.10,该Feature还是Alpha阶段。

前期我的几遍关于scheduler的博客中对Predicate的分析中提到,所有注册成功的Predicate Policy都会在scheduler.findNodesThatFit(pod, nodes, predicateFuncs ...)过程中按照一定的并行数对每个node调用scheduler.podFitsOnNode(pod, node, predicateFuncs ...)进行注册的Predicate Policys检查。

podFitsOnNode的输入是一个pod,一个node和一系列注册成功的predicateFuncs,用来检查node是否满足该pod的预选条件。加入了Equivalence Class之后,预选阶段会发生了如下变化:

Equivalence Cache会存储每个node的Predicates Results,是一个3层Map对象:

比如,algorithmCache[$nodeName].predicatesCache.Get($predicateKey)[$equivalenceHash]表示$equivalenceHash对应的Pods在$nodeName节点上进行$predicateKey进行预选是否成功。

截止Kubernetes 1.10,predicateKey支持列表如下(20个):

注意,即使该Pod找到对应的Equivalence Class,Equivalence Cache中也有可能没有可用的Predicate Result,或者对应的Predicate Result已经失效。这时就会触发正常的Predicate,并把Result写到Equivalence Cache中。

如何维护和更新Equivalence Cache呢?如果频繁的更新整个node对应的Equivalence Cache,这违背了Equivalence Cache设计的初衷,并不能提升Predicate的效率。

前面提到过Equivalence Cache的三层Map结构设计,第二层Key是predicateKey,因此Scheduler能做到只invalid单个Predicate Result,而不是盲目的invalid整个node的algorithmCache。

Scheduler会Watch相关API Objects Add/Update/Delete Event,并根据相关策略invalid对应的Equivalence Cache数据,具体的逻辑请看下面的源码分析部分。

Equivalence Class源码分析

Equivalence Cache数据结构

Equivalence Cache结构定义如下:

// EquivalenceCache holds:
// 1. a map of AlgorithmCache with node name as key
// 2. function to get equivalence pod
type EquivalenceCache struct {
	sync.RWMutex
	getEquivalencePod algorithm.GetEquivalencePodFunc
	algorithmCache    map[string]AlgorithmCache
}

// The AlgorithmCache stores PredicateMap with predicate name as key
type AlgorithmCache struct {
	// Only consider predicates for now
	predicatesCache *lru.Cache
}

kubernetes提升Scheduler吞吐量的工作机制是什么

Equivalence Cache的核心操作

func (ec *EquivalenceCache) InvalidateCachedPredicateItem(nodeName string, predicateKeys sets.String) {
	...
	if algorithmCache, exist := ec.algorithmCache[nodeName]; exist {
		for predicateKey := range predicateKeys {
			algorithmCache.predicatesCache.Remove(predicateKey)
		}
	}
	...
}
func (ec *EquivalenceCache) InvalidateCachedPredicateItemOfAllNodes(predicateKeys sets.String) {
	...
	// algorithmCache uses nodeName as key, so we just iterate it and invalid given predicates
	for _, algorithmCache := range ec.algorithmCache {
		for predicateKey := range predicateKeys {
			// just use keys is enough
			algorithmCache.predicatesCache.Remove(predicateKey)
		}
	}
	...
}
// PredicateWithECache returns:
// 1. if fit
// 2. reasons if not fit
// 3. if this cache is invalid
// based on cached predicate results
func (ec *EquivalenceCache) PredicateWithECache(
	podName, nodeName, predicateKey string,
	equivalenceHash uint64, needLock bool,
) (bool, []algorithm.PredicateFailureReason, bool) {
	...
	if algorithmCache, exist := ec.algorithmCache[nodeName]; exist {
		if cachePredicate, exist := algorithmCache.predicatesCache.Get(predicateKey); exist {
			predicateMap := cachePredicate.(PredicateMap)
			// TODO(resouer) Is it possible a race that cache failed to update immediately?
			if hostPredicate, ok := predicateMap[equivalenceHash]; ok {
				if hostPredicate.Fit {
					return true, []algorithm.PredicateFailureReason{}, false
				}
				return false, hostPredicate.FailReasons, false
			}
			// is invalid
			return false, []algorithm.PredicateFailureReason{}, true
		}
	}
	return false, []algorithm.PredicateFailureReason{}, true
}
// UpdateCachedPredicateItem updates pod predicate for equivalence class
func (ec *EquivalenceCache) UpdateCachedPredicateItem(
	podName, nodeName, predicateKey string,
	fit bool,
	reasons []algorithm.PredicateFailureReason,
	equivalenceHash uint64,
	needLock bool,
) {
	...
	if _, exist := ec.algorithmCache[nodeName]; !exist {
		ec.algorithmCache[nodeName] = newAlgorithmCache()
	}
	predicateItem := HostPredicate{
		Fit:         fit,
		FailReasons: reasons,
	}
	// if cached predicate map already exists, just update the predicate by key
	if v, ok := ec.algorithmCache[nodeName].predicatesCache.Get(predicateKey); ok {
		predicateMap := v.(PredicateMap)
		// maps in golang are references, no need to add them back
		predicateMap[equivalenceHash] = predicateItem
	} else {
		ec.algorithmCache[nodeName].predicatesCache.Add(predicateKey,
			PredicateMap{
				equivalenceHash: predicateItem,
			})
	}
}

Equivalence Cache的初始化

Kubernetes在注册predicates、priorities、scheduler extenders时,同时也会进行Equivalence Cache的初始化,并将其传入scheduler config中。

// Creates a scheduler from a set of registered fit predicate keys and priority keys.
func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*scheduler.Config, error) {
	...
	// Init equivalence class cache
	if c.enableEquivalenceClassCache && getEquivalencePodFuncFactory != nil {
		pluginArgs, err := c.getPluginArgs()
		if err != nil {
			return nil, err
		}
		c.equivalencePodCache = core.NewEquivalenceCache(
			getEquivalencePodFuncFactory(*pluginArgs),
		)
		glog.Info("Created equivalence class cache")
	}
	...
}

// NewEquivalenceCache creates a EquivalenceCache object.
func NewEquivalenceCache(getEquivalencePodFunc algorithm.GetEquivalencePodFunc) *EquivalenceCache {
	return &EquivalenceCache{
		getEquivalencePod: getEquivalencePodFunc,
		algorithmCache:    make(map[string]AlgorithmCache),
	}
}

NewEquivalenceCache负责Equivalence Cache的初始化工作,那么getEquivalencePod又是在哪完成注册的呢?defualt algorithm provider初始化时完成注册GetEquivalencePodFunc(只能使用defualt provider?通过configfile就不行吗?),注意这里factory.PluginFactoryArgs只传入了PVCInfo。

GetEquivalencePodFunc is a function that gets a EquivalencePod from a pod.

pkg/scheduler/algorithmprovider/defaults/defaults.go:38

func init() {
	...
	// Use equivalence class to speed up heavy predicates phase.
	factory.RegisterGetEquivalencePodFunction(
		func(args factory.PluginFactoryArgs) algorithm.GetEquivalencePodFunc {
			return predicates.NewEquivalencePodGenerator(args.PVCInfo)
		},
	)
	...
}	

为什么只传入PVCInfo呢?或者为什么需要PVCInfo呢?要回答这个问题,我们先来看看EquivalencePod和getEquivalencePod的定义。

// EquivalencePod is a group of pod attributes which can be reused as equivalence to schedule other pods.
type EquivalencePod struct {
	ControllerRef metav1.OwnerReference
	PVCSet        sets.String
}

EquivalencePod定义了具备哪些相同属性的Pods属于Equivalent Pods,Equivalence Hash就是根据Pod的EquivalencePod中指定的两个属性来计算的,这两个属性分别是:

因此,只有两个Pod属于同一个Controller并且引用可同样的PVCs对象才被认为是EquivalentPod,对应同一个Equivalence Hash。

getEquivalencePod根据Pod Object中的OwnerReference和PVC信息获取它所属的EquivalencePod对象。

func (e *EquivalencePodGenerator) getEquivalencePod(pod *v1.Pod) interface{} {
	for _, ref := range pod.OwnerReferences {
		if ref.Controller != nil && *ref.Controller {
			pvcSet, err := e.getPVCSet(pod)
			if err == nil {
				// A pod can only belongs to one controller, so let's return.
				return &EquivalencePod{
					ControllerRef: ref,
					PVCSet:        pvcSet,
				}
			}
			return nil
		}
	}
	return nil
}

何时生成Pod对应的Equivalence Hash

预选的入口是findNodesThatFit,也就是在findNodesThatFit中调用了getEquivalenceClassInfo计算Pod的EquivalenceHash,然后把该hash值传入podFitsOnNode中进行后续的Equivalence Class功能。

func findNodesThatFit(
	pod *v1.Pod,
	nodeNameToInfo map[string]*schedulercache.NodeInfo,
	nodes []*v1.Node,
	predicateFuncs map[string]algorithm.FitPredicate,
	extenders []algorithm.SchedulerExtender,
	metadataProducer algorithm.PredicateMetadataProducer,
	ecache *EquivalenceCache,
	schedulingQueue SchedulingQueue,
	alwaysCheckAllPredicates bool,
) ([]*v1.Node, FailedPredicateMap, error) {
	...

		var equivCacheInfo *equivalenceClassInfo
		if ecache != nil {
			// getEquivalenceClassInfo will return immediately if no equivalence pod found
			equivCacheInfo = ecache.getEquivalenceClassInfo(pod)
		}

		checkNode := func(i int) {
			nodeName := nodes[i].Name
			fits, failedPredicates, err := podFitsOnNode(
				pod,
				meta,
				nodeNameToInfo[nodeName],
				predicateFuncs,
				ecache,
				schedulingQueue,
				alwaysCheckAllPredicates,
				equivCacheInfo,
			)
			...
		}
		...
	}

getEquivalenceClassInfo计算pod的EquivalenceHash的原理如下:

// getEquivalenceClassInfo returns the equivalence class of given pod.
func (ec *EquivalenceCache) getEquivalenceClassInfo(pod *v1.Pod) *equivalenceClassInfo {
	equivalencePod := ec.getEquivalencePod(pod)
	if equivalencePod != nil {
		hash := fnv.New32a()
		hashutil.DeepHashObject(hash, equivalencePod)
		return &equivalenceClassInfo{
			hash: uint64(hash.Sum32()),
		}
	}
	return nil
}

可见,EquivalenceHash就是对getEquivalencePod利用FNV算法进行哈希的。

Equivalent Pod的Predicate Result何时加到PredicateCache中

我们先看看podFitsOnNode的相关实现:

func podFitsOnNode(
	pod *v1.Pod,
	meta algorithm.PredicateMetadata,
	info *schedulercache.NodeInfo,
	predicateFuncs map[string]algorithm.FitPredicate,
	ecache *EquivalenceCache,
	queue SchedulingQueue,
	alwaysCheckAllPredicates bool,
	equivCacheInfo *equivalenceClassInfo,
) (bool, []algorithm.PredicateFailureReason, error) {
	...
			if predicate, exist := predicateFuncs[predicateKey]; exist {
				// Use an in-line function to guarantee invocation of ecache.Unlock()
				// when the in-line function returns.
				func() {
					var invalid bool
					if eCacheAvailable {
						// Lock ecache here to avoid a race condition against cache invalidation invoked
						// in event handlers. This race has existed despite locks in equivClassCacheimplementation.
						ecache.Lock()
						defer ecache.Unlock()
						// PredicateWithECache will return its cached predicate results.
						fit, reasons, invalid = ecache.PredicateWithECache(
							pod.GetName(), info.Node().GetName(),
							predicateKey, equivCacheInfo.hash, false)
					}

					if !eCacheAvailable || invalid {
						// we need to execute predicate functions since equivalence cache does not work
						fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
						if err != nil {
							return
						}

						if eCacheAvailable {
							// Store data to update equivClassCacheafter this loop.
							if res, exists := predicateResults[predicateKey]; exists {
								res.Fit = res.Fit && fit
								res.FailReasons = append(res.FailReasons, reasons...)
								predicateResults[predicateKey] = res
							} else {
								predicateResults[predicateKey] = HostPredicate{Fit: fit, FailReasons: reasons}
							}
							result := predicateResults[predicateKey]
							ecache.UpdateCachedPredicateItem(
								pod.GetName(), info.Node().GetName(),
								predicateKey, result.Fit, result.FailReasons, equivCacheInfo.hash, false)
						}
					}
				}()

				...
}

podFitsOnNode时会先通过PredicateWithECache检查是否Equivalence Cache中有该缓存命中:

维护Equivalence Cache

我们回到Scheduler Config Factory,看看Scheduler中podInformer、nodeInformer、serviceInformer、pvcInformer等注册的EventHandler中对Equivalence Cache的操作。

Assume Pod

当完成pod的调度后,在Bind Node之前,会先进行Pod Assume,在Assume过程中,会对Equivalence Cache有操作。

// assume signals to the cache that a pod is already in the cache, so that binding can be asynchronous.
// assume modifies `assumed`.
func (sched *Scheduler) assume(assumed *v1.Pod, host string) error {
	...

	// Optimistically assume that the binding will succeed, so we need to invalidate affected
	// predicates in equivalence cache.
	// If the binding fails, these invalidated item will not break anything.
	if sched.config.Ecache != nil {
		sched.config.Ecache.InvalidateCachedPredicateItemForPodAdd(assumed, host)
	}
	return nil
}

Assume Pod时调用InvalidateCachedPredicateItemForPodAdd对Equivalence Cache进行操作。

func (ec *EquivalenceCache) InvalidateCachedPredicateItemForPodAdd(pod *v1.Pod, nodeName string) {
	// GeneralPredicates: will always be affected by adding a new pod
	invalidPredicates := sets.NewString("GeneralPredicates")

	// MaxPDVolumeCountPredicate: we check the volumes of pod to make decision.
	for _, vol := range pod.Spec.Volumes {
		if vol.PersistentVolumeClaim != nil {
			invalidPredicates.Insert("MaxEBSVolumeCount", "MaxGCEPDVolumeCount", "MaxAzureDiskVolumeCount")
		} else {
			if vol.AWSElasticBlockStore != nil {
				invalidPredicates.Insert("MaxEBSVolumeCount")
			}
			if vol.GCEPersistentDisk != nil {
				invalidPredicates.Insert("MaxGCEPDVolumeCount")
			}
			if vol.AzureDisk != nil {
				invalidPredicates.Insert("MaxAzureDiskVolumeCount")
			}
		}
	}
	ec.InvalidateCachedPredicateItem(nodeName, invalidPredicates)
}

InvalidateCachedPredicateItemForPodAdd中可以看出,Assume Pod会删除该node上以下predicateKey对应的predicateCache:

Update Pod in Scheduled Pod Cache

在scheduler进行NewConfigFactory时,注册Update assignedNonTerminatedPod Event Handler为updatePodInCache。

func (c *configFactory) updatePodInCache(oldObj, newObj interface{}) {
	...
	c.invalidateCachedPredicatesOnUpdatePod(newPod, oldPod)
	c.podQueue.AssignedPodUpdated(newPod)
}

func (c *configFactory) invalidateCachedPredicatesOnUpdatePod(newPod *v1.Pod, oldPod *v1.Pod) {
	if c.enableEquivalenceClassCache {
		// if the pod does not have bound node, updating equivalence cache is meaningless;
		// if pod's bound node has been changed, that case should be handled by pod add & delete.
		if len(newPod.Spec.NodeName) != 0 && newPod.Spec.NodeName == oldPod.Spec.NodeName {
			if !reflect.DeepEqual(oldPod.GetLabels(), newPod.GetLabels()) {
				// MatchInterPodAffinity need to be reconsidered for this node,
				// as well as all nodes in its same failure domain.
				c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(
					matchInterPodAffinitySet)
			}
			// if requested container resource changed, invalidate GeneralPredicates of this node
			if !reflect.DeepEqual(predicates.GetResourceRequest(newPod),
				predicates.GetResourceRequest(oldPod)) {
				c.equivalencePodCache.InvalidateCachedPredicateItem(
					newPod.Spec.NodeName, generalPredicatesSets)
			}
		}
	}
}

updatePodInCache调用invalidateCachedPredicatesOnUpdatePod对Equivalence Cache做了如下处理:

Delete Pod in Scheduled Pod Cache

同样的,当发生删除assignedNonTerminatedPod时,对应会调用invalidateCachedPredicatesOnDeletePod更新Equivalence Cache。

func (c *configFactory) invalidateCachedPredicatesOnDeletePod(pod *v1.Pod) {
	if c.enableEquivalenceClassCache {
		// part of this case is the same as pod add.
		c.equivalencePodCache.InvalidateCachedPredicateItemForPodAdd(pod, pod.Spec.NodeName)
		// MatchInterPodAffinity need to be reconsidered for this node,
		// as well as all nodes in its same failure domain.
		// TODO(resouer) can we just do this for nodes in the same failure domain
		c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(
			matchInterPodAffinitySet)

		// if this pod have these PV, cached result of disk conflict will become invalid.
		for _, volume := range pod.Spec.Volumes {
			if volume.GCEPersistentDisk != nil || volume.AWSElasticBlockStore != nil ||
				volume.RBD != nil || volume.ISCSI != nil {
				c.equivalencePodCache.InvalidateCachedPredicateItem(
					pod.Spec.NodeName, noDiskConflictSet)
			}
		}
	}
}

invalidateCachedPredicatesOnDeletePod更新Equivalence Cache的处理总结为:

Update Node

当发生node update event时,对应会调用invalidateCachedPredicatesOnNodeUpdate更新Equivalence Cache。

func (c *configFactory) invalidateCachedPredicatesOnNodeUpdate(newNode *v1.Node, oldNode *v1.Node) {
	if c.enableEquivalenceClassCache {
		// Begin to update equivalence cache based on node update
		// TODO(resouer): think about lazily initialize this set
		invalidPredicates := sets.NewString()

		if !reflect.DeepEqual(oldNode.Status.Allocatable, newNode.Status.Allocatable) {
			invalidPredicates.Insert(predicates.GeneralPred) // "PodFitsResources"
		}
		if !reflect.DeepEqual(oldNode.GetLabels(), newNode.GetLabels()) {
			invalidPredicates.Insert(predicates.GeneralPred, predicates.CheckServiceAffinityPred) // "PodSelectorMatches"
			for k, v := range oldNode.GetLabels() {
				// any label can be topology key of pod, we have to invalidate in all cases
				if v != newNode.GetLabels()[k] {
					invalidPredicates.Insert(predicates.MatchInterPodAffinityPred)
				}
				// NoVolumeZoneConflict will only be affected by zone related label change
				if isZoneRegionLabel(k) {
					if v != newNode.GetLabels()[k] {
						invalidPredicates.Insert(predicates.NoVolumeZoneConflictPred)
					}
				}
			}
		}

		oldTaints, oldErr := helper.GetTaintsFromNodeAnnotations(oldNode.GetAnnotations())
		if oldErr != nil {
			glog.Errorf("Failed to get taints from old node annotation for equivalence cache")
		}
		newTaints, newErr := helper.GetTaintsFromNodeAnnotations(newNode.GetAnnotations())
		if newErr != nil {
			glog.Errorf("Failed to get taints from new node annotation for equivalence cache")
		}
		if !reflect.DeepEqual(oldTaints, newTaints) ||
			!reflect.DeepEqual(oldNode.Spec.Taints, newNode.Spec.Taints) {
			invalidPredicates.Insert(predicates.PodToleratesNodeTaintsPred)
		}

		if !reflect.DeepEqual(oldNode.Status.Conditions, newNode.Status.Conditions) {
			oldConditions := make(map[v1.NodeConditionType]v1.ConditionStatus)
			newConditions := make(map[v1.NodeConditionType]v1.ConditionStatus)
			for _, cond := range oldNode.Status.Conditions {
				oldConditions[cond.Type] = cond.Status
			}
			for _, cond := range newNode.Status.Conditions {
				newConditions[cond.Type] = cond.Status
			}
			if oldConditions[v1.NodeMemoryPressure] != newConditions[v1.NodeMemoryPressure] {
				invalidPredicates.Insert(predicates.CheckNodeMemoryPressurePred)
			}
			if oldConditions[v1.NodeDiskPressure] != newConditions[v1.NodeDiskPressure] {
				invalidPredicates.Insert(predicates.CheckNodeDiskPressurePred)
			}
			if oldConditions[v1.NodeReady] != newConditions[v1.NodeReady] ||
				oldConditions[v1.NodeOutOfDisk] != newConditions[v1.NodeOutOfDisk] ||
				oldConditions[v1.NodeNetworkUnavailable] != newConditions[v1.NodeNetworkUnavailable] {
				invalidPredicates.Insert(predicates.CheckNodeConditionPred)
			}
		}
		if newNode.Spec.Unschedulable != oldNode.Spec.Unschedulable {
			invalidPredicates.Insert(predicates.CheckNodeConditionPred)
		}
		c.equivalencePodCache.InvalidateCachedPredicateItem(newNode.GetName(), invalidPredicates)
	}
}

因此,node update时,会删除该node对应的Equivalence Cache中如下PredicateKey的PredicateCache:

Delete Node

当发生node delete event时,对应会调用InvalidateAllCachedPredicateItemOfNode更新Equivalence Cache。

// InvalidateAllCachedPredicateItemOfNode marks all cached items on given node as invalid
func (ec *EquivalenceCache) InvalidateAllCachedPredicateItemOfNode(nodeName string) {
	ec.Lock()
	defer ec.Unlock()
	delete(ec.algorithmCache, nodeName)
	glog.V(5).Infof("Done invalidating all cached predicates on node: %s", nodeName)
}

因此,node delete时,则会从Equivalence Cache中删除整个node对应的algorthmCache。

Add or Delete PV

当发生pv add或者delete event时,对应会调用invalidatePredicatesForPv更新Equivalence Cache。

func (c *configFactory) invalidatePredicatesForPv(pv *v1.PersistentVolume) {
	// You could have a PVC that points to a PV, but the PV object doesn't exist.
	// So when the PV object gets added, we can recount.
	invalidPredicates := sets.NewString()

	// PV types which impact MaxPDVolumeCountPredicate
	if pv.Spec.AWSElasticBlockStore != nil {
		invalidPredicates.Insert(predicates.MaxEBSVolumeCountPred)
	}
	if pv.Spec.GCEPersistentDisk != nil {
		invalidPredicates.Insert(predicates.MaxGCEPDVolumeCountPred)
	}
	if pv.Spec.AzureDisk != nil {
		invalidPredicates.Insert(predicates.MaxAzureDiskVolumeCountPred)
	}

	// If PV contains zone related label, it may impact cached NoVolumeZoneConflict
	for k := range pv.Labels {
		if isZoneRegionLabel(k) {
			invalidPredicates.Insert(predicates.NoVolumeZoneConflictPred)
			break
		}
	}

	if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
		// Add/delete impacts the available PVs to choose from
		invalidPredicates.Insert(predicates.CheckVolumeBindingPred)
	}

	c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates)
}

因此,当add或者delete PV时,会从Equivalence Cache中删除所有nodes的以下predicateKey对应的PredicateCache:

Update PV

当发生pv update event时,对应会调用invalidatePredicatesForPvUpdate更新Equivalence Cache。

func (c *configFactory) invalidatePredicatesForPvUpdate(oldPV, newPV *v1.PersistentVolume) {
	invalidPredicates := sets.NewString()
	for k, v := range newPV.Labels {
		// If PV update modifies the zone/region labels.
		if isZoneRegionLabel(k) && !reflect.DeepEqual(v, oldPV.Labels[k]) {
			invalidPredicates.Insert(predicates.NoVolumeZoneConflictPred)
			break
		}
	}
	c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates)
}

因此,当update PV时,会从Equivalence Cache中删除所有nodes的以下predicateKey对应的PredicateCache:

Add or Delete PVC
func (c *configFactory) invalidatePredicatesForPvc(pvc *v1.PersistentVolumeClaim) {
	// We need to do this here because the ecache uses PVC uid as part of equivalence hash of pod

	// The bound volume type may change
	invalidPredicates := sets.NewString(maxPDVolumeCountPredicateKeys...)

	// The bound volume's label may change
	invalidPredicates.Insert(predicates.NoVolumeZoneConflictPred)

	if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
		// Add/delete impacts the available PVs to choose from
		invalidPredicates.Insert(predicates.CheckVolumeBindingPred)
	}
	c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates)
}

当发生pvc add或者delete event时,会从Equivalence Cache中删除所有nodes的以下predicateKey对应的PredicateCache:

Update PVC
func (c *configFactory) invalidatePredicatesForPvcUpdate(old, new *v1.PersistentVolumeClaim) {
	invalidPredicates := sets.NewString()

	if old.Spec.VolumeName != new.Spec.VolumeName {
		if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
			// PVC volume binding has changed
			invalidPredicates.Insert(predicates.CheckVolumeBindingPred)
		}
		// The bound volume type may change
		invalidPredicates.Insert(maxPDVolumeCountPredicateKeys...)
	}

	c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates)
}

当发生pvc update event时,会从Equivalence Cache中删除所有nodes的以下predicateKey对应的PredicateCache:

Add or Delete Service
func (c *configFactory) onServiceAdd(obj interface{}) {
	if c.enableEquivalenceClassCache {
		c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(serviceAffinitySet)
	}
	c.podQueue.MoveAllToActiveQueue()
}

func (c *configFactory) onServiceDelete(obj interface{}) {
	if c.enableEquivalenceClassCache {
		c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(serviceAffinitySet)
	}
	c.podQueue.MoveAllToActiveQueue()
}

当发生Service Add或Delete event时,会从Equivalence Cache中删除所有nodes的以下predicateKey对应的PredicateCache:

Update Service
func (c *configFactory) onServiceUpdate(oldObj interface{}, newObj interface{}) {
	if c.enableEquivalenceClassCache {
		// TODO(resouer) We may need to invalidate this for specified group of pods only
		oldService := oldObj.(*v1.Service)
		newService := newObj.(*v1.Service)
		if !reflect.DeepEqual(oldService.Spec.Selector, newService.Spec.Selector) {
			c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(serviceAffinitySet)
		}
	}
	c.podQueue.MoveAllToActiveQueue()
}

当发生Service Update event时,会从Equivalence Cache中删除所有nodes的以下predicateKey对应的PredicateCache:

Equivalence Class的不足

到此,相信大家对“kubernetes提升Scheduler吞吐量的工作机制是什么”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

推荐阅读:
  1. redis如何通过pipeline提升吞吐量
  2. kubernetes中的Scheduler原理是什么

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

kubernetes scheduler

上一篇:kubernetes Volume有什么作用

下一篇:thinkphp中如何使用s方法

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》