Golang事件队列开发

SuKai March 28, 2022

前面文章介绍了DaoCloud开源的Clusterpedia通过Informer机制监控Kubernetes资源事件,保存事件对象到队列中,等待事件处理函数将Kubernetes资源对象保存到数据库中。今天一起学习一下DaoCloud开源的Clusterpedia如何实现事件队列的。

下面我们文章首先介绍Clusterpedia如何使用EventQueue的,再来介绍EventQueue如何实现的。主要分以下部分:

1,Clusterpedia事件处理函数HandleDeltas,调用informer.handler,这个handler实现了ResourceEventHandler接口,有OnAdd,OnUpdate,OnDelete方法。

2,实现了ResourceEventHandler接口的ResourceSynchro,ResourceSynchro调用EventQueue接口将对象写入队列

3,实现了EventQueue接口的pressurequeue,pressurequeue负责队列的操作。

事件处理函数HandleDeltas

Clusterpedia直接基于client-go底层的cache.Controller开发了自己的resourceVersionInformer,cache.Controller的Process函数调用HandleDeltas来处理资源事件。根据cache.Deltas的操作类型,调用informer.storage和informer.handler的对应操作。这里的handler指的是ResourceEventHandler。

func (informer *resourceVersionInformer) HandleDeltas(deltas cache.Deltas) error {
   for _, d := range deltas {
      switch d.Type {
      case cache.Replaced, cache.Added, cache.Updated:
         version, exists, err := informer.storage.Get(d.Object)
         if err != nil {
            return err
         }

         if !exists {
            if err := informer.storage.Add(d.Object); err != nil {
               return err
            }

            informer.handler.OnAdd(d.Object)
            break
         }

         if d.Type == cache.Replaced {
            if v := compareResourceVersion(d.Object, version); v <= 0 {
               if v == 0 {
                  informer.handler.OnSync(d.Object)
               }
               break
            }
         }

         if err := informer.storage.Update(d.Object); err != nil {
            return err
         }
         informer.handler.OnUpdate(nil, d.Object)
      case cache.Deleted:
         if err := informer.storage.Delete(d.Object); err != nil {
            return err
         }
         informer.handler.OnDelete(d.Object)
      }
   }
   return nil
}

ResourceEventHandler ResourceSynchro

ResourceSynchro的处理很简单,删除资源对象metadata中clusterpedia相关的字段和annotation,然后调用队列操作。

func (synchro *ResourceSynchro) OnAdd(obj interface{}) {
   // `obj` will not be processed in parallel elsewhere,
   // no deep copy is needed for now.
   //
   // robj := obj.(runtime.Object).DeepCopyObject()

   // Prune object before enqueue.
   //
   // There are many solutions for pruning fields, such as
   // * prunning at the clusterpedia apiserver.
   // * prunning in the storage layer, where neither clustersynchro
   //   nor apiserver are responsible for the pruning process.
   // https://github.com/clusterpedia-io/clusterpedia/issues/4
   synchro.pruneObject(obj.(*unstructured.Unstructured))

   _ = synchro.queue.Add(obj)
}

func (synchro *ResourceSynchro) OnUpdate(_, obj interface{}) {
   // `obj` will not be processed in parallel elsewhere,
   // no deep copy is needed for now.
   //
   // robj := obj.(runtime.Object).DeepCopyObject()

   // https://github.com/clusterpedia-io/clusterpedia/issues/4
   synchro.pruneObject(obj.(*unstructured.Unstructured))
   _ = synchro.queue.Update(obj)
}

func (synchro *ResourceSynchro) OnDelete(obj interface{}) {
   _ = synchro.queue.Delete(obj)
}

func (synchro *ResourceSynchro) OnSync(obj interface{}) {
}

EventQueue队列操作

队列接口的方法包括:新建,更新,删除,取出,事件处理结束,关闭。

type EventQueue interface {
   Add(obj interface{}) error
   Update(obj interface{}) error
   Delete(obj interface{}) error

   Pop() (*Event, error)
   Done(event *Event) error

   Close()
}

pressurequeue

pressurequeue结构体包含:

1,lock,sync.RWMutex读写互斥锁,同时只允许多个读操作,或一个写操作在进行,支持Lock, Unlock, Rlock, RUnlock。在这里只使用了Lock, Unlock写锁定和写解锁,所以这里所有的操作都是互斥的,保证在同一时刻仅有一个线程访问某一个共享资源。

2,cond,sync.Cond条件变量,和互斥锁组合使用,协调想要访问共享资源的线程。当共享资源状态发生变化时,通知其它因此而阻塞的线程,起到线程之间的协调作用。在构造函数NewPressureQueue中,q.cond.L = &q.lock表示通过观察lock来触发条件改变。cond有提供了三个方法:Wait(), Signal(), Broadcast()。当一个goroutine协程调用Wait()方法后,这个goroutine协程会加入到条件变量等待通知的列表里,当调用Signal(), Broadcast()方法会发出条件改变通知,Signal()只唤醒一个等待列表中的协程,Broadcast()唤醒所有等待列表中的协程。

3,processing,sets.String字符串集合,集合中的元素不能重复,与map类似,value为空,实际类型为map[string]struct{},降低内存消耗,提供了集合的Insert, Delete, Has, HasAll, HasAny, Difference,Union操作方法。processing用于保存正在处理中的资源对象。

4,items,map[string]*Event保存事件。items用于保存资源对象的事件。

5,queue,[]string用slice作为队列。queue用于保存等待处理的资源对象。

6,keyFunc,生成key的函数,这里使用资源对象名称作为key。

7,closed,bool类型,队列关闭。


func NewPressureQueue(keyFunc KeyFunc) *pressurequeue {
	if keyFunc == nil {
		panic("keyFunc is required")
	}

	q := &pressurequeue{
		processing: sets.NewString(),
		items:      map[string]*Event{},
		queue:      []string{},
		keyFunc:    keyFunc,
	}
	q.cond.L = &q.lock
	return q
}

type pressurequeue struct {
	lock sync.RWMutex
	cond sync.Cond

	processing sets.String
	items      map[string]*Event
	queue      []string
	keyFunc    KeyFunc
	closed     bool
}

pressurequeue的方法

1,pressurequeue的接口操作方法中,第一步是申请排斥锁,最后结束时解开锁。

2,Add, Update, Delete调用queueActionLocked完成队列操作,传参:操作类型,资源对象。

3,queueActionLocked中调用put将事件保存到队列中,在put之前会先调用pressureEvents将队列中现有的资源对象操作事件(老事件)和新的操作事件进行比较返回结果事件。如果新的事件操作类型是删除,则返回新的删除事件。如果新的事件操作类型是更新,老事件为删除,则返回老的删除事件,老事件为新建,则返回新的事件,老事件也为更新,则返回新的更新事件。如果新的事件操作类型是新建,老事件为删除,则修改新的事件为更新并返回,老事件为更新或者新建,则返回新的新建事件。

4,Pop取出资源对象事件。如果queue长度为0,则执行q.cond.Wait()条件等待,直到有条件改变时再判断queue长度是否为0,如果在等待过程中q.closed为true,表示已经事件队列关闭,直接退出。当queue长度不为0时,从queue中取出第一条资源对象并删除。从items中取出第一条资源对象对应的事件并删除。在processing中插入第一条资源对象。返回事件数据。

5,Done资源对象事件处理结束。在processing中删除这个资源对象,如果在items中存在这个资源对象,表示资源对象在处理过程中又产生了新事件,在queue中重新添加资源对象等待处理。最后发出条件改变通知。

6,put添加资源对象事件,如果processing和items都没有这个资源对象,表示新的资源对象事件,在queue中添加资源对象,然后将事件保存到items中。如果processing中有,表示资源对象事件正在处理中,还没有处理结束,将事件添加到items中。最后发出条件改变通知。

7,Close关闭事件队列,设置q.closed为true。最后发出条件改变通知。

func (q *pressurequeue) Add(obj interface{}) error {
   q.lock.Lock()
   defer q.lock.Unlock()

   return q.queueActionLocked(Added, obj)
}

func (q *pressurequeue) Update(obj interface{}) error {
   q.lock.Lock()
   defer q.lock.Unlock()

   return q.queueActionLocked(Updated, obj)
}

func (q *pressurequeue) Delete(obj interface{}) error {
   q.lock.Lock()
   defer q.lock.Unlock()

   return q.queueActionLocked(Deleted, obj)
}

func (q *pressurequeue) queueActionLocked(action ActionType, obj interface{}) error {
   key, err := q.keyFunc(obj)
   if err != nil {
      return err
   }
   q.put(key, pressureEvents(q.items[key], &Event{Action: action, Object: obj}))
   return nil
}

func (q *pressurequeue) Reput(event *Event) error {
   if event == nil {
      return nil
   }

   key, err := q.keyFunc(event.Object)
   if err != nil {
      return err
   }

   q.lock.Lock()
   defer q.lock.Unlock()

   q.processing.Delete(key)

   event.reputCount++
   q.put(key, pressureEvents(event, q.items[key]))
   return nil
}

func (q *pressurequeue) put(key string, event *Event) {
   if event == nil {
      return
   }

   if !q.processing.Has(key) {
      if _, existed := q.items[key]; !existed {
         q.queue = append(q.queue, key)
      }
   }
   q.items[key] = event
   q.cond.Broadcast()
}

func (q *pressurequeue) Done(event *Event) error {
   key, err := q.keyFunc(event.Object)
   if err != nil {
      return err
   }

   q.lock.Lock()
   defer q.lock.Unlock()

   q.processing.Delete(key)
   if _, existed := q.items[key]; existed {
      q.queue = append(q.queue, key)
   }
   q.cond.Broadcast()
   return nil
}

func (q *pressurequeue) Pop() (*Event, error) {
   q.lock.Lock()
   defer q.lock.Unlock()

   for {
      for len(q.queue) == 0 {
         if q.closed {
            return nil, ErrQueueClosed
         }
         q.cond.Wait()
      }

      key := q.queue[0]
      q.queue = q.queue[1:]

      event, ok := q.items[key]
      delete(q.items, key)
      if !ok || event == nil {
         // TODO(clusterpedia-io): add log
         continue
      }

      q.processing.Insert(key)
      return event, nil
   }
}

func (q *pressurequeue) PopAll() ([]*Event, error) {
   q.lock.Lock()
   defer q.lock.Unlock()
   if len(q.queue) == 0 {
      if q.closed {
         return nil, ErrQueueClosed
      }
      return []*Event{}, nil
   }

   events := make([]*Event, 0, len(q.queue))
   for _, key := range q.queue {
      if event := q.items[key]; event != nil {
         events = append(events, event)
         q.processing.Insert(key)
      }
   }
   q.items = make(map[string]*Event)
   q.queue = q.queue[:0]
   return events, nil
}

func (q *pressurequeue) Close() {
   q.lock.Lock()
   defer q.lock.Unlock()

   q.closed = true
   q.cond.Broadcast()
}

func pressureEvents(older *Event, newer *Event) *Event {
	if newer == nil {
		return older
	}
	if older == nil || newer.Action == Deleted || older.Action == newer.Action {
		return newer
	}

	switch newer.Action {
	case Updated:
		if older.Action == Deleted {
			// TODO: 可以比对 resource version
			// 但是从 informer 中获取的数据应该是可以保证顺序的,所以可以不添加比对
			return older
		}

		if older.Action == Added {
			newer.Action = Added
		}
		return newer
	case Added:
		if older.Action == Deleted {
			newer.Action = Updated
			return newer
		}

		/*
			if older.Action == Updated {
				// TODO(iceber)
				// 正常来说 Updated -> Added 之间,会存在一个 Deleted 事件
			}
		*/
		return newer
	}

	return nil
}

总结,可以看到这个事件队列,保存了三种数据:1,正在处理中的资源对象。2,等待处理的资源对象。3,资源对象对应的事件数据,很好地实现了队列数据的状态变化。通过sync.RWMutex读写排斥锁和sync.Cond条件变量,实现多线程读写共享资源的保护和协调。