go-restful框架开发Kubernetes应用平台

SuKai January 7, 2022

go-restful是一个golang语言实现的RESTful库,Kubernetes APIServer使用它实现RESTful API。下面我们一起简单看一下Kubesphere如何使用go-restful的。

Container

Container逻辑上是WebService的集合,功能上可以实现多终端的效果。 它包括一组restful.WebService和一个http.ServeMux对象,使用RouteSelector进行请求派发。

Webservice

WebService逻辑上是Route的集合,功能上主要是为一组Route统一设置包括root path,请求响应的数据类型等一些通用的属性。

Route

路由包含两种,一种是标准JSR311接口规范的实现RouterJSR311,一种是快速路由CurlyRouter。 CurlyRouter支持正则表达式和动态参数,相比RouterJSR11更加轻量级,apiserver中使用的就是这种路由。 一种Route的设定包含:请求方法(http Method),请求路径(URL Path),输入输出类型(JSON/YAML)以及对应的回掉函数restful.RouteFunction,响应内容类型(Accept)等。

代码示例:一个API聚合服务,包含两种服务,一种是/api, /apis两个路径请求代理给Kubernetes,另一种是本地注册API提供服务。

定义APIServer结构体

type APIServer struct {
   ServerCount int

   Server *http.Server

   Config *apiserverconfig.Config

   // webservice container, where all webservice defines
   container *restful.Container

   KubernetesClient k8s.Client
}

APIServer构造函数

初始化kubernets Client和http server

func (s *ServerRunOptions) NewAPIServer(stopCh <-chan struct{}) (*apiserver.APIServer, error) {
   apiServer := &apiserver.APIServer{
      Config:     s.Config,
   }

   kubernetesClient, err := k8s.NewKubernetesClient(s.KubernetesOptions)
   if err != nil {
      return nil, err
   }
   apiServer.KubernetesClient = kubernetesClient

   server := &http.Server{
      Addr: fmt.Sprintf(":%d", s.GenericServerRunOptions.InsecurePort),
   }

   apiServer.Server = server

   return apiServer, nil
}

初始化Container

func (s *APIServer) PrepareRun(stopCh <-chan struct{}) error {
   s.container = restful.NewContainer()

   // 设定路由为CurlyRouter(快速路由)
   s.container.Router(restful.CurlyRouter{})

   for _, ws := range s.container.RegisteredWebServices() {
      klog.V(2).Infof("%s", ws.RootPath())
   }
   
   // container作为http server的handler
   s.Server.Handler = s.container

   // 注册服务
   s.installAIscopeAPIs()

   // handler chain
   s.buildHandlerChain(stopCh)

   return nil
}

服务创建与注册

func (s *APIServer) installAIscopeAPIs() {
   urlruntime.Must(version.AddToContainer(s.container, s.KubernetesClient.Discovery()))
}

创建一个webservice,path为/aiapis/{GroupVersion}的服务,这里GroupVersion为空。添加了一条"/version"路由。最后API请求URL为/aiapis/version。

func AddToContainer(container *restful.Container, k8sDiscovery discovery.DiscoveryInterface) error {
   webservice := runtime.NewWebService(schema.GroupVersion{})

   webservice.Route(webservice.GET("/version").
      To(func(request *restful.Request, response *restful.Response) {
         ksVersion := version.Get()

         if k8sDiscovery != nil {
            k8sVersion, err := k8sDiscovery.ServerVersion()
            if err == nil {
               ksVersion.Kubernetes = k8sVersion
            } else {
               klog.Errorf("Failed to get kubernetes version, error %v", err)
            }
         }

         response.WriteAsJson(ksVersion)
      })).
      Doc("AIScope version")

   container.Add(webservice)

   return nil
}
const (
   ApiRootPath = "/aiapis"
)

func NewWebService(gv schema.GroupVersion) *restful.WebService {
   webservice := restful.WebService{}
   webservice.Path(ApiRootPath + "/" + gv.String()).
      Produces(restful.MIME_JSON)

   return &webservice
}

Handler Chain

通过WithRequestInfo解析API请求的信息,WithKubeAPIServer根据API请求信息判断是否代理请求给Kubernetes

func (s *APIServer) buildHandlerChain(stopCh <-chan struct{}) {
   requestInfoResolver := &request.RequestInfoFactory{
      APIPrefixes:          sets.NewString("api", "apis"),
   }

   handler := s.Server.Handler
   handler = filters.WithKubeAPIServer(handler, s.KubernetesClient.Config(), &errorResponder{})

   handler = filters.WithRequestInfo(handler, requestInfoResolver)

   s.Server.Handler = handler
}

WithRequestInfo,调用NewRequestInfo解析请求信息保存到ctx中

func WithRequestInfo(handler http.Handler, resolver request.RequestInfoResolver) http.Handler {
   return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {

      ctx := req.Context()
      info, err := resolver.NewRequestInfo(req)
      if err != nil {
         responsewriters.InternalError(w, req, fmt.Errorf("failed to crate RequestInfo: %v", err))
         return
      }

      req = req.WithContext(request.WithRequestInfo(ctx, info))
      handler.ServeHTTP(w, req)
   })
}

NewRequestInfo根据URL路径,解析请求信息,这里根据APIPrefix是否为IsKubernetesRequest请求

var kubernetesAPIPrefixes = sets.NewString("api", "apis")

func (r *RequestInfoFactory) NewRequestInfo(req *http.Request) (*RequestInfo, error) {
   requestInfo := RequestInfo{
      IsKubernetesRequest: false,
      RequestInfo: &k8srequest.RequestInfo{
         Path: req.URL.Path,
         Verb: req.Method,
      },
      Workspace: api.WorkspaceNone,
      Cluster:   api.ClusterNone,
      SourceIP:  iputil.RemoteIp(req),
      UserAgent: req.UserAgent(),
   }

   defer func() {
      prefix := requestInfo.APIPrefix
      if prefix == "" {
         currentParts := splitPath(requestInfo.Path)
         // Proxy discovery API
         if len(currentParts) > 0 && len(currentParts) < 3 {
            prefix = currentParts[0]
         }
      }
      if kubernetesAPIPrefixes.Has(prefix) {
         requestInfo.IsKubernetesRequest = true
      }
   }()

   currentParts := splitPath(req.URL.Path)
   if len(currentParts) < 3 {
      return &requestInfo, nil
   }

   if !r.APIPrefixes.Has(currentParts[0]) {
      // return a non-resource request
      return &requestInfo, nil
   }
   requestInfo.APIPrefix = currentParts[0]


   return &requestInfo, nil
}

WithKubeAPIServer handler首先创建Kubernetes代理defaultTransport,再RequestInfoFrom从请求的context中获取信息,如果info.IsKubernetesRequest是Kubernetes请求,httpProxy代理请求到Kubernetes。

func WithKubeAPIServer(handler http.Handler, config *rest.Config, failed proxy.ErrorResponder) http.Handler {
   kubernetes, _ := url.Parse(config.Host)
   defaultTransport, err := rest.TransportFor(config)
   if err != nil {
      klog.Errorf("Unable to create transport from rest.Config: %v", err)
      return handler
   }

   return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
      info, ok := request.RequestInfoFrom(req.Context())
      if !ok {
         err := errors.New("Unable to retrieve request info from request")
         klog.Error(err)
         responsewriters.InternalError(w, req, err)
      }
      if info.IsKubernetesRequest {
         s := *req.URL
         s.Host = kubernetes.Host
         s.Scheme = kubernetes.Scheme

         // make sure we don't override kubernetes's authorization
         req.Header.Del("Authorization")
         httpProxy := proxy.NewUpgradeAwareHandler(&s, defaultTransport, true, false, failed)
         httpProxy.UpgradeTransport = proxy.NewUpgradeRequestRoundTripper(defaultTransport, defaultTransport)
         httpProxy.ServeHTTP(w, req)
         return
      }

      handler.ServeHTTP(w, req)
   })
}

运行APIServer

func (s *APIServer) Run(ctx context.Context) (err error) {

	shutdownCtx, cancel := context.WithCancel(context.Background())
	defer cancel()

	go func() {
		<-ctx.Done()
		_ = s.Server.Shutdown(shutdownCtx)
	}()

	klog.V(0).Infof("Start listening on %s", s.Server.Addr)
	if s.Server.TLSConfig != nil {
		err = s.Server.ListenAndServeTLS("", "")
	} else {
		err = s.Server.ListenAndServe()
	}

	return err
}

image-20211224165449608

image-20211224165526336

image-20211224165547792