如何通过源码分析Informer机制

发布时间:2021-10-12 14:17:41 作者:柒染
来源:亿速云 阅读:160

本篇文章给大家分享的是有关如何通过源码分析Informer机制,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。

通过源码分析Informer机制

在之前的文章kubernetes之面向终态设计与控制器中了解主要了解了控制器通过ListAndWatch方式实现终态架构。而并未分析内部实现的细节,故本章主要分析内部实现的细节。

参考项目为官方的sample-controller

其实,每个Kubernetes扩展组件,都可以看做是一个控制器,用于处理自己关心的资源类型(API对象)。而要把自己关系的资源和自己的控制器关联起来,则是通过Informer机制,可称为“通知器”,而Informer与API对象都是一一对应的,因此可实现我们创建一个自定义资源,我们的控制器就可以通过该Informer机制

如何通过源码分析Informer机制

官方架构图

如何通过源码分析Informer机制

通过上图,可知一个Kubernetes的扩展组件主要分为两大部分:client-go组件和自定义的Controller组件。

整体执行流程

原生Controller代码分析

初始化部分
	//https://github.com/kubernetes/sample-controller/blob/master/main.go#L62
	
	kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)
	exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second*30)

	controller := NewController(kubeClient, exampleClient,
		kubeInformerFactory.Apps().V1().Deployments(),
		exampleInformerFactory.Samplecontroller().V1alpha1().Foos())

	// notice that there is no need to run Start methods in a separate goroutine. (i.e. go kubeInformerFactory.Start(stopCh)
	// Start method is non-blocking and runs all registered informers in a dedicated goroutine.
	kubeInformerFactory.Start(stopCh)
	exampleInformerFactory.Start(stopCh)

	if err = controller.Run(2, stopCh); err != nil {
		klog.Fatalf("Error running controller: %s", err.Error())
	}

以上的代码涵盖两部分:

client-go代码分析

用户通过构建Informer并启动后,就会进入到client-go内部的Informer内,主要逻辑如下

// Start initializes all requested informers.
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
	f.lock.Lock()
	defer f.lock.Unlock()

	for informerType, informer := range f.informers {
		if !f.startedInformers[informerType] {
			go informer.Run(stopCh)
			f.startedInformers[informerType] = true
		}
	}
}
//client-go@v2.0.0-alpha.0.0.20180910083459-2cefa64ff137+incompatible/tools/cache/shared_informer.go:189
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()

	fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)

	cfg := &Config{
		//注册DeltaFIFO队列
		Queue:            fifo,
		//注册listerWatcher,后续会和APIServer建立连接
		ListerWatcher:    s.listerWatcher,
		ObjectType:       s.objectType,
		FullResyncPeriod: s.resyncCheckPeriod,
		RetryOnError:     false,
		//检查是否需要进行Resync,该方法会把需要Resync的listener加入到需要同步的队列中
		ShouldResync:     s.processor.shouldResync,
		//这里先注册用于构建索引和分发事件的方法
		Process: s.HandleDeltas,
	}
	......
	//启动用于缓存比较的方法
	wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
	//启动用于接收事件消息并回调用户注册的ResourceEventHandler
	wg.StartWithChannel(processorStopCh, s.processor.run)
	......
	//运行内部的Controller
	s.controller.Run(stopCh)
}
// Run begins processing items, and will continue until a value is sent down stopCh.
// It's an error to call Run more than once.
// Run blocks; call via go.
func (c *controller) Run(stopCh <-chan struct{}) {
	......
   //调用reflector中的Run,进行启动ListAndWatch,同APIServer建立连接
   //client-go@v2.0.0-alpha.0.0.20180910083459-2cefa64ff137+incompatible/tools/cache/shared_informer.go:219
   wg.StartWithChannel(stopCh, r.Run)
	//启动定时器,每秒运行一次,用于调用processLoop进行读取数据
	wait.Until(c.processLoop, time.Second, stopCh)
}
Custom Controller 代码分析

以上client-go处理完毕后,会把数据通过用户注册的ResourceEventHandler调用相应的方法。通过自定义的ResourceEventHandler进行预处理,并加入到工作队列(这里不建议处理复杂逻辑,因为一旦该方法阻塞,会导致相应的链路阻塞,而应该把需要处理的事件放入到工作列表中,通过用户侧的协程进行处理)。

//https://github.com/kubernetes/sample-controller/blob/7e92736cc38f37632d2b53e31b9a966e7a91c24a/controller.go#L150
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
	
	......
	// Wait for the caches to be synced before starting workers
	if ok := cache.WaitForCacheSync(stopCh, c.deploymentsSynced, c.foosSynced); !ok {
		return fmt.Errorf("failed to wait for caches to sync")
	}

	klog.Info("Starting workers")
	// Launch two workers to process Foo resources
	for i := 0; i < threadiness; i++ {
		go wait.Until(c.runWorker, time.Second, stopCh)
	}

}
	
// runWorker is a long-running function that will continually call the
// processNextWorkItem function in order to read and process a message on the
// workqueue.
func (c *Controller) runWorker() {
	//死循环,执行业务逻辑
	for c.processNextWorkItem() {
	}
}	
//https://github.com/kubernetes/sample-controller/blob/7e92736cc38f37632d2b53e31b9a966e7a91c24a/controller.go#L186
func (c *Controller) processNextWorkItem() bool {
	//读取工作队列中的数据,在之前通过用户定义的ResourceEventHandler已经加入到了工作队列中,这里区出做处理
	obj, shutdown := c.workqueue.Get()

	if shutdown {
		return false
	}

	// We wrap this block in a func so we can defer c.workqueue.Done.
	err := func(obj interface{}) error {
		// We call Done here so the workqueue knows we have finished
		// processing this item. We also must remember to call Forget if we
		// do not want this work item being re-queued. For example, we do
		// not call Forget if a transient error occurs, instead the item is
		// put back on the workqueue and attempted again after a back-off
		// period.
		defer c.workqueue.Done(obj)
		var key string
		var ok bool
		// We expect strings to come off the workqueue. These are of the
		// form namespace/name. We do this as the delayed nature of the
		// workqueue means the items in the informer cache may actually be
		// more up to date that when the item was initially put onto the
		// workqueue.
		if key, ok = obj.(string); !ok {
			// As the item in the workqueue is actually invalid, we call
			// Forget here else we'd go into a loop of attempting to
			// process a work item that is invalid.
			c.workqueue.Forget(obj)
			utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
			return nil
		}
		// Run the syncHandler, passing it the namespace/name string of the
		// Foo resource to be synced.
		if err := c.syncHandler(key); err != nil {
			// Put the item back on the workqueue to handle any transient errors.
			c.workqueue.AddRateLimited(key)
			return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
		}
		// Finally, if no error occurs we Forget this item so it does not
		// get queued again until another change happens.
		c.workqueue.Forget(obj)
		klog.Infof("Successfully synced '%s'", key)
		return nil
	}(obj)

	if err != nil {
		utilruntime.HandleError(err)
		return true
	}

	return true
}
// syncHandler compares the actual state with the desired, and attempts to
// converge the two. It then updates the Status block of the Foo resource
// with the current status of the resource.
func (c *Controller) syncHandler(key string) error {
	.....
}

至此,已经全部分析完毕。

对Kubernetes做扩展开发,一般都是采用自定义的Controller方式。借助官方提供的client-go组件,已经实现了Informer机制。而我们只需要注册ResourceEventHandler事件,并实现自定义的Controller即可完成扩展。

步骤回顾:

以上就是如何通过源码分析Informer机制,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注亿速云行业资讯频道。

推荐阅读:
  1. 通过源码分析MyBatis的缓存
  2. python如何通过实例了解反射机制

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

informer

上一篇:如何理解FizzBuzzWhizz

下一篇:各种排序算法的编写教程

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》