由 SuKai March 28, 2022
前面文章介绍了DaoCloud开源的Clusterpedia通过Informer机制监控Kubernetes资源事件,保存事件对象到队列中,等待事件处理函数将Kubernetes资源对象保存到数据库中。今天一起学习一下DaoCloud开源的Clusterpedia如何实现事件队列的。
下面我们文章首先介绍Clusterpedia如何使用EventQueue的,再来介绍EventQueue如何实现的。主要分以下部分:
1,Clusterpedia事件处理函数HandleDeltas,调用informer.handler,这个handler实现了ResourceEventHandler接口,有OnAdd,OnUpdate,OnDelete方法。
2,实现了ResourceEventHandler接口的ResourceSynchro,ResourceSynchro调用EventQueue接口将对象写入队列
3,实现了EventQueue接口的pressurequeue,pressurequeue负责队列的操作。
事件处理函数HandleDeltas
Clusterpedia直接基于client-go底层的cache.Controller开发了自己的resourceVersionInformer,cache.Controller的Process函数调用HandleDeltas来处理资源事件。根据cache.Deltas的操作类型,调用informer.storage和informer.handler的对应操作。这里的handler指的是ResourceEventHandler。
func (informer *resourceVersionInformer) HandleDeltas(deltas cache.Deltas) error {
for _, d := range deltas {
switch d.Type {
case cache.Replaced, cache.Added, cache.Updated:
version, exists, err := informer.storage.Get(d.Object)
if err != nil {
return err
}
if !exists {
if err := informer.storage.Add(d.Object); err != nil {
return err
}
informer.handler.OnAdd(d.Object)
break
}
if d.Type == cache.Replaced {
if v := compareResourceVersion(d.Object, version); v <= 0 {
if v == 0 {
informer.handler.OnSync(d.Object)
}
break
}
}
if err := informer.storage.Update(d.Object); err != nil {
return err
}
informer.handler.OnUpdate(nil, d.Object)
case cache.Deleted:
if err := informer.storage.Delete(d.Object); err != nil {
return err
}
informer.handler.OnDelete(d.Object)
}
}
return nil
}
ResourceEventHandler ResourceSynchro
ResourceSynchro的处理很简单,删除资源对象metadata中clusterpedia相关的字段和annotation,然后调用队列操作。
func (synchro *ResourceSynchro) OnAdd(obj interface{}) {
// `obj` will not be processed in parallel elsewhere,
// no deep copy is needed for now.
//
// robj := obj.(runtime.Object).DeepCopyObject()
// Prune object before enqueue.
//
// There are many solutions for pruning fields, such as
// * prunning at the clusterpedia apiserver.
// * prunning in the storage layer, where neither clustersynchro
// nor apiserver are responsible for the pruning process.
// https://github.com/clusterpedia-io/clusterpedia/issues/4
synchro.pruneObject(obj.(*unstructured.Unstructured))
_ = synchro.queue.Add(obj)
}
func (synchro *ResourceSynchro) OnUpdate(_, obj interface{}) {
// `obj` will not be processed in parallel elsewhere,
// no deep copy is needed for now.
//
// robj := obj.(runtime.Object).DeepCopyObject()
// https://github.com/clusterpedia-io/clusterpedia/issues/4
synchro.pruneObject(obj.(*unstructured.Unstructured))
_ = synchro.queue.Update(obj)
}
func (synchro *ResourceSynchro) OnDelete(obj interface{}) {
_ = synchro.queue.Delete(obj)
}
func (synchro *ResourceSynchro) OnSync(obj interface{}) {
}
EventQueue队列操作
队列接口的方法包括:新建,更新,删除,取出,事件处理结束,关闭。
type EventQueue interface {
Add(obj interface{}) error
Update(obj interface{}) error
Delete(obj interface{}) error
Pop() (*Event, error)
Done(event *Event) error
Close()
}
pressurequeue
pressurequeue结构体包含:
1,lock,sync.RWMutex读写互斥锁,同时只允许多个读操作,或一个写操作在进行,支持Lock, Unlock, Rlock, RUnlock。在这里只使用了Lock, Unlock写锁定和写解锁,所以这里所有的操作都是互斥的,保证在同一时刻仅有一个线程访问某一个共享资源。
2,cond,sync.Cond条件变量,和互斥锁组合使用,协调想要访问共享资源的线程。当共享资源状态发生变化时,通知其它因此而阻塞的线程,起到线程之间的协调作用。在构造函数NewPressureQueue中,q.cond.L = &q.lock表示通过观察lock来触发条件改变。cond有提供了三个方法:Wait(), Signal(), Broadcast()。当一个goroutine协程调用Wait()方法后,这个goroutine协程会加入到条件变量等待通知的列表里,当调用Signal(), Broadcast()方法会发出条件改变通知,Signal()只唤醒一个等待列表中的协程,Broadcast()唤醒所有等待列表中的协程。
3,processing,sets.String字符串集合,集合中的元素不能重复,与map类似,value为空,实际类型为map[string]struct{},降低内存消耗,提供了集合的Insert, Delete, Has, HasAll, HasAny, Difference,Union操作方法。processing用于保存正在处理中的资源对象。
4,items,map[string]*Event保存事件。items用于保存资源对象的事件。
5,queue,[]string用slice作为队列。queue用于保存等待处理的资源对象。
6,keyFunc,生成key的函数,这里使用资源对象名称作为key。
7,closed,bool类型,队列关闭。
func NewPressureQueue(keyFunc KeyFunc) *pressurequeue {
if keyFunc == nil {
panic("keyFunc is required")
}
q := &pressurequeue{
processing: sets.NewString(),
items: map[string]*Event{},
queue: []string{},
keyFunc: keyFunc,
}
q.cond.L = &q.lock
return q
}
type pressurequeue struct {
lock sync.RWMutex
cond sync.Cond
processing sets.String
items map[string]*Event
queue []string
keyFunc KeyFunc
closed bool
}
pressurequeue的方法
1,pressurequeue的接口操作方法中,第一步是申请排斥锁,最后结束时解开锁。
2,Add, Update, Delete调用queueActionLocked完成队列操作,传参:操作类型,资源对象。
3,queueActionLocked中调用put将事件保存到队列中,在put之前会先调用pressureEvents将队列中现有的资源对象操作事件(老事件)和新的操作事件进行比较返回结果事件。如果新的事件操作类型是删除,则返回新的删除事件。如果新的事件操作类型是更新,老事件为删除,则返回老的删除事件,老事件为新建,则返回新的事件,老事件也为更新,则返回新的更新事件。如果新的事件操作类型是新建,老事件为删除,则修改新的事件为更新并返回,老事件为更新或者新建,则返回新的新建事件。
4,Pop取出资源对象事件。如果queue长度为0,则执行q.cond.Wait()条件等待,直到有条件改变时再判断queue长度是否为0,如果在等待过程中q.closed为true,表示已经事件队列关闭,直接退出。当queue长度不为0时,从queue中取出第一条资源对象并删除。从items中取出第一条资源对象对应的事件并删除。在processing中插入第一条资源对象。返回事件数据。
5,Done资源对象事件处理结束。在processing中删除这个资源对象,如果在items中存在这个资源对象,表示资源对象在处理过程中又产生了新事件,在queue中重新添加资源对象等待处理。最后发出条件改变通知。
6,put添加资源对象事件,如果processing和items都没有这个资源对象,表示新的资源对象事件,在queue中添加资源对象,然后将事件保存到items中。如果processing中有,表示资源对象事件正在处理中,还没有处理结束,将事件添加到items中。最后发出条件改变通知。
7,Close关闭事件队列,设置q.closed为true。最后发出条件改变通知。
func (q *pressurequeue) Add(obj interface{}) error {
q.lock.Lock()
defer q.lock.Unlock()
return q.queueActionLocked(Added, obj)
}
func (q *pressurequeue) Update(obj interface{}) error {
q.lock.Lock()
defer q.lock.Unlock()
return q.queueActionLocked(Updated, obj)
}
func (q *pressurequeue) Delete(obj interface{}) error {
q.lock.Lock()
defer q.lock.Unlock()
return q.queueActionLocked(Deleted, obj)
}
func (q *pressurequeue) queueActionLocked(action ActionType, obj interface{}) error {
key, err := q.keyFunc(obj)
if err != nil {
return err
}
q.put(key, pressureEvents(q.items[key], &Event{Action: action, Object: obj}))
return nil
}
func (q *pressurequeue) Reput(event *Event) error {
if event == nil {
return nil
}
key, err := q.keyFunc(event.Object)
if err != nil {
return err
}
q.lock.Lock()
defer q.lock.Unlock()
q.processing.Delete(key)
event.reputCount++
q.put(key, pressureEvents(event, q.items[key]))
return nil
}
func (q *pressurequeue) put(key string, event *Event) {
if event == nil {
return
}
if !q.processing.Has(key) {
if _, existed := q.items[key]; !existed {
q.queue = append(q.queue, key)
}
}
q.items[key] = event
q.cond.Broadcast()
}
func (q *pressurequeue) Done(event *Event) error {
key, err := q.keyFunc(event.Object)
if err != nil {
return err
}
q.lock.Lock()
defer q.lock.Unlock()
q.processing.Delete(key)
if _, existed := q.items[key]; existed {
q.queue = append(q.queue, key)
}
q.cond.Broadcast()
return nil
}
func (q *pressurequeue) Pop() (*Event, error) {
q.lock.Lock()
defer q.lock.Unlock()
for {
for len(q.queue) == 0 {
if q.closed {
return nil, ErrQueueClosed
}
q.cond.Wait()
}
key := q.queue[0]
q.queue = q.queue[1:]
event, ok := q.items[key]
delete(q.items, key)
if !ok || event == nil {
// TODO(clusterpedia-io): add log
continue
}
q.processing.Insert(key)
return event, nil
}
}
func (q *pressurequeue) PopAll() ([]*Event, error) {
q.lock.Lock()
defer q.lock.Unlock()
if len(q.queue) == 0 {
if q.closed {
return nil, ErrQueueClosed
}
return []*Event{}, nil
}
events := make([]*Event, 0, len(q.queue))
for _, key := range q.queue {
if event := q.items[key]; event != nil {
events = append(events, event)
q.processing.Insert(key)
}
}
q.items = make(map[string]*Event)
q.queue = q.queue[:0]
return events, nil
}
func (q *pressurequeue) Close() {
q.lock.Lock()
defer q.lock.Unlock()
q.closed = true
q.cond.Broadcast()
}
func pressureEvents(older *Event, newer *Event) *Event {
if newer == nil {
return older
}
if older == nil || newer.Action == Deleted || older.Action == newer.Action {
return newer
}
switch newer.Action {
case Updated:
if older.Action == Deleted {
// TODO: 可以比对 resource version
// 但是从 informer 中获取的数据应该是可以保证顺序的,所以可以不添加比对
return older
}
if older.Action == Added {
newer.Action = Added
}
return newer
case Added:
if older.Action == Deleted {
newer.Action = Updated
return newer
}
/*
if older.Action == Updated {
// TODO(iceber)
// 正常来说 Updated -> Added 之间,会存在一个 Deleted 事件
}
*/
return newer
}
return nil
}
总结,可以看到这个事件队列,保存了三种数据:1,正在处理中的资源对象。2,等待处理的资源对象。3,资源对象对应的事件数据,很好地实现了队列数据的状态变化。通过sync.RWMutex读写排斥锁和sync.Cond条件变量,实现多线程读写共享资源的保护和协调。