Golang事件队列开发
前面文章介绍了DaoCloud开源的Clusterpedia通过Informer机制监控Kubernetes资源事件,保存事件对象到队列中,等待事件处理函数将Kubernetes资源对象保存到数据库中。今天一起学习一下DaoCloud开源的Clusterpedia如何实现事件队列的。 下面我们文章首先介绍Clusterpedia如何使用EventQueue的,再来介绍EventQueue如何实现的。主要分以下部分: 1,Clusterpedia事件处理函数HandleDeltas,调用informer.handler,这个handler实现了ResourceEventHandler接口,有OnAdd,OnUpdate,OnDelete方法。 2,实现了ResourceEventHandler接口的ResourceSynchro,ResourceSynchro调用EventQueue接口将对象写入队列 3,实现了EventQueue接口的pressurequeue,pressurequeue负责队列的操作。 事件处理函数HandleDeltas Clusterpedia直接基于client-go底层的cache.Controller开发了自己的resourceVersionInformer,cache.Controller的Process函数调用HandleDeltas来处理资源事件。根据cache.Deltas的操作类型,调用informer.storage和informer.handler的对应操作。这里的handler指的是ResourceEventHandler。 func (informer *resourceVersionInformer) HandleDeltas(deltas cache.Deltas) error { for _, d := range deltas { switch d.Type { case cache.Replaced, cache.Added, cache.Updated: version, exists, err := informer.storage.Get(d.Object) if err != nil { return err } if !exists { if err := informer.storage.Add(d.Object); err != nil { return err } informer.handler.OnAdd(d.Object) break } if d.Type == cache.Replaced { if v := compareResourceVersion(d.Object, version); v <= 0 { if v == 0 { informer.