由 SuKai April 14, 2022
前面文章介绍了Karmada的控制器流程,了解了Karmada如何实现Kubernetes资源下发到成员集群的。今天我们一起看一下Karmada的调度器,看看Karmada如何实现Kubernetes资源下发调度的。
前面控制器流程里讲过,用户创建了Kubernetes资源对象模板,下发策略和差异策略后,由resourceDetector根据下发策略,创建ResourceBinding绑定资源与策略,再由BindingController根据ResourceBinding,查找并应用OverridePolicy,最终生成创建成员集群资源的Work,由ExecutionController在成员集群中执行Work完成Kubernetes资源操作。
在流程上,Karmada调度器工作在ResourceBinding创建生成后,BindingController控制器工作之前。在功能上,Karmada调度器主要负责Kubernetes资源对象在成员集群的分布,Karmada调度器通过算法得出在哪些成员集群上运行多少个副本数量的结果,将调度结果保存到ResourceBinding的Clusters参数中。
业务流程
1,监听事件加入队列:Informer监听资源绑定ResourceBinding, 下发策略PropagationPolicy事件,将对应的ResourceBinding资源加入工作队列。
2,取出队列中ResourceBinding对象:每秒中开启一个worker,从队列中取出ResourceBinding对象进行处理。循环处理工作队列,直到队列处理结束。
3,获取分布规则:根据ResourceBinding的Annotation中下发策略PropagationPolicy名称,获取下发策略的分布规则。分布规则包括:集群亲和,集群容忍,拓扑分布约束规则以及副本调度策略等信息。
4,判断是否进行ResourceBinding调度:比对新旧分布规则,判断是否需要进行ResourceBinding调度。
5,调度ResourceBinding,计算出成员集群分配副本数量:
a,FilterPlugin过滤插件判断集群亲和和容忍是否匹配条件,得到匹配集群。
b,ScorePlugin计算匹配集群的得分,得到集群的优先级。
c,根据SpreadConstraint拓扑分布约束规则,得到最后的下发成员集群。
d,发配副本,副本调度策略类型分两种:Duplicated复制, Divided切分。如果是复制类型,每个成员集群部署与模板相同的资源副本数量。如果是切分类型,可以配置PreferenceWeighted权重优先和PreferenceAggregated聚合优先,前者会根据权重比例来进行切分,后者根据总的副本数量来判断是scaleUP还是scaleDown,再根据成员集群的最大可用副本数来分配副本数量。
6,更新ResourceBinding中的Clusters字段,设置为计算出的成员集群分配副本数量结果。
7,完成了一个ResourceBinding的处理,继续处理下一条。
代码实现
过滤和计分插件
构造调度算法实例,算法插件包括:集群亲和,污点容忍,已安装支持的资源API,已下发集群。
algorithm := core.NewGenericScheduler(schedulerCache, []string{clusteraffinity.Name, tainttoleration.Name, apiinstalled.Name, clusterlocality.Name})
集群亲和:过滤插件,1,集群名称是否在亲和策略的排除集群列表中。2,集群的标签、字段过滤是否与亲和策略中的标签和字段过滤匹配。返回是否成功结果。
// Filter checks if the cluster matched the placement cluster affinity constraint.
func (p *ClusterAffinity) Filter(ctx context.Context, placement *policyv1alpha1.Placement, resource *workv1alpha2.ObjectReference, cluster *clusterv1alpha1.Cluster) *framework.Result {
affinity := placement.ClusterAffinity
if affinity != nil {
if util.ClusterMatches(cluster, *affinity) {
return framework.NewResult(framework.Success)
}
return framework.NewResult(framework.Unschedulable, "cluster is not matched the placement cluster affinity constraint")
}
// If no clusters specified and it is not excluded, mark it matched
return framework.NewResult(framework.Success)
}
func ClusterMatches(cluster *clusterv1alpha1.Cluster, affinity policyv1alpha1.ClusterAffinity) bool {
for _, clusterName := range affinity.ExcludeClusters {
if clusterName == cluster.Name {
return false
}
}
// match rules:
// case LabelSelector ClusterNames FieldSelector Rule
// 1 not-empty not-empty not-empty match selector, name and field
// 2 not-empty empty empty match selector only
// 3 not-empty not-empty empty match selector, name
// 4 not-empty empty not-empty match selector, filed
// 5 empty not-empty not-empty match name, filed
// 6 empty not-empty empty match name only
// 7 empty empty not-empty match field only
// 8 empty empty empty match all
if affinity.LabelSelector != nil {
var s labels.Selector
var err error
if s, err = metav1.LabelSelectorAsSelector(affinity.LabelSelector); err != nil {
return false
}
if !s.Matches(labels.Set(cluster.GetLabels())) {
return false
}
}
if affinity.FieldSelector != nil {
var matchFields labels.Selector
var err error
if matchFields, err = lifted.NodeSelectorRequirementsAsSelector(affinity.FieldSelector.MatchExpressions); err != nil {
return false
}
clusterFields := extractClusterFields(cluster)
if matchFields != nil && !matchFields.Matches(clusterFields) {
return false
}
}
return ClusterNamesMatches(cluster, affinity.ClusterNames)
}
tainttoleration污点容忍:过滤插件,比对集群的污点和下发策略的容忍是否匹配,返回是否成功结果。
// Filter checks if the given tolerations in placement tolerate cluster's taints.
func (p *TaintToleration) Filter(ctx context.Context, placement *policyv1alpha1.Placement, resource *workv1alpha2.ObjectReference, cluster *clusterv1alpha1.Cluster) *framework.Result {
filterPredicate := func(t *corev1.Taint) bool {
// now only interested in NoSchedule taint which means do not allow new resource to schedule onto the cluster unless they tolerate the taint
// todo: supprot NoExecute taint
return t.Effect == corev1.TaintEffectNoSchedule
}
taint, isUntolerated := v1helper.FindMatchingUntoleratedTaint(cluster.Spec.Taints, placement.ClusterTolerations, filterPredicate)
if !isUntolerated {
return framework.NewResult(framework.Success)
}
return framework.NewResult(framework.Unschedulable, fmt.Sprintf("cluster had taint {%s: %s}, that the propagation policy didn't tolerate",
taint.Key, taint.Value))
}
APIInstalled已安装支持的资源API:过滤插件,集群可用的资源API与下发资源对象的GVK进行比对,返回返回是否成功结果。
// Filter checks if the API(CRD) of the resource is installed in the target cluster.
func (p *APIInstalled) Filter(ctx context.Context, placement *policyv1alpha1.Placement, resource *workv1alpha2.ObjectReference, cluster *clusterv1alpha1.Cluster) *framework.Result {
if !helper.IsAPIEnabled(cluster.Status.APIEnablements, resource.APIVersion, resource.Kind) {
klog.V(2).Infof("Cluster(%s) not fit as missing API(%s, kind=%s)", cluster.Name, resource.APIVersion, resource.Kind)
return framework.NewResult(framework.Unschedulable, "no such API resource")
}
return framework.NewResult(framework.Success)
}
ClusterLocality已下发集群:计分插件,如果成员集群在ResourceBinding的Clusters的列表中,并且有分配副本数量则返回100分,副本数量为0或不在则返回0分。Clusters列表为前一次调度的成员集群列表。
func (p *ClusterLocality) Score(ctx context.Context, placement *policyv1alpha1.Placement,
spec *workv1alpha2.ResourceBindingSpec, cluster *clusterv1alpha1.Cluster) (int64, *framework.Result) {
if len(spec.Clusters) == 0 {
return framework.MinClusterScore, framework.NewResult(framework.Success)
}
replicas := util.GetSumOfReplicas(spec.Clusters)
if replicas <= 0 {
return framework.MinClusterScore, framework.NewResult(framework.Success)
}
if spec.TargetContains(cluster.Name) {
return framework.MaxClusterScore, framework.NewResult(framework.Success)
}
return framework.MinClusterScore, framework.NewResult(framework.Success)
}
调度流程
1,findClustersThatFit,运行过滤插件,得到可行的成员集群列表。
2,prioritizeClusters,运行计分插件,得到可行的成员集群的得分。
3,selectClusters,根据拓扑分布约束规则,得到最终的成员集群列表。
4,assignReplicas,根据副本分配策略,分配成员集群的副本数量。
func (g *genericScheduler) Schedule(ctx context.Context, placement *policyv1alpha1.Placement, spec *workv1alpha2.ResourceBindingSpec) (result ScheduleResult, err error) {
clusterInfoSnapshot := g.schedulerCache.Snapshot()
if clusterInfoSnapshot.NumOfClusters() == 0 {
return result, fmt.Errorf("no clusters available to schedule")
}
feasibleClusters, err := g.findClustersThatFit(ctx, g.scheduleFramework, placement, &spec.Resource, clusterInfoSnapshot)
if err != nil {
return result, fmt.Errorf("failed to findClustersThatFit: %v", err)
}
if len(feasibleClusters) == 0 {
return result, fmt.Errorf("no clusters fit")
}
klog.V(4).Infof("feasible clusters found: %v", feasibleClusters)
clustersScore, err := g.prioritizeClusters(ctx, g.scheduleFramework, placement, spec, feasibleClusters)
if err != nil {
return result, fmt.Errorf("failed to prioritizeClusters: %v", err)
}
klog.V(4).Infof("feasible clusters scores: %v", clustersScore)
clusters, err := g.selectClusters(clustersScore, placement, spec)
if err != nil {
return result, fmt.Errorf("failed to select clusters: %v", err)
}
klog.V(4).Infof("selected clusters: %v", clusters)
clustersWithReplicas, err := g.assignReplicas(clusters, placement.ReplicaScheduling, spec)
if err != nil {
return result, fmt.Errorf("failed to assignReplicas: %v", err)
}
result.SuggestedClusters = clustersWithReplicas
return result, nil
}
总结
Karmada调度器主要完成的Kubernetes资源在多个集群间的调度工作,根据算法计算出成员集群运行Kubernetes资源的副本数量,提供给控制器进行成员集群资源操作。调度器主要完成资源的期望值变更,再由成员集群Kubernetes自身调度器来完成真正的运行调度。