由 SuKai March 30, 2022
我们在Kubernetes自定义资源CRD控制器开发中都使用过Informer,Informer主要提供了两个功能:1,同步数据到本地缓存。2,监听Kubernetes资源的事件,根据对应的事件操作类型,触发事先注册好的ResourceEventHandle。
前面文章介绍了DaoCloud开源的Clusterpedia通过Informer机制实现了同步多个Kubernetes集群资源到MySQL等数据库。那么如何开发一个简化的自定义的Informer呢?下面我们一起看一下DaoCloud开源的Clusterpedia如何实现Kubernetes资源版本Informer的。
文章内容分以下几部分:
1,Kubernetes原生的Informer机制与实现
2,Clusterpedia的ResourceVersionInformer与原生的Informer比较
3,ResourceVersionInformer代码实现
Kubernetes原生的Informer机制
Informer组件
Reflector:反射器,通过Kubernetes的List/Watch API监控指定类型的资源对象。
DeltaFIFO Queue:将Reflector监控到的变化的对象存放在这个FIFO队列中。
LocalStore:Informer的本地缓存,缓存Kubernetes资源对象,可以被Lister的List/Get方法访问,减少对APIServer的访问压力。
WorkQueue:DeltaFIFO中的事件更新完Store后保存到WorkQueue中,Controller处理WorkQueue中的资源对象事件调用对应的回调函数。
SharedInformer实现
这里可以看到:
1,NewSharedIndexInformer中,初始化了processor,这个processor用于回调用户注册的事件处理回调函数,NewIndexer初始化了一个indexer,这个indexer是作为informer的store本地缓存。listerWatcher指定ListerWatcher接口lw,这个是Kubernetes的client连接用于监控Kubernetes资源对象。objectType指定监控的Kubernetes资源类型。
2,SharedInformer的Run中,NewDeltaFIFOWithOptions构造了一个DeltaFIFO实例fifo,构造了一个controller,这个controller使用fifo队列,使用listerWatcher监控资源对象,ObjectType指定资源类型,Process指定controller处理队列的函数。s.processor.run启动监听通知,调用用户注册的回调函数处理。
type SharedInformer interface {
AddEventHandler(handler ResourceEventHandler)
AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
GetStore() Store
GetController() Controller
Run(stopCh <-chan struct{})
HasSynced() bool
LastSyncResourceVersion() string
SetWatchErrorHandler(handler WatchErrorHandler) error
}
func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
realClock := &clock.RealClock{}
sharedIndexInformer := &sharedIndexInformer{
processor: &sharedProcessor{clock: realClock},
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
listerWatcher: lw,
objectType: exampleObject,
resyncCheckPeriod: defaultEventHandlerResyncPeriod,
defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
clock: realClock,
}
return sharedIndexInformer
}
type sharedIndexInformer struct {
indexer Indexer
controller Controller
processor *sharedProcessor
cacheMutationDetector MutationDetector
listerWatcher ListerWatcher
// objectType is an example object of the type this informer is
// expected to handle. Only the type needs to be right, except
// that when that is `unstructured.Unstructured` the object's
// `"apiVersion"` and `"kind"` must also be right.
objectType runtime.Object
// resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
// shouldResync to check if any of our listeners need a resync.
resyncCheckPeriod time.Duration
// defaultEventHandlerResyncPeriod is the default resync period for any handlers added via
// AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default
// value).
defaultEventHandlerResyncPeriod time.Duration
// clock allows for testability
clock clock.Clock
started, stopped bool
startedLock sync.Mutex
// blockDeltas gives a way to stop all event distribution so that a late event handler
// can safely join the shared informer.
blockDeltas sync.Mutex
// Called whenever the ListAndWatch drops the connection with an error.
watchErrorHandler WatchErrorHandler
}
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: s.indexer,
EmitDeltaTypeReplaced: true,
})
cfg := &Config{
Queue: fifo,
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
ShouldResync: s.processor.shouldResync,
Process: s.HandleDeltas,
WatchErrorHandler: s.watchErrorHandler,
}
func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
}()
// Separate stop channel because Processor should be stopped strictly after controller
processorStopCh := make(chan struct{})
var wg wait.Group
defer wg.Wait() // Wait for Processor to stop
defer close(processorStopCh) // Tell Processor to stop
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
wg.StartWithChannel(processorStopCh, s.processor.run)
defer func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.stopped = true // Don't want any new listeners
}()
s.controller.Run(stopCh)
}
controller的函数
Run
1,NewReflector使用ListerWatcher, ObjectType, Queue等构造Reflector反射器
2,wait.Until(c.processLoop, time.Second, stopCh)每隔1秒调用一次processLoop函数
processLoop
1,obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)),取出队列中的一条资源对象事件,调用c.config.Process进行处理,Process函数也就是SharedInformer的HandleDeltas。
2,如果出错,RetryOnError为true,队列中不存在这个对象,那么重新将资源对象事件加入到队列中。
func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go func() {
<-stopCh
c.config.Queue.Close()
}()
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
r.ShouldResync = c.config.ShouldResync
r.WatchListPageSize = c.config.WatchListPageSize
r.clock = c.clock
if c.config.WatchErrorHandler != nil {
r.watchErrorHandler = c.config.WatchErrorHandler
}
c.reflectorMutex.Lock()
c.reflector = r
c.reflectorMutex.Unlock()
var wg wait.Group
wg.StartWithChannel(stopCh, r.Run)
wait.Until(c.processLoop, time.Second, stopCh)
wg.Wait()
}
func (c *controller) processLoop() {
for {
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
if err == ErrFIFOClosed {
return
}
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}
Reflector的watchHandler函数
watchHandler负责将监听Kubernetes资源对象的变化,将资源对象事件保存到反射器的store中,这里的store也就是上面sharedIndexInformer的Run函数中初始化的DeltaFIFO队列fifo。
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
eventCount := 0
// Stopping the watcher should be idempotent and if we return from this function there's no way
// we're coming back in with the same watch interface.
defer w.Stop()
loop:
for {
select {
case <-stopCh:
return errorStopRequested
case err := <-errc:
return err
case event, ok := <-w.ResultChan():
if !ok {
break loop
}
if event.Type == watch.Error {
return apierrors.FromObject(event.Object)
}
if r.expectedType != nil {
if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
continue
}
}
if r.expectedGVK != nil {
if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a))
continue
}
}
meta, err := meta.Accessor(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
continue
}
newResourceVersion := meta.GetResourceVersion()
switch event.Type {
case watch.Added:
err := r.store.Add(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
}
case watch.Modified:
err := r.store.Update(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
}
case watch.Deleted:
// TODO: Will any consumers need access to the "last known
// state", which is passed in event.Object? If so, may need
// to change this.
err := r.store.Delete(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
}
case watch.Bookmark:
// A `Bookmark` means watch has synced here, just update the resourceVersion
default:
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
}
*resourceVersion = newResourceVersion
r.setLastSyncResourceVersion(newResourceVersion)
if rvu, ok := r.store.(ResourceVersionUpdater); ok {
rvu.UpdateResourceVersion(newResourceVersion)
}
eventCount++
}
}
watchDuration := r.clock.Since(start)
if watchDuration < 1*time.Second && eventCount == 0 {
return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
}
klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedTypeName, eventCount)
return nil
}
SharedInformer的HandleDeltas
HandleDeltas将队列中取出的资源对象事件进行处理,根据事件的操作类型,调用indexer对应的操作,将资源对象保存到本地缓存。
当indexer保存结束后,调用s.processor.distribute发送通知,调用用户注册的回调函数。
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
// from oldest to newest
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Replaced, Added, Updated:
s.cacheMutationDetector.AddObject(d.Object)
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
if err := s.indexer.Update(d.Object); err != nil {
return err
}
isSync := false
switch {
case d.Type == Sync:
// Sync events are only propagated to listeners that requested resync
isSync = true
case d.Type == Replaced:
if accessor, err := meta.Accessor(d.Object); err == nil {
if oldAccessor, err := meta.Accessor(old); err == nil {
// Replaced events that didn't change resourceVersion are treated as resync events
// and only propagated to listeners that requested resync
isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
}
}
}
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
if err := s.indexer.Add(d.Object); err != nil {
return err
}
s.processor.distribute(addNotification{newObj: d.Object}, false)
}
case Deleted:
if err := s.indexer.Delete(d.Object); err != nil {
return err
}
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
}
}
return nil
}
Index本地缓存操作
使用map来保存Kubernetes资源对象
type ThreadSafeStore interface {
Add(key string, obj interface{})
Update(key string, obj interface{})
Delete(key string)
Get(key string) (item interface{}, exists bool)
List() []interface{}
ListKeys() []string
Replace(map[string]interface{}, string)
Index(indexName string, obj interface{}) ([]interface{}, error)
IndexKeys(indexName, indexKey string) ([]string, error)
ListIndexFuncValues(name string) []string
ByIndex(indexName, indexKey string) ([]interface{}, error)
GetIndexers() Indexers
// AddIndexers adds more indexers to this store. If you call this after you already have data
// in the store, the results are undefined.
AddIndexers(newIndexers Indexers) error
// Resync is a no-op and is deprecated
Resync() error
}
// threadSafeMap implements ThreadSafeStore
type threadSafeMap struct {
lock sync.RWMutex
items map[string]interface{}
// indexers maps a name to an IndexFunc
indexers Indexers
// indices maps a name to an Index
indices Indices
}
func (c *threadSafeMap) Add(key string, obj interface{}) {
c.lock.Lock()
defer c.lock.Unlock()
oldObject := c.items[key]
c.items[key] = obj
c.updateIndices(oldObject, obj, key)
}
func (c *threadSafeMap) Update(key string, obj interface{}) {
c.lock.Lock()
defer c.lock.Unlock()
oldObject := c.items[key]
c.items[key] = obj
c.updateIndices(oldObject, obj, key)
}
func (c *threadSafeMap) Delete(key string) {
c.lock.Lock()
defer c.lock.Unlock()
if obj, exists := c.items[key]; exists {
c.deleteFromIndices(obj, key)
delete(c.items, key)
}
}
func (c *threadSafeMap) Get(key string) (item interface{}, exists bool) {
c.lock.RLock()
defer c.lock.RUnlock()
item, exists = c.items[key]
return item, exists
}
func (c *threadSafeMap) List() []interface{} {
c.lock.RLock()
defer c.lock.RUnlock()
list := make([]interface{}, 0, len(c.items))
for _, item := range c.items {
list = append(list, item)
}
return list
}
sharedProcessor调用用户注册的回调函数
当HandleDeltas调用了本地缓存操作完成后,发送资源对象事件通知,根据通知的事件操作类型,调用对应的用户注册回调函数。
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
if sync {
for _, listener := range p.syncingListeners {
listener.add(obj)
}
} else {
for _, listener := range p.listeners {
listener.add(obj)
}
}
}
func (p *processorListener) add(notification interface{}) {
p.addCh <- notification
}
func (p *processorListener) run() {
// this call blocks until the channel is closed. When a panic happens during the notification
// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
// the next notification will be attempted. This is usually better than the alternative of never
// delivering again.
stopCh := make(chan struct{})
wait.Until(func() {
for next := range p.nextCh {
switch notification := next.(type) {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
}
}
// the only way to get here is if the p.nextCh is empty and closed
close(stopCh)
}, 1*time.Second, stopCh)
}
总结Kubernetes的sharedIndexInformer实现过程:
1,反射器Reflector的watchHandler将资源对象的事件写入到DeltaFIFO队列。
2,SharedInformer的HandleDeltas将资源对象的事件写入到本地缓存Indexer的map中。
3,SharedInformer的HandleDeltas发送通知,调用用户注册的回调函数。
Clusterpedia的ResourceVersionInformer与原生的Informer比较
1,直接使用Kubernetes原生的反射器Reflector监听不同Kubernetes集群的资源对象变化,将事件写入到DeltaFIFO队列。
2,ResourceVersionInformer的HandleDeltas写入到本地缓存中,本地缓存使用Kubernetes原生的ThreadSafeStore接口。
3,ResourceVersionInformer的HandleDeltas不再发送通知调用用户注册的回调函数,而是在本地缓存保存完成后,直接调用ResourceEventHandler接口,结构体ResourceSynchro实现了ResourceEventHandler接口
ResourceVersionInformer的实现
代码上基本和Kubernetes的sharedIndexInformer一致,去掉了不需要的方法,比如注册用户回调函数方法。resourceVersionInformer增加了一个ResourceEventHandler。HandleDeltas保存完本地缓存后,直接调用事件处理函数ResourceEventHandler。
type ResourceVersionInformer interface {
Run(stopCh <-chan struct{})
HasSynced() bool
}
type resourceVersionInformer struct {
name string
storage *ResourceVersionStorage
handler ResourceEventHandler
controller cache.Controller
listerWatcher cache.ListerWatcher
}
func NewResourceVersionInformer(name string, lw cache.ListerWatcher, storage *ResourceVersionStorage, exampleObject runtime.Object, handler ResourceEventHandler) ResourceVersionInformer {
if name == "" {
panic("name is required")
}
// storage: NewResourceVersionStorage(cache.DeletionHandlingMetaNamespaceKeyFunc),
informer := &resourceVersionInformer{
name: name,
listerWatcher: lw,
storage: storage,
handler: handler,
}
config := &cache.Config{
ListerWatcher: lw,
ObjectType: exampleObject,
RetryOnError: false,
Process: func(obj interface{}) error {
deltas := obj.(cache.Deltas)
return informer.HandleDeltas(deltas)
},
Queue: cache.NewDeltaFIFOWithOptions(cache.DeltaFIFOOptions{
KeyFunction: cache.DeletionHandlingMetaNamespaceKeyFunc,
KnownObjects: informer.storage,
EmitDeltaTypeReplaced: true,
}),
}
informer.controller = NewNamedController(informer.name, config)
return informer
}
func (informer *resourceVersionInformer) HasSynced() bool {
return informer.controller.HasSynced()
}
func (informer *resourceVersionInformer) Run(stopCh <-chan struct{}) {
informer.controller.Run(stopCh)
}
func (informer *resourceVersionInformer) HandleDeltas(deltas cache.Deltas) error {
for _, d := range deltas {
switch d.Type {
case cache.Replaced, cache.Added, cache.Updated:
version, exists, err := informer.storage.Get(d.Object)
if err != nil {
return err
}
if !exists {
if err := informer.storage.Add(d.Object); err != nil {
return err
}
informer.handler.OnAdd(d.Object)
break
}
if d.Type == cache.Replaced {
if v := compareResourceVersion(d.Object, version); v <= 0 {
if v == 0 {
informer.handler.OnSync(d.Object)
}
break
}
}
if err := informer.storage.Update(d.Object); err != nil {
return err
}
informer.handler.OnUpdate(nil, d.Object)
case cache.Deleted:
if err := informer.storage.Delete(d.Object); err != nil {
return err
}
informer.handler.OnDelete(d.Object)
}
}
return nil
}
Controller和原生的Controller一致
type controller struct {
name string
config cache.Config
reflectorMutex sync.RWMutex
reflector *cache.Reflector
queue cache.Queue
}
func NewNamedController(name string, config *cache.Config) cache.Controller {
return &controller{
name: name,
config: *config,
}
}
func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go func() {
<-stopCh
c.config.Queue.Close()
}()
r := cache.NewNamedReflector(
c.name,
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
r.ShouldResync = c.config.ShouldResync
r.WatchListPageSize = c.config.WatchListPageSize
c.reflectorMutex.Lock()
c.reflector = r
c.reflectorMutex.Unlock()
var wg wait.Group
wg.StartWithChannel(stopCh, r.Run)
wait.Until(c.processLoop, time.Second, stopCh)
wg.Wait()
}
func (c *controller) processLoop() {
for {
obj, err := c.config.Queue.Pop(cache.PopProcessFunc(c.config.Process))
if err != nil {
if err == cache.ErrFIFOClosed {
return
}
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
_ = c.config.Queue.AddIfNotPresent(obj)
}
}
}
}
本地缓存直接使用ThreadSafeStore操作
type ResourceVersionStorage struct {
keyFunc cache.KeyFunc
cacheStorage cache.ThreadSafeStore
}
var _ cache.KeyListerGetter = &ResourceVersionStorage{}
func NewResourceVersionStorage(keyFunc cache.KeyFunc) *ResourceVersionStorage {
return &ResourceVersionStorage{
cacheStorage: cache.NewThreadSafeStore(cache.Indexers{}, cache.Indices{}),
keyFunc: keyFunc,
}
}
func (c *ResourceVersionStorage) Add(obj interface{}) error {
key, err := c.keyFunc(obj)
if err != nil {
return cache.KeyError{Obj: obj, Err: err}
}
accessor, err := meta.Accessor(obj)
if err != nil {
return err
}
c.cacheStorage.Add(key, accessor.GetResourceVersion())
return nil
}
func (c *ResourceVersionStorage) Update(obj interface{}) error {
key, err := c.keyFunc(obj)
if err != nil {
return cache.KeyError{Obj: obj, Err: err}
}
accessor, err := meta.Accessor(obj)
if err != nil {
return err
}
c.cacheStorage.Update(key, accessor.GetResourceVersion())
return nil
}
func (c *ResourceVersionStorage) Delete(obj interface{}) error {
key, err := c.keyFunc(obj)
if err != nil {
return cache.KeyError{Obj: obj, Err: err}
}
c.cacheStorage.Delete(key)
return nil
}
func (c *ResourceVersionStorage) Get(obj interface{}) (string, bool, error) {
key, err := c.keyFunc(obj)
if err != nil {
return "", false, cache.KeyError{Obj: obj, Err: err}
}
version, exists := c.cacheStorage.Get(key)
if exists {
return version.(string), exists, nil
}
return "", false, nil
}
总结
如果要实现一个自定义的Informer,主要需要修改的就是处理DeltaFIFO队列的HandleDeltas函数,根据业务逻辑进行修改。这种用法适用于只是接收处理资源对象事件进行处理的场景。