Kubernetes应用平台API开发实战

SuKai January 27, 2022

前面文章介绍了通过基于go-restful框架开发API, client-go生成clientset, informers, listers来读取和写入自定义资源,基于kubebuilder开发CRD资源控制器。今天我们通过一个实例来看一下API整个开发过程。

开发步骤

1,通过kubebuilder来开发CRD控制器

2,通过client-gen,lister-gen,informer-gen生成clientset, informers, listers代码

3,开发models实现CRD资源的kubernets读写操作,读取列表时的排序,分页,过滤

4,开发handler实现CRD资源的API处理

CRD资源的读操作

CRD资源的读操作通过Informer来读取,减少API和Etcd集群的压力。

Informer的主要工作原理为:通过Reflector反射来监听Kubernetes对资源的操作事件,把资源对象和操作类型写入到一个DeltaFIFO的队列中。Reflector消费队列,将资源对象存储到indexer,indexer与Etcd集群的数据完全保持一致。

CRD资源的写操作

CRD资源的写通过client-go的clientset来完成对资源的Create,Update,Patch操作。

代码开发

这里我们需要实现上一篇讲的TrackingServer自定义资源的API。TrackingServer主要用于管理机器学习实验跟踪MLflow在k8s里的实例资源。

创建Informer

informerFactories结构体实现了InformerFactory接口,这个接口有两个SharedInformerFactory,一个为Kubernetes资源的informerFactory,一个为本项目自定义资源的aiInformerFactory。aiscopeinformers.NewSharedInformerFactory创建了一个aiInformerFactory实例,这里的NewSharedInformerFactory为代码生成器生成的方法。通过代码生成器创建clientset客户端aiClient,作为参数来创建InformerFactory

type InformerFactory interface {
   KubernetesSharedInformerFactory() k8sinformers.SharedInformerFactory
   AIScopeSharedInformerFactory() aiscopeinformers.SharedInformerFactory

   // Start shared informer factory one by one if they are not nil
   Start(stopCh <-chan struct{})
}

type informerFactories struct {
   informerFactory              k8sinformers.SharedInformerFactory
   aiInformerFactory            aiscopeinformers.SharedInformerFactory
}

func NewInformerFactories(client kubernetes.Interface, aiClient versioned.Interface) InformerFactory {
   factory := &informerFactories{}

   if client != nil {
      factory.informerFactory = k8sinformers.NewSharedInformerFactory(client, defaultResync)
   }

   if aiClient != nil {
      factory.aiInformerFactory = aiscopeinformers.NewSharedInformerFactory(aiClient, defaultResync)
   }

   return factory
}

func (f *informerFactories) KubernetesSharedInformerFactory() k8sinformers.SharedInformerFactory {
   return f.informerFactory
}

func (f *informerFactories) AIScopeSharedInformerFactory() aiscopeinformers.SharedInformerFactory {
   return f.aiInformerFactory
}

TrackingServer Model

定义TrackingServer接口,Operator Struct实现了这个接口。aiclient用于CRD 资源(TrackingServer)的写操作,resourceGetter用于CRD 资源(TrackingServer)的读操作。

type Interface interface {
   CreateOrUpdateTrackingServer(namespace string, trackingserver *experimentv1alpha2.TrackingServer) (*experimentv1alpha2.TrackingServer, error)
   PatchTrackingServer(namespace string, trackingserver *experimentv1alpha2.TrackingServer) (*experimentv1alpha2.TrackingServer, error)
   DeleteTrackingServer(namespace, name string) error
   ListTrackingServers(namespace string, queryParam *query.Query) (*api.ListResult, error)
   DescribeTrackingServer(namespace, name string) (*experimentv1alpha2.TrackingServer, error)
}

type Operator struct {
   aiclient          aiscope.Interface
   resourceGetter    *resourcesv1alpha2.ResourceGetter
}

func New(aiclient aiscope.Interface, informers informers.InformerFactory) Interface {
   return &Operator{
      aiclient:           aiclient,
      resourceGetter:     resourcesv1alpha2.NewResourceGetter(informers),
   }
}

通过aiclient完成Create, Update, Patch, Delete操作,通过resourceGetter完成Get, List操作

func (o *Operator) CreateOrUpdateTrackingServer(namespace string, trackingserver *experimentv1alpha2.TrackingServer) (*experimentv1alpha2.TrackingServer, error) {
   var created *experimentv1alpha2.TrackingServer
   var err error

   trackingserver.Namespace = namespace

   if trackingserver.ResourceVersion != "" {
      created, err = o.aiclient.ExperimentV1alpha2().TrackingServers(namespace).Update(context.Background(), trackingserver, metav1.UpdateOptions{})
   } else {
      created, err = o.aiclient.ExperimentV1alpha2().TrackingServers(namespace).Create(context.Background(), trackingserver, metav1.CreateOptions{})
   }

   return created, err
}

func (o *Operator) PatchTrackingServer(namespace string, trackingserver *experimentv1alpha2.TrackingServer) (*experimentv1alpha2.TrackingServer, error) {
   data, err := json.Marshal(trackingserver)
   if err != nil {
      return nil, err
   }

   return o.aiclient.ExperimentV1alpha2().TrackingServers(namespace).Patch(context.Background(), trackingserver.Name, types.MergePatchType, data, metav1.PatchOptions{})
}

func (o *Operator) DeleteTrackingServer(namespace, name string) error {
   return o.aiclient.ExperimentV1alpha2().TrackingServers(namespace).Delete(context.Background(), name, metav1.DeleteOptions{})
}

func (o *Operator) ListTrackingServers(namespace string, queryParam *query.Query) (*api.ListResult, error) {
   result, err := o.resourceGetter.List(experimentv1alpha2.ResourcePluralTrackingServer, namespace, queryParam)
   if err != nil {
      klog.Error(err)
      return nil, err
   }
   return result, nil
}

func (o *Operator) DescribeTrackingServer(namespace, name string) (*experimentv1alpha2.TrackingServer, error) {
   obj, err := o.resourceGetter.Get(experimentv1alpha2.ResourcePluralTrackingServer, namespace, name)
   if err != nil {
      return nil, err
   }
   result := obj.(*experimentv1alpha2.TrackingServer)
   return result, nil
}

工厂模式的ResourceGetter实现

首先定义一个接口,包含Get, List方法。

type Interface interface {
   // Get retrieves a single object by its namespace and name
   Get(namespace, name string) (runtime.Object, error)

   // List retrieves a collection of objects matches given query
   List(namespace string, query *query.Query) (*api.ListResult, error)
}

不同的CRD资源实现这个接口,这里trackingserverGetter实现了这个接口

type trackingserverGetter struct {
   sharedInformers informers.SharedInformerFactory
}

func New(sharedInformers informers.SharedInformerFactory) v1alpha2.Interface {
   return &trackingserverGetter{sharedInformers: sharedInformers}
}

func (g *trackingserverGetter) Get(namespace, name string) (runtime.Object, error) {
   return g.sharedInformers.Experiment().V1alpha2().TrackingServers().Lister().TrackingServers(namespace).Get(name)
}

func (g *trackingserverGetter) List(namespace string, query *query.Query) (*api.ListResult, error) {
   trackingservers, err := g.sharedInformers.Experiment().V1alpha2().TrackingServers().Lister().TrackingServers(namespace).List(query.Selector())
   if err != nil {
      return nil, err
   }

   var result []runtime.Object
   for _, ts := range trackingservers {
      result = append(result, ts)
   }
   return v1alpha2.DefaultList(result, query, g.compare, g.filter), nil
}

通过一个Map来存储各个资源的接口

clusterResourceGetters[schema.GroupVersionResource{Group: "", Version: "v1", Resource: "namespaces"}] = namespace.New(factory.KubernetesSharedInformerFactory())
namespacedResourceGetters[experimentv1alpha2.SchemeGroupVersion.WithResource(experimentv1alpha2.ResourcePluralTrackingServer)] = trackingserver.New(factory.AIScopeSharedInformerFactory())

当调用ResourceGetter的Get, List方法时,调用TryResource来获取对应资源的接口,然后调用对应资源的Get,List方法

func (r *ResourceGetter) TryResource(clusterScope bool, resource string) v1alpha2.Interface {
   if clusterScope {
      for k, v := range r.clusterResourceGetters {
         if k.Resource == resource {
            return v
         }
      }
   }
   for k, v := range r.namespacedResourceGetters {
      if k.Resource == resource {
         return v
      }
   }
   return nil
}

func (r *ResourceGetter) Get(resource, namespace, name string) (runtime.Object, error) {
   clusterScope := namespace == ""
   getter := r.TryResource(clusterScope, resource)
   if getter == nil {
      return nil, ErrResourceNotSupported
   }
   return getter.Get(namespace, name)
}

func (r *ResourceGetter) List(resource, namespace string, query *query.Query) (*api.ListResult, error) {
   clusterScope := namespace == ""
   getter := r.TryResource(clusterScope, resource)
   if getter == nil {
      return nil, ErrResourceNotSupported
   }
   return getter.List(namespace, query)
}

DefaultList实现排序,分页,过滤

DefaultList里的过滤,遍历列表条目,将过滤字段值和过滤条件逐一匹配,如果有一条不满足就丢弃这个条目。

var filtered []runtime.Object
for _, object := range objects {
   selected := true
   for field, value := range q.Filters {
      if !filterFunc(object, query.Filter{Field: field, Value: value}) {
         selected = false
         break
      }
   }

   if selected {
      for _, transform := range transformFuncs {
         object = transform(object)
      }
      filtered = append(filtered, object)
   }
}

TrackingServer的filterFunc进行过滤

func (g *trackingserverGetter) filter(object runtime.Object, filter query.Filter) bool {
   trackingserver, ok := object.(*experimentv1alpha2.TrackingServer)

   if !ok {
      return false
   }

   return v1alpha2.DefaultObjectMetaFilter(trackingserver.ObjectMeta, filter)
}

TrackingServer的filterFunc调用DefaultObjectMetaFilter进行比对

//  Default metadata filter
func DefaultObjectMetaFilter(item metav1.ObjectMeta, filter query.Filter) bool {
   switch filter.Field {
   case query.FieldNames:
      for _, name := range strings.Split(string(filter.Value), ",") {
         if item.Name == name {
            return true
         }
      }
      return false
   // /namespaces?page=1&limit=10&name=default
   case query.FieldName:
      return strings.Contains(item.Name, string(filter.Value))
      // /namespaces?page=1&limit=10&uid=a8a8d6cf-f6a5-4fea-9c1b-e57610115706
   case query.FieldUID:
      return strings.Compare(string(item.UID), string(filter.Value)) == 0
      // /deployments?page=1&limit=10&namespace=aiscope-system
   case query.FieldNamespace:
      return strings.Compare(item.Namespace, string(filter.Value)) == 0
      // /namespaces?page=1&limit=10&ownerReference=a8a8d6cf-f6a5-4fea-9c1b-e57610115706
   case query.FieldOwnerReference:
      for _, ownerReference := range item.OwnerReferences {
         if strings.Compare(string(ownerReference.UID), string(filter.Value)) == 0 {
            return true
         }
      }
      return false
      // /namespaces?page=1&limit=10&ownerKind=Workspace
   case query.FieldOwnerKind:
      for _, ownerReference := range item.OwnerReferences {
         if strings.Compare(ownerReference.Kind, string(filter.Value)) == 0 {
            return true
         }
      }
      return false
      // /namespaces?page=1&limit=10&annotation=openpitrix_runtime
   case query.FieldAnnotation:
      return labelMatch(item.Annotations, string(filter.Value))
      // /namespaces?page=1&limit=10&label=aiscope.io/workspace:system-workspace
   case query.FieldLabel:
      return labelMatch(item.Labels, string(filter.Value))
   default:
      return false
   }
}

DefaultList里的排序

// sort by sortBy field
sort.Slice(filtered, func(i, j int) bool {
   if !q.Ascending {
      return compareFunc(filtered[i], filtered[j], q.SortBy)
   }
   return !compareFunc(filtered[i], filtered[j], q.SortBy)
})

TrackingServer的compareFunc

func (g *trackingserverGetter) compare(left runtime.Object, right runtime.Object, field query.Field) bool {
   leftTrackingServer, ok := left.(*experimentv1alpha2.TrackingServer)
   if !ok {
      return false
   }
   rightTrackingServer, ok := right.(*experimentv1alpha2.TrackingServer)
   if !ok {
      return false
   }
   return v1alpha2.DefaultObjectMetaCompare(leftTrackingServer.ObjectMeta, rightTrackingServer.ObjectMeta, field)
}

TrackingServer的compareFunc调用DefaultObjectMetaCompare进行比对

// DefaultObjectMetaCompare return true is left great than right
func DefaultObjectMetaCompare(left, right metav1.ObjectMeta, sortBy query.Field) bool {
   switch sortBy {
   // ?sortBy=name
   case query.FieldName:
      return strings.Compare(left.Name, right.Name) > 0
   // ?sortBy=creationTimestamp
   default:
      fallthrough
   case query.FieldCreateTime:
      fallthrough
   case query.FieldCreationTimeStamp:
      // compare by name if creation timestamp is equal
      if left.CreationTimestamp.Equal(&right.CreationTimestamp) {
         return strings.Compare(left.Name, right.Name) > 0
      }
      return left.CreationTimestamp.After(right.CreationTimestamp.Time)
   }
}

DefaultList里将过滤后filtered的条目进行分页,将分页的数据返回

total := len(filtered)

if q.Pagination == nil {
   q.Pagination = query.NoPagination
}

start, end := q.Pagination.GetValidPagination(total)

return &api.ListResult{
    TotalItems: len(filtered),
    Items:      objectsToInterfaces(filtered[start:end]),
}

计算分页的开始和结束索引

func (p *Pagination) GetValidPagination(total int) (startIndex, endIndex int) {

   // no pagination
   if p.Limit == NoPagination.Limit {
      return 0, total
   }

   // out of range
   if p.Limit < 0 || p.Offset < 0 || p.Offset > total {
      return 0, 0
   }

   startIndex = p.Offset
   endIndex = startIndex + p.Limit

   if endIndex > total {
      endIndex = total
   }

   return startIndex, endIndex
}

到这里资源列表的排序,分页,过滤就完成了。

API Handler开发

路由注册

func AddToContainer(container *restful.Container, ep model.Interface) error {
   mimePatch := []string{restful.MIME_JSON, runtime.MimeMergePatchJson, runtime.MimeJsonPatchJson}

   ws := runtime.NewWebService(experimentv1alpha2.SchemeGroupVersion)
   handler := newHandler(ep)

   // trackingservers
   ws.Route(ws.POST("/namespaces/{namespace}/trackingservers").
      To(handler.CreateTrackingServer).
      Reads(experimentv1alpha2.TrackingServer{}).
      Param(ws.PathParameter("namespace", "namespace")).
      Doc("Create a trackingserver in the specified namespace.").
      Returns(http.StatusOK, api.StatusOK, experimentv1alpha2.TrackingServer{}).
      Metadata(restfulspec.KeyOpenAPITags, []string{constants.ExperimentTrackingServerTag}))
   ws.Route(ws.PUT("/namespaces/{namespace}/trackingservers/{trackingserver}").
      To(handler.UpdateTrackingServer).
      Doc("Update trackingserver in the specified namespace.").
      Param(ws.PathParameter("namespace", "namespace")).
      Param(ws.PathParameter("trackingserver", "trackingserver name")).
      Reads(experimentv1alpha2.TrackingServer{}).
      Returns(http.StatusOK, api.StatusOK, experimentv1alpha2.TrackingServer{}).
      Metadata(restfulspec.KeyOpenAPITags, []string{constants.ExperimentTrackingServerTag}))
   ws.Route(ws.PATCH("/namespaces/{namespace}/trackingservers/{trackingserver}").
      To(handler.PatchTrackingServer).
      Consumes(mimePatch...).
      Doc("Update trackingserver in the specified namespace.").
      Param(ws.PathParameter("namespace", "namespace")).
      Param(ws.PathParameter("trackingserver", "trackingserver name")).
      Reads(experimentv1alpha2.TrackingServer{}).
      Returns(http.StatusOK, api.StatusOK, experimentv1alpha2.TrackingServer{}).
      Metadata(restfulspec.KeyOpenAPITags, []string{constants.ExperimentTrackingServerTag}))
   ws.Route(ws.GET("/namespaces/{namespace}/trackingservers").
      To(handler.ListTrackingServer).
      Param(ws.PathParameter("namespace", "namespace")).
      Doc("List the trackingservers of the specified namespace for the current user").
      Returns(http.StatusOK, api.StatusOK, api.ListResult{}).
      Metadata(restfulspec.KeyOpenAPITags, []string{constants.ExperimentTrackingServerTag}))
   ws.Route(ws.GET("/namespaces/{namespace}/trackingservers/{trackingserver}").
      To(handler.DescribeTrackingServer).
      Param(ws.PathParameter("namespace", "namespace")).
      Param(ws.PathParameter("trackingserver", "trackingserver name")).
      Doc("Retrieve trackingserver details.").
      Returns(http.StatusOK, api.StatusOK, experimentv1alpha2.TrackingServer{}).
      Metadata(restfulspec.KeyOpenAPITags, []string{constants.ExperimentTrackingServerTag}))
   ws.Route(ws.DELETE("/namespaces/{namespace}/trackingservers/{trackingserver}").
      To(handler.DeleteTrackingServer).
      Param(ws.PathParameter("namespace", "namespace")).
      Param(ws.PathParameter("trackingserver", "trackingserver name")).
      Doc("Delete trackingserver under namespace.").
      Returns(http.StatusOK, api.StatusOK, experimentv1alpha2.TrackingServer{}).
      Metadata(restfulspec.KeyOpenAPITags, []string{constants.ExperimentTrackingServerTag}))

   container.Add(ws)
   return nil
}

Create, Update, List, Describe等操作

func (h *handler) CreateTrackingServer(request *restful.Request, response *restful.Response) {
	namespace := request.PathParameter("namespace")
	var trackingserver *experimentv1alpha2.TrackingServer
	if err := request.ReadEntity(&trackingserver); err != nil {
		api.HandleBadRequest(response, request, err)
		return
	}

	created, err := h.ep.CreateOrUpdateTrackingServer(namespace, trackingserver)
	if err != nil {
		api.HandleError(response, request, err)
		return
	}

	response.WriteEntity(created)
}

func (h *handler) UpdateTrackingServer(request *restful.Request, response *restful.Response) {
	namespace := request.PathParameter("namespace")
	trackingserverName := request.PathParameter("trackingserver")

	var trackingserver experimentv1alpha2.TrackingServer
	err := request.ReadEntity(&trackingserver)
	if err != nil {
		api.HandleBadRequest(response, request, err)
		return
	}

	if trackingserverName != trackingserver.Name {
		err := fmt.Errorf("the name of the object (%s) does not match the name on the URL (%s)", trackingserver.Name, trackingserverName)
		api.HandleBadRequest(response, request, err)
		return
	}

	updated, err := h.ep.CreateOrUpdateTrackingServer(namespace, &trackingserver)
	if err != nil {
		api.HandleError(response, request, err)
		return
	}

	response.WriteEntity(updated)
}

func (h *handler) PatchTrackingServer(request *restful.Request, response *restful.Response) {
   namespace := request.PathParameter("namespace")
   trackingserverName := request.PathParameter("trackingserver")

   var trackingserver experimentv1alpha2.TrackingServer
   err := request.ReadEntity(&trackingserver)
   if err != nil {
      api.HandleBadRequest(response, request, err)
      return
   }

   trackingserver.Name = trackingserverName
   patched, err := h.ep.PatchTrackingServer(namespace, &trackingserver)
   if err != nil {
      api.HandleError(response, request, err)
      return
   }

   response.WriteEntity(patched)
}

func (h *handler) ListTrackingServer(request *restful.Request, response *restful.Response) {
   namespace := request.PathParameter("namespace")
   queryParam := query.ParseQueryParameter(request)

   result, err := h.ep.ListTrackingServers(namespace, queryParam)
   if err != nil {
      api.HandleError(response, nil, err)
   }

   response.WriteEntity(result)
}

func (h *handler) DescribeTrackingServer(request *restful.Request, response *restful.Response) {
   namespace := request.PathParameter("namespace")
   trackingserverName := request.PathParameter("trackingserver")

   trackingserver, err := h.ep.DescribeTrackingServer(namespace, trackingserverName)
   if err != nil {
      api.HandleError(response, request, err)
      return
   }

   response.WriteEntity(trackingserver)
}

func (h *handler) DeleteTrackingServer(request *restful.Request, response *restful.Response) {
   namespace := request.PathParameter("namespace")
   trackingserverName := request.PathParameter("trackingserver")

   err := h.ep.DeleteTrackingServer(namespace, trackingserverName)
   if err != nil {
      api.HandleError(response, request, err)
      return
   }

   response.WriteEntity(servererr.None)
}

客户端访问API

image-20220127180948639

image-20220127181045240

image-20220127181112191