Kubernetes Replication Controller的结构定义是什么

发布时间:2021-12-20 10:14:45 作者:iii
来源:亿速云 阅读:149

本篇内容主要讲解“Kubernetes Replication Controller的结构定义是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Kubernetes Replication Controller的结构定义是什么”吧!

ReplicationManager

ReplicationManager就是ReplicationController控制器对象,方便在代码中和ReplicationController Resource API Object进行区分。下面代码是ReplicationManager的结构定义。

pkg/controller/replication/replication_controller.go:75

// ReplicationManager is responsible for synchronizing ReplicationController objects stored in the system with actual running pods.
type ReplicationManager struct {
	kubeClient clientset.Interface
	podControl controller.PodControlInterface

	// internalPodInformer is used to hold a personal informer.  If we're using
	// a normal shared informer, then the informer will be started for us.  If
	// we have a personal informer, we must start it ourselves.   If you start
	// the controller using NewReplicationManager(passing SharedInformer), this
	// will be null
	internalPodInformer cache.SharedIndexInformer

	// An rc is temporarily suspended after creating/deleting these many replicas.
	// It resumes normal action after observing the watch events for them.
	burstReplicas int
	// To allow injection of syncReplicationController for testing.
	syncHandler func(rcKey string) error

	// A TTLCache of pod creates/deletes each rc expects to see.
	expectations *controller.UIDTrackingControllerExpectations

	// A store of replication controllers, populated by the rcController
	rcStore cache.StoreToReplicationControllerLister
	// Watches changes to all replication controllers
	rcController *cache.Controller
	// A store of pods, populated by the podController
	podStore cache.StoreToPodLister
	// Watches changes to all pods
	podController cache.ControllerInterface
	// podStoreSynced returns true if the pod store has been synced at least once.
	// Added as a member to the struct to allow injection for testing.
	podStoreSynced func() bool

	lookupCache *controller.MatchingCache

	// Controllers that need to be synced
	queue workqueue.RateLimitingInterface

	// garbageCollectorEnabled denotes if the garbage collector is enabled. RC
	// manager behaves differently if GC is enabled.
	garbageCollectorEnabled bool
}

重点对下面个几个对象介绍说明:

ReplicationController在何处启动的

看过我我的博文: Kubernetes ResourceQuota Controller内部实现原理及源码分析的可能有印象,里面也提到了controller manager是如何启动ResourceQuotaController的,ReplicationController也是一样的。在kube-controller-manager调用newControllerInitializers进行控制器初始化的时候,将startReplicationController注册进去了,用来启动ReplicationController控制器。

cmd/kube-controller-manager/app/controllermanager.go:224

func newControllerInitializers() map[string]InitFunc {
	controllers := map[string]InitFunc{}
	controllers["endpoint"] = startEndpointController
	controllers["replicationcontroller"] = startReplicationController
	controllers["podgc"] = startPodGCController
	controllers["resourcequota"] = startResourceQuotaController
	controllers["namespace"] = startNamespaceController
	controllers["serviceaccount"] = startServiceAccountController
	controllers["garbagecollector"] = startGarbageCollectorController
	controllers["daemonset"] = startDaemonSetController
	controllers["job"] = startJobController
	controllers["deployment"] = startDeploymentController
	controllers["replicaset"] = startReplicaSetController
	controllers["horizontalpodautoscaling"] = startHPAController
	controllers["disruption"] = startDisruptionController
	controllers["statefuleset"] = startStatefulSetController
	controllers["cronjob"] = startCronJobController
	controllers["certificatesigningrequests"] = startCSRController

	return controllers
}

代码继续跟到startReplicationController,很简单,启动一个goroutine,调用replicationcontroller.NewReplicationManager创建一个ReplicationManager并执行其中Run方法开始工作。

cmd/kube-controller-manager/app/core.go:55

func startReplicationController(ctx ControllerContext) (bool, error) {
	go replicationcontroller.NewReplicationManager(
		ctx.InformerFactory.Pods().Informer(),
		ctx.ClientBuilder.ClientOrDie("replication-controller"),
		ResyncPeriod(&ctx.Options),
		replicationcontroller.BurstReplicas,
		int(ctx.Options.LookupCacheSizeForRC),
		ctx.Options.EnableGarbageCollector,
	).Run(int(ctx.Options.ConcurrentRCSyncs), ctx.Stop)
	return true, nil
}

创建ReplicationManager

上面分析到,controller-manager通过NewReplicationManager创建一个ReplicationManager对象,其实就是ReplicationController控制器。

pkg/controller/replication/replication_controller.go:122

// NewReplicationManager creates a replication manager
func NewReplicationManager(podInformer cache.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int, garbageCollectorEnabled bool) *ReplicationManager {
	eventBroadcaster := record.NewBroadcaster()
	eventBroadcaster.StartLogging(glog.Infof)
	eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.Core().Events("")})
	return newReplicationManager(
		eventBroadcaster.NewRecorder(v1.EventSource{Component: "replication-controller"}),
		podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize, garbageCollectorEnabled)
}



pkg/controller/replication/replication_controller.go:132
// newReplicationManager configures a replication manager with the specified event recorder
func newReplicationManager(eventRecorder record.EventRecorder, podInformer cache.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int, garbageCollectorEnabled bool) *ReplicationManager {
	if kubeClient != nil && kubeClient.Core().RESTClient().GetRateLimiter() != nil {
		metrics.RegisterMetricAndTrackRateLimiterUsage("replication_controller", kubeClient.Core().RESTClient().GetRateLimiter())
	}

	rm := &ReplicationManager{
		kubeClient: kubeClient,
		podControl: controller.RealPodControl{
			KubeClient: kubeClient,
			Recorder:   eventRecorder,
		},
		burstReplicas: burstReplicas,
		expectations:  controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
		queue:         workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "replicationmanager"),
		garbageCollectorEnabled: garbageCollectorEnabled,
	}

	rm.rcStore.Indexer, rm.rcController = cache.NewIndexerInformer(
		&cache.ListWatch{
			ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
				return rm.kubeClient.Core().ReplicationControllers(v1.NamespaceAll).List(options)
			},
			WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
				return rm.kubeClient.Core().ReplicationControllers(v1.NamespaceAll).Watch(options)
			},
		},
		&v1.ReplicationController{},
		// TODO: Can we have much longer period here?
		FullControllerResyncPeriod,
		cache.ResourceEventHandlerFuncs{
			AddFunc:    rm.enqueueController,
			UpdateFunc: rm.updateRC,
			// This will enter the sync loop and no-op, because the controller has been deleted from the store.
			// Note that deleting a controller immediately after scaling it to 0 will not work. The recommended
			// way of achieving this is by performing a `stop` operation on the controller.
			DeleteFunc: rm.enqueueController,
		},
		cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
	)

	podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: rm.addPod,
		// This invokes the rc for every pod change, eg: host assignment. Though this might seem like overkill
		// the most frequent pod update is status, and the associated rc will only list from local storage, so
		// it should be ok.
		UpdateFunc: rm.updatePod,
		DeleteFunc: rm.deletePod,
	})
	rm.podStore.Indexer = podInformer.GetIndexer()
	rm.podController = podInformer.GetController()

	rm.syncHandler = rm.syncReplicationController
	rm.podStoreSynced = rm.podController.HasSynced
	rm.lookupCache = controller.NewMatchingCache(lookupCacheSize)
	return rm
}

newReplicationManager中主要配置ReplicationManager,比如:

执行ReplicationManger.Run开始工作

ReplicationManager创建好了,接下来得干活啦。Run方法就是干活的起步点,开始进行watching and syncing

pkg/controller/replication/replication_controller.go:217

// Run begins watching and syncing.
func (rm *ReplicationManager) Run(workers int, stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
	glog.Infof("Starting RC Manager")
	go rm.rcController.Run(stopCh)
	go rm.podController.Run(stopCh)
	for i := 0; i < workers; i++ {
		go wait.Until(rm.worker, time.Second, stopCh)
	}

	if rm.internalPodInformer != nil {
		go rm.internalPodInformer.Run(stopCh)
	}

	<-stopCh
	glog.Infof("Shutting down RC Manager")
	rm.queue.ShutDown()
}

下面是rcController和podController的Run方法实现,功能就是完成rc / pod的watch。

pkg/client/cache/controller.go:84

// Run begins processing items, and will continue until a value is sent down stopCh.
// It's an error to call Run more than once.
// Run blocks; call via go.
func (c *Controller) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
	r := NewReflector(
		c.config.ListerWatcher,
		c.config.ObjectType,
		c.config.Queue,
		c.config.FullResyncPeriod,
	)

	c.reflectorMutex.Lock()
	c.reflector = r
	c.reflectorMutex.Unlock()

	r.RunUntil(stopCh)

	wait.Until(c.processLoop, time.Second, stopCh)
}

sync的关键实现,就在ReplicationManager的worker方法中,代码如下。

pkg/controller/replication/replication_controller.go:488

// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (rm *ReplicationManager) worker() {
	workFunc := func() bool {
		key, quit := rm.queue.Get()
		if quit {
			return true
		}
		defer rm.queue.Done(key)

		err := rm.syncHandler(key.(string))
		if err == nil {
			rm.queue.Forget(key)
			return false
		}

		rm.queue.AddRateLimited(key)
		utilruntime.HandleError(err)
		return false
	}
	for {
		if quit := workFunc(); quit {
			glog.Infof("replication controller worker shutting down")
			return
		}
	}
}

worker中的主要逻辑为:

在newReplicationManager时,通过rm.syncHandler = rm.syncReplicationController注册syncHandler为syncReplicationController了。因此sync rc的逻辑就在syncReplicationController中了。

pkg/controller/replication/replication_controller.go:639

// syncReplicationController will sync the rc with the given key if it has had its expectations fulfilled, meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked concurrently with the same key.

func (rm *ReplicationManager) syncReplicationController(key string) error {
	trace := util.NewTrace("syncReplicationController: " + key)
	defer trace.LogIfLong(250 * time.Millisecond)

	startTime := time.Now()
	defer func() {
		glog.V(4).Infof("Finished syncing controller %q (%v)", key, time.Now().Sub(startTime))
	}()

	if !rm.podStoreSynced() {
		// Sleep so we give the pod reflector goroutine a chance to run.
		time.Sleep(PodStoreSyncedPollPeriod)
		glog.Infof("Waiting for pods controller to sync, requeuing rc %v", key)
		rm.queue.Add(key)
		return nil
	}

	obj, exists, err := rm.rcStore.Indexer.GetByKey(key)
	if !exists {
		glog.Infof("Replication Controller has been deleted %v", key)
		rm.expectations.DeleteExpectations(key)
		return nil
	}
	if err != nil {
		return err
	}
	rc := *obj.(*v1.ReplicationController)

	trace.Step("ReplicationController restored")
	rcNeedsSync := rm.expectations.SatisfiedExpectations(key)
	trace.Step("Expectations restored")

	// NOTE: filteredPods are pointing to objects from cache - if you need to
	// modify them, you need to copy it first.
	// TODO: Do the List and Filter in a single pass, or use an index.
	var filteredPods []*v1.Pod
	if rm.garbageCollectorEnabled {
		// list all pods to include the pods that don't match the rc's selector
		// anymore but has the stale controller ref.
		pods, err := rm.podStore.Pods(rc.Namespace).List(labels.Everything())
		if err != nil {
			glog.Errorf("Error getting pods for rc %q: %v", key, err)
			rm.queue.Add(key)
			return err
		}
		cm := controller.NewPodControllerRefManager(rm.podControl, rc.ObjectMeta, labels.Set(rc.Spec.Selector).AsSelectorPreValidated(), getRCKind())
		matchesAndControlled, matchesNeedsController, controlledDoesNotMatch := cm.Classify(pods)
		// Adopt pods only if this replication controller is not going to be deleted.
		if rc.DeletionTimestamp == nil {
			for _, pod := range matchesNeedsController {
				err := cm.AdoptPod(pod)
				// continue to next pod if adoption fails.
				if err != nil {
					// If the pod no longer exists, don't even log the error.
					if !errors.IsNotFound(err) {
						utilruntime.HandleError(err)
					}
				} else {
					matchesAndControlled = append(matchesAndControlled, pod)
				}
			}
		}
		filteredPods = matchesAndControlled
		// remove the controllerRef for the pods that no longer have matching labels
		var errlist []error
		for _, pod := range controlledDoesNotMatch {
			err := cm.ReleasePod(pod)
			if err != nil {
				errlist = append(errlist, err)
			}
		}
		if len(errlist) != 0 {
			aggregate := utilerrors.NewAggregate(errlist)
			// push the RC into work queue again. We need to try to free the
			// pods again otherwise they will stuck with the stale
			// controllerRef.
			rm.queue.Add(key)
			return aggregate
		}
	} else {
		pods, err := rm.podStore.Pods(rc.Namespace).List(labels.Set(rc.Spec.Selector).AsSelectorPreValidated())
		if err != nil {
			glog.Errorf("Error getting pods for rc %q: %v", key, err)
			rm.queue.Add(key)
			return err
		}
		filteredPods = controller.FilterActivePods(pods)
	}

	var manageReplicasErr error
	if rcNeedsSync && rc.DeletionTimestamp == nil {
		manageReplicasErr = rm.manageReplicas(filteredPods, &rc)
	}
	trace.Step("manageReplicas done")

	newStatus := calculateStatus(rc, filteredPods, manageReplicasErr)

	// Always updates status as pods come up or die.
	if err := updateReplicationControllerStatus(rm.kubeClient.Core().ReplicationControllers(rc.Namespace), rc, newStatus); err != nil {
		// Multiple things could lead to this update failing.  Returning an error causes a requeue without forcing a hotloop
		return err
	}

	return manageReplicasErr
}

syncReplicationController的主要逻辑为:

  1. 如果podStore还没有被同步过一次,则将该rc的key重新加入到queue中,以等待podStore同步,流程结束,否则继续后面的流程。

  2. 根据该rc的key值,从rcStore中获取对应的rc object,如果不存在该rc object,则说明该rc已经被删除了,然后根据key从epectations中删除该rc并返回,流程结束。如果存在该rc object,则继续后面的流程。

  3. 检测expectations中的add和del以及距离上一个时间戳是否超时5min,来判断该rc是否需要sync。

  4. 如果启动了GC,则获取podStore中整个namespace下的pods,然后将matchesAndControlled和matchesNeedsController的pods作为过滤后待同步的filteredPods。如果没有启动GC,则直接获取podStore中该namespace下匹配rc.Spec.Selector的Active状态的pods作为过滤后待同步的filteredPods。(关于matchesAndControlled和matchesNeedsController的理解,请参考pkg/controller/controller_ref_manager.go:57中定义的PodControllerRefManager.Classify函数)

  5. 如果第3步中检测到该rc需要sync,并且DeletionTimestamp这个时间戳为nil,则调用manageReplicas方法,使得该rc管理的active状态的pods数量和期望值一样。

  6. 执行完manageReplicas后,需要马上重新计算一下rc的status,更新status中的Conditions,Replicas,FullyLabeledReplicas,ReadyReplicas,AvailableReplicas信息。

  7. 通过updateReplicationControllerStatus方法调用kube-api-server的接口更新该rc的status为上一步重新计算后的新status,流程结束。

上面描述的syncReplicationController流程中,一个很关键的步骤是step 5中调用的manageReplicas方法,它负责rc对应replicas的修复工作(add or delete)。

pkg/controller/replication/replication_controller.go:516

// manageReplicas checks and updates replicas for the given replication controller.
// Does NOT modify <filteredPods>.
func (rm *ReplicationManager) manageReplicas(filteredPods []*v1.Pod, rc *v1.ReplicationController) error {
	diff := len(filteredPods) - int(*(rc.Spec.Replicas))
	rcKey, err := controller.KeyFunc(rc)
	if err != nil {
		return err
	}
	if diff == 0 {
		return nil
	}

	if diff < 0 {
		diff *= -1
		if diff > rm.burstReplicas {
			diff = rm.burstReplicas
		}
		// TODO: Track UIDs of creates just like deletes. The problem currently
		// is we'd need to wait on the result of a create to record the pod's
		// UID, which would require locking *across* the create, which will turn
		// into a performance bottleneck. We should generate a UID for the pod
		// beforehand and store it via ExpectCreations.
		errCh := make(chan error, diff)
		rm.expectations.ExpectCreations(rcKey, diff)
		var wg sync.WaitGroup
		wg.Add(diff)
		glog.V(2).Infof("Too few %q/%q replicas, need %d, creating %d", rc.Namespace, rc.Name, *(rc.Spec.Replicas), diff)
		for i := 0; i < diff; i++ {
			go func() {
				defer wg.Done()
				var err error
				if rm.garbageCollectorEnabled {
					var trueVar = true
					controllerRef := &metav1.OwnerReference{
						APIVersion: getRCKind().GroupVersion().String(),
						Kind:       getRCKind().Kind,
						Name:       rc.Name,
						UID:        rc.UID,
						Controller: &trueVar,
					}
					err = rm.podControl.CreatePodsWithControllerRef(rc.Namespace, rc.Spec.Template, rc, controllerRef)
				} else {
					err = rm.podControl.CreatePods(rc.Namespace, rc.Spec.Template, rc)
				}
				if err != nil {
					// Decrement the expected number of creates because the informer won't observe this pod
					glog.V(2).Infof("Failed creation, decrementing expectations for controller %q/%q", rc.Namespace, rc.Name)
					rm.expectations.CreationObserved(rcKey)
					errCh <- err
					utilruntime.HandleError(err)
				}
			}()
		}
		wg.Wait()

		select {
		case err := <-errCh:
			// all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit.
			if err != nil {
				return err
			}
		default:
		}

		return nil
	}

	if diff > rm.burstReplicas {
		diff = rm.burstReplicas
	}
	glog.V(2).Infof("Too many %q/%q replicas, need %d, deleting %d", rc.Namespace, rc.Name, *(rc.Spec.Replicas), diff)
	// No need to sort pods if we are about to delete all of them
	if *(rc.Spec.Replicas) != 0 {
		// Sort the pods in the order such that not-ready < ready, unscheduled
		// < scheduled, and pending < running. This ensures that we delete pods
		// in the earlier stages whenever possible.
		sort.Sort(controller.ActivePods(filteredPods))
	}
	// Snapshot the UIDs (ns/name) of the pods we're expecting to see
	// deleted, so we know to record their expectations exactly once either
	// when we see it as an update of the deletion timestamp, or as a delete.
	// Note that if the labels on a pod/rc change in a way that the pod gets
	// orphaned, the rs will only wake up after the expectations have
	// expired even if other pods are deleted.
	deletedPodKeys := []string{}
	for i := 0; i < diff; i++ {
		deletedPodKeys = append(deletedPodKeys, controller.PodKey(filteredPods[i]))
	}
	// We use pod namespace/name as a UID to wait for deletions, so if the
	// labels on a pod/rc change in a way that the pod gets orphaned, the
	// rc will only wake up after the expectation has expired.
	errCh := make(chan error, diff)
	rm.expectations.ExpectDeletions(rcKey, deletedPodKeys)
	var wg sync.WaitGroup
	wg.Add(diff)
	for i := 0; i < diff; i++ {
		go func(ix int) {
			defer wg.Done()
			if err := rm.podControl.DeletePod(rc.Namespace, filteredPods[ix].Name, rc); err != nil {
				// Decrement the expected number of deletes because the informer won't observe this deletion
				podKey := controller.PodKey(filteredPods[ix])
				glog.V(2).Infof("Failed to delete %v due to %v, decrementing expectations for controller %q/%q", podKey, err, rc.Namespace, rc.Name)
				rm.expectations.DeletionObserved(rcKey, podKey)
				errCh <- err
				utilruntime.HandleError(err)
			}
		}(i)
	}
	wg.Wait()

	select {
	case err := <-errCh:
		// all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit.
		if err != nil {
			return err
		}
	default:
	}

	return nil

}

上面manageReplicas代码的主要逻辑为:

到此,相信大家对“Kubernetes Replication Controller的结构定义是什么”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

推荐阅读:
  1. Kubernetes之标签与Pod控制器详解
  2. Kubernetes基本概念

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

kubernetes replication controller

上一篇:phyml是怎样基于最大似然法构建进化树的

下一篇:Kubernetes Cluster HA如何配置

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》