由 SuKai April 13, 2022
Karmada是华为云开源的,提供应用的跨云多集群统一管理和部署。今天一起来看一下Karmada的控制器,怎么来完成集群纳管和Kubernetes应用工作负载在成员集群创建的。
基本概念
Cluster
在Karmada集群管理中,将Kubernetes集群分为两种:控制面集群,成员集群。控制面集群是Karmada管理组件所在的集群。成员集群为纳管的业务集群。Cluster自定义资源指的是成员集群,指定成员集群的API地址,访问密钥,伪装身份,同步模式。同步模式分为Push和Pull,控制面集群能够主动建立与成员集群的连接,创建资源的为Push模式,控制面集群不能与成员集群主动建立连接的,成员集群建立与控制面集群的连接,监听控制面集群的资源事件,创建资源的为Pull模式。
PropagationPolicy
Namespaced下发策略,Kubernetes资源对象下发到成员集群的策略,指定资源对象,成员集群。ClusterPropagationPolicy,非Namespaced下发策略。PropagationPolicy只能下发自己所在命名空间的资源,ClusterPropagationPolicy能够下发集群范围的除了系统保留命名空间外的任意命名空间的资源。
OverridePolicy
Namespaced差异策略,Kubernetes资源对象下发到某些集群时,使用指定的参数值。ClusterOverridePolicy,非Namespaced差异策略。
ResourceBinding
根据下发策略生成ResourceBinding,指定具体版本的Kubernetes资源对象与下发策略PropagationPolicy绑定。ClusterResourceBinding绑定Kubernetes资源与ClusterPropagationPolicy。
Work
下发到成员集群的资源列表,可以为Deployment, Configmap, Service, Role, CRD等Kubernetes资源。控制面集群通过Work来实现对成员集群的资源控制。Push模式下,控制面集群监听Work资源事件,执行对成员集群的资源对象操作。Pull模式下,成员集群Agent监听Work资源事件,并在成员集群里执行资源对象的操作。
Karmada用户操作
1,添加成员集群,创建Cluster自定义资源的实例对象
2,创建Kubernetes的资源,这里就是Kubernetes原生的Deployment, Statefulset, Daemonset, Job, Configmap, Secret, Service等。
3,创建下发策略PropagationPolicy,指定资源对象部署到哪些集群中。
4,创建差异策略OverridePolicy,指定资源对象在哪些集群中覆盖哪些参数值。
Karmada用户操作完成后,由Karmada控制器进行资源调谐,最终完成在指定的成员集群中创建Kubernetes资源对象。
Karmada控制器
ClusterController
负责调谐Cluster自定义资源,创建集群对应的execution命名空间。Cluster为纳入管理的业务成员集群。
ClusterStatusController
负责调谐Cluster子资源Status,检测并更新成员集群的健康状态、资源使用信息、集群版本信息、集群主机资源信息以及支持的APIResource。
HpaController
Pod水平自动扩展控制器,负责调谐HorizontalPodAutoscaler自定义资源。在成员集群创建对应的HorizontalPodAutoscaler。
BindingController
包括两个控制器,ResourceBindingController监听Work, OveridePolicy, ClusterOverridePolicy资源的事件,调谐ResourceBinding负责创建对应的Work,指定要创建的Kubernetes工作负载Workload。ClusterResourceBindingController主要调谐ClusterResourceBinding。
ExecutionController
负责调谐Work自定义资源,在指定集群创建用户定义的Kubernetes工作负载Workload。
WorkStatusController
负责调谐Work的子资源Status,将引用的资源信息保存到Status中。
NamespaceController
监听Cluster自定义资源事件,创建Work,指定创建namespace。
ServiceExportController
监听Work事件,启动对应集群的Informer,监听serviceexports,endpointslices事件,将事件处理后放入worker队列。
EndpointSliceController
监听Work事件,根据Work定义,创建对应的EndpointSlice。
ServiceImportController
调谐ServiceImport资源,创建Kubernetes Service资源对象。
UnifiedAuthController
监听Cluster, ClusterRole, ClusterRoleBinding资源事件,生成Work指定创建ClusterRole, ClusterRoleBinding资源对象。
FederatedResourceQuotaSyncController
调谐FederatedResourceQuota,监听Cluster事件,创建Work指定创建ResourceQuota资源。
FederatedResourceQuotaStatusController
调谐FederatedResourceQuota子资源Status,更新Status信息。
Karmada控制器执行流程
1,创建一个成员集群Cluster资源实例
2,ClusterController创建成员集群对应的执行命名空间executionNamespace。
3,UnifiedAuthController生成创建成员集群ClusterRole, ClusterRoleBinding的Work。
4,ExecutionController根据Work中的集群和资源对象,在成员集群中执行对应操作,比如创建ClusterRole, ClusterRoleBinding等。
5,ClusterStatusController更新成员集群的健康状态,集群及主机信息。
6,成员集群加入成功后,用户创建Kubernetes资源对象模板,下发策略PropagationPolicy,差异配置策略OverridePolicy。
7,resourceDetector根据下发策略,创建ResourceBinding绑定资源与策略。
8,BindingController根据ResourceBinding,查找并应用OverridePolicy,生成创建成员集群资源的Work。
9,ExecutionController根据Work中的集群和资源对象,在成员集群中执行对应操作。
10,WorkStatusController保存Work完成的资源信息。
代码实现
Karmada基于controller-runtime框架开发控制器。
控制器流程:
1,构造controller manager实例controllerManager
2,初始化controllers并启动
在命令行执行函数Run中:
1,通过controller-runtime的GetConfig()获取controllerManager所在Kubernetes集群的RestConfig。
2,配置QPS和Burst,限制Kubernetes API请求,限制每秒请求数,突发峰值请求个数上限。
3,NewManager构造controllerManager实例,配置自定义资源的Schema,选举配置,存活检查和指标监控。
4,调用setupControllers初始化controllers。
func Run(ctx context.Context, opts *options.Options) error {
klog.Infof("karmada-controller-manager version: %s", version.Get())
config, err := controllerruntime.GetConfig()
if err != nil {
panic(err)
}
config.QPS, config.Burst = opts.KubeAPIQPS, opts.KubeAPIBurst
controllerManager, err := controllerruntime.NewManager(config, controllerruntime.Options{
Scheme: gclient.NewSchema(),
SyncPeriod: &opts.ResyncPeriod.Duration,
LeaderElection: opts.LeaderElection.LeaderElect,
LeaderElectionID: opts.LeaderElection.ResourceName,
LeaderElectionNamespace: opts.LeaderElection.ResourceNamespace,
LeaderElectionResourceLock: opts.LeaderElection.ResourceLock,
HealthProbeBindAddress: net.JoinHostPort(opts.BindAddress, strconv.Itoa(opts.SecurePort)),
LivenessEndpointName: "/healthz",
MetricsBindAddress: opts.MetricsBindAddress,
Controller: v1alpha1.ControllerConfigurationSpec{
GroupKindConcurrency: map[string]int{
workv1alpha1.SchemeGroupVersion.WithKind("Work").GroupKind().String(): opts.ConcurrentWorkSyncs,
workv1alpha2.SchemeGroupVersion.WithKind("ResourceBinding").GroupKind().String(): opts.ConcurrentResourceBindingSyncs,
workv1alpha2.SchemeGroupVersion.WithKind("ClusterResourceBinding").GroupKind().String(): opts.ConcurrentClusterResourceBindingSyncs,
clusterv1alpha1.SchemeGroupVersion.WithKind("Cluster").GroupKind().String(): opts.ConcurrentClusterSyncs,
schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Namespace"}.GroupKind().String(): opts.ConcurrentNamespaceSyncs,
},
},
})
if err != nil {
klog.Errorf("failed to build controller manager: %v", err)
return err
}
if err := controllerManager.AddHealthzCheck("ping", healthz.Ping); err != nil {
klog.Errorf("failed to add health check endpoint: %v", err)
return err
}
setupControllers(controllerManager, opts, ctx.Done())
// blocks until the context is done.
if err := controllerManager.Start(ctx); err != nil {
klog.Errorf("controller manager exits unexpectedly: %v", err)
return err
}
// never reach here
return nil
}
在setupControllers中:
1,创建dynamicClientSet, discoverClientSet 客户端连接和discovery连接。
2,构造overrideManager实例,该实例的ApplyOverridePolicies方法根据资源对象和集群名称查找OverridePolicy和ClusterOverridePolicy,资源对象应用差异化配置策略并返回匹配的差异化配置策略。
3,定义忽略下发的资源skippedResourceConfig和命名空间skippedPropagatingNamespaces。
4,初始化controlPlaneInformerManager,用于创建控制面集群资源的Informer。SingleClusterInformerManager接口包含方法:ForResource(resource schema.GroupVersionResource, handler cache.ResourceEventHandler)用于构建指定资源的DynamicSharedInformer,Lister(resource schema.GroupVersionResource) cache.GenericLister用于在informer store中查找资源。
5,初始化resourceInterpreter资源解释器,资源在下发到成员集群前,匹配资源类型和操作,调用解释器相应的方法。方法包括:HookEnabled查看资源类型对应的操作是否有WebHook存在,调用对应的WebHook;GetReplicas用于获取资源对象的当前副本数量和期望副本数量;ReviseReplica修改资源对象副本数量;Retain根据期望配置和运行状态配置比较,保持资源对象值比如资源对象为Pod时,Pod的NodeName,ServiceAccountName,Volumes,VolumeMounts的配置项是不修改的。
6,初始化objectWatcher用于对成员集群的资源对象操作:创建,更新,删除。
7,初始化resourceDetector控制器,监听propagationPolicy,clusterPropagationPolicy资源事件进行调谐。
8,初始化dependenciesDistributor控制器,自动下发相关资源到成员集群。
9,初始化clusterAPIClusterDetector控制器,用于自动发现cluster-api的Cluster资源来创建Karmada的Cluster。监听cluster-api的Cluster资源,生成成员集群Cluster。cluster-api是Kubernetes的一个子项目,用来管理Kubernetes集群生命周期,可以通过cluster-api来创建、升级、操作多个Kubernetes集群基础设施。
10,初始化controllerContext,在controllerManager的context中保存ClientSet连接和配置信息。
11,StartControllers启动所有控制器。
func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stopChan <-chan struct{}) {
restConfig := mgr.GetConfig()
dynamicClientSet := dynamic.NewForConfigOrDie(restConfig)
discoverClientSet := discovery.NewDiscoveryClientForConfigOrDie(restConfig)
overrideManager := overridemanager.New(mgr.GetClient())
skippedResourceConfig := util.NewSkippedResourceConfig()
if err := skippedResourceConfig.Parse(opts.SkippedPropagatingAPIs); err != nil {
// The program will never go here because the parameters have been checked
return
}
skippedPropagatingNamespaces := map[string]struct{}{}
for _, ns := range opts.SkippedPropagatingNamespaces {
skippedPropagatingNamespaces[ns] = struct{}{}
}
controlPlaneInformerManager := informermanager.NewSingleClusterInformerManager(dynamicClientSet, 0, stopChan)
resourceInterpreter := resourceinterpreter.NewResourceInterpreter("", controlPlaneInformerManager)
if err := mgr.Add(resourceInterpreter); err != nil {
klog.Fatalf("Failed to setup custom resource interpreter: %v", err)
}
objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSet, resourceInterpreter)
resourceDetector := &detector.ResourceDetector{
DiscoveryClientSet: discoverClientSet,
Client: mgr.GetClient(),
InformerManager: controlPlaneInformerManager,
RESTMapper: mgr.GetRESTMapper(),
DynamicClient: dynamicClientSet,
SkippedResourceConfig: skippedResourceConfig,
SkippedPropagatingNamespaces: skippedPropagatingNamespaces,
ResourceInterpreter: resourceInterpreter,
EventRecorder: mgr.GetEventRecorderFor("resource-detector"),
ConcurrentResourceTemplateSyncs: opts.ConcurrentResourceTemplateSyncs,
ConcurrentResourceBindingSyncs: opts.ConcurrentResourceBindingSyncs,
RateLimiterOptions: opts.RateLimiterOpts,
}
if err := mgr.Add(resourceDetector); err != nil {
klog.Fatalf("Failed to setup resource detector: %v", err)
}
if features.FeatureGate.Enabled(features.PropagateDeps) {
dependenciesDistributor := &dependenciesdistributor.DependenciesDistributor{
Client: mgr.GetClient(),
DynamicClient: dynamicClientSet,
InformerManager: controlPlaneInformerManager,
ResourceInterpreter: resourceInterpreter,
RESTMapper: mgr.GetRESTMapper(),
}
if err := mgr.Add(dependenciesDistributor); err != nil {
klog.Fatalf("Failed to setup dependencies distributor: %v", err)
}
}
setupClusterAPIClusterDetector(mgr, opts, stopChan)
controllerContext := controllerscontext.Context{
Mgr: mgr,
ObjectWatcher: objectWatcher,
Opts: controllerscontext.Options{
Controllers: opts.Controllers,
ClusterMonitorPeriod: opts.ClusterMonitorPeriod,
ClusterMonitorGracePeriod: opts.ClusterMonitorGracePeriod,
ClusterStartupGracePeriod: opts.ClusterStartupGracePeriod,
ClusterStatusUpdateFrequency: opts.ClusterStatusUpdateFrequency,
ClusterLeaseDuration: opts.ClusterLeaseDuration,
ClusterLeaseRenewIntervalFraction: opts.ClusterLeaseRenewIntervalFraction,
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
ClusterAPIQPS: opts.ClusterAPIQPS,
ClusterAPIBurst: opts.ClusterAPIBurst,
SkippedPropagatingNamespaces: opts.SkippedPropagatingNamespaces,
ConcurrentWorkSyncs: opts.ConcurrentWorkSyncs,
RateLimiterOptions: opts.RateLimiterOpts,
},
StopChan: stopChan,
DynamicClientSet: dynamicClientSet,
OverrideManager: overrideManager,
ControlPlaneInformerManager: controlPlaneInformerManager,
ResourceInterpreter: resourceInterpreter,
}
if err := controllers.StartControllers(controllerContext); err != nil {
klog.Fatalf("error starting controllers: %v", err)
}
// Ensure the InformerManager stops when the stop channel closes
go func() {
<-stopChan
informermanager.StopInstance()
}()
}
总结,上面Karmada控制器主要完成了成员集群导入和用户创建资源下发工作,当使用Push模式时,这些工作都在控制面集群完成,当使用Pull模式时,ClusterStatus, Execution, WorkStatus, ServiceExport等控制器则在成员集群中工作。