您好,登录后才能下订单哦!
在Kubernetes中,Controller是核心组件之一,负责维护集群中各种资源的状态。ReplicaSetController是Kubernetes中一个重要的Controller,它负责确保指定数量的Pod副本始终在运行。本文将深入分析ReplicaSetController的实现细节,并通过代码示例展示其工作原理。
ReplicaSetController的主要职责是确保在任何时候都有指定数量的Pod副本在运行。它通过监听ReplicaSet和Pod的变化,并根据这些变化调整Pod的数量,以达到期望的状态。
ReplicaSetController的实现主要分为以下几个部分:
在Kubernetes中,Controller的初始化通常在cmd/kube-controller-manager/app/controllermanager.go文件中进行。ReplicaSetController的初始化代码如下:
func startReplicaSetController(ctx ControllerContext) (http.Handler, bool, error) {
    if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "replicasets"}] {
        return nil, false, nil
    }
    go replicaset.NewReplicaSetController(
        ctx.InformerFactory.Apps().V1().ReplicaSets(),
        ctx.InformerFactory.Core().V1().Pods(),
        ctx.ClientBuilder.ClientOrDie("replicaset-controller"),
        replicaset.BurstReplicas,
    ).Run(int(ctx.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs), ctx.Stop)
    return nil, true, nil
}
在这段代码中,NewReplicaSetController函数用于创建一个新的ReplicaSetController实例。它接收三个参数:
ReplicaSets Informer:用于监听ReplicaSet资源的变化。Pods Informer:用于监听Pod资源的变化。Client:用于与Kubernetes API Server进行交互。ReplicaSetController通过Informer监听ReplicaSet和Pod资源的变化。Informer是Kubernetes中用于监听资源变化的机制,它会在资源发生变化时触发相应的事件处理函数。
func NewReplicaSetController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int) *ReplicaSetController {
    eventBroadcaster := record.NewBroadcaster()
    eventBroadcaster.StartLogging(klog.Infof)
    eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
    rsc := &ReplicaSetController{
        kubeClient: kubeClient,
        burstReplicas: burstReplicas,
        expectations: controller.NewControllerExpectations(),
        queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "replicaset"),
    }
    rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: rsc.enqueueReplicaSet,
        UpdateFunc: func(old, cur interface{}) {
            oldRS := old.(*apps.ReplicaSet)
            curRS := cur.(*apps.ReplicaSet)
            if oldRS.Status.Replicas != curRS.Status.Replicas {
                klog.V(4).Infof("Observed updated replica count for ReplicaSet: %v, %d->%d", curRS.Name, oldRS.Status.Replicas, curRS.Status.Replicas)
            }
            rsc.enqueueReplicaSet(cur)
        },
        DeleteFunc: rsc.enqueueReplicaSet,
    })
    rsc.rsLister = rsInformer.Lister()
    rsc.rsListerSynced = rsInformer.Informer().HasSynced
    podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: rsc.addPod,
        UpdateFunc: rsc.updatePod,
        DeleteFunc: rsc.deletePod,
    })
    rsc.podLister = podInformer.Lister()
    rsc.podListerSynced = podInformer.Informer().HasSynced
    rsc.syncHandler = rsc.syncReplicaSet
    return rsc
}
在这段代码中,AddEventHandler方法用于注册事件处理函数。当ReplicaSet或Pod资源发生变化时,相应的事件处理函数会被调用。
当ReplicaSet或Pod资源发生变化时,Controller会调用相应的事件处理函数。例如,当ReplicaSet被创建、更新或删除时,enqueueReplicaSet函数会被调用,将ReplicaSet加入到工作队列中。
func (rsc *ReplicaSetController) enqueueReplicaSet(obj interface{}) {
    key, err := controller.KeyFunc(obj)
    if err != nil {
        utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))
        return
    }
    rsc.queue.Add(key)
}
enqueueReplicaSet函数会将ReplicaSet的Key加入到工作队列中,等待后续处理。
ReplicaSetController的核心逻辑是同步ReplicaSet的状态。syncReplicaSet函数负责处理工作队列中的ReplicaSet,并根据需要调整Pod的数量。
func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
    startTime := time.Now()
    defer func() {
        klog.V(4).Infof("Finished syncing replica set %q (%v)", key, time.Since(startTime))
    }()
    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
        return err
    }
    rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
    if errors.IsNotFound(err) {
        klog.V(4).Infof("ReplicaSet has been deleted %v", key)
        return nil
    }
    if err != nil {
        return err
    }
    // List all Pods owned by this ReplicaSet
    podList, err := rsc.podLister.Pods(rs.Namespace).List(labels.Set(rs.Spec.Selector.MatchLabels).AsSelector())
    if err != nil {
        return err
    }
    // Filter out inactive Pods
    filteredPods := controller.FilterActivePods(podList)
    // Manage the ReplicaSet's replicas
    manageReplicasErr := rsc.manageReplicas(filteredPods, rs)
    // Update the ReplicaSet's status
    newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)
    updatedRS, err := updateReplicaSetStatus(rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)
    if err != nil {
        return err
    }
    // Resync the ReplicaSet after MinReadySeconds
    if updatedRS.Spec.MinReadySeconds > 0 && updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) {
        rsc.enqueueReplicaSetAfter(updatedRS, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second)
    }
    return manageReplicasErr
}
syncReplicaSet函数的主要步骤如下:
MinReadySeconds,则在指定时间后重新同步ReplicaSet。manageReplicas函数负责根据ReplicaSet的配置,调整Pod的数量。如果实际运行的Pod数量少于期望的副本数,则创建新的Pod;如果实际运行的Pod数量多于期望的副本数,则删除多余的Pod。
func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
    diff := len(filteredPods) - int(*(rs.Spec.Replicas))
    rsKey, err := controller.KeyFunc(rs)
    if err != nil {
        utilruntime.HandleError(fmt.Errorf("couldn't get key for ReplicaSet %#v: %v", rs, err))
        return nil
    }
    if diff < 0 {
        diff *= -1
        if diff > rsc.burstReplicas {
            diff = rsc.burstReplicas
        }
        rsc.expectations.ExpectCreations(rsKey, diff)
        klog.V(2).Infof("Too few replicas for %v %v/%v, need %d, creating %d", rs.Kind, rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff)
        var errs []error
        for i := 0; i < diff; i++ {
            err := rsc.podControl.CreatePods(rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind))
            if err != nil {
                rsc.expectations.CreationObserved(rsKey)
                errs = append(errs, err)
            }
        }
        return utilerrors.NewAggregate(errs)
    } else if diff > 0 {
        if diff > rsc.burstReplicas {
            diff = rsc.burstReplicas
        }
        klog.V(2).Infof("Too many replicas for %v %v/%v, need %d, deleting %d", rs.Kind, rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff)
        podsToDelete := getPodsToDelete(filteredPods, diff)
        rsc.expectations.ExpectDeletions(rsKey, getPodKeys(podsToDelete))
        var errs []error
        for _, pod := range podsToDelete {
            err := rsc.podControl.DeletePod(rs.Namespace, pod.Name, rs)
            if err != nil {
                rsc.expectations.DeletionObserved(rsKey, controller.PodKey(pod))
                errs = append(errs, err)
            }
        }
        return utilerrors.NewAggregate(errs)
    }
    return nil
}
manageReplicas函数的主要步骤如下:
ReplicaSetController是Kubernetes中一个重要的Controller,它负责确保指定数量的Pod副本始终在运行。本文通过分析ReplicaSetController的实现细节,展示了其工作原理。ReplicaSetController通过监听ReplicaSet和Pod资源的变化,并根据这些变化调整Pod的数量,以达到期望的状态。理解ReplicaSetController的实现有助于更好地理解Kubernetes的工作机制,并为开发自定义Controller提供参考。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。