Kubernetes Endpoints Controller的源码解析

发布时间:2021-08-30 16:12:24 作者:chen
来源:亿速云 阅读:123

Kubernetes Endpoints Controller的源码解析

目录

  1. 引言
  2. Kubernetes Endpoints概述
  3. Endpoints Controller的作用
  4. 源码结构
  5. 核心逻辑解析
    1. Controller的初始化
    2. 事件监听
    3. Endpoints的创建与更新
    4. Endpoints的删除
  6. 关键数据结构
  7. 并发控制与性能优化
  8. 常见问题与解决方案
  9. 总结

引言

Kubernetes是一个开源的容器编排平台,广泛应用于云原生应用的部署和管理。在Kubernetes中,Endpoints是一个非常重要的资源对象,它用于表示Service后端的Pod集合。Endpoints Controller是Kubernetes控制平面中的一个核心组件,负责维护Service与Pod之间的映射关系。

本文将深入解析Kubernetes Endpoints Controller的源码,探讨其工作原理、核心逻辑以及实现细节。通过对源码的分析,读者可以更好地理解Endpoints Controller的工作机制,并为Kubernetes的二次开发和优化提供参考。

Kubernetes Endpoints概述

在Kubernetes中,Service是一种抽象,用于定义一组Pod的访问策略。Service通过Selector选择一组Pod,并将流量分发到这些Pod上。Endpoints是Service的后端,它记录了Service所选择的Pod的IP地址和端口信息。

Endpoints资源对象的结构如下:

apiVersion: v1
kind: Endpoints
metadata:
  name: my-service
subsets:
  - addresses:
      - ip: 10.244.1.2
      - ip: 10.244.1.3
    ports:
      - port: 80
        protocol: TCP

在上面的例子中,my-service Service对应的Endpoints记录了两个Pod的IP地址和端口信息。

Endpoints Controller的作用

Endpoints Controller的主要作用是维护Service与Pod之间的映射关系。具体来说,Endpoints Controller会监听Service和Pod的变化,并根据这些变化动态更新Endpoints资源对象。

Endpoints Controller的工作流程如下:

  1. 监听Service和Pod的变化。
  2. 当Service或Pod发生变化时,重新计算Service对应的Endpoints。
  3. 更新Endpoints资源对象,确保其与当前的Pod状态一致。

通过这种方式,Endpoints Controller确保了Service能够正确地路由流量到后端的Pod。

源码结构

Endpoints Controller的源码位于Kubernetes项目的pkg/controller/endpoint目录下。主要的源码文件包括:

endpoints_controller.go文件中,定义了EndpointsController结构体,它是Endpoints Controller的核心实现。

type EndpointsController struct {
    client clientset.Interface
    eventBroadcaster record.EventBroadcaster
    eventRecorder record.EventRecorder
    serviceLister corelisters.ServiceLister
    serviceListerSynced cache.InformerSynced
    podLister corelisters.PodLister
    podListerSynced cache.InformerSynced
    endpointsLister corelisters.EndpointsLister
    endpointsListerSynced cache.InformerSynced
    queue workqueue.RateLimitingInterface
    workerLoopPeriod time.Duration
}

EndpointsController结构体包含了Endpoints Controller所需的各种依赖,如Kubernetes客户端、事件广播器、Service和Pod的Lister等。

核心逻辑解析

Controller的初始化

Endpoints Controller的初始化过程主要包括以下几个步骤:

  1. 创建Kubernetes客户端。
  2. 初始化事件广播器和事件记录器。
  3. 注册Service和Pod的Informer。
  4. 启动Controller的主循环。

NewEndpointController函数中,完成了Endpoints Controller的初始化工作。

func NewEndpointController(serviceInformer coreinformers.ServiceInformer, podInformer coreinformers.PodInformer, endpointsInformer coreinformers.EndpointsInformer, client clientset.Interface) *EndpointsController {
    eventBroadcaster := record.NewBroadcaster()
    eventBroadcaster.StartLogging(klog.Infof)
    eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})
    recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "endpoints-controller"})

    ec := &EndpointsController{
        client:           client,
        eventBroadcaster: eventBroadcaster,
        eventRecorder:    recorder,
        queue:           workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "endpoints"),
        workerLoopPeriod: time.Second,
    }

    serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: ec.enqueueService,
        UpdateFunc: func(old, cur interface{}) {
            ec.enqueueService(cur)
        },
        DeleteFunc: ec.enqueueService,
    })
    ec.serviceLister = serviceInformer.Lister()
    ec.serviceListerSynced = serviceInformer.Informer().HasSynced

    podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: ec.addPod,
        UpdateFunc: ec.updatePod,
        DeleteFunc: ec.deletePod,
    })
    ec.podLister = podInformer.Lister()
    ec.podListerSynced = podInformer.Informer().HasSynced

    endpointsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: ec.handleObject,
        UpdateFunc: func(old, cur interface{}) {
            ec.handleObject(cur)
        },
        DeleteFunc: ec.handleObject,
    })
    ec.endpointsLister = endpointsInformer.Lister()
    ec.endpointsListerSynced = endpointsInformer.Informer().HasSynced

    return ec
}

在初始化过程中,Endpoints Controller注册了Service、Pod和Endpoints的Informer,并为其添加了事件处理函数。这些事件处理函数会在相应的资源对象发生变化时被调用,从而触发Endpoints Controller的更新逻辑。

事件监听

Endpoints Controller通过Informer监听Service、Pod和Endpoints的变化。当这些资源对象发生变化时,相应的事件处理函数会被调用。

Service事件处理

当Service发生变化时,enqueueService函数会被调用。该函数将Service的Key(Namespace/Name)加入到工作队列中,等待后续处理。

func (ec *EndpointsController) enqueueService(obj interface{}) {
    key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
    if err != nil {
        utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))
        return
    }
    ec.queue.Add(key)
}

Pod事件处理

当Pod发生变化时,addPodupdatePoddeletePod函数会被调用。这些函数会根据Pod所属的Service,将Service的Key加入到工作队列中。

func (ec *EndpointsController) addPod(obj interface{}) {
    pod := obj.(*v1.Pod)
    services, err := ec.getPodServiceMemberships(pod)
    if err != nil {
        utilruntime.HandleError(fmt.Errorf("unable to get pod %s/%s's service memberships: %v", pod.Namespace, pod.Name, err))
        return
    }
    for key := range services {
        ec.queue.Add(key)
    }
}

func (ec *EndpointsController) updatePod(old, cur interface{}) {
    oldPod := old.(*v1.Pod)
    curPod := cur.(*v1.Pod)
    if oldPod.ResourceVersion == curPod.ResourceVersion {
        return
    }
    services, err := ec.getPodServiceMemberships(curPod)
    if err != nil {
        utilruntime.HandleError(fmt.Errorf("unable to get pod %s/%s's service memberships: %v", curPod.Namespace, curPod.Name, err))
        return
    }
    for key := range services {
        ec.queue.Add(key)
    }
}

func (ec *EndpointsController) deletePod(obj interface{}) {
    pod, ok := obj.(*v1.Pod)
    if !ok {
        tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
        if !ok {
            utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %+v", obj))
            return
        }
        pod, ok = tombstone.Obj.(*v1.Pod)
        if !ok {
            utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %+v", obj))
            return
        }
    }
    services, err := ec.getPodServiceMemberships(pod)
    if err != nil {
        utilruntime.HandleError(fmt.Errorf("unable to get pod %s/%s's service memberships: %v", pod.Namespace, pod.Name, err))
        return
    }
    for key := range services {
        ec.queue.Add(key)
    }
}

Endpoints事件处理

当Endpoints发生变化时,handleObject函数会被调用。该函数会根据Endpoints所属的Service,将Service的Key加入到工作队列中。

func (ec *EndpointsController) handleObject(obj interface{}) {
    if object, ok := obj.(metav1.Object); ok {
        if key, err := cache.MetaNamespaceKeyFunc(object); err == nil {
            ec.queue.Add(key)
        }
    }
}

Endpoints的创建与更新

Endpoints Controller的核心逻辑是维护Service与Pod之间的映射关系。当Service或Pod发生变化时,Endpoints Controller会重新计算Service对应的Endpoints,并更新Endpoints资源对象。

计算Endpoints

syncService函数中,Endpoints Controller会根据Service的Selector选择符合条件的Pod,并计算Endpoints的Subsets。

func (ec *EndpointsController) syncService(key string) error {
    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
        return err
    }
    service, err := ec.serviceLister.Services(namespace).Get(name)
    if err != nil {
        if apierrors.IsNotFound(err) {
            return nil
        }
        return err
    }

    selector := labels.Set(service.Spec.Selector).AsSelectorPreValidated()
    pods, err := ec.podLister.Pods(namespace).List(selector)
    if err != nil {
        return err
    }

    subsets := []v1.EndpointSubset{}
    for _, pod := range pods {
        if len(pod.Status.PodIP) == 0 {
            continue
        }
        for _, container := range pod.Spec.Containers {
            for _, port := range container.Ports {
                if port.Name == "" || port.Protocol == "" {
                    continue
                }
                subsets = append(subsets, v1.EndpointSubset{
                    Addresses: []v1.EndpointAddress{
                        {
                            IP: pod.Status.PodIP,
                            TargetRef: &v1.ObjectReference{
                                Kind:      "Pod",
                                Namespace: pod.Namespace,
                                Name:      pod.Name,
                                UID:       pod.UID,
                            },
                        },
                    },
                    Ports: []v1.EndpointPort{
                        {
                            Name:     port.Name,
                            Port:     port.ContainerPort,
                            Protocol: port.Protocol,
                        },
                    },
                })
            }
        }
    }

    endpoints := &v1.Endpoints{
        ObjectMeta: metav1.ObjectMeta{
            Name:      service.Name,
            Namespace: service.Namespace,
            Labels:    service.Labels,
        },
        Subsets: subsets,
    }

    existingEndpoints, err := ec.endpointsLister.Endpoints(namespace).Get(name)
    if err != nil {
        if !apierrors.IsNotFound(err) {
            return err
        }
        _, err = ec.client.CoreV1().Endpoints(namespace).Create(endpoints)
        return err
    }

    if reflect.DeepEqual(existingEndpoints.Subsets, endpoints.Subsets) {
        return nil
    }

    existingEndpoints.Subsets = endpoints.Subsets
    _, err = ec.client.CoreV1().Endpoints(namespace).Update(existingEndpoints)
    return err
}

syncService函数中,Endpoints Controller首先根据Service的Selector选择符合条件的Pod,然后遍历这些Pod的容器和端口信息,生成Endpoints的Subsets。最后,Endpoints Controller会检查是否存在对应的Endpoints资源对象,如果不存在则创建,如果存在则更新。

Endpoints的删除

当Service被删除时,Endpoints Controller会删除对应的Endpoints资源对象。这一逻辑在syncService函数中实现。

func (ec *EndpointsController) syncService(key string) error {
    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
        return err
    }
    service, err := ec.serviceLister.Services(namespace).Get(name)
    if err != nil {
        if apierrors.IsNotFound(err) {
            err = ec.client.CoreV1().Endpoints(namespace).Delete(name, nil)
            if err != nil && !apierrors.IsNotFound(err) {
                return err
            }
            return nil
        }
        return err
    }

    // 计算Endpoints的逻辑...
}

当Service不存在时,Endpoints Controller会删除对应的Endpoints资源对象。

关键数据结构

在Endpoints Controller的实现中,有几个关键的数据结构需要特别关注:

  1. Endpoints: 表示Service后端的Pod集合。
  2. Service: 表示Kubernetes中的Service资源对象。
  3. Pod: 表示Kubernetes中的Pod资源对象。
  4. Informer: 用于监听资源对象的变化。
  5. Workqueue: 用于存储待处理的任务。

这些数据结构在Endpoints Controller的实现中起到了核心作用,理解它们的设计和用途对于深入理解Endpoints Controller的工作原理至关重要。

并发控制与性能优化

在Kubernetes中,Endpoints Controller需要处理大量的Service和Pod变化事件。为了确保系统的稳定性和性能,Endpoints Controller采用了多种并发控制和性能优化策略。

并发控制

Endpoints Controller通过工作队列(Workqueue)来实现并发控制。工作队列是一个线程安全的队列,用于存储待处理的任务。Endpoints Controller的Worker会从工作队列中取出任务并处理。

func (ec *EndpointsController) Run(workers int, stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()
    defer ec.queue.ShutDown()

    if !cache.WaitForCacheSync(stopCh, ec.serviceListerSynced, ec.podListerSynced, ec.endpointsListerSynced) {
        return
    }

    for i := 0; i < workers; i++ {
        go wait.Until(ec.worker, ec.workerLoopPeriod, stopCh)
    }

    <-stopCh
}

func (ec *EndpointsController) worker() {
    for ec.processNextWorkItem() {
    }
}

func (ec *EndpointsController) processNextWorkItem() bool {
    key, quit := ec.queue.Get()
    if quit {
        return false
    }
    defer ec.queue.Done(key)

    err := ec.syncService(key.(string))
    if err == nil {
        ec.queue.Forget(key)
        return true
    }

    utilruntime.HandleError(fmt.Errorf("sync %q failed with %v", key, err))
    ec.queue.AddRateLimited(key)

    return true
}

Run函数中,Endpoints Controller启动了多个Worker,每个Worker会不断从工作队列中取出任务并处理。通过这种方式,Endpoints Controller能够并发处理多个任务,提高了系统的吞吐量。

性能优化

为了优化性能,Endpoints Controller采用了以下几种策略:

  1. 批量处理: Endpoints Controller会将多个事件合并为一个任务,减少对API Server的请求次数。
  2. 缓存机制: Endpoints Controller通过Informer的缓存机制,减少对API Server的查询次数。
  3. 限流机制: Endpoints Controller通过工作队列的限流机制,防止系统过载。

这些优化策略确保了Endpoints Controller在高负载情况下仍能稳定运行。

常见问题与解决方案

在实际使用中,Endpoints Controller可能会遇到一些常见问题。本节将介绍这些问题及其解决方案。

1. Endpoints未及时更新

问题描述: 当Pod的状态发生变化时,Endpoints Controller未能及时更新Endpoints资源对象。

解决方案: 检查Endpoints Controller的日志,确认是否有错误发生。如果Endpoints Controller的Worker处理任务的速度较慢,可以尝试增加Worker的数量。

2. Endpoints与Pod状态不一致

问题描述: Endpoints资源对象中记录的Pod状态与实际Pod状态不一致。

解决方案: 检查Pod的Selector是否正确,确保Endpoints Controller能够正确选择符合条件的Pod。此外,检查Pod的IP地址和端口信息是否正确。

3. Endpoints Controller性能瓶颈

问题描述: 在高负载情况下,Endpoints Controller的性能出现瓶颈,导致系统响应变慢。

解决方案: 增加Endpoints Controller的Worker数量,优化工作队列的限流策略,减少对API Server的请求次数。

总结

本文详细解析了Kubernetes Endpoints Controller的源码,探讨了其工作原理、核心逻辑以及实现细节。通过对源码的分析,我们了解到Endpoints Controller通过监听Service和Pod的变化,动态维护Service与Pod之间的映射关系。此外,我们还介绍了Endpoints Controller的并发控制与性能优化策略,以及常见问题的解决方案。

通过对Endpoints Controller的深入理解,读者可以更好地掌握Kubernetes的核心组件,并为Kubernetes的二次开发和优化提供参考。希望本文能够帮助读者更好地理解Kubernetes的内部机制,并在实际应用中发挥更大的作用。

推荐阅读:
  1. kubernetes详细介绍
  2. Kubernetes的Endpoints

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

kubernetes

上一篇:Linux查看系统日志的命令

下一篇:怎么分析并探索Docker容器镜像的内容

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》