DaoCloud Clusterpedia多Kubernetes集群资源同步与检索

SuKai March 27, 2022

Clusterpedia是DaoCloud开源的支持同步多个集群的指定资源,将资源对象保存到MySQL等数据库,可以通过Clusterpedia检索排序分页资源。

今天我们一起来看一下Clusterpedia的资源同步控制器代码, 了解他的业务流程。

Clusterpedia架构

Clusterpedia在一个Kubernetes集群中,运行Clusterpedia APIServer和ClusterSynchro Manager,提供聚合API,管理和同步多个Kubernetes集群的资源。Clusterpedia所在的集群可理解为管理集群或者控制集群。Clusterpeida将同步的业务集群的资源保存到MySQL/PostgreSQL数据库中,这部分由代码中的StorageFactory实现。

arch

主要业务流程:

1,在管理集群中通过自定义资源PediaCluster CRD来保存、管理和调谐下调业务集群。

2,每一个PediaCluster资源,对应一个ClusterSynchro,也就是一个业务集群会有一个ClusterSynchro实例,用来保存下游业务集群的RestConfig,storage数据库工厂接口,resourceSynchros同步资源GVR对应的ResourceSynchro。

3,每个集群的每个GVR资源对应一个ResourceSynchro,用来保存GVR/GVK信息,事件队列等信息。

4,在PediaCluster资源调谐过程中,主要负责

a,创建对应的ClusterSynchro,将ClusterSynchro中的同步资源对应的Informer启动,将Informer事件调用ResourceSynchro的事件处理函数进行处理。ResourceSynchro实现了ResourceEventHandler接口。

b,ResourceSynchro的事件处理函数,主要实现将对象放入ResourceSynchro队列queue中。

c,开启worker线程池,等待处理队列中的事件,将对象同步到数据库中。

代码

clustersynchro-manager配置项

1,storage.NewStorageFactory生成存储工厂,支持数据库:MySQL, PostgreSQL

2,生成管理集群的Kubeconfig,创建kubernetes以及自定义资源PediaCluster的clientset连接。

func (o *Options) Config() (*config.Config, error) {
   if err := o.Validate(); err != nil {
      return nil, err
   }

   storagefactory, err := storage.NewStorageFactory(o.Storage.Name, o.Storage.ConfigPath)
   if err != nil {
      return nil, err
   }

   kubeconfig, err := clientcmd.BuildConfigFromFlags(o.Master, o.Kubeconfig)
   if err != nil {
      return nil, err
   }


   client, err := clientset.NewForConfig(restclient.AddUserAgent(kubeconfig, ClusterSynchroManagerUserAgent))
   if err != nil {
      return nil, err
   }
   crdclient, err := crdclientset.NewForConfig(restclient.AddUserAgent(kubeconfig, ClusterSynchroManagerUserAgent))
   if err != nil {
      return nil, err
   }

}

clustersynchro-manager命令入口

构造synchromanager,并运行synchromanager.Run。

func Run(ctx context.Context, c *config.Config) error {
	synchromanager := synchromanager.NewManager(c.Client, c.CRDClient, c.StorageFactory)
	if !c.LeaderElection.LeaderElect {
		synchromanager.Run(1, ctx.Done())
		return nil
	}
	
	return nil
}

在NewManager中,创建clusterinformer,监听自定义资源PediaCluster资源,注册事件处理函数:addCluster, updateCluster, deleteCluster。

在Run中,启动informerFactory监听manager.informerFactory.Start,等待cache同步完成cache.WaitForCacheSync后,开启workers线程池wait.Until等待处理队列中的业务集群。这里在代码中固定只开启一个worker。

func NewManager(kubeclient clientset.Interface, client crdclientset.Interface, storage storage.StorageFactory) *Manager {
   factory := externalversions.NewSharedInformerFactory(client, 0)
   clusterinformer := factory.Cluster().V1alpha2().PediaClusters()

   manager := &Manager{
      informerFactory:    factory,
      kubeclient:         kubeclient,
      clusterpediaclient: client,

      storage:         storage,
      clusterlister:   clusterinformer.Lister(),
      clusterInformer: clusterinformer.Informer(),
      queue: workqueue.NewRateLimitingQueue(
         workqueue.NewItemExponentialFailureRateLimiter(2*time.Second, 5*time.Second),
      ),

      synchros: make(map[string]*clustersynchro.ClusterSynchro),
   }

   clusterinformer.Informer().AddEventHandler(
      cache.ResourceEventHandlerFuncs{
         AddFunc:    manager.addCluster,
         UpdateFunc: manager.updateCluster,
         DeleteFunc: manager.deleteCluster,
      },
   )

   return manager
}

func (manager *Manager) Run(workers int, stopCh <-chan struct{}) {
   manager.runLock.Lock()
   defer manager.runLock.Unlock()
   if manager.stopCh != nil {
      klog.Fatal("clustersynchro manager is already running...")
   }
   klog.Info("Start Informer Factory")

   // informerFactory should not be controlled by stopCh
   stopInformer := make(chan struct{})
   manager.informerFactory.Start(stopInformer)
   if !cache.WaitForCacheSync(stopCh, manager.clusterInformer.HasSynced) {
      klog.Fatal("clustersynchro manager: wait for informer factory failed")
   }

   manager.stopCh = stopCh

   klog.InfoS("Start Manager Cluster Worker", "workers", workers)
   var waitGroup sync.WaitGroup
   for i := 0; i < workers; i++ {
      waitGroup.Add(1)

      go func() {
         defer waitGroup.Done()
         wait.Until(manager.worker, time.Second, manager.stopCh)
      }()
   }

   <-manager.stopCh
   klog.Info("receive stop signal, stop...")

   manager.queue.ShutDown()
   waitGroup.Wait()

   klog.Info("wait for cluster synchros stop...")
   manager.synchroWaitGroup.Wait()
   klog.Info("cluster synchro manager stoped.")
}

worker调谐队列中的业务集群

1,从队列中获取一个业务集群,manager.queue.Get()manager.queue.Get()

2,查询自定义资源PediaCluster,manager.clusterlister.Get(name)

3,调谐自定义资源manager.reconcileCluster(cluster)

4,从队列中删除记录manager.queue.Forget(key)

func (manager *Manager) worker() {
   for manager.processNextCluster() {
      select {
      case <-manager.stopCh:
         return
      default:
      }
   }
}

func (manager *Manager) processNextCluster() (continued bool) {
   key, shutdown := manager.queue.Get()
   if shutdown {
      return false
   }
   defer manager.queue.Done(key)
   continued = true

   _, name, err := cache.SplitMetaNamespaceKey(key.(string))
   if err != nil {
      klog.Error(err)
      return
   }

   klog.InfoS("reconcile cluster", "cluster", name)
   cluster, err := manager.clusterlister.Get(name)
   if err != nil {
      if apierrors.IsNotFound(err) {
         klog.InfoS("cluster has been deleted", "cluster", name)
         return
      }

      klog.ErrorS(err, "Failed to get cluster from cache", "cluster", name)
      return
   }

   cluster = cluster.DeepCopy()
   if err := manager.reconcileCluster(cluster); err != nil {
      klog.ErrorS(err, "Failed to reconcile cluster", "cluster", name, "num requeues", manager.queue.NumRequeues(key))
      manager.queue.AddRateLimited(key)
      return
   }
   manager.queue.Forget(key)
   return
}

调谐自定义资源PediaCluster

1,判断资源是否被标记为删除,cluster.DeletionTimestamp.IsZero(),如果资源被标记为删除,在资源的meta中删除controllerutil.RemoveFinalizer,并保存。这样kubernetes认为资源清理工作已经完成,可以真正删除资源。如果资源不是删除,继续下面的逻辑处理。

2,如果是新建的PediaCluster,给资源添加Finalizer。

3,生成业务集群的restConfig。

4,如果synchro不为空,比对restConfig不一致,说明集群配置不一致,清空synchro。

5,为业务集群生成synchro,并运行run(),主要是将resourceSynchros资源同步的所有informer运行起来,监听同步业务集群资源。

// if err returned is not nil, cluster will be requeued
func (manager *Manager) reconcileCluster(cluster *clusterv1alpha2.PediaCluster) (err error) {
   if !cluster.DeletionTimestamp.IsZero() {
      klog.InfoS("remove cluster", "cluster", cluster.Name)
      if err := manager.removeCluster(cluster.Name); err != nil {
         klog.ErrorS(err, "Failed to remove cluster", cluster.Name)
         return err
      }

      if !controllerutil.ContainsFinalizer(cluster, ClusterSynchroControllerFinalizer) {
         return nil
      }

      // remove finalizer
      controllerutil.RemoveFinalizer(cluster, ClusterSynchroControllerFinalizer)
      if _, err := manager.clusterpediaclient.ClusterV1alpha2().PediaClusters().Update(context.TODO(), cluster, metav1.UpdateOptions{}); err != nil {
         klog.ErrorS(err, "Failed to remove finializer", "cluster", cluster.Name)
         return err
      }
      return nil
   }

   // ensure finalizer
   if !controllerutil.ContainsFinalizer(cluster, ClusterSynchroControllerFinalizer) {
      controllerutil.AddFinalizer(cluster, ClusterSynchroControllerFinalizer)
      cluster, err = manager.clusterpediaclient.ClusterV1alpha2().PediaClusters().Update(context.TODO(), cluster, metav1.UpdateOptions{})
      if err != nil {
         klog.ErrorS(err, "Failed to add finializer", "cluster", cluster.Name)
         return err
      }
   }

   config, err := buildClusterConfig(cluster)
   if err != nil {
      // TODO(iceber): update cluster status
      klog.ErrorS(err, "Failed to build cluster config", "cluster", cluster.Name)
      return nil
   }

   manager.synchrolock.RLock()
   synchro := manager.synchros[cluster.Name]
   manager.synchrolock.RUnlock()
   if synchro != nil && !reflect.DeepEqual(synchro.RESTConfig, config) {
      klog.InfoS("cluster config is changed, rebuild cluster synchro", "cluster", cluster.Name)

      synchro.Shutdown(false, false)
      synchro = nil

      // manager.cleanCluster(cluster.Name)
   }

   // create resource synchro
   if synchro == nil {
      // TODO(iceber): set the stop sign of the manager to cluster synchro
      synchro, err = clustersynchro.New(cluster.Name, config, manager.storage, manager)
      if err != nil {
         // TODO(iceber): update cluster status
         // There are many reasons why creating a cluster synchro can fail.
         // How do you gracefully handle different errors?

         klog.ErrorS(err, "Failed to create cluster synchro", "cluster", cluster.Name)
         // Not requeue
         return nil
      }

      manager.synchroWaitGroup.StartWithChannel(manager.stopCh, synchro.Run)
   }

   synchro.SetResources(cluster.Spec.SyncResources)

   manager.synchrolock.Lock()
   manager.synchros[cluster.Name] = synchro
   manager.synchrolock.Unlock()
   return nil
}

SetResources设置业务集群同步的资源

1,根据新PediaCluster的资源的配置,生成需要同步下游集群资源GVR

2,将GVR更新到Manager的集群Map synchros中的对应集群信息sortedGroupResources中。

3,比对目前业务集群中同步的资源,在resourceVersionCaches中处理删除掉的同步资源GVR。

4,对于新增加的同步资源GVR,新建一个ResourceSynchro,运行synchro.runStorager将队列中的消息进行处理,主要是把资源的事件同步到StorageFactory的数据库中。

5,开启协程,执行synchro.Run,通过informer机制,监听业务集群的资源事件,写入到队列中。

func (s *ClusterSynchro) SetResources(syncResources []clusterv1alpha2.ClusterGroupResources) {
   var (
      // syncConfigs key is resource's storage gvr
      syncConfigs = map[schema.GroupVersionResource]*syncConfig{}

      sortedGroupResources = []schema.GroupResource{}
      resourceStatuses     = map[schema.GroupResource]*clusterv1alpha2.ClusterResourceStatus{}
   )

   for _, groupResources := range syncResources {
      for _, resource := range groupResources.Resources {
         gr := schema.GroupResource{Group: groupResources.Group, Resource: resource}
         supportedGVKs, err := s.restmapper.KindsFor(gr.WithVersion(""))
         if err != nil {
            klog.ErrorS(fmt.Errorf("Cluster not supported resource: %v", err), "Skip resource sync", "cluster", s.name, "resource", gr)
            continue
         }

         syncVersions, isLegacyResource, err := negotiateSyncVersions(groupResources.Versions, supportedGVKs)
         if err != nil {
            klog.InfoS("Skip resource sync", "cluster", s.name, "resource", gr, "reason", err)
            continue
         }

         mapper, err := s.restmapper.RESTMapping(supportedGVKs[0].GroupKind(), supportedGVKs[0].Version)
         if err != nil {
            klog.ErrorS(err, "Skip resource sync", "cluster", s.name, "resource", gr)
            continue
         }

         info := &clusterv1alpha2.ClusterResourceStatus{
            Name:       gr.Resource,
            Kind:       mapper.GroupVersionKind.Kind,
            Namespaced: mapper.Scope.Name() == meta.RESTScopeNameNamespace,
         }

         for _, version := range syncVersions {
            syncResource := gr.WithVersion(version)
            storageConfig, err := s.resourceStorageConfig.NewConfig(syncResource)
            if err != nil {
               // TODO(iceber): set storage error ?
               klog.ErrorS(err, "Failed to create resource storage config", "cluster", s.name, "resource", syncResource)
               continue
            }

            storageResource := storageConfig.StorageGroupResource.WithVersion(storageConfig.StorageVersion.Version)
            if _, ok := syncConfigs[storageResource]; !ok {
               config := &syncConfig{
                  kind:            info.Kind,
                  syncResource:    syncResource,
                  storageResource: storageResource,
                  storageConfig:   storageConfig,
               }

               if syncResource != storageResource {
                  if isLegacyResource {
                     config.convertor = resourcescheme.LegacyResourceScheme
                  } else {
                     config.convertor = resourcescheme.CustomResourceScheme
                  }
               }
               syncConfigs[storageResource] = config
            }

            syncCondition := clusterv1alpha2.ClusterResourceSyncCondition{
               Version:        version,
               StorageVersion: storageConfig.StorageVersion.Version,
               Status:         clusterv1alpha2.SyncStatusPending,
               Reason:         "SynchroCreating",
            }
            if gr != storageConfig.StorageGroupResource {
               storageResource := storageConfig.StorageGroupResource.String()
               syncCondition.StorageResource = &storageResource
            }
            info.SyncConditions = append(info.SyncConditions, syncCondition)
         }

         resourceStatuses[gr] = info
         sortedGroupResources = append(sortedGroupResources, gr)
      }
   }

   s.resourcelock.Lock()
   defer s.resourcelock.Unlock()
   select {
   case <-s.closer:
      return
   default:
   }

   s.sortedGroupResources.Store(sortedGroupResources)
   s.resourceStatuses.Store(resourceStatuses)

   // filter deleted resources
   deleted := map[schema.GroupVersionResource]struct{}{}
   for gvr := range s.resourceVersionCaches {
      if _, ok := syncConfigs[gvr]; !ok {
         deleted[gvr] = struct{}{}
      }
   }

   synchros := s.resourceSynchros.Load().(map[schema.GroupVersionResource]*ResourceSynchro)

   // remove deleted resource synchro
   for gvr := range deleted {
      if handler, ok := synchros[gvr]; ok {
         handler.Close()

         // ensure that no more data is synchronized to the storage.
         <-handler.Closed()
         delete(synchros, gvr)
      }

      if err := s.storage.CleanClusterResource(context.TODO(), s.name, gvr); err != nil {
         klog.ErrorS(err, "Failed to clean cluster resource", "cluster", s.name, "resource", gvr)
         // update resource sync status
         continue
      }

      delete(s.resourceVersionCaches, gvr)
   }

   for gvr, config := range syncConfigs {
      if _, ok := synchros[gvr]; ok {
         continue
      }

      resourceStorage, err := s.storage.NewResourceStorage(config.storageConfig)
      if err != nil {
         klog.ErrorS(err, "Failed to create resource storage", "cluster", s.name, "storage resource", config.storageResource)
         // update resource sync status
         continue
      }

      resourceVersionCache, ok := s.resourceVersionCaches[gvr]
      if !ok {
         resourceVersionCache = informer.NewResourceVersionStorage(cache.DeletionHandlingMetaNamespaceKeyFunc)
         s.resourceVersionCaches[gvr] = resourceVersionCache
      }

      syncKind := config.syncResource.GroupVersion().WithKind(config.kind)
      synchro := newResourceSynchro(s.name, syncKind,
         s.listerWatcherFactory.ForResource(metav1.NamespaceAll, config.syncResource),
         resourceVersionCache,
         config.convertor,
         resourceStorage,
      )
      s.resourceSynchroWaitGroup.StartWithChannel(s.closer, synchro.runStorager)

      if s.handlerStopCh != nil {
         select {
         case <-s.handlerStopCh:
         default:
            go synchro.Run(s.handlerStopCh)
         }
      }
      synchros[gvr] = synchro
   }
   s.resourceSynchros.Store(synchros)
}

runStorager处理业务集群资源事件

开启线程池处理资源事件,队列中不断读取待处理事件,将资源对象与数据库同步。

func (synchro *ResourceSynchro) runStorager(shutdown <-chan struct{}) {
   go func() {
      <-shutdown
      synchro.Close()
   }()

   synchro.storager(1)
}


func (synchro *ResourceSynchro) storager(worker int) {
	var waitGroup sync.WaitGroup
	for i := 0; i < worker; i++ {
		waitGroup.Add(1)

		go wait.Until(func() {
			defer waitGroup.Done()

			synchro.processResources()
		}, time.Second, synchro.closer)
	}

	waitGroup.Wait()
	close(synchro.closed)
}

func (synchro *ResourceSynchro) processResources() {
	for {
		select {
		case <-synchro.closer:
			return
		default:
		}

		event, err := synchro.queue.Pop()
		if err != nil {
			if err == queue.ErrQueueClosed {
				return
			}

			klog.Error(err)
			continue
		}

		synchro.handleResourceEvent(event)
	}
}

func (synchro *ResourceSynchro) handleResourceEvent(event *queue.Event) {
	defer func() { _ = synchro.queue.Done(event) }()

	if d, ok := event.Object.(cache.DeletedFinalStateUnknown); ok {
		namespace, name, err := cache.SplitMetaNamespaceKey(d.Key)
		if err != nil {
			klog.Error(err)
			return
		}
		obj := &metav1.PartialObjectMetadata{
			ObjectMeta: metav1.ObjectMeta{
				Namespace: namespace,
				Name:      name,
			},
		}

		if err := synchro.deleteResource(obj); err != nil {
			klog.ErrorS(err, "Failed to handler resource",
				"cluster", synchro.cluster,
				"action", event.Action,
				"resource", synchro.storageResource,
				"namespace", namespace,
				"name", name,
			)
		}
		return
	}

	var err error
	obj := event.Object.(runtime.Object)

	// if synchro.convertor == nil, it means no conversion is needed.
	if synchro.convertor != nil {
		if obj, err = synchro.convertToStorageVersion(obj); err != nil {
			klog.Error(err)
			return
		}
	}
	utils.InjectClusterName(obj, synchro.cluster)

	switch event.Action {
	case queue.Added:
		err = synchro.createOrUpdateResource(obj)
	case queue.Updated:
		err = synchro.updateOrCreateResource(obj)
	case queue.Deleted:
		err = synchro.deleteResource(obj)
	}

	if err != nil {
		o, _ := meta.Accessor(obj)
		klog.ErrorS(err, "Failed to handler resource",
			"cluster", synchro.cluster,
			"action", event.Action,
			"resource", synchro.storageResource,
			"namespace", o.GetNamespace(),
			"name", o.GetName(),
		)
	}
}


func (synchro *ResourceSynchro) createOrUpdateResource(obj runtime.Object) error {
	err := synchro.storage.Create(synchro.ctx, synchro.cluster, obj)
	if genericstorage.IsNodeExist(err) {
		return synchro.storage.Update(synchro.ctx, synchro.cluster, obj)
	}
	return err
}

func (synchro *ResourceSynchro) updateOrCreateResource(obj runtime.Object) error {
	err := synchro.storage.Update(synchro.ctx, synchro.cluster, obj)
	if genericstorage.IsNotFound(err) {
		return synchro.storage.Create(synchro.ctx, synchro.cluster, obj)
	}
	return err
}

func (synchro *ResourceSynchro) deleteResource(obj runtime.Object) error {
	return synchro.storage.Delete(synchro.ctx, synchro.cluster, obj)
}

Clusterpedia同步多个Kubernetes集群资源到数据库的业务流程结束。