Controller实现ReplicaSetController的示例分析

发布时间:2021-12-15 18:55:46 作者:柒染
来源:亿速云 阅读:190

Controller实现ReplicaSetController的示例分析

引言

在Kubernetes中,Controller是核心组件之一,负责维护集群中各种资源的状态。ReplicaSetController是Kubernetes中一个重要的Controller,它负责确保指定数量的Pod副本始终在运行。本文将深入分析ReplicaSetController的实现细节,并通过代码示例展示其工作原理。

ReplicaSetController概述

ReplicaSetController的主要职责是确保在任何时候都有指定数量的Pod副本在运行。它通过监听ReplicaSet和Pod的变化,并根据这些变化调整Pod的数量,以达到期望的状态。

ReplicaSetController的核心功能

  1. 监听ReplicaSet的变化:ReplicaSetController会监听ReplicaSet资源的变化,当ReplicaSet被创建、更新或删除时,Controller会做出相应的反应。
  2. 监听Pod的变化:Controller还会监听Pod资源的变化,当Pod被创建、更新或删除时,Controller会根据ReplicaSet的配置调整Pod的数量。
  3. 调整Pod数量:Controller会根据ReplicaSet中定义的副本数,创建或删除Pod,以确保实际运行的Pod数量与期望的副本数一致。

ReplicaSetController的实现

ReplicaSetController的实现主要分为以下几个部分:

  1. 初始化Controller:在Kubernetes启动时,ReplicaSetController会被初始化,并注册到Controller Manager中。
  2. 监听资源变化:Controller会通过Informer监听ReplicaSet和Pod资源的变化。
  3. 处理资源变化事件:当ReplicaSet或Pod资源发生变化时,Controller会处理相应的事件,并根据需要调整Pod的数量。
  4. 同步ReplicaSet状态:Controller会定期同步ReplicaSet的状态,确保实际运行的Pod数量与期望的副本数一致。

初始化Controller

在Kubernetes中,Controller的初始化通常在cmd/kube-controller-manager/app/controllermanager.go文件中进行。ReplicaSetController的初始化代码如下:

func startReplicaSetController(ctx ControllerContext) (http.Handler, bool, error) {
    if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "replicasets"}] {
        return nil, false, nil
    }
    go replicaset.NewReplicaSetController(
        ctx.InformerFactory.Apps().V1().ReplicaSets(),
        ctx.InformerFactory.Core().V1().Pods(),
        ctx.ClientBuilder.ClientOrDie("replicaset-controller"),
        replicaset.BurstReplicas,
    ).Run(int(ctx.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs), ctx.Stop)
    return nil, true, nil
}

在这段代码中,NewReplicaSetController函数用于创建一个新的ReplicaSetController实例。它接收三个参数:

监听资源变化

ReplicaSetController通过Informer监听ReplicaSet和Pod资源的变化。Informer是Kubernetes中用于监听资源变化的机制,它会在资源发生变化时触发相应的事件处理函数。

func NewReplicaSetController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int) *ReplicaSetController {
    eventBroadcaster := record.NewBroadcaster()
    eventBroadcaster.StartLogging(klog.Infof)
    eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})

    rsc := &ReplicaSetController{
        kubeClient: kubeClient,
        burstReplicas: burstReplicas,
        expectations: controller.NewControllerExpectations(),
        queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "replicaset"),
    }

    rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: rsc.enqueueReplicaSet,
        UpdateFunc: func(old, cur interface{}) {
            oldRS := old.(*apps.ReplicaSet)
            curRS := cur.(*apps.ReplicaSet)
            if oldRS.Status.Replicas != curRS.Status.Replicas {
                klog.V(4).Infof("Observed updated replica count for ReplicaSet: %v, %d->%d", curRS.Name, oldRS.Status.Replicas, curRS.Status.Replicas)
            }
            rsc.enqueueReplicaSet(cur)
        },
        DeleteFunc: rsc.enqueueReplicaSet,
    })

    rsc.rsLister = rsInformer.Lister()
    rsc.rsListerSynced = rsInformer.Informer().HasSynced

    podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: rsc.addPod,
        UpdateFunc: rsc.updatePod,
        DeleteFunc: rsc.deletePod,
    })

    rsc.podLister = podInformer.Lister()
    rsc.podListerSynced = podInformer.Informer().HasSynced

    rsc.syncHandler = rsc.syncReplicaSet

    return rsc
}

在这段代码中,AddEventHandler方法用于注册事件处理函数。当ReplicaSet或Pod资源发生变化时,相应的事件处理函数会被调用。

处理资源变化事件

当ReplicaSet或Pod资源发生变化时,Controller会调用相应的事件处理函数。例如,当ReplicaSet被创建、更新或删除时,enqueueReplicaSet函数会被调用,将ReplicaSet加入到工作队列中。

func (rsc *ReplicaSetController) enqueueReplicaSet(obj interface{}) {
    key, err := controller.KeyFunc(obj)
    if err != nil {
        utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))
        return
    }
    rsc.queue.Add(key)
}

enqueueReplicaSet函数会将ReplicaSet的Key加入到工作队列中,等待后续处理。

同步ReplicaSet状态

ReplicaSetController的核心逻辑是同步ReplicaSet的状态。syncReplicaSet函数负责处理工作队列中的ReplicaSet,并根据需要调整Pod的数量。

func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
    startTime := time.Now()
    defer func() {
        klog.V(4).Infof("Finished syncing replica set %q (%v)", key, time.Since(startTime))
    }()

    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
        return err
    }
    rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
    if errors.IsNotFound(err) {
        klog.V(4).Infof("ReplicaSet has been deleted %v", key)
        return nil
    }
    if err != nil {
        return err
    }

    // List all Pods owned by this ReplicaSet
    podList, err := rsc.podLister.Pods(rs.Namespace).List(labels.Set(rs.Spec.Selector.MatchLabels).AsSelector())
    if err != nil {
        return err
    }

    // Filter out inactive Pods
    filteredPods := controller.FilterActivePods(podList)

    // Manage the ReplicaSet's replicas
    manageReplicasErr := rsc.manageReplicas(filteredPods, rs)

    // Update the ReplicaSet's status
    newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)
    updatedRS, err := updateReplicaSetStatus(rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)
    if err != nil {
        return err
    }

    // Resync the ReplicaSet after MinReadySeconds
    if updatedRS.Spec.MinReadySeconds > 0 && updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) {
        rsc.enqueueReplicaSetAfter(updatedRS, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second)
    }

    return manageReplicasErr
}

syncReplicaSet函数的主要步骤如下:

  1. 获取ReplicaSet:根据Key从缓存中获取ReplicaSet对象。
  2. 获取Pod列表:获取与ReplicaSet关联的所有Pod。
  3. 过滤活跃的Pod:过滤掉不活跃的Pod(如已删除的Pod)。
  4. 管理Pod副本:根据ReplicaSet的配置,调整Pod的数量。
  5. 更新ReplicaSet状态:根据当前的Pod状态,更新ReplicaSet的状态。
  6. 重新同步ReplicaSet:如果ReplicaSet配置了MinReadySeconds,则在指定时间后重新同步ReplicaSet。

管理Pod副本

manageReplicas函数负责根据ReplicaSet的配置,调整Pod的数量。如果实际运行的Pod数量少于期望的副本数,则创建新的Pod;如果实际运行的Pod数量多于期望的副本数,则删除多余的Pod。

func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
    diff := len(filteredPods) - int(*(rs.Spec.Replicas))
    rsKey, err := controller.KeyFunc(rs)
    if err != nil {
        utilruntime.HandleError(fmt.Errorf("couldn't get key for ReplicaSet %#v: %v", rs, err))
        return nil
    }
    if diff < 0 {
        diff *= -1
        if diff > rsc.burstReplicas {
            diff = rsc.burstReplicas
        }
        rsc.expectations.ExpectCreations(rsKey, diff)
        klog.V(2).Infof("Too few replicas for %v %v/%v, need %d, creating %d", rs.Kind, rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff)
        var errs []error
        for i := 0; i < diff; i++ {
            err := rsc.podControl.CreatePods(rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind))
            if err != nil {
                rsc.expectations.CreationObserved(rsKey)
                errs = append(errs, err)
            }
        }
        return utilerrors.NewAggregate(errs)
    } else if diff > 0 {
        if diff > rsc.burstReplicas {
            diff = rsc.burstReplicas
        }
        klog.V(2).Infof("Too many replicas for %v %v/%v, need %d, deleting %d", rs.Kind, rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff)
        podsToDelete := getPodsToDelete(filteredPods, diff)
        rsc.expectations.ExpectDeletions(rsKey, getPodKeys(podsToDelete))
        var errs []error
        for _, pod := range podsToDelete {
            err := rsc.podControl.DeletePod(rs.Namespace, pod.Name, rs)
            if err != nil {
                rsc.expectations.DeletionObserved(rsKey, controller.PodKey(pod))
                errs = append(errs, err)
            }
        }
        return utilerrors.NewAggregate(errs)
    }
    return nil
}

manageReplicas函数的主要步骤如下:

  1. 计算Pod数量差异:计算实际运行的Pod数量与期望的副本数之间的差异。
  2. 创建Pod:如果实际运行的Pod数量少于期望的副本数,则创建新的Pod。
  3. 删除Pod:如果实际运行的Pod数量多于期望的副本数,则删除多余的Pod。

总结

ReplicaSetController是Kubernetes中一个重要的Controller,它负责确保指定数量的Pod副本始终在运行。本文通过分析ReplicaSetController的实现细节,展示了其工作原理。ReplicaSetController通过监听ReplicaSet和Pod资源的变化,并根据这些变化调整Pod的数量,以达到期望的状态。理解ReplicaSetController的实现有助于更好地理解Kubernetes的工作机制,并为开发自定义Controller提供参考。

推荐阅读:
  1. Controller激活与URL路由的示例分析
  2. Spring mvc Controller和RestFul原理的示例分析

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

controller

上一篇:如何分析Controller Manager

下一篇:linux如何修改path环境变量

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》