由 SuKai January 7, 2022
go-restful是一个golang语言实现的RESTful库,Kubernetes APIServer使用它实现RESTful API。下面我们一起简单看一下Kubesphere如何使用go-restful的。
Container
Container逻辑上是WebService的集合,功能上可以实现多终端的效果。 它包括一组restful.WebService和一个http.ServeMux对象,使用RouteSelector进行请求派发。
Webservice
WebService逻辑上是Route的集合,功能上主要是为一组Route统一设置包括root path,请求响应的数据类型等一些通用的属性。
Route
路由包含两种,一种是标准JSR311接口规范的实现RouterJSR311,一种是快速路由CurlyRouter。 CurlyRouter支持正则表达式和动态参数,相比RouterJSR11更加轻量级,apiserver中使用的就是这种路由。 一种Route的设定包含:请求方法(http Method),请求路径(URL Path),输入输出类型(JSON/YAML)以及对应的回掉函数restful.RouteFunction,响应内容类型(Accept)等。
代码示例:一个API聚合服务,包含两种服务,一种是/api, /apis两个路径请求代理给Kubernetes,另一种是本地注册API提供服务。
定义APIServer结构体
type APIServer struct {
ServerCount int
Server *http.Server
Config *apiserverconfig.Config
// webservice container, where all webservice defines
container *restful.Container
KubernetesClient k8s.Client
}
APIServer构造函数
初始化kubernets Client和http server
func (s *ServerRunOptions) NewAPIServer(stopCh <-chan struct{}) (*apiserver.APIServer, error) {
apiServer := &apiserver.APIServer{
Config: s.Config,
}
kubernetesClient, err := k8s.NewKubernetesClient(s.KubernetesOptions)
if err != nil {
return nil, err
}
apiServer.KubernetesClient = kubernetesClient
server := &http.Server{
Addr: fmt.Sprintf(":%d", s.GenericServerRunOptions.InsecurePort),
}
apiServer.Server = server
return apiServer, nil
}
初始化Container
func (s *APIServer) PrepareRun(stopCh <-chan struct{}) error {
s.container = restful.NewContainer()
// 设定路由为CurlyRouter(快速路由)
s.container.Router(restful.CurlyRouter{})
for _, ws := range s.container.RegisteredWebServices() {
klog.V(2).Infof("%s", ws.RootPath())
}
// container作为http server的handler
s.Server.Handler = s.container
// 注册服务
s.installAIscopeAPIs()
// handler chain
s.buildHandlerChain(stopCh)
return nil
}
服务创建与注册
func (s *APIServer) installAIscopeAPIs() {
urlruntime.Must(version.AddToContainer(s.container, s.KubernetesClient.Discovery()))
}
创建一个webservice,path为/aiapis/{GroupVersion}的服务,这里GroupVersion为空。添加了一条"/version"路由。最后API请求URL为/aiapis/version。
func AddToContainer(container *restful.Container, k8sDiscovery discovery.DiscoveryInterface) error {
webservice := runtime.NewWebService(schema.GroupVersion{})
webservice.Route(webservice.GET("/version").
To(func(request *restful.Request, response *restful.Response) {
ksVersion := version.Get()
if k8sDiscovery != nil {
k8sVersion, err := k8sDiscovery.ServerVersion()
if err == nil {
ksVersion.Kubernetes = k8sVersion
} else {
klog.Errorf("Failed to get kubernetes version, error %v", err)
}
}
response.WriteAsJson(ksVersion)
})).
Doc("AIScope version")
container.Add(webservice)
return nil
}
const (
ApiRootPath = "/aiapis"
)
func NewWebService(gv schema.GroupVersion) *restful.WebService {
webservice := restful.WebService{}
webservice.Path(ApiRootPath + "/" + gv.String()).
Produces(restful.MIME_JSON)
return &webservice
}
Handler Chain
通过WithRequestInfo解析API请求的信息,WithKubeAPIServer根据API请求信息判断是否代理请求给Kubernetes
func (s *APIServer) buildHandlerChain(stopCh <-chan struct{}) {
requestInfoResolver := &request.RequestInfoFactory{
APIPrefixes: sets.NewString("api", "apis"),
}
handler := s.Server.Handler
handler = filters.WithKubeAPIServer(handler, s.KubernetesClient.Config(), &errorResponder{})
handler = filters.WithRequestInfo(handler, requestInfoResolver)
s.Server.Handler = handler
}
WithRequestInfo,调用NewRequestInfo解析请求信息保存到ctx中
func WithRequestInfo(handler http.Handler, resolver request.RequestInfoResolver) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()
info, err := resolver.NewRequestInfo(req)
if err != nil {
responsewriters.InternalError(w, req, fmt.Errorf("failed to crate RequestInfo: %v", err))
return
}
req = req.WithContext(request.WithRequestInfo(ctx, info))
handler.ServeHTTP(w, req)
})
}
NewRequestInfo根据URL路径,解析请求信息,这里根据APIPrefix是否为IsKubernetesRequest请求
var kubernetesAPIPrefixes = sets.NewString("api", "apis")
func (r *RequestInfoFactory) NewRequestInfo(req *http.Request) (*RequestInfo, error) {
requestInfo := RequestInfo{
IsKubernetesRequest: false,
RequestInfo: &k8srequest.RequestInfo{
Path: req.URL.Path,
Verb: req.Method,
},
Workspace: api.WorkspaceNone,
Cluster: api.ClusterNone,
SourceIP: iputil.RemoteIp(req),
UserAgent: req.UserAgent(),
}
defer func() {
prefix := requestInfo.APIPrefix
if prefix == "" {
currentParts := splitPath(requestInfo.Path)
// Proxy discovery API
if len(currentParts) > 0 && len(currentParts) < 3 {
prefix = currentParts[0]
}
}
if kubernetesAPIPrefixes.Has(prefix) {
requestInfo.IsKubernetesRequest = true
}
}()
currentParts := splitPath(req.URL.Path)
if len(currentParts) < 3 {
return &requestInfo, nil
}
if !r.APIPrefixes.Has(currentParts[0]) {
// return a non-resource request
return &requestInfo, nil
}
requestInfo.APIPrefix = currentParts[0]
return &requestInfo, nil
}
WithKubeAPIServer handler首先创建Kubernetes代理defaultTransport,再RequestInfoFrom从请求的context中获取信息,如果info.IsKubernetesRequest是Kubernetes请求,httpProxy代理请求到Kubernetes。
func WithKubeAPIServer(handler http.Handler, config *rest.Config, failed proxy.ErrorResponder) http.Handler {
kubernetes, _ := url.Parse(config.Host)
defaultTransport, err := rest.TransportFor(config)
if err != nil {
klog.Errorf("Unable to create transport from rest.Config: %v", err)
return handler
}
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
info, ok := request.RequestInfoFrom(req.Context())
if !ok {
err := errors.New("Unable to retrieve request info from request")
klog.Error(err)
responsewriters.InternalError(w, req, err)
}
if info.IsKubernetesRequest {
s := *req.URL
s.Host = kubernetes.Host
s.Scheme = kubernetes.Scheme
// make sure we don't override kubernetes's authorization
req.Header.Del("Authorization")
httpProxy := proxy.NewUpgradeAwareHandler(&s, defaultTransport, true, false, failed)
httpProxy.UpgradeTransport = proxy.NewUpgradeRequestRoundTripper(defaultTransport, defaultTransport)
httpProxy.ServeHTTP(w, req)
return
}
handler.ServeHTTP(w, req)
})
}
运行APIServer
func (s *APIServer) Run(ctx context.Context) (err error) {
shutdownCtx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
<-ctx.Done()
_ = s.Server.Shutdown(shutdownCtx)
}()
klog.V(0).Infof("Start listening on %s", s.Server.Addr)
if s.Server.TLSConfig != nil {
err = s.Server.ListenAndServeTLS("", "")
} else {
err = s.Server.ListenAndServe()
}
return err
}