您好,登录后才能下订单哦!
Kubernetes 是一个开源的容器编排平台,旨在自动化应用程序的部署、扩展和管理。其核心设计理念之一是“面向终态”(Declarative State),即用户只需声明期望的系统状态,Kubernetes 会自动确保系统达到并维持该状态。为了实现这一目标,Kubernetes 引入了控制器的概念。本文将深入探讨 Kubernetes 中如何面向终态设计,以及控制器在这一过程中的作用。
面向终态是一种声明式的系统设计方法,用户只需描述系统的期望状态,而不需要关心如何达到这一状态。Kubernetes 通过 API 对象(如 Pod、Service、Deployment 等)来表示系统的期望状态,并通过控制器来确保系统实际状态与期望状态一致。
控制器是 Kubernetes 中的核心组件,负责监控系统的实际状态,并根据期望状态进行调整。控制器通过不断地比较实际状态与期望状态,执行必要的操作来消除两者之间的差异。
Kubernetes 允许用户通过 CRD 定义自己的资源类型。CRD 是扩展 Kubernetes API 的一种方式,用户可以通过定义 CRD 来描述自定义的期望状态。
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  name: myresources.example.com
spec:
  group: example.com
  versions:
    - name: v1
      served: true
      storage: true
      schema:
        openAPIV3Schema:
          type: object
          properties:
            spec:
              type: object
              properties:
                replicas:
                  type: integer
  scope: Namespaced
  names:
    plural: myresources
    singular: myresource
    kind: MyResource
    shortNames:
      - mr
定义 CRD 后,用户需要编写自定义控制器来处理这些资源。自定义控制器通常包括 Informer、Workqueue 和 Reconciler 三个部分。
package main
import (
    "context"
    "fmt"
    "time"
    "k8s.io/apimachinery/pkg/api/errors"
    "k8s.io/apimachinery/pkg/util/runtime"
    "k8s.io/apimachinery/pkg/util/wait"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/cache"
    "k8s.io/client-go/util/workqueue"
    "k8s.io/klog/v2"
    clientset "example.com/myresources/pkg/generated/clientset/versioned"
    informers "example.com/myresources/pkg/generated/informers/externalversions"
    listers "example.com/myresources/pkg/generated/listers/example/v1"
)
type Controller struct {
    kubeclientset    kubernetes.Interface
    myresourceClient clientset.Interface
    myresourceLister listers.MyResourceLister
    myresourceSynced cache.InformerSynced
    workqueue        workqueue.RateLimitingInterface
}
func NewController(
    kubeclientset kubernetes.Interface,
    myresourceClient clientset.Interface,
    myresourceInformer informers.MyResourceInformer) *Controller {
    controller := &Controller{
        kubeclientset:    kubeclientset,
        myresourceClient: myresourceClient,
        myresourceLister: myresourceInformer.Lister(),
        myresourceSynced: myresourceInformer.Informer().HasSynced,
        workqueue:        workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "MyResources"),
    }
    myresourceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: controller.enqueueMyResource,
        UpdateFunc: func(old, new interface{}) {
            controller.enqueueMyResource(new)
        },
        DeleteFunc: controller.enqueueMyResource,
    })
    return controller
}
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
    defer runtime.HandleCrash()
    defer c.workqueue.ShutDown()
    if ok := cache.WaitForCacheSync(stopCh, c.myresourceSynced); !ok {
        return fmt.Errorf("failed to wait for caches to sync")
    }
    for i := 0; i < threadiness; i++ {
        go wait.Until(c.runWorker, time.Second, stopCh)
    }
    <-stopCh
    return nil
}
func (c *Controller) runWorker() {
    for c.processNextWorkItem() {
    }
}
func (c *Controller) processNextWorkItem() bool {
    obj, shutdown := c.workqueue.Get()
    if shutdown {
        return false
    }
    err := func(obj interface{}) error {
        defer c.workqueue.Done(obj)
        var key string
        var ok bool
        if key, ok = obj.(string); !ok {
            c.workqueue.Forget(obj)
            return nil
        }
        if err := c.syncHandler(key); err != nil {
            c.workqueue.AddRateLimited(key)
            return fmt.Errorf("error syncing '%s': %s", key, err.Error())
        }
        c.workqueue.Forget(obj)
        return nil
    }(obj)
    if err != nil {
        runtime.HandleError(err)
        return true
    }
    return true
}
func (c *Controller) syncHandler(key string) error {
    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
        runtime.HandleError(fmt.Errorf("invalid resource key: %s", key))
        return nil
    }
    myresource, err := c.myresourceLister.MyResources(namespace).Get(name)
    if err != nil {
        if errors.IsNotFound(err) {
            runtime.HandleError(fmt.Errorf("myresource '%s' in work queue no longer exists", key))
            return nil
        }
        return err
    }
    fmt.Printf("Sync/Add/Update for MyResource %s\n", myresource.GetName())
    return nil
}
func (c *Controller) enqueueMyResource(obj interface{}) {
    var key string
    var err error
    if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
        runtime.HandleError(err)
        return
    }
    c.workqueue.Add(key)
}
Kubernetes 的面向终态设计理念通过控制器实现了系统状态的自动化管理。用户只需声明期望状态,控制器会自动确保系统达到并维持该状态。通过自定义资源定义和控制器,用户可以灵活扩展 Kubernetes 的功能,满足各种复杂的应用场景。掌握面向终态的设计方法和控制器的实现原理,对于深入理解和使用 Kubernetes 具有重要意义。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。