由 SuKai March 27, 2022
Clusterpedia是DaoCloud开源的支持同步多个集群的指定资源,将资源对象保存到MySQL等数据库,可以通过Clusterpedia检索排序分页资源。
今天我们一起来看一下Clusterpedia的资源同步控制器代码, 了解他的业务流程。
Clusterpedia架构
Clusterpedia在一个Kubernetes集群中,运行Clusterpedia APIServer和ClusterSynchro Manager,提供聚合API,管理和同步多个Kubernetes集群的资源。Clusterpedia所在的集群可理解为管理集群或者控制集群。Clusterpeida将同步的业务集群的资源保存到MySQL/PostgreSQL数据库中,这部分由代码中的StorageFactory实现。
主要业务流程:
1,在管理集群中通过自定义资源PediaCluster CRD来保存、管理和调谐下调业务集群。
2,每一个PediaCluster资源,对应一个ClusterSynchro,也就是一个业务集群会有一个ClusterSynchro实例,用来保存下游业务集群的RestConfig,storage数据库工厂接口,resourceSynchros同步资源GVR对应的ResourceSynchro。
3,每个集群的每个GVR资源对应一个ResourceSynchro,用来保存GVR/GVK信息,事件队列等信息。
4,在PediaCluster资源调谐过程中,主要负责
a,创建对应的ClusterSynchro,将ClusterSynchro中的同步资源对应的Informer启动,将Informer事件调用ResourceSynchro的事件处理函数进行处理。ResourceSynchro实现了ResourceEventHandler接口。
b,ResourceSynchro的事件处理函数,主要实现将对象放入ResourceSynchro队列queue中。
c,开启worker线程池,等待处理队列中的事件,将对象同步到数据库中。
代码
clustersynchro-manager配置项
1,storage.NewStorageFactory生成存储工厂,支持数据库:MySQL, PostgreSQL
2,生成管理集群的Kubeconfig,创建kubernetes以及自定义资源PediaCluster的clientset连接。
func (o *Options) Config() (*config.Config, error) {
if err := o.Validate(); err != nil {
return nil, err
}
storagefactory, err := storage.NewStorageFactory(o.Storage.Name, o.Storage.ConfigPath)
if err != nil {
return nil, err
}
kubeconfig, err := clientcmd.BuildConfigFromFlags(o.Master, o.Kubeconfig)
if err != nil {
return nil, err
}
client, err := clientset.NewForConfig(restclient.AddUserAgent(kubeconfig, ClusterSynchroManagerUserAgent))
if err != nil {
return nil, err
}
crdclient, err := crdclientset.NewForConfig(restclient.AddUserAgent(kubeconfig, ClusterSynchroManagerUserAgent))
if err != nil {
return nil, err
}
}
clustersynchro-manager命令入口
构造synchromanager,并运行synchromanager.Run。
func Run(ctx context.Context, c *config.Config) error {
synchromanager := synchromanager.NewManager(c.Client, c.CRDClient, c.StorageFactory)
if !c.LeaderElection.LeaderElect {
synchromanager.Run(1, ctx.Done())
return nil
}
return nil
}
在NewManager中,创建clusterinformer,监听自定义资源PediaCluster资源,注册事件处理函数:addCluster, updateCluster, deleteCluster。
在Run中,启动informerFactory监听manager.informerFactory.Start,等待cache同步完成cache.WaitForCacheSync后,开启workers线程池wait.Until等待处理队列中的业务集群。这里在代码中固定只开启一个worker。
func NewManager(kubeclient clientset.Interface, client crdclientset.Interface, storage storage.StorageFactory) *Manager {
factory := externalversions.NewSharedInformerFactory(client, 0)
clusterinformer := factory.Cluster().V1alpha2().PediaClusters()
manager := &Manager{
informerFactory: factory,
kubeclient: kubeclient,
clusterpediaclient: client,
storage: storage,
clusterlister: clusterinformer.Lister(),
clusterInformer: clusterinformer.Informer(),
queue: workqueue.NewRateLimitingQueue(
workqueue.NewItemExponentialFailureRateLimiter(2*time.Second, 5*time.Second),
),
synchros: make(map[string]*clustersynchro.ClusterSynchro),
}
clusterinformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: manager.addCluster,
UpdateFunc: manager.updateCluster,
DeleteFunc: manager.deleteCluster,
},
)
return manager
}
func (manager *Manager) Run(workers int, stopCh <-chan struct{}) {
manager.runLock.Lock()
defer manager.runLock.Unlock()
if manager.stopCh != nil {
klog.Fatal("clustersynchro manager is already running...")
}
klog.Info("Start Informer Factory")
// informerFactory should not be controlled by stopCh
stopInformer := make(chan struct{})
manager.informerFactory.Start(stopInformer)
if !cache.WaitForCacheSync(stopCh, manager.clusterInformer.HasSynced) {
klog.Fatal("clustersynchro manager: wait for informer factory failed")
}
manager.stopCh = stopCh
klog.InfoS("Start Manager Cluster Worker", "workers", workers)
var waitGroup sync.WaitGroup
for i := 0; i < workers; i++ {
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
wait.Until(manager.worker, time.Second, manager.stopCh)
}()
}
<-manager.stopCh
klog.Info("receive stop signal, stop...")
manager.queue.ShutDown()
waitGroup.Wait()
klog.Info("wait for cluster synchros stop...")
manager.synchroWaitGroup.Wait()
klog.Info("cluster synchro manager stoped.")
}
worker调谐队列中的业务集群
1,从队列中获取一个业务集群,manager.queue.Get()manager.queue.Get()
2,查询自定义资源PediaCluster,manager.clusterlister.Get(name)
3,调谐自定义资源manager.reconcileCluster(cluster)
4,从队列中删除记录manager.queue.Forget(key)
func (manager *Manager) worker() {
for manager.processNextCluster() {
select {
case <-manager.stopCh:
return
default:
}
}
}
func (manager *Manager) processNextCluster() (continued bool) {
key, shutdown := manager.queue.Get()
if shutdown {
return false
}
defer manager.queue.Done(key)
continued = true
_, name, err := cache.SplitMetaNamespaceKey(key.(string))
if err != nil {
klog.Error(err)
return
}
klog.InfoS("reconcile cluster", "cluster", name)
cluster, err := manager.clusterlister.Get(name)
if err != nil {
if apierrors.IsNotFound(err) {
klog.InfoS("cluster has been deleted", "cluster", name)
return
}
klog.ErrorS(err, "Failed to get cluster from cache", "cluster", name)
return
}
cluster = cluster.DeepCopy()
if err := manager.reconcileCluster(cluster); err != nil {
klog.ErrorS(err, "Failed to reconcile cluster", "cluster", name, "num requeues", manager.queue.NumRequeues(key))
manager.queue.AddRateLimited(key)
return
}
manager.queue.Forget(key)
return
}
调谐自定义资源PediaCluster
1,判断资源是否被标记为删除,cluster.DeletionTimestamp.IsZero(),如果资源被标记为删除,在资源的meta中删除controllerutil.RemoveFinalizer,并保存。这样kubernetes认为资源清理工作已经完成,可以真正删除资源。如果资源不是删除,继续下面的逻辑处理。
2,如果是新建的PediaCluster,给资源添加Finalizer。
3,生成业务集群的restConfig。
4,如果synchro不为空,比对restConfig不一致,说明集群配置不一致,清空synchro。
5,为业务集群生成synchro,并运行run(),主要是将resourceSynchros资源同步的所有informer运行起来,监听同步业务集群资源。
// if err returned is not nil, cluster will be requeued
func (manager *Manager) reconcileCluster(cluster *clusterv1alpha2.PediaCluster) (err error) {
if !cluster.DeletionTimestamp.IsZero() {
klog.InfoS("remove cluster", "cluster", cluster.Name)
if err := manager.removeCluster(cluster.Name); err != nil {
klog.ErrorS(err, "Failed to remove cluster", cluster.Name)
return err
}
if !controllerutil.ContainsFinalizer(cluster, ClusterSynchroControllerFinalizer) {
return nil
}
// remove finalizer
controllerutil.RemoveFinalizer(cluster, ClusterSynchroControllerFinalizer)
if _, err := manager.clusterpediaclient.ClusterV1alpha2().PediaClusters().Update(context.TODO(), cluster, metav1.UpdateOptions{}); err != nil {
klog.ErrorS(err, "Failed to remove finializer", "cluster", cluster.Name)
return err
}
return nil
}
// ensure finalizer
if !controllerutil.ContainsFinalizer(cluster, ClusterSynchroControllerFinalizer) {
controllerutil.AddFinalizer(cluster, ClusterSynchroControllerFinalizer)
cluster, err = manager.clusterpediaclient.ClusterV1alpha2().PediaClusters().Update(context.TODO(), cluster, metav1.UpdateOptions{})
if err != nil {
klog.ErrorS(err, "Failed to add finializer", "cluster", cluster.Name)
return err
}
}
config, err := buildClusterConfig(cluster)
if err != nil {
// TODO(iceber): update cluster status
klog.ErrorS(err, "Failed to build cluster config", "cluster", cluster.Name)
return nil
}
manager.synchrolock.RLock()
synchro := manager.synchros[cluster.Name]
manager.synchrolock.RUnlock()
if synchro != nil && !reflect.DeepEqual(synchro.RESTConfig, config) {
klog.InfoS("cluster config is changed, rebuild cluster synchro", "cluster", cluster.Name)
synchro.Shutdown(false, false)
synchro = nil
// manager.cleanCluster(cluster.Name)
}
// create resource synchro
if synchro == nil {
// TODO(iceber): set the stop sign of the manager to cluster synchro
synchro, err = clustersynchro.New(cluster.Name, config, manager.storage, manager)
if err != nil {
// TODO(iceber): update cluster status
// There are many reasons why creating a cluster synchro can fail.
// How do you gracefully handle different errors?
klog.ErrorS(err, "Failed to create cluster synchro", "cluster", cluster.Name)
// Not requeue
return nil
}
manager.synchroWaitGroup.StartWithChannel(manager.stopCh, synchro.Run)
}
synchro.SetResources(cluster.Spec.SyncResources)
manager.synchrolock.Lock()
manager.synchros[cluster.Name] = synchro
manager.synchrolock.Unlock()
return nil
}
SetResources设置业务集群同步的资源
1,根据新PediaCluster的资源的配置,生成需要同步下游集群资源GVR
2,将GVR更新到Manager的集群Map synchros中的对应集群信息sortedGroupResources中。
3,比对目前业务集群中同步的资源,在resourceVersionCaches中处理删除掉的同步资源GVR。
4,对于新增加的同步资源GVR,新建一个ResourceSynchro,运行synchro.runStorager将队列中的消息进行处理,主要是把资源的事件同步到StorageFactory的数据库中。
5,开启协程,执行synchro.Run,通过informer机制,监听业务集群的资源事件,写入到队列中。
func (s *ClusterSynchro) SetResources(syncResources []clusterv1alpha2.ClusterGroupResources) {
var (
// syncConfigs key is resource's storage gvr
syncConfigs = map[schema.GroupVersionResource]*syncConfig{}
sortedGroupResources = []schema.GroupResource{}
resourceStatuses = map[schema.GroupResource]*clusterv1alpha2.ClusterResourceStatus{}
)
for _, groupResources := range syncResources {
for _, resource := range groupResources.Resources {
gr := schema.GroupResource{Group: groupResources.Group, Resource: resource}
supportedGVKs, err := s.restmapper.KindsFor(gr.WithVersion(""))
if err != nil {
klog.ErrorS(fmt.Errorf("Cluster not supported resource: %v", err), "Skip resource sync", "cluster", s.name, "resource", gr)
continue
}
syncVersions, isLegacyResource, err := negotiateSyncVersions(groupResources.Versions, supportedGVKs)
if err != nil {
klog.InfoS("Skip resource sync", "cluster", s.name, "resource", gr, "reason", err)
continue
}
mapper, err := s.restmapper.RESTMapping(supportedGVKs[0].GroupKind(), supportedGVKs[0].Version)
if err != nil {
klog.ErrorS(err, "Skip resource sync", "cluster", s.name, "resource", gr)
continue
}
info := &clusterv1alpha2.ClusterResourceStatus{
Name: gr.Resource,
Kind: mapper.GroupVersionKind.Kind,
Namespaced: mapper.Scope.Name() == meta.RESTScopeNameNamespace,
}
for _, version := range syncVersions {
syncResource := gr.WithVersion(version)
storageConfig, err := s.resourceStorageConfig.NewConfig(syncResource)
if err != nil {
// TODO(iceber): set storage error ?
klog.ErrorS(err, "Failed to create resource storage config", "cluster", s.name, "resource", syncResource)
continue
}
storageResource := storageConfig.StorageGroupResource.WithVersion(storageConfig.StorageVersion.Version)
if _, ok := syncConfigs[storageResource]; !ok {
config := &syncConfig{
kind: info.Kind,
syncResource: syncResource,
storageResource: storageResource,
storageConfig: storageConfig,
}
if syncResource != storageResource {
if isLegacyResource {
config.convertor = resourcescheme.LegacyResourceScheme
} else {
config.convertor = resourcescheme.CustomResourceScheme
}
}
syncConfigs[storageResource] = config
}
syncCondition := clusterv1alpha2.ClusterResourceSyncCondition{
Version: version,
StorageVersion: storageConfig.StorageVersion.Version,
Status: clusterv1alpha2.SyncStatusPending,
Reason: "SynchroCreating",
}
if gr != storageConfig.StorageGroupResource {
storageResource := storageConfig.StorageGroupResource.String()
syncCondition.StorageResource = &storageResource
}
info.SyncConditions = append(info.SyncConditions, syncCondition)
}
resourceStatuses[gr] = info
sortedGroupResources = append(sortedGroupResources, gr)
}
}
s.resourcelock.Lock()
defer s.resourcelock.Unlock()
select {
case <-s.closer:
return
default:
}
s.sortedGroupResources.Store(sortedGroupResources)
s.resourceStatuses.Store(resourceStatuses)
// filter deleted resources
deleted := map[schema.GroupVersionResource]struct{}{}
for gvr := range s.resourceVersionCaches {
if _, ok := syncConfigs[gvr]; !ok {
deleted[gvr] = struct{}{}
}
}
synchros := s.resourceSynchros.Load().(map[schema.GroupVersionResource]*ResourceSynchro)
// remove deleted resource synchro
for gvr := range deleted {
if handler, ok := synchros[gvr]; ok {
handler.Close()
// ensure that no more data is synchronized to the storage.
<-handler.Closed()
delete(synchros, gvr)
}
if err := s.storage.CleanClusterResource(context.TODO(), s.name, gvr); err != nil {
klog.ErrorS(err, "Failed to clean cluster resource", "cluster", s.name, "resource", gvr)
// update resource sync status
continue
}
delete(s.resourceVersionCaches, gvr)
}
for gvr, config := range syncConfigs {
if _, ok := synchros[gvr]; ok {
continue
}
resourceStorage, err := s.storage.NewResourceStorage(config.storageConfig)
if err != nil {
klog.ErrorS(err, "Failed to create resource storage", "cluster", s.name, "storage resource", config.storageResource)
// update resource sync status
continue
}
resourceVersionCache, ok := s.resourceVersionCaches[gvr]
if !ok {
resourceVersionCache = informer.NewResourceVersionStorage(cache.DeletionHandlingMetaNamespaceKeyFunc)
s.resourceVersionCaches[gvr] = resourceVersionCache
}
syncKind := config.syncResource.GroupVersion().WithKind(config.kind)
synchro := newResourceSynchro(s.name, syncKind,
s.listerWatcherFactory.ForResource(metav1.NamespaceAll, config.syncResource),
resourceVersionCache,
config.convertor,
resourceStorage,
)
s.resourceSynchroWaitGroup.StartWithChannel(s.closer, synchro.runStorager)
if s.handlerStopCh != nil {
select {
case <-s.handlerStopCh:
default:
go synchro.Run(s.handlerStopCh)
}
}
synchros[gvr] = synchro
}
s.resourceSynchros.Store(synchros)
}
runStorager处理业务集群资源事件
开启线程池处理资源事件,队列中不断读取待处理事件,将资源对象与数据库同步。
func (synchro *ResourceSynchro) runStorager(shutdown <-chan struct{}) {
go func() {
<-shutdown
synchro.Close()
}()
synchro.storager(1)
}
func (synchro *ResourceSynchro) storager(worker int) {
var waitGroup sync.WaitGroup
for i := 0; i < worker; i++ {
waitGroup.Add(1)
go wait.Until(func() {
defer waitGroup.Done()
synchro.processResources()
}, time.Second, synchro.closer)
}
waitGroup.Wait()
close(synchro.closed)
}
func (synchro *ResourceSynchro) processResources() {
for {
select {
case <-synchro.closer:
return
default:
}
event, err := synchro.queue.Pop()
if err != nil {
if err == queue.ErrQueueClosed {
return
}
klog.Error(err)
continue
}
synchro.handleResourceEvent(event)
}
}
func (synchro *ResourceSynchro) handleResourceEvent(event *queue.Event) {
defer func() { _ = synchro.queue.Done(event) }()
if d, ok := event.Object.(cache.DeletedFinalStateUnknown); ok {
namespace, name, err := cache.SplitMetaNamespaceKey(d.Key)
if err != nil {
klog.Error(err)
return
}
obj := &metav1.PartialObjectMetadata{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: name,
},
}
if err := synchro.deleteResource(obj); err != nil {
klog.ErrorS(err, "Failed to handler resource",
"cluster", synchro.cluster,
"action", event.Action,
"resource", synchro.storageResource,
"namespace", namespace,
"name", name,
)
}
return
}
var err error
obj := event.Object.(runtime.Object)
// if synchro.convertor == nil, it means no conversion is needed.
if synchro.convertor != nil {
if obj, err = synchro.convertToStorageVersion(obj); err != nil {
klog.Error(err)
return
}
}
utils.InjectClusterName(obj, synchro.cluster)
switch event.Action {
case queue.Added:
err = synchro.createOrUpdateResource(obj)
case queue.Updated:
err = synchro.updateOrCreateResource(obj)
case queue.Deleted:
err = synchro.deleteResource(obj)
}
if err != nil {
o, _ := meta.Accessor(obj)
klog.ErrorS(err, "Failed to handler resource",
"cluster", synchro.cluster,
"action", event.Action,
"resource", synchro.storageResource,
"namespace", o.GetNamespace(),
"name", o.GetName(),
)
}
}
func (synchro *ResourceSynchro) createOrUpdateResource(obj runtime.Object) error {
err := synchro.storage.Create(synchro.ctx, synchro.cluster, obj)
if genericstorage.IsNodeExist(err) {
return synchro.storage.Update(synchro.ctx, synchro.cluster, obj)
}
return err
}
func (synchro *ResourceSynchro) updateOrCreateResource(obj runtime.Object) error {
err := synchro.storage.Update(synchro.ctx, synchro.cluster, obj)
if genericstorage.IsNotFound(err) {
return synchro.storage.Create(synchro.ctx, synchro.cluster, obj)
}
return err
}
func (synchro *ResourceSynchro) deleteResource(obj runtime.Object) error {
return synchro.storage.Delete(synchro.ctx, synchro.cluster, obj)
}
Clusterpedia同步多个Kubernetes集群资源到数据库的业务流程结束。