由 SuKai January 27, 2022
前面文章介绍了通过基于go-restful框架开发API, client-go生成clientset, informers, listers来读取和写入自定义资源,基于kubebuilder开发CRD资源控制器。今天我们通过一个实例来看一下API整个开发过程。
开发步骤
1,通过kubebuilder来开发CRD控制器
2,通过client-gen,lister-gen,informer-gen生成clientset, informers, listers代码
3,开发models实现CRD资源的kubernets读写操作,读取列表时的排序,分页,过滤
4,开发handler实现CRD资源的API处理
CRD资源的读操作
CRD资源的读操作通过Informer来读取,减少API和Etcd集群的压力。
Informer的主要工作原理为:通过Reflector反射来监听Kubernetes对资源的操作事件,把资源对象和操作类型写入到一个DeltaFIFO的队列中。Reflector消费队列,将资源对象存储到indexer,indexer与Etcd集群的数据完全保持一致。
CRD资源的写操作
CRD资源的写通过client-go的clientset来完成对资源的Create,Update,Patch操作。
代码开发
这里我们需要实现上一篇讲的TrackingServer自定义资源的API。TrackingServer主要用于管理机器学习实验跟踪MLflow在k8s里的实例资源。
创建Informer
informerFactories结构体实现了InformerFactory接口,这个接口有两个SharedInformerFactory,一个为Kubernetes资源的informerFactory,一个为本项目自定义资源的aiInformerFactory。aiscopeinformers.NewSharedInformerFactory创建了一个aiInformerFactory实例,这里的NewSharedInformerFactory为代码生成器生成的方法。通过代码生成器创建clientset客户端aiClient,作为参数来创建InformerFactory
type InformerFactory interface {
KubernetesSharedInformerFactory() k8sinformers.SharedInformerFactory
AIScopeSharedInformerFactory() aiscopeinformers.SharedInformerFactory
// Start shared informer factory one by one if they are not nil
Start(stopCh <-chan struct{})
}
type informerFactories struct {
informerFactory k8sinformers.SharedInformerFactory
aiInformerFactory aiscopeinformers.SharedInformerFactory
}
func NewInformerFactories(client kubernetes.Interface, aiClient versioned.Interface) InformerFactory {
factory := &informerFactories{}
if client != nil {
factory.informerFactory = k8sinformers.NewSharedInformerFactory(client, defaultResync)
}
if aiClient != nil {
factory.aiInformerFactory = aiscopeinformers.NewSharedInformerFactory(aiClient, defaultResync)
}
return factory
}
func (f *informerFactories) KubernetesSharedInformerFactory() k8sinformers.SharedInformerFactory {
return f.informerFactory
}
func (f *informerFactories) AIScopeSharedInformerFactory() aiscopeinformers.SharedInformerFactory {
return f.aiInformerFactory
}
TrackingServer Model
定义TrackingServer接口,Operator Struct实现了这个接口。aiclient用于CRD 资源(TrackingServer)的写操作,resourceGetter用于CRD 资源(TrackingServer)的读操作。
type Interface interface {
CreateOrUpdateTrackingServer(namespace string, trackingserver *experimentv1alpha2.TrackingServer) (*experimentv1alpha2.TrackingServer, error)
PatchTrackingServer(namespace string, trackingserver *experimentv1alpha2.TrackingServer) (*experimentv1alpha2.TrackingServer, error)
DeleteTrackingServer(namespace, name string) error
ListTrackingServers(namespace string, queryParam *query.Query) (*api.ListResult, error)
DescribeTrackingServer(namespace, name string) (*experimentv1alpha2.TrackingServer, error)
}
type Operator struct {
aiclient aiscope.Interface
resourceGetter *resourcesv1alpha2.ResourceGetter
}
func New(aiclient aiscope.Interface, informers informers.InformerFactory) Interface {
return &Operator{
aiclient: aiclient,
resourceGetter: resourcesv1alpha2.NewResourceGetter(informers),
}
}
通过aiclient完成Create, Update, Patch, Delete操作,通过resourceGetter完成Get, List操作
func (o *Operator) CreateOrUpdateTrackingServer(namespace string, trackingserver *experimentv1alpha2.TrackingServer) (*experimentv1alpha2.TrackingServer, error) {
var created *experimentv1alpha2.TrackingServer
var err error
trackingserver.Namespace = namespace
if trackingserver.ResourceVersion != "" {
created, err = o.aiclient.ExperimentV1alpha2().TrackingServers(namespace).Update(context.Background(), trackingserver, metav1.UpdateOptions{})
} else {
created, err = o.aiclient.ExperimentV1alpha2().TrackingServers(namespace).Create(context.Background(), trackingserver, metav1.CreateOptions{})
}
return created, err
}
func (o *Operator) PatchTrackingServer(namespace string, trackingserver *experimentv1alpha2.TrackingServer) (*experimentv1alpha2.TrackingServer, error) {
data, err := json.Marshal(trackingserver)
if err != nil {
return nil, err
}
return o.aiclient.ExperimentV1alpha2().TrackingServers(namespace).Patch(context.Background(), trackingserver.Name, types.MergePatchType, data, metav1.PatchOptions{})
}
func (o *Operator) DeleteTrackingServer(namespace, name string) error {
return o.aiclient.ExperimentV1alpha2().TrackingServers(namespace).Delete(context.Background(), name, metav1.DeleteOptions{})
}
func (o *Operator) ListTrackingServers(namespace string, queryParam *query.Query) (*api.ListResult, error) {
result, err := o.resourceGetter.List(experimentv1alpha2.ResourcePluralTrackingServer, namespace, queryParam)
if err != nil {
klog.Error(err)
return nil, err
}
return result, nil
}
func (o *Operator) DescribeTrackingServer(namespace, name string) (*experimentv1alpha2.TrackingServer, error) {
obj, err := o.resourceGetter.Get(experimentv1alpha2.ResourcePluralTrackingServer, namespace, name)
if err != nil {
return nil, err
}
result := obj.(*experimentv1alpha2.TrackingServer)
return result, nil
}
工厂模式的ResourceGetter实现
首先定义一个接口,包含Get, List方法。
type Interface interface {
// Get retrieves a single object by its namespace and name
Get(namespace, name string) (runtime.Object, error)
// List retrieves a collection of objects matches given query
List(namespace string, query *query.Query) (*api.ListResult, error)
}
不同的CRD资源实现这个接口,这里trackingserverGetter实现了这个接口
type trackingserverGetter struct {
sharedInformers informers.SharedInformerFactory
}
func New(sharedInformers informers.SharedInformerFactory) v1alpha2.Interface {
return &trackingserverGetter{sharedInformers: sharedInformers}
}
func (g *trackingserverGetter) Get(namespace, name string) (runtime.Object, error) {
return g.sharedInformers.Experiment().V1alpha2().TrackingServers().Lister().TrackingServers(namespace).Get(name)
}
func (g *trackingserverGetter) List(namespace string, query *query.Query) (*api.ListResult, error) {
trackingservers, err := g.sharedInformers.Experiment().V1alpha2().TrackingServers().Lister().TrackingServers(namespace).List(query.Selector())
if err != nil {
return nil, err
}
var result []runtime.Object
for _, ts := range trackingservers {
result = append(result, ts)
}
return v1alpha2.DefaultList(result, query, g.compare, g.filter), nil
}
通过一个Map来存储各个资源的接口
clusterResourceGetters[schema.GroupVersionResource{Group: "", Version: "v1", Resource: "namespaces"}] = namespace.New(factory.KubernetesSharedInformerFactory())
namespacedResourceGetters[experimentv1alpha2.SchemeGroupVersion.WithResource(experimentv1alpha2.ResourcePluralTrackingServer)] = trackingserver.New(factory.AIScopeSharedInformerFactory())
当调用ResourceGetter的Get, List方法时,调用TryResource来获取对应资源的接口,然后调用对应资源的Get,List方法
func (r *ResourceGetter) TryResource(clusterScope bool, resource string) v1alpha2.Interface {
if clusterScope {
for k, v := range r.clusterResourceGetters {
if k.Resource == resource {
return v
}
}
}
for k, v := range r.namespacedResourceGetters {
if k.Resource == resource {
return v
}
}
return nil
}
func (r *ResourceGetter) Get(resource, namespace, name string) (runtime.Object, error) {
clusterScope := namespace == ""
getter := r.TryResource(clusterScope, resource)
if getter == nil {
return nil, ErrResourceNotSupported
}
return getter.Get(namespace, name)
}
func (r *ResourceGetter) List(resource, namespace string, query *query.Query) (*api.ListResult, error) {
clusterScope := namespace == ""
getter := r.TryResource(clusterScope, resource)
if getter == nil {
return nil, ErrResourceNotSupported
}
return getter.List(namespace, query)
}
DefaultList实现排序,分页,过滤
DefaultList里的过滤,遍历列表条目,将过滤字段值和过滤条件逐一匹配,如果有一条不满足就丢弃这个条目。
var filtered []runtime.Object
for _, object := range objects {
selected := true
for field, value := range q.Filters {
if !filterFunc(object, query.Filter{Field: field, Value: value}) {
selected = false
break
}
}
if selected {
for _, transform := range transformFuncs {
object = transform(object)
}
filtered = append(filtered, object)
}
}
TrackingServer的filterFunc进行过滤
func (g *trackingserverGetter) filter(object runtime.Object, filter query.Filter) bool {
trackingserver, ok := object.(*experimentv1alpha2.TrackingServer)
if !ok {
return false
}
return v1alpha2.DefaultObjectMetaFilter(trackingserver.ObjectMeta, filter)
}
TrackingServer的filterFunc调用DefaultObjectMetaFilter进行比对
// Default metadata filter
func DefaultObjectMetaFilter(item metav1.ObjectMeta, filter query.Filter) bool {
switch filter.Field {
case query.FieldNames:
for _, name := range strings.Split(string(filter.Value), ",") {
if item.Name == name {
return true
}
}
return false
// /namespaces?page=1&limit=10&name=default
case query.FieldName:
return strings.Contains(item.Name, string(filter.Value))
// /namespaces?page=1&limit=10&uid=a8a8d6cf-f6a5-4fea-9c1b-e57610115706
case query.FieldUID:
return strings.Compare(string(item.UID), string(filter.Value)) == 0
// /deployments?page=1&limit=10&namespace=aiscope-system
case query.FieldNamespace:
return strings.Compare(item.Namespace, string(filter.Value)) == 0
// /namespaces?page=1&limit=10&ownerReference=a8a8d6cf-f6a5-4fea-9c1b-e57610115706
case query.FieldOwnerReference:
for _, ownerReference := range item.OwnerReferences {
if strings.Compare(string(ownerReference.UID), string(filter.Value)) == 0 {
return true
}
}
return false
// /namespaces?page=1&limit=10&ownerKind=Workspace
case query.FieldOwnerKind:
for _, ownerReference := range item.OwnerReferences {
if strings.Compare(ownerReference.Kind, string(filter.Value)) == 0 {
return true
}
}
return false
// /namespaces?page=1&limit=10&annotation=openpitrix_runtime
case query.FieldAnnotation:
return labelMatch(item.Annotations, string(filter.Value))
// /namespaces?page=1&limit=10&label=aiscope.io/workspace:system-workspace
case query.FieldLabel:
return labelMatch(item.Labels, string(filter.Value))
default:
return false
}
}
DefaultList里的排序
// sort by sortBy field
sort.Slice(filtered, func(i, j int) bool {
if !q.Ascending {
return compareFunc(filtered[i], filtered[j], q.SortBy)
}
return !compareFunc(filtered[i], filtered[j], q.SortBy)
})
TrackingServer的compareFunc
func (g *trackingserverGetter) compare(left runtime.Object, right runtime.Object, field query.Field) bool {
leftTrackingServer, ok := left.(*experimentv1alpha2.TrackingServer)
if !ok {
return false
}
rightTrackingServer, ok := right.(*experimentv1alpha2.TrackingServer)
if !ok {
return false
}
return v1alpha2.DefaultObjectMetaCompare(leftTrackingServer.ObjectMeta, rightTrackingServer.ObjectMeta, field)
}
TrackingServer的compareFunc调用DefaultObjectMetaCompare进行比对
// DefaultObjectMetaCompare return true is left great than right
func DefaultObjectMetaCompare(left, right metav1.ObjectMeta, sortBy query.Field) bool {
switch sortBy {
// ?sortBy=name
case query.FieldName:
return strings.Compare(left.Name, right.Name) > 0
// ?sortBy=creationTimestamp
default:
fallthrough
case query.FieldCreateTime:
fallthrough
case query.FieldCreationTimeStamp:
// compare by name if creation timestamp is equal
if left.CreationTimestamp.Equal(&right.CreationTimestamp) {
return strings.Compare(left.Name, right.Name) > 0
}
return left.CreationTimestamp.After(right.CreationTimestamp.Time)
}
}
DefaultList里将过滤后filtered的条目进行分页,将分页的数据返回
total := len(filtered)
if q.Pagination == nil {
q.Pagination = query.NoPagination
}
start, end := q.Pagination.GetValidPagination(total)
return &api.ListResult{
TotalItems: len(filtered),
Items: objectsToInterfaces(filtered[start:end]),
}
计算分页的开始和结束索引
func (p *Pagination) GetValidPagination(total int) (startIndex, endIndex int) {
// no pagination
if p.Limit == NoPagination.Limit {
return 0, total
}
// out of range
if p.Limit < 0 || p.Offset < 0 || p.Offset > total {
return 0, 0
}
startIndex = p.Offset
endIndex = startIndex + p.Limit
if endIndex > total {
endIndex = total
}
return startIndex, endIndex
}
到这里资源列表的排序,分页,过滤就完成了。
API Handler开发
路由注册
func AddToContainer(container *restful.Container, ep model.Interface) error {
mimePatch := []string{restful.MIME_JSON, runtime.MimeMergePatchJson, runtime.MimeJsonPatchJson}
ws := runtime.NewWebService(experimentv1alpha2.SchemeGroupVersion)
handler := newHandler(ep)
// trackingservers
ws.Route(ws.POST("/namespaces/{namespace}/trackingservers").
To(handler.CreateTrackingServer).
Reads(experimentv1alpha2.TrackingServer{}).
Param(ws.PathParameter("namespace", "namespace")).
Doc("Create a trackingserver in the specified namespace.").
Returns(http.StatusOK, api.StatusOK, experimentv1alpha2.TrackingServer{}).
Metadata(restfulspec.KeyOpenAPITags, []string{constants.ExperimentTrackingServerTag}))
ws.Route(ws.PUT("/namespaces/{namespace}/trackingservers/{trackingserver}").
To(handler.UpdateTrackingServer).
Doc("Update trackingserver in the specified namespace.").
Param(ws.PathParameter("namespace", "namespace")).
Param(ws.PathParameter("trackingserver", "trackingserver name")).
Reads(experimentv1alpha2.TrackingServer{}).
Returns(http.StatusOK, api.StatusOK, experimentv1alpha2.TrackingServer{}).
Metadata(restfulspec.KeyOpenAPITags, []string{constants.ExperimentTrackingServerTag}))
ws.Route(ws.PATCH("/namespaces/{namespace}/trackingservers/{trackingserver}").
To(handler.PatchTrackingServer).
Consumes(mimePatch...).
Doc("Update trackingserver in the specified namespace.").
Param(ws.PathParameter("namespace", "namespace")).
Param(ws.PathParameter("trackingserver", "trackingserver name")).
Reads(experimentv1alpha2.TrackingServer{}).
Returns(http.StatusOK, api.StatusOK, experimentv1alpha2.TrackingServer{}).
Metadata(restfulspec.KeyOpenAPITags, []string{constants.ExperimentTrackingServerTag}))
ws.Route(ws.GET("/namespaces/{namespace}/trackingservers").
To(handler.ListTrackingServer).
Param(ws.PathParameter("namespace", "namespace")).
Doc("List the trackingservers of the specified namespace for the current user").
Returns(http.StatusOK, api.StatusOK, api.ListResult{}).
Metadata(restfulspec.KeyOpenAPITags, []string{constants.ExperimentTrackingServerTag}))
ws.Route(ws.GET("/namespaces/{namespace}/trackingservers/{trackingserver}").
To(handler.DescribeTrackingServer).
Param(ws.PathParameter("namespace", "namespace")).
Param(ws.PathParameter("trackingserver", "trackingserver name")).
Doc("Retrieve trackingserver details.").
Returns(http.StatusOK, api.StatusOK, experimentv1alpha2.TrackingServer{}).
Metadata(restfulspec.KeyOpenAPITags, []string{constants.ExperimentTrackingServerTag}))
ws.Route(ws.DELETE("/namespaces/{namespace}/trackingservers/{trackingserver}").
To(handler.DeleteTrackingServer).
Param(ws.PathParameter("namespace", "namespace")).
Param(ws.PathParameter("trackingserver", "trackingserver name")).
Doc("Delete trackingserver under namespace.").
Returns(http.StatusOK, api.StatusOK, experimentv1alpha2.TrackingServer{}).
Metadata(restfulspec.KeyOpenAPITags, []string{constants.ExperimentTrackingServerTag}))
container.Add(ws)
return nil
}
Create, Update, List, Describe等操作
func (h *handler) CreateTrackingServer(request *restful.Request, response *restful.Response) {
namespace := request.PathParameter("namespace")
var trackingserver *experimentv1alpha2.TrackingServer
if err := request.ReadEntity(&trackingserver); err != nil {
api.HandleBadRequest(response, request, err)
return
}
created, err := h.ep.CreateOrUpdateTrackingServer(namespace, trackingserver)
if err != nil {
api.HandleError(response, request, err)
return
}
response.WriteEntity(created)
}
func (h *handler) UpdateTrackingServer(request *restful.Request, response *restful.Response) {
namespace := request.PathParameter("namespace")
trackingserverName := request.PathParameter("trackingserver")
var trackingserver experimentv1alpha2.TrackingServer
err := request.ReadEntity(&trackingserver)
if err != nil {
api.HandleBadRequest(response, request, err)
return
}
if trackingserverName != trackingserver.Name {
err := fmt.Errorf("the name of the object (%s) does not match the name on the URL (%s)", trackingserver.Name, trackingserverName)
api.HandleBadRequest(response, request, err)
return
}
updated, err := h.ep.CreateOrUpdateTrackingServer(namespace, &trackingserver)
if err != nil {
api.HandleError(response, request, err)
return
}
response.WriteEntity(updated)
}
func (h *handler) PatchTrackingServer(request *restful.Request, response *restful.Response) {
namespace := request.PathParameter("namespace")
trackingserverName := request.PathParameter("trackingserver")
var trackingserver experimentv1alpha2.TrackingServer
err := request.ReadEntity(&trackingserver)
if err != nil {
api.HandleBadRequest(response, request, err)
return
}
trackingserver.Name = trackingserverName
patched, err := h.ep.PatchTrackingServer(namespace, &trackingserver)
if err != nil {
api.HandleError(response, request, err)
return
}
response.WriteEntity(patched)
}
func (h *handler) ListTrackingServer(request *restful.Request, response *restful.Response) {
namespace := request.PathParameter("namespace")
queryParam := query.ParseQueryParameter(request)
result, err := h.ep.ListTrackingServers(namespace, queryParam)
if err != nil {
api.HandleError(response, nil, err)
}
response.WriteEntity(result)
}
func (h *handler) DescribeTrackingServer(request *restful.Request, response *restful.Response) {
namespace := request.PathParameter("namespace")
trackingserverName := request.PathParameter("trackingserver")
trackingserver, err := h.ep.DescribeTrackingServer(namespace, trackingserverName)
if err != nil {
api.HandleError(response, request, err)
return
}
response.WriteEntity(trackingserver)
}
func (h *handler) DeleteTrackingServer(request *restful.Request, response *restful.Response) {
namespace := request.PathParameter("namespace")
trackingserverName := request.PathParameter("trackingserver")
err := h.ep.DeleteTrackingServer(namespace, trackingserverName)
if err != nil {
api.HandleError(response, request, err)
return
}
response.WriteEntity(servererr.None)
}
客户端访问API