您好,登录后才能下订单哦!
在Kubernetes中,Controller是核心组件之一,负责维护集群中各种资源的状态。ReplicaSetController是Kubernetes中一个重要的Controller,它负责确保指定数量的Pod副本始终在运行。本文将深入分析ReplicaSetController的实现细节,并通过代码示例展示其工作原理。
ReplicaSetController的主要职责是确保在任何时候都有指定数量的Pod副本在运行。它通过监听ReplicaSet和Pod的变化,并根据这些变化调整Pod的数量,以达到期望的状态。
ReplicaSetController的实现主要分为以下几个部分:
在Kubernetes中,Controller的初始化通常在cmd/kube-controller-manager/app/controllermanager.go
文件中进行。ReplicaSetController的初始化代码如下:
func startReplicaSetController(ctx ControllerContext) (http.Handler, bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "replicasets"}] {
return nil, false, nil
}
go replicaset.NewReplicaSetController(
ctx.InformerFactory.Apps().V1().ReplicaSets(),
ctx.InformerFactory.Core().V1().Pods(),
ctx.ClientBuilder.ClientOrDie("replicaset-controller"),
replicaset.BurstReplicas,
).Run(int(ctx.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs), ctx.Stop)
return nil, true, nil
}
在这段代码中,NewReplicaSetController
函数用于创建一个新的ReplicaSetController实例。它接收三个参数:
ReplicaSets
Informer:用于监听ReplicaSet资源的变化。Pods
Informer:用于监听Pod资源的变化。Client
:用于与Kubernetes API Server进行交互。ReplicaSetController通过Informer监听ReplicaSet和Pod资源的变化。Informer是Kubernetes中用于监听资源变化的机制,它会在资源发生变化时触发相应的事件处理函数。
func NewReplicaSetController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int) *ReplicaSetController {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(klog.Infof)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
rsc := &ReplicaSetController{
kubeClient: kubeClient,
burstReplicas: burstReplicas,
expectations: controller.NewControllerExpectations(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "replicaset"),
}
rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: rsc.enqueueReplicaSet,
UpdateFunc: func(old, cur interface{}) {
oldRS := old.(*apps.ReplicaSet)
curRS := cur.(*apps.ReplicaSet)
if oldRS.Status.Replicas != curRS.Status.Replicas {
klog.V(4).Infof("Observed updated replica count for ReplicaSet: %v, %d->%d", curRS.Name, oldRS.Status.Replicas, curRS.Status.Replicas)
}
rsc.enqueueReplicaSet(cur)
},
DeleteFunc: rsc.enqueueReplicaSet,
})
rsc.rsLister = rsInformer.Lister()
rsc.rsListerSynced = rsInformer.Informer().HasSynced
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: rsc.addPod,
UpdateFunc: rsc.updatePod,
DeleteFunc: rsc.deletePod,
})
rsc.podLister = podInformer.Lister()
rsc.podListerSynced = podInformer.Informer().HasSynced
rsc.syncHandler = rsc.syncReplicaSet
return rsc
}
在这段代码中,AddEventHandler
方法用于注册事件处理函数。当ReplicaSet或Pod资源发生变化时,相应的事件处理函数会被调用。
当ReplicaSet或Pod资源发生变化时,Controller会调用相应的事件处理函数。例如,当ReplicaSet被创建、更新或删除时,enqueueReplicaSet
函数会被调用,将ReplicaSet加入到工作队列中。
func (rsc *ReplicaSetController) enqueueReplicaSet(obj interface{}) {
key, err := controller.KeyFunc(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))
return
}
rsc.queue.Add(key)
}
enqueueReplicaSet
函数会将ReplicaSet的Key加入到工作队列中,等待后续处理。
ReplicaSetController的核心逻辑是同步ReplicaSet的状态。syncReplicaSet
函数负责处理工作队列中的ReplicaSet,并根据需要调整Pod的数量。
func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
startTime := time.Now()
defer func() {
klog.V(4).Infof("Finished syncing replica set %q (%v)", key, time.Since(startTime))
}()
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
if errors.IsNotFound(err) {
klog.V(4).Infof("ReplicaSet has been deleted %v", key)
return nil
}
if err != nil {
return err
}
// List all Pods owned by this ReplicaSet
podList, err := rsc.podLister.Pods(rs.Namespace).List(labels.Set(rs.Spec.Selector.MatchLabels).AsSelector())
if err != nil {
return err
}
// Filter out inactive Pods
filteredPods := controller.FilterActivePods(podList)
// Manage the ReplicaSet's replicas
manageReplicasErr := rsc.manageReplicas(filteredPods, rs)
// Update the ReplicaSet's status
newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)
updatedRS, err := updateReplicaSetStatus(rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)
if err != nil {
return err
}
// Resync the ReplicaSet after MinReadySeconds
if updatedRS.Spec.MinReadySeconds > 0 && updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) {
rsc.enqueueReplicaSetAfter(updatedRS, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second)
}
return manageReplicasErr
}
syncReplicaSet
函数的主要步骤如下:
MinReadySeconds
,则在指定时间后重新同步ReplicaSet。manageReplicas
函数负责根据ReplicaSet的配置,调整Pod的数量。如果实际运行的Pod数量少于期望的副本数,则创建新的Pod;如果实际运行的Pod数量多于期望的副本数,则删除多余的Pod。
func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
diff := len(filteredPods) - int(*(rs.Spec.Replicas))
rsKey, err := controller.KeyFunc(rs)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for ReplicaSet %#v: %v", rs, err))
return nil
}
if diff < 0 {
diff *= -1
if diff > rsc.burstReplicas {
diff = rsc.burstReplicas
}
rsc.expectations.ExpectCreations(rsKey, diff)
klog.V(2).Infof("Too few replicas for %v %v/%v, need %d, creating %d", rs.Kind, rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff)
var errs []error
for i := 0; i < diff; i++ {
err := rsc.podControl.CreatePods(rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind))
if err != nil {
rsc.expectations.CreationObserved(rsKey)
errs = append(errs, err)
}
}
return utilerrors.NewAggregate(errs)
} else if diff > 0 {
if diff > rsc.burstReplicas {
diff = rsc.burstReplicas
}
klog.V(2).Infof("Too many replicas for %v %v/%v, need %d, deleting %d", rs.Kind, rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff)
podsToDelete := getPodsToDelete(filteredPods, diff)
rsc.expectations.ExpectDeletions(rsKey, getPodKeys(podsToDelete))
var errs []error
for _, pod := range podsToDelete {
err := rsc.podControl.DeletePod(rs.Namespace, pod.Name, rs)
if err != nil {
rsc.expectations.DeletionObserved(rsKey, controller.PodKey(pod))
errs = append(errs, err)
}
}
return utilerrors.NewAggregate(errs)
}
return nil
}
manageReplicas
函数的主要步骤如下:
ReplicaSetController是Kubernetes中一个重要的Controller,它负责确保指定数量的Pod副本始终在运行。本文通过分析ReplicaSetController的实现细节,展示了其工作原理。ReplicaSetController通过监听ReplicaSet和Pod资源的变化,并根据这些变化调整Pod的数量,以达到期望的状态。理解ReplicaSetController的实现有助于更好地理解Kubernetes的工作机制,并为开发自定义Controller提供参考。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。