开发Kubernetes自已的Informer

SuKai March 30, 2022

我们在Kubernetes自定义资源CRD控制器开发中都使用过Informer,Informer主要提供了两个功能:1,同步数据到本地缓存。2,监听Kubernetes资源的事件,根据对应的事件操作类型,触发事先注册好的ResourceEventHandle。

前面文章介绍了DaoCloud开源的Clusterpedia通过Informer机制实现了同步多个Kubernetes集群资源到MySQL等数据库。那么如何开发一个简化的自定义的Informer呢?下面我们一起看一下DaoCloud开源的Clusterpedia如何实现Kubernetes资源版本Informer的。

文章内容分以下几部分:

1,Kubernetes原生的Informer机制与实现

2,Clusterpedia的ResourceVersionInformer与原生的Informer比较

3,ResourceVersionInformer代码实现

Kubernetes原生的Informer机制

Informer组件

informer-1

Reflector:反射器,通过Kubernetes的List/Watch API监控指定类型的资源对象。

DeltaFIFO Queue:将Reflector监控到的变化的对象存放在这个FIFO队列中。

LocalStore:Informer的本地缓存,缓存Kubernetes资源对象,可以被Lister的List/Get方法访问,减少对APIServer的访问压力。

WorkQueue:DeltaFIFO中的事件更新完Store后保存到WorkQueue中,Controller处理WorkQueue中的资源对象事件调用对应的回调函数。

SharedInformer实现

这里可以看到:

1,NewSharedIndexInformer中,初始化了processor,这个processor用于回调用户注册的事件处理回调函数,NewIndexer初始化了一个indexer,这个indexer是作为informer的store本地缓存。listerWatcher指定ListerWatcher接口lw,这个是Kubernetes的client连接用于监控Kubernetes资源对象。objectType指定监控的Kubernetes资源类型。

2,SharedInformer的Run中,NewDeltaFIFOWithOptions构造了一个DeltaFIFO实例fifo,构造了一个controller,这个controller使用fifo队列,使用listerWatcher监控资源对象,ObjectType指定资源类型,Process指定controller处理队列的函数。s.processor.run启动监听通知,调用用户注册的回调函数处理。

type SharedInformer interface {
	AddEventHandler(handler ResourceEventHandler)

	AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)

	GetStore() Store

	GetController() Controller

	Run(stopCh <-chan struct{})

	HasSynced() bool

	LastSyncResourceVersion() string

	SetWatchErrorHandler(handler WatchErrorHandler) error
}

func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
	realClock := &clock.RealClock{}
	sharedIndexInformer := &sharedIndexInformer{
		processor:                       &sharedProcessor{clock: realClock},
		indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
		listerWatcher:                   lw,
		objectType:                      exampleObject,
		resyncCheckPeriod:               defaultEventHandlerResyncPeriod,
		defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
		cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
		clock:                           realClock,
	}
	return sharedIndexInformer
}

type sharedIndexInformer struct {
	indexer    Indexer
	controller Controller

	processor             *sharedProcessor
	cacheMutationDetector MutationDetector

	listerWatcher ListerWatcher

	// objectType is an example object of the type this informer is
	// expected to handle.  Only the type needs to be right, except
	// that when that is `unstructured.Unstructured` the object's
	// `"apiVersion"` and `"kind"` must also be right.
	objectType runtime.Object

	// resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
	// shouldResync to check if any of our listeners need a resync.
	resyncCheckPeriod time.Duration
	// defaultEventHandlerResyncPeriod is the default resync period for any handlers added via
	// AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default
	// value).
	defaultEventHandlerResyncPeriod time.Duration
	// clock allows for testability
	clock clock.Clock

	started, stopped bool
	startedLock      sync.Mutex

	// blockDeltas gives a way to stop all event distribution so that a late event handler
	// can safely join the shared informer.
	blockDeltas sync.Mutex

	// Called whenever the ListAndWatch drops the connection with an error.
	watchErrorHandler WatchErrorHandler
}

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()

	fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
		KnownObjects:          s.indexer,
		EmitDeltaTypeReplaced: true,
	})

	cfg := &Config{
		Queue:            fifo,
		ListerWatcher:    s.listerWatcher,
		ObjectType:       s.objectType,
		FullResyncPeriod: s.resyncCheckPeriod,
		RetryOnError:     false,
		ShouldResync:     s.processor.shouldResync,

		Process:           s.HandleDeltas,
		WatchErrorHandler: s.watchErrorHandler,
	}

	func() {
		s.startedLock.Lock()
		defer s.startedLock.Unlock()

		s.controller = New(cfg)
		s.controller.(*controller).clock = s.clock
		s.started = true
	}()

	// Separate stop channel because Processor should be stopped strictly after controller
	processorStopCh := make(chan struct{})
	var wg wait.Group
	defer wg.Wait()              // Wait for Processor to stop
	defer close(processorStopCh) // Tell Processor to stop
	wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
	wg.StartWithChannel(processorStopCh, s.processor.run)

	defer func() {
		s.startedLock.Lock()
		defer s.startedLock.Unlock()
		s.stopped = true // Don't want any new listeners
	}()
	s.controller.Run(stopCh)
}

controller的函数

Run

1,NewReflector使用ListerWatcher, ObjectType, Queue等构造Reflector反射器

2,wait.Until(c.processLoop, time.Second, stopCh)每隔1秒调用一次processLoop函数

processLoop

1,obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)),取出队列中的一条资源对象事件,调用c.config.Process进行处理,Process函数也就是SharedInformer的HandleDeltas。

2,如果出错,RetryOnError为true,队列中不存在这个对象,那么重新将资源对象事件加入到队列中。

func (c *controller) Run(stopCh <-chan struct{}) {
   defer utilruntime.HandleCrash()
   go func() {
      <-stopCh
      c.config.Queue.Close()
   }()
   r := NewReflector(
      c.config.ListerWatcher,
      c.config.ObjectType,
      c.config.Queue,
      c.config.FullResyncPeriod,
   )
   r.ShouldResync = c.config.ShouldResync
   r.WatchListPageSize = c.config.WatchListPageSize
   r.clock = c.clock
   if c.config.WatchErrorHandler != nil {
      r.watchErrorHandler = c.config.WatchErrorHandler
   }

   c.reflectorMutex.Lock()
   c.reflector = r
   c.reflectorMutex.Unlock()

   var wg wait.Group

   wg.StartWithChannel(stopCh, r.Run)

   wait.Until(c.processLoop, time.Second, stopCh)
   wg.Wait()
}

func (c *controller) processLoop() {
	for {
		obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
		if err != nil {
			if err == ErrFIFOClosed {
				return
			}
			if c.config.RetryOnError {
				// This is the safe way to re-enqueue.
				c.config.Queue.AddIfNotPresent(obj)
			}
		}
	}
}

Reflector的watchHandler函数

watchHandler负责将监听Kubernetes资源对象的变化,将资源对象事件保存到反射器的store中,这里的store也就是上面sharedIndexInformer的Run函数中初始化的DeltaFIFO队列fifo。

func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
   eventCount := 0

   // Stopping the watcher should be idempotent and if we return from this function there's no way
   // we're coming back in with the same watch interface.
   defer w.Stop()

loop:
   for {
      select {
      case <-stopCh:
         return errorStopRequested
      case err := <-errc:
         return err
      case event, ok := <-w.ResultChan():
         if !ok {
            break loop
         }
         if event.Type == watch.Error {
            return apierrors.FromObject(event.Object)
         }
         if r.expectedType != nil {
            if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
               utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
               continue
            }
         }
         if r.expectedGVK != nil {
            if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
               utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a))
               continue
            }
         }
         meta, err := meta.Accessor(event.Object)
         if err != nil {
            utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
            continue
         }
         newResourceVersion := meta.GetResourceVersion()
         switch event.Type {
         case watch.Added:
            err := r.store.Add(event.Object)
            if err != nil {
               utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
            }
         case watch.Modified:
            err := r.store.Update(event.Object)
            if err != nil {
               utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
            }
         case watch.Deleted:
            // TODO: Will any consumers need access to the "last known
            // state", which is passed in event.Object? If so, may need
            // to change this.
            err := r.store.Delete(event.Object)
            if err != nil {
               utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
            }
         case watch.Bookmark:
            // A `Bookmark` means watch has synced here, just update the resourceVersion
         default:
            utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
         }
         *resourceVersion = newResourceVersion
         r.setLastSyncResourceVersion(newResourceVersion)
         if rvu, ok := r.store.(ResourceVersionUpdater); ok {
            rvu.UpdateResourceVersion(newResourceVersion)
         }
         eventCount++
      }
   }

   watchDuration := r.clock.Since(start)
   if watchDuration < 1*time.Second && eventCount == 0 {
      return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
   }
   klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedTypeName, eventCount)
   return nil
}

SharedInformer的HandleDeltas

HandleDeltas将队列中取出的资源对象事件进行处理,根据事件的操作类型,调用indexer对应的操作,将资源对象保存到本地缓存。

当indexer保存结束后,调用s.processor.distribute发送通知,调用用户注册的回调函数。

func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
   s.blockDeltas.Lock()
   defer s.blockDeltas.Unlock()

   // from oldest to newest
   for _, d := range obj.(Deltas) {
      switch d.Type {
      case Sync, Replaced, Added, Updated:
         s.cacheMutationDetector.AddObject(d.Object)
         if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
            if err := s.indexer.Update(d.Object); err != nil {
               return err
            }

            isSync := false
            switch {
            case d.Type == Sync:
               // Sync events are only propagated to listeners that requested resync
               isSync = true
            case d.Type == Replaced:
               if accessor, err := meta.Accessor(d.Object); err == nil {
                  if oldAccessor, err := meta.Accessor(old); err == nil {
                     // Replaced events that didn't change resourceVersion are treated as resync events
                     // and only propagated to listeners that requested resync
                     isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
                  }
               }
            }
            s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
         } else {
            if err := s.indexer.Add(d.Object); err != nil {
               return err
            }
            s.processor.distribute(addNotification{newObj: d.Object}, false)
         }
      case Deleted:
         if err := s.indexer.Delete(d.Object); err != nil {
            return err
         }
         s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
      }
   }
   return nil
}

Index本地缓存操作

使用map来保存Kubernetes资源对象

type ThreadSafeStore interface {
   Add(key string, obj interface{})
   Update(key string, obj interface{})
   Delete(key string)
   Get(key string) (item interface{}, exists bool)
   List() []interface{}
   ListKeys() []string
   Replace(map[string]interface{}, string)
   Index(indexName string, obj interface{}) ([]interface{}, error)
   IndexKeys(indexName, indexKey string) ([]string, error)
   ListIndexFuncValues(name string) []string
   ByIndex(indexName, indexKey string) ([]interface{}, error)
   GetIndexers() Indexers

   // AddIndexers adds more indexers to this store.  If you call this after you already have data
   // in the store, the results are undefined.
   AddIndexers(newIndexers Indexers) error
   // Resync is a no-op and is deprecated
   Resync() error
}

// threadSafeMap implements ThreadSafeStore
type threadSafeMap struct {
   lock  sync.RWMutex
   items map[string]interface{}

   // indexers maps a name to an IndexFunc
   indexers Indexers
   // indices maps a name to an Index
   indices Indices
}

func (c *threadSafeMap) Add(key string, obj interface{}) {
   c.lock.Lock()
   defer c.lock.Unlock()
   oldObject := c.items[key]
   c.items[key] = obj
   c.updateIndices(oldObject, obj, key)
}

func (c *threadSafeMap) Update(key string, obj interface{}) {
   c.lock.Lock()
   defer c.lock.Unlock()
   oldObject := c.items[key]
   c.items[key] = obj
   c.updateIndices(oldObject, obj, key)
}

func (c *threadSafeMap) Delete(key string) {
   c.lock.Lock()
   defer c.lock.Unlock()
   if obj, exists := c.items[key]; exists {
      c.deleteFromIndices(obj, key)
      delete(c.items, key)
   }
}

func (c *threadSafeMap) Get(key string) (item interface{}, exists bool) {
   c.lock.RLock()
   defer c.lock.RUnlock()
   item, exists = c.items[key]
   return item, exists
}

func (c *threadSafeMap) List() []interface{} {
   c.lock.RLock()
   defer c.lock.RUnlock()
   list := make([]interface{}, 0, len(c.items))
   for _, item := range c.items {
      list = append(list, item)
   }
   return list
}

sharedProcessor调用用户注册的回调函数

当HandleDeltas调用了本地缓存操作完成后,发送资源对象事件通知,根据通知的事件操作类型,调用对应的用户注册回调函数。

func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
	p.listenersLock.RLock()
	defer p.listenersLock.RUnlock()

	if sync {
		for _, listener := range p.syncingListeners {
			listener.add(obj)
		}
	} else {
		for _, listener := range p.listeners {
			listener.add(obj)
		}
	}
}

func (p *processorListener) add(notification interface{}) {
	p.addCh <- notification
}

func (p *processorListener) run() {
   // this call blocks until the channel is closed.  When a panic happens during the notification
   // we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
   // the next notification will be attempted.  This is usually better than the alternative of never
   // delivering again.
   stopCh := make(chan struct{})
   wait.Until(func() {
      for next := range p.nextCh {
         switch notification := next.(type) {
         case updateNotification:
            p.handler.OnUpdate(notification.oldObj, notification.newObj)
         case addNotification:
            p.handler.OnAdd(notification.newObj)
         case deleteNotification:
            p.handler.OnDelete(notification.oldObj)
         default:
            utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
         }
      }
      // the only way to get here is if the p.nextCh is empty and closed
      close(stopCh)
   }, 1*time.Second, stopCh)
}

总结Kubernetes的sharedIndexInformer实现过程:

1,反射器Reflector的watchHandler将资源对象的事件写入到DeltaFIFO队列。

2,SharedInformer的HandleDeltas将资源对象的事件写入到本地缓存Indexer的map中。

3,SharedInformer的HandleDeltas发送通知,调用用户注册的回调函数。

Clusterpedia的ResourceVersionInformer与原生的Informer比较

1,直接使用Kubernetes原生的反射器Reflector监听不同Kubernetes集群的资源对象变化,将事件写入到DeltaFIFO队列。

2,ResourceVersionInformer的HandleDeltas写入到本地缓存中,本地缓存使用Kubernetes原生的ThreadSafeStore接口。

3,ResourceVersionInformer的HandleDeltas不再发送通知调用用户注册的回调函数,而是在本地缓存保存完成后,直接调用ResourceEventHandler接口,结构体ResourceSynchro实现了ResourceEventHandler接口

ResourceVersionInformer的实现

代码上基本和Kubernetes的sharedIndexInformer一致,去掉了不需要的方法,比如注册用户回调函数方法。resourceVersionInformer增加了一个ResourceEventHandler。HandleDeltas保存完本地缓存后,直接调用事件处理函数ResourceEventHandler。

type ResourceVersionInformer interface {
   Run(stopCh <-chan struct{})
   HasSynced() bool
}

type resourceVersionInformer struct {
   name          string
   storage       *ResourceVersionStorage
   handler       ResourceEventHandler
   controller    cache.Controller
   listerWatcher cache.ListerWatcher
}

func NewResourceVersionInformer(name string, lw cache.ListerWatcher, storage *ResourceVersionStorage, exampleObject runtime.Object, handler ResourceEventHandler) ResourceVersionInformer {
   if name == "" {
      panic("name is required")
   }

   // storage: NewResourceVersionStorage(cache.DeletionHandlingMetaNamespaceKeyFunc),
   informer := &resourceVersionInformer{
      name:          name,
      listerWatcher: lw,
      storage:       storage,
      handler:       handler,
   }

   config := &cache.Config{
      ListerWatcher: lw,
      ObjectType:    exampleObject,
      RetryOnError:  false,
      Process: func(obj interface{}) error {
         deltas := obj.(cache.Deltas)
         return informer.HandleDeltas(deltas)
      },
      Queue: cache.NewDeltaFIFOWithOptions(cache.DeltaFIFOOptions{
         KeyFunction:           cache.DeletionHandlingMetaNamespaceKeyFunc,
         KnownObjects:          informer.storage,
         EmitDeltaTypeReplaced: true,
      }),
   }
   informer.controller = NewNamedController(informer.name, config)
   return informer
}

func (informer *resourceVersionInformer) HasSynced() bool {
   return informer.controller.HasSynced()
}

func (informer *resourceVersionInformer) Run(stopCh <-chan struct{}) {
   informer.controller.Run(stopCh)
}

func (informer *resourceVersionInformer) HandleDeltas(deltas cache.Deltas) error {
   for _, d := range deltas {
      switch d.Type {
      case cache.Replaced, cache.Added, cache.Updated:
         version, exists, err := informer.storage.Get(d.Object)
         if err != nil {
            return err
         }

         if !exists {
            if err := informer.storage.Add(d.Object); err != nil {
               return err
            }

            informer.handler.OnAdd(d.Object)
            break
         }

         if d.Type == cache.Replaced {
            if v := compareResourceVersion(d.Object, version); v <= 0 {
               if v == 0 {
                  informer.handler.OnSync(d.Object)
               }
               break
            }
         }

         if err := informer.storage.Update(d.Object); err != nil {
            return err
         }
         informer.handler.OnUpdate(nil, d.Object)
      case cache.Deleted:
         if err := informer.storage.Delete(d.Object); err != nil {
            return err
         }
         informer.handler.OnDelete(d.Object)
      }
   }
   return nil
}

Controller和原生的Controller一致

type controller struct {
   name   string
   config cache.Config

   reflectorMutex sync.RWMutex
   reflector      *cache.Reflector
   queue          cache.Queue
}

func NewNamedController(name string, config *cache.Config) cache.Controller {
   return &controller{
      name:   name,
      config: *config,
   }
}

func (c *controller) Run(stopCh <-chan struct{}) {
   defer utilruntime.HandleCrash()
   go func() {
      <-stopCh
      c.config.Queue.Close()
   }()
   r := cache.NewNamedReflector(
      c.name,
      c.config.ListerWatcher,
      c.config.ObjectType,
      c.config.Queue,
      c.config.FullResyncPeriod,
   )
   r.ShouldResync = c.config.ShouldResync
   r.WatchListPageSize = c.config.WatchListPageSize

   c.reflectorMutex.Lock()
   c.reflector = r
   c.reflectorMutex.Unlock()

   var wg wait.Group
   wg.StartWithChannel(stopCh, r.Run)

   wait.Until(c.processLoop, time.Second, stopCh)
   wg.Wait()
}

func (c *controller) processLoop() {
   for {
      obj, err := c.config.Queue.Pop(cache.PopProcessFunc(c.config.Process))
      if err != nil {
         if err == cache.ErrFIFOClosed {
            return
         }
         if c.config.RetryOnError {
            // This is the safe way to re-enqueue.
            _ = c.config.Queue.AddIfNotPresent(obj)
         }
      }
   }
}

本地缓存直接使用ThreadSafeStore操作

type ResourceVersionStorage struct {
   keyFunc cache.KeyFunc

   cacheStorage cache.ThreadSafeStore
}

var _ cache.KeyListerGetter = &ResourceVersionStorage{}

func NewResourceVersionStorage(keyFunc cache.KeyFunc) *ResourceVersionStorage {
   return &ResourceVersionStorage{
      cacheStorage: cache.NewThreadSafeStore(cache.Indexers{}, cache.Indices{}),
      keyFunc:      keyFunc,
   }
}

func (c *ResourceVersionStorage) Add(obj interface{}) error {
   key, err := c.keyFunc(obj)
   if err != nil {
      return cache.KeyError{Obj: obj, Err: err}
   }
   accessor, err := meta.Accessor(obj)
   if err != nil {
      return err
   }

   c.cacheStorage.Add(key, accessor.GetResourceVersion())
   return nil
}

func (c *ResourceVersionStorage) Update(obj interface{}) error {
   key, err := c.keyFunc(obj)
   if err != nil {
      return cache.KeyError{Obj: obj, Err: err}
   }
   accessor, err := meta.Accessor(obj)
   if err != nil {
      return err
   }

   c.cacheStorage.Update(key, accessor.GetResourceVersion())
   return nil
}

func (c *ResourceVersionStorage) Delete(obj interface{}) error {
   key, err := c.keyFunc(obj)
   if err != nil {
      return cache.KeyError{Obj: obj, Err: err}
   }

   c.cacheStorage.Delete(key)
   return nil
}

func (c *ResourceVersionStorage) Get(obj interface{}) (string, bool, error) {
   key, err := c.keyFunc(obj)
   if err != nil {
      return "", false, cache.KeyError{Obj: obj, Err: err}
   }
   version, exists := c.cacheStorage.Get(key)
   if exists {
      return version.(string), exists, nil
   }
   return "", false, nil
}

总结

如果要实现一个自定义的Informer,主要需要修改的就是处理DeltaFIFO队列的HandleDeltas函数,根据业务逻辑进行修改。这种用法适用于只是接收处理资源对象事件进行处理的场景。