由 SuKai May 2, 2022
在基于Kubernetes平台开发过程中,经常需要访问不同Kuberntes版本的集群资源,这时就需要去获取所访问集群支持的资源版本信息。下面我们一起来看一下,如何来实现Kubernetes资源类型发现?
文章分为下面几部分为大家介绍:
1,kubectl命令查看Kubernetes集群资源
2,client-go库获取Kubernetes集群资源
3,controller-runtime动态获取Kubernetes集群资源
kubectl命令
kubectl命令行工具提供api-resources,api-versions命令来查看kubernetes集群当前支持的资源类型。
kubectl api-resources查看k8s所有资源和版本列表。
sukai@sukai:~$ kubectl api-resources
NAME SHORTNAMES APIVERSION NAMESPACED KIND
bindings v1 true Binding
componentstatuses cs v1 false ComponentStatus
configmaps cm v1 true ConfigMap
endpoints ep v1 true Endpoints
events ev v1 true Event
limitranges limits v1 true LimitRange
namespaces ns v1 false Namespace
nodes no v1 false Node
persistentvolumeclaims pvc v1 true PersistentVolumeClaim
persistentvolumes pv v1 false PersistentVolume
pods po v1 true Pod
...
kubectl api-versions查看k8s的API组和版本列表
sukai@sukai:~$ kubectl api-versions
admissionregistration.k8s.io/v1
apiextensions.k8s.io/v1
apiregistration.k8s.io/v1
apps/v1
authentication.k8s.io/v1
authorization.k8s.io/v1
autoscaling/v1
autoscaling/v2
autoscaling/v2beta1
autoscaling/v2beta2
batch/v1
batch/v1beta1
certificates.k8s.io/v1
cluster.clusterpedia.io/v1alpha2
clusterpedia.io/v1beta1
coordination.k8s.io/v1
crd.projectcalico.org/v1
dex.coreos.com/v1
discovery.k8s.io/v1
discovery.k8s.io/v1beta1
events.k8s.io/v1
events.k8s.io/v1beta1
experiment.aiscope/v1alpha2
flowcontrol.apiserver.k8s.io/v1beta1
flowcontrol.apiserver.k8s.io/v1beta2
iam.aiscope/v1alpha2
networking.k8s.io/v1
node.k8s.io/v1
node.k8s.io/v1beta1
policy/v1
policy/v1beta1
rbac.authorization.k8s.io/v1
scheduling.k8s.io/v1
storage.k8s.io/v1
storage.k8s.io/v1beta1
tenant.aiscope/v1alpha2
traefik.containo.us/v1alpha1
v1
kubectl命令api-resources的代码
1,创建一个discoveryclient
2,discoveryclient.ServerPreferredResources()获取资源和首选版本的列表
3,schema.ParseGroupVersion将"group/version"字符串转换为GroupVersion结构体,如果命令行参数指定了过滤条件,对APIResource资源进行过滤,过滤条件包括:资源组,全局资源还是命名空间资源,资源支持的动作如get,list,watch,create,update等。
4,打印出资源列表
// RunAPIResources does the work
func (o *APIResourceOptions) RunAPIResources(cmd *cobra.Command, f cmdutil.Factory) error {
w := printers.GetNewTabWriter(o.Out)
defer w.Flush()
discoveryclient, err := f.ToDiscoveryClient()
if err != nil {
return err
}
if !o.Cached {
// Always request fresh data from the server
discoveryclient.Invalidate()
}
errs := []error{}
lists, err := discoveryclient.ServerPreferredResources()
if err != nil {
errs = append(errs, err)
}
resources := []groupResource{}
groupChanged := cmd.Flags().Changed("api-group")
nsChanged := cmd.Flags().Changed("namespaced")
for _, list := range lists {
if len(list.APIResources) == 0 {
continue
}
gv, err := schema.ParseGroupVersion(list.GroupVersion)
if err != nil {
continue
}
for _, resource := range list.APIResources {
if len(resource.Verbs) == 0 {
continue
}
// filter apiGroup
if groupChanged && o.APIGroup != gv.Group {
continue
}
// filter namespaced
if nsChanged && o.Namespaced != resource.Namespaced {
continue
}
// filter to resources that support the specified verbs
if len(o.Verbs) > 0 && !sets.NewString(resource.Verbs...).HasAll(o.Verbs...) {
continue
}
resources = append(resources, groupResource{
APIGroup: gv.Group,
APIGroupVersion: gv.String(),
APIResource: resource,
})
}
}
if o.NoHeaders == false && o.Output != "name" {
if err = printContextHeaders(w, o.Output); err != nil {
return err
}
}
sort.Stable(sortableResource{resources, o.SortBy})
for _, r := range resources {
switch o.Output {
case "name":
name := r.APIResource.Name
if len(r.APIGroup) > 0 {
name += "." + r.APIGroup
}
if _, err := fmt.Fprintf(w, "%s\n", name); err != nil {
errs = append(errs, err)
}
case "wide":
if _, err := fmt.Fprintf(w, "%s\t%s\t%s\t%v\t%s\t%v\n",
r.APIResource.Name,
strings.Join(r.APIResource.ShortNames, ","),
r.APIGroupVersion,
r.APIResource.Namespaced,
r.APIResource.Kind,
r.APIResource.Verbs); err != nil {
errs = append(errs, err)
}
case "":
if _, err := fmt.Fprintf(w, "%s\t%s\t%s\t%v\t%s\n",
r.APIResource.Name,
strings.Join(r.APIResource.ShortNames, ","),
r.APIGroupVersion,
r.APIResource.Namespaced,
r.APIResource.Kind); err != nil {
errs = append(errs, err)
}
}
}
if len(errs) > 0 {
return errors.NewAggregate(errs)
}
return nil
}
ToDiscoveryClient
1,ToRESTConfig构造rest.Config
2,配置突发流量和缓存目录,这里注释可以看到资源组请求是很频繁,需要配置速率限制,防止突发流量导致APIServer无法提供其他服务,另外这里还设置了两个本地缓存,一个用于缓存APIResource,一个用于HTTP缓存,缓存期限为10分钟。
3,调用NewCachedDiscoveryClientForConfig创建CachedDiscoveryClient,CachedDiscoveryClient对DiscoveryClient做了一层封装cacheRoundTripper,cacheRoundTripper通过http.RoundTripper做本地缓存,如果本地缓存存在,那么直接返回给kubectl,不再请求k8s APIServer。
// ToDiscoveryClient implements RESTClientGetter.
// Expects the AddFlags method to have been called.
// Returns a CachedDiscoveryInterface using a computed RESTConfig.
func (f *ConfigFlags) ToDiscoveryClient() (discovery.CachedDiscoveryInterface, error) {
config, err := f.ToRESTConfig()
if err != nil {
return nil, err
}
// The more groups you have, the more discovery requests you need to make.
// given 25 groups (our groups + a few custom resources) with one-ish version each, discovery needs to make 50 requests
// double it just so we don't end up here again for a while. This config is only used for discovery.
config.Burst = f.discoveryBurst
cacheDir := defaultCacheDir
// retrieve a user-provided value for the "cache-dir"
// override httpCacheDir and discoveryCacheDir if user-value is given.
if f.CacheDir != nil {
cacheDir = *f.CacheDir
}
httpCacheDir := filepath.Join(cacheDir, "http")
discoveryCacheDir := computeDiscoverCacheDir(filepath.Join(cacheDir, "discovery"), config.Host)
return diskcached.NewCachedDiscoveryClientForConfig(config, discoveryCacheDir, httpCacheDir, time.Duration(10*time.Minute))
}
// NewCachedDiscoveryClientForConfig creates a new DiscoveryClient for the given config, and wraps
// the created client in a CachedDiscoveryClient. The provided configuration is updated with a
// custom transport that understands cache responses.
// We receive two distinct cache directories for now, in order to preserve old behavior
// which makes use of the --cache-dir flag value for storing cache data from the CacheRoundTripper,
// and makes use of the hardcoded destination (~/.kube/cache/discovery/...) for storing
// CachedDiscoveryClient cache data. If httpCacheDir is empty, the restconfig's transport will not
// be updated with a roundtripper that understands cache responses.
// If discoveryCacheDir is empty, cached server resource data will be looked up in the current directory.
func NewCachedDiscoveryClientForConfig(config *restclient.Config, discoveryCacheDir, httpCacheDir string, ttl time.Duration) (*CachedDiscoveryClient, error) {
if len(httpCacheDir) > 0 {
// update the given restconfig with a custom roundtripper that
// understands how to handle cache responses.
config = restclient.CopyConfig(config)
config.Wrap(func(rt http.RoundTripper) http.RoundTripper {
return newCacheRoundTripper(httpCacheDir, rt)
})
}
discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
if err != nil {
return nil, err
}
return newCachedDiscoveryClient(discoveryClient, discoveryCacheDir, ttl), nil
}
ServerPreferredResources
1,ServerGroups获取资源组列表,通过RestClient请求/api和/apis,得到k8s支持的GroupVersion列表
2,fetchGroupVersionResources根据GroupVersion列表,通过RestClient并发请求/api/v1和/apis/groupVersion获取GroupVersion对应的Resources资源列表。
3,将获取数据处理返回APIResourceList
// ServerPreferredResources uses the provided discovery interface to look up preferred resources
func ServerPreferredResources(d DiscoveryInterface) ([]*metav1.APIResourceList, error) {
serverGroupList, err := d.ServerGroups()
if err != nil {
return nil, err
}
groupVersionResources, failedGroups := fetchGroupVersionResources(d, serverGroupList)
result := []*metav1.APIResourceList{}
grVersions := map[schema.GroupResource]string{} // selected version of a GroupResource
grAPIResources := map[schema.GroupResource]*metav1.APIResource{} // selected APIResource for a GroupResource
gvAPIResourceLists := map[schema.GroupVersion]*metav1.APIResourceList{} // blueprint for a APIResourceList for later grouping
for _, apiGroup := range serverGroupList.Groups {
for _, version := range apiGroup.Versions {
groupVersion := schema.GroupVersion{Group: apiGroup.Name, Version: version.Version}
apiResourceList, ok := groupVersionResources[groupVersion]
if !ok {
continue
}
// create empty list which is filled later in another loop
emptyAPIResourceList := metav1.APIResourceList{
GroupVersion: version.GroupVersion,
}
gvAPIResourceLists[groupVersion] = &emptyAPIResourceList
result = append(result, &emptyAPIResourceList)
for i := range apiResourceList.APIResources {
apiResource := &apiResourceList.APIResources[i]
if strings.Contains(apiResource.Name, "/") {
continue
}
gv := schema.GroupResource{Group: apiGroup.Name, Resource: apiResource.Name}
if _, ok := grAPIResources[gv]; ok && version.Version != apiGroup.PreferredVersion.Version {
// only override with preferred version
continue
}
grVersions[gv] = version.Version
grAPIResources[gv] = apiResource
}
}
}
// group selected APIResources according to GroupVersion into APIResourceLists
for groupResource, apiResource := range grAPIResources {
version := grVersions[groupResource]
groupVersion := schema.GroupVersion{Group: groupResource.Group, Version: version}
apiResourceList := gvAPIResourceLists[groupVersion]
apiResourceList.APIResources = append(apiResourceList.APIResources, *apiResource)
}
if len(failedGroups) == 0 {
return result, nil
}
return result, &ErrGroupDiscoveryFailed{Groups: failedGroups}
}
// ServerGroups returns the supported groups, with information like supported versions and the
// preferred version.
func (d *DiscoveryClient) ServerGroups() (apiGroupList *metav1.APIGroupList, err error) {
// Get the groupVersions exposed at /api
v := &metav1.APIVersions{}
err = d.restClient.Get().AbsPath(d.LegacyPrefix).Do(context.TODO()).Into(v)
apiGroup := metav1.APIGroup{}
if err == nil && len(v.Versions) != 0 {
apiGroup = apiVersionsToAPIGroup(v)
}
if err != nil && !errors.IsNotFound(err) && !errors.IsForbidden(err) {
return nil, err
}
// Get the groupVersions exposed at /apis
apiGroupList = &metav1.APIGroupList{}
err = d.restClient.Get().AbsPath("/apis").Do(context.TODO()).Into(apiGroupList)
if err != nil && !errors.IsNotFound(err) && !errors.IsForbidden(err) {
return nil, err
}
// to be compatible with a v1.0 server, if it's a 403 or 404, ignore and return whatever we got from /api
if err != nil && (errors.IsNotFound(err) || errors.IsForbidden(err)) {
apiGroupList = &metav1.APIGroupList{}
}
// prepend the group retrieved from /api to the list if not empty
if len(v.Versions) != 0 {
apiGroupList.Groups = append([]metav1.APIGroup{apiGroup}, apiGroupList.Groups...)
}
return apiGroupList, nil
}
// fetchServerResourcesForGroupVersions uses the discovery client to fetch the resources for the specified groups in parallel.
func fetchGroupVersionResources(d DiscoveryInterface, apiGroups *metav1.APIGroupList) (map[schema.GroupVersion]*metav1.APIResourceList, map[schema.GroupVersion]error) {
groupVersionResources := make(map[schema.GroupVersion]*metav1.APIResourceList)
failedGroups := make(map[schema.GroupVersion]error)
wg := &sync.WaitGroup{}
resultLock := &sync.Mutex{}
for _, apiGroup := range apiGroups.Groups {
for _, version := range apiGroup.Versions {
groupVersion := schema.GroupVersion{Group: apiGroup.Name, Version: version.Version}
wg.Add(1)
go func() {
defer wg.Done()
defer utilruntime.HandleCrash()
apiResourceList, err := d.ServerResourcesForGroupVersion(groupVersion.String())
// lock to record results
resultLock.Lock()
defer resultLock.Unlock()
if err != nil {
// TODO: maybe restrict this to NotFound errors
failedGroups[groupVersion] = err
}
if apiResourceList != nil {
// even in case of error, some fallback might have been returned
groupVersionResources[groupVersion] = apiResourceList
}
}()
}
}
wg.Wait()
return groupVersionResources, failedGroups
}
func (d *DiscoveryClient) ServerGroups() (apiGroupList *metav1.APIGroupList, err error) {
// Get the groupVersions exposed at /api
v := &metav1.APIVersions{}
err = d.restClient.Get().AbsPath(d.LegacyPrefix).Do(context.TODO()).Into(v)
apiGroup := metav1.APIGroup{}
if err == nil && len(v.Versions) != 0 {
apiGroup = apiVersionsToAPIGroup(v)
}
if err != nil && !errors.IsNotFound(err) && !errors.IsForbidden(err) {
return nil, err
}
// Get the groupVersions exposed at /apis
apiGroupList = &metav1.APIGroupList{}
err = d.restClient.Get().AbsPath("/apis").Do(context.TODO()).Into(apiGroupList)
if err != nil && !errors.IsNotFound(err) && !errors.IsForbidden(err) {
return nil, err
}
// to be compatible with a v1.0 server, if it's a 403 or 404, ignore and return whatever we got from /api
if err != nil && (errors.IsNotFound(err) || errors.IsForbidden(err)) {
apiGroupList = &metav1.APIGroupList{}
}
// prepend the group retrieved from /api to the list if not empty
if len(v.Versions) != 0 {
apiGroupList.Groups = append([]metav1.APIGroup{apiGroup}, apiGroupList.Groups...)
}
return apiGroupList, nil
}
func fetchGroupVersionResources(d DiscoveryInterface, apiGroups *metav1.APIGroupList) (map[schema.GroupVersion]*metav1.APIResourceList, map[schema.GroupVersion]error) {
groupVersionResources := make(map[schema.GroupVersion]*metav1.APIResourceList)
failedGroups := make(map[schema.GroupVersion]error)
wg := &sync.WaitGroup{}
resultLock := &sync.Mutex{}
for _, apiGroup := range apiGroups.Groups {
for _, version := range apiGroup.Versions {
groupVersion := schema.GroupVersion{Group: apiGroup.Name, Version: version.Version}
wg.Add(1)
go func() {
defer wg.Done()
defer utilruntime.HandleCrash()
apiResourceList, err := d.ServerResourcesForGroupVersion(groupVersion.String())
// lock to record results
resultLock.Lock()
defer resultLock.Unlock()
if err != nil {
// TODO: maybe restrict this to NotFound errors
failedGroups[groupVersion] = err
}
if apiResourceList != nil {
// even in case of error, some fallback might have been returned
groupVersionResources[groupVersion] = apiResourceList
}
}()
}
}
wg.Wait()
return groupVersionResources, failedGroups
}
client-go
kubectl CachedDiscoveryClient对client-go的DiscoveryClient进行了封装,增加了Cache缓存层,减少DiscoveryClient请求资源列表时对APIServer的请求压力。
client-go获取资源列表的过程包括:
1,discovery.NewDiscoveryClientForConfig创建discoveryClient。
2,ServerGroups返回GroupVersion列表,包括支持的资源组,支持的版本以及首选的版本,比如v1(core组),apps/v1等。
3,ServerResourcesForGroupVersion返回APIResourceList,包括各个资源组版本下的资源信息,比如pods, deployments,CRD自定义资源,Aggregated API聚合API资源等。
client-go还提供了GetAPIGroupResources方法,将上面步骤进行了整合,直接返回资源列表APIGroupResources,提供开发使用。
那么获取了资源列表以后,可以做什么呢?在开发过程中经常需要进行资源操作,在调用API时需要GVR和GVK进行转换,或者只知道资源,不知道k8s实际资源的版本,那么就需要根据资源查询版本,然后再调用API。
k8s定义了RESTMapper接口,根据资源列表APIGroupResources构建RESTMapper,提供资源映射查询,比如KindsFor根据资源GVR映射GVK,ResourcesFor根据GVR查询GVR优先级。k8s API支持多种RESTMapper,比如:DefaultRESTMapper资源无法匹配到版本时,返回runtime.Scheme定义的默认版本。MultiRESTMapper,一组RESTMapper,资源在一个RESTMapper无法匹配时,进行下一个RESTMapper匹配。PriorityRESTMapper基于某个RESTMapper封装,先从RESTMapper匹配出资源,然后根据优先级选择资源。
client-go通过NewDiscoveryRESTMapper根据资源列表APIGroupResources,构造了一个PriorityRESTMapper。这个PriorityRESTMapper又是基于MultiRESTMapper进行封装的。这样就可以通过PriorityRESTMapper进行资源发现。
controller-runtime
下面我们再来看一下controller-runtime框架又是如何基于client-go来实现自己的资源发现的。
NewDynamicRESTMapper
运行时动态发现资源类型:
1,NewDiscoveryClientForConfig创建DiscoveryClient类型的客户端
2,构造dynamicRESTMapper,dynamicRESTMapper实现了RESTMapper接口。可以看到dynamicRESTMapper,包含了一个速率限制器,一个newMapper的函数。newMapper调用GetAPIGroupResources获取资源列表,NewDiscoveryRESTMapper根据资源列表,构造了一个dynamicRESTMapper。
3,通过newMapper初始化dynamicRESTMapper.staticMapper,也就是请求了k8s APIServer获取资源并生成了dynamicRESTMapper保存到staticMapper。
func NewDynamicRESTMapper(cfg *rest.Config, opts ...DynamicRESTMapperOption) (meta.RESTMapper, error) {
client, err := discovery.NewDiscoveryClientForConfig(cfg)
if err != nil {
return nil, err
}
drm := &dynamicRESTMapper{
limiter: rate.NewLimiter(rate.Limit(defaultRefillRate), defaultLimitSize),
newMapper: func() (meta.RESTMapper, error) {
groupResources, err := restmapper.GetAPIGroupResources(client)
if err != nil {
return nil, err
}
return restmapper.NewDiscoveryRESTMapper(groupResources), nil
},
}
for _, opt := range opts {
if err = opt(drm); err != nil {
return nil, err
}
}
if !drm.lazy {
if err := drm.setStaticMapper(); err != nil {
return nil, err
}
}
return drm, nil
}
func (drm *dynamicRESTMapper) setStaticMapper() error {
newMapper, err := drm.newMapper()
if err != nil {
return err
}
drm.staticMapper = newMapper
return nil
}
下面看一下dynamicRESTMapper如何进行资源的版本列表查询和组资源下资源列表查询的。
流程比较简单,通过checkAndReload进行资源列表查询,如果返回NoResourceMatchError错误,那么就调用setStaticMapper重新请求k8s APIServer获取资源生成dynamicRESTMapper的staticMapper,再次进行查询,如果还查询不到那么返回错误,并且在这个过程中有了速率限制器的并发限制drm.limiter.Allow。也就是说,当资源查询不到时触发一次资源查询更新restmapper,再查一次。
func (drm *dynamicRESTMapper) KindsFor(resource schema.GroupVersionResource) ([]schema.GroupVersionKind, error) {
if err := drm.init(); err != nil {
return nil, err
}
var gvks []schema.GroupVersionKind
err := drm.checkAndReload(&meta.NoResourceMatchError{}, func() error {
var err error
gvks, err = drm.staticMapper.KindsFor(resource)
return err
})
return gvks, err
}
func (drm *dynamicRESTMapper) ResourcesFor(input schema.GroupVersionResource) ([]schema.GroupVersionResource, error) {
if err := drm.init(); err != nil {
return nil, err
}
var gvrs []schema.GroupVersionResource
err := drm.checkAndReload(&meta.NoResourceMatchError{}, func() error {
var err error
gvrs, err = drm.staticMapper.ResourcesFor(input)
return err
})
return gvrs, err
}
// checkAndReload attempts to call the given callback, which is assumed to be dependent
// on the data in the restmapper.
//
// If the callback returns an error that matches the given error, it will attempt to reload
// the RESTMapper's data and re-call the callback once that's occurred.
// If the callback returns any other error, the function will return immediately regardless.
//
// It will take care of ensuring that reloads are rate-limited and that extraneous calls
// aren't made. If a reload would exceed the limiters rate, it returns the error return by
// the callback.
// It's thread-safe, and worries about thread-safety for the callback (so the callback does
// not need to attempt to lock the restmapper).
func (drm *dynamicRESTMapper) checkAndReload(needsReloadErr error, checkNeedsReload func() error) error {
// first, check the common path -- data is fresh enough
// (use an IIFE for the lock's defer)
err := func() error {
drm.mu.RLock()
defer drm.mu.RUnlock()
return checkNeedsReload()
}()
// NB(directxman12): `Is` and `As` have a confusing relationship --
// `Is` is like `== or does this implement .Is`, whereas `As` says
// `can I type-assert into`
needsReload := errors.As(err, &needsReloadErr)
if !needsReload {
return err
}
// if the data wasn't fresh, we'll need to try and update it, so grab the lock...
drm.mu.Lock()
defer drm.mu.Unlock()
// ... and double-check that we didn't reload in the meantime
err = checkNeedsReload()
needsReload = errors.As(err, &needsReloadErr)
if !needsReload {
return err
}
// we're still stale, so grab a rate-limit token if we can...
if !drm.limiter.Allow() {
// return error from static mapper here, we have refreshed often enough (exceeding rate of provided limiter)
// so that client's can handle this the same way as a "normal" NoResourceMatchError / NoKindMatchError
return err
}
// ...reload...
if err := drm.setStaticMapper(); err != nil {
return err
}
// ...and return the results of the closure regardless
return checkNeedsReload()
}
总结
Kubernetes的DiscoveryClient提供了Kubernetes支持资源列表查询,k8s API提供多种RESTMapper提供资源GVR/GVK映射查询,资源版本列表查询。client-go基于DiscoveryClient提供了GetAPIGroupResources方法直接返回APIGroupResources,NewDiscoveryRESTMapper通过APIGroupResources构造PriorityRESTMapper。而controller-runtime使用client-go的GetAPIGroupResources获取资源后,构造了dynamicRESTMapper,dynamicRESTMapper增加了速率限制和查询出错重试,进行了优化。