Kubernetes容器存储接口CSI插件controller

SuKai August 29, 2022

前面文章介绍了CSI插件node-agent组件,node-agent主要实现了需要在主机节点上才能进行的操作功能,包含了LVMNode, LVMVolume, LVMSnapshot三个自定义资源控制器和NodeServer接口服务。那么LVMVolume,LVMSnapshot这些自定义资源又是由谁创建的呢?今天我们一起看一下OpenEBS LVM存储插件的Controller部分,了解Controller如何被编排调度系统Container Orchestration system(CO)侧Kubernetes调用,Controller又如何创建自定义资源给Node-Agent调谐的。

之前我们了解到CO侧Kubernetes开发了一系列组件如external-provisioner,external-attacher,external-snapshotter,external-resizer,这些组件通过Sidecar的方式和Plugin插件驱动侧一起运行,直接调用插件Controller的gRPC服务,实现存储卷的管理。下面我们来看一下Controller的接口服务。

Controller Interface包括以下方法

// ControllerServer is the server API for Controller service.
type ControllerServer interface {
   // 创建存储卷,创建自定义资源LVMVolume实例
   CreateVolume(context.Context, *CreateVolumeRequest) (*CreateVolumeResponse, error)
   // 删除存储卷,删除LVMVolume实例
   DeleteVolume(context.Context, *DeleteVolumeRequest) (*DeleteVolumeResponse, error)
   // 挂接存储卷到指定节点,因为LVM是本地设备,末实现
   ControllerPublishVolume(context.Context, *ControllerPublishVolumeRequest) (*ControllerPublishVolumeResponse, error)
   // 卸载存储卷,末实现
   ControllerUnpublishVolume(context.Context, *ControllerUnpublishVolumeRequest) (*ControllerUnpublishVolumeResponse, error)
   // 验证是否支持存储卷访问模式,比如支持单/多机读写,Read-Write-Once
   ValidateVolumeCapabilities(context.Context, *ValidateVolumeCapabilitiesRequest) (*ValidateVolumeCapabilitiesResponse, error)
   // 列出所有的存储卷,末实现
   ListVolumes(context.Context, *ListVolumesRequest) (*ListVolumesResponse, error)
   // 返回指定节点的容量信息,可用存储空间
   GetCapacity(context.Context, *GetCapacityRequest) (*GetCapacityResponse, error)
   // 返回controller支持的功能,比如创建/删除存储卷,扩展存储卷,创建删除快照,获取容量
   ControllerGetCapabilities(context.Context, *ControllerGetCapabilitiesRequest) (*ControllerGetCapabilitiesResponse, error)
   // 创建指定存储卷快照
   CreateSnapshot(context.Context, *CreateSnapshotRequest) (*CreateSnapshotResponse, error)
   // 删除指定存储卷快照
   DeleteSnapshot(context.Context, *DeleteSnapshotRequest) (*DeleteSnapshotResponse, error)
   // 列出指定存储卷所有快照,末实现
   ListSnapshots(context.Context, *ListSnapshotsRequest) (*ListSnapshotsResponse, error)
   // 调用存储卷大小
   ControllerExpandVolume(context.Context, *ControllerExpandVolumeRequest) (*ControllerExpandVolumeResponse, error)
}

ControllerGetCapabilities服务

返回Controller支持的功能列表。CO侧Kubernetes调用此接口检查Controller支持的服务,也就是Controller实现了哪些服务接口可供CO调用。

// ControllerGetCapabilities fetches controller capabilities
//
// This implements csi.ControllerServer
func (cs *controller) ControllerGetCapabilities(
   ctx context.Context,
   req *csi.ControllerGetCapabilitiesRequest,
) (*csi.ControllerGetCapabilitiesResponse, error) {

   resp := &csi.ControllerGetCapabilitiesResponse{
      Capabilities: cs.capabilities,
   }

   return resp, nil
}

// newControllerCapabilities returns a list
// of this controller's capabilities
func newControllerCapabilities() []*csi.ControllerServiceCapability {
	fromType := func(
		cap csi.ControllerServiceCapability_RPC_Type,
	) *csi.ControllerServiceCapability {
		return &csi.ControllerServiceCapability{
			Type: &csi.ControllerServiceCapability_Rpc{
				Rpc: &csi.ControllerServiceCapability_RPC{
					Type: cap,
				},
			},
		}
	}

	var capabilities []*csi.ControllerServiceCapability
	for _, cap := range []csi.ControllerServiceCapability_RPC_Type{
		csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
		csi.ControllerServiceCapability_RPC_EXPAND_VOLUME,
		csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT,
		csi.ControllerServiceCapability_RPC_GET_CAPACITY,
	} {
		capabilities = append(capabilities, fromType(cap))
	}
	return capabilities
}

CreateVolume服务

创建存储卷过程

1,NewVolumeParams方法,从请求体中获得信息

a, 从storageclass参数中获取卷组信息,匹配逻辑卷组的条件vgpattern, volgroup

b, 从storageclass参数中获取调度策略,是否可共享永定,是否精简配置。

调度策略分为:SpaceWeighted空间权重(按实际使用空间来评估),可用空间大小,可用空间越大优先级越高。CapacityWeighted容量权重(按已经分配容量大小评估),已经分配空间占比,分配越少优先级越高。VolumeWeighted逻辑卷权重,逻辑卷数量越少优先级越高。

c, PVC持久卷声明命令空间和名称,PV持久卷名称

2,contentSource指定存储卷的数据源,代码没有实现这部分功能。但看代码是区分了逻辑卷的源数据分为Snapshot和Volume。

3,BeginCreateVolume,创建逻辑卷前在LeakProtectionController的claimsInProgress中记录逻辑卷正在处理中。创建finishCreateVolume函数,创建逻辑卷结束后添加finalizer Annotation标签。

4,调用CreateLVMVolume创建逻辑卷

5,返回CreateVolumeResponse,返回结构体Volume的AccessibleTopology表明这个PV持久卷所在主机,Kubernetes将使用此信息调度Pod工作负载到这台主机。

// CreateVolume provisions a volume
func (cs *controller) CreateVolume(
   ctx context.Context,
   req *csi.CreateVolumeRequest,
) (*csi.CreateVolumeResponse, error) {

   if err := cs.validateVolumeCreateReq(req); err != nil {
      return nil, status.Error(codes.InvalidArgument, err.Error())
   }

   params, err := NewVolumeParams(req.GetParameters())
   if err != nil {
      return nil, status.Errorf(codes.InvalidArgument,
         "failed to parse csi volume params: %v", err)
   }

   volName := strings.ToLower(req.GetName())
   size := getRoundedCapacity(req.GetCapacityRange().GetRequiredBytes())
   contentSource := req.GetVolumeContentSource()

   var vol *lvmapi.LVMVolume
   if contentSource != nil && contentSource.GetSnapshot() != nil {
      return nil, status.Error(codes.Unimplemented, "")
   } else if contentSource != nil && contentSource.GetVolume() != nil {
      return nil, status.Error(codes.Unimplemented, "")
   } else {
      // mark volume for leak protection if pvc gets deleted
      // before the creation of pv.
      var finishCreateVolume func()
      if finishCreateVolume, err = cs.leakProtection.BeginCreateVolume(volName,
         params.PVCNamespace, params.PVCName); err != nil {
         return nil, err
      }
      defer finishCreateVolume()

      vol, err = CreateLVMVolume(ctx, req, params)
   }

   if err != nil {
      return nil, err
   }
   sendEventOrIgnore(params.PVCName, volName,
      strconv.FormatInt(int64(size), 10),
      "lvm-localpv", analytics.VolumeProvision)

   topology := map[string]string{lvm.LVMTopologyKey: vol.Spec.OwnerNodeID}
   cntx := map[string]string{lvm.VolGroupKey: vol.Spec.VolGroup, lvm.OpenEBSCasTypeKey: lvm.LVMCasTypeName}

   return csipayload.NewCreateVolumeResponseBuilder().
      WithName(volName).
      WithCapacity(size).
      WithTopology(topology).
      WithContext(cntx).
      WithContentSource(contentSource).
      Build(), nil
}

创建LVM逻辑卷,主要流程如下:

1,获取主机列表,getNodeMap获取主机节点列表,

2,schd.Scheduler调度主机,算法获取调度主机优先级列表

4,lvm.ProvisionVolume创建LVMVolume

5,waitForLVMVolume等待逻辑卷创建结果

// CreateLVMVolume create new lvm volume for csi volume request
func CreateLVMVolume(ctx context.Context, req *csi.CreateVolumeRequest,
   params *VolumeParams) (*lvmapi.LVMVolume, error) {
   volName := strings.ToLower(req.GetName())
   capacity := strconv.FormatInt(getRoundedCapacity(
      req.GetCapacityRange().RequiredBytes), 10)

   vol, err := lvm.GetLVMVolume(volName)
  

   nmap, err := getNodeMap(params.Scheduler, params.VgPattern)
   if err != nil {
      return nil, status.Errorf(codes.Internal, "get node map failed : %s", err.Error())
   }

   // run the scheduler
   selected := schd.Scheduler(req, nmap)

   if len(selected) == 0 {
      return nil, status.Error(codes.Internal, "scheduler failed, not able to select a node to create the PV")
   }

   owner := selected[0]
   klog.Infof("scheduling the volume %s/%s on node %s",
      params.VgPattern.String(), volName, owner)

   volObj, err := volbuilder.NewBuilder().
      WithName(volName).
      WithCapacity(capacity).
      WithVgPattern(params.VgPattern.String()).
      WithOwnerNode(owner).
      WithVolumeStatus(lvm.LVMStatusPending).
      WithShared(params.Shared).
      WithThinProvision(params.ThinProvision).Build()

   if err != nil {
      return nil, status.Error(codes.Internal, err.Error())
   }

   vol, err = lvm.ProvisionVolume(volObj)
   if err != nil {
      return nil, status.Errorf(codes.Internal, "not able to provision the volume %s", err.Error())
   }
   vol, _, err = waitForLVMVolume(ctx, vol)
   return vol, err
}

getNodeMap返回主机Map,key为主机名,value为:Volume数量,已经分配Volume容量,MaxInt64减去已经使用空间量。

func getNodeMap(schd string, vgPattern *regexp.Regexp) (map[string]int64, error) {
   switch schd {
   case VolumeWeighted:
      return getVolumeWeightedMap(vgPattern)
   case CapacityWeighted:
      return getCapacityWeightedMap(vgPattern)
   case SpaceWeighted:
      return getSpaceWeightedMap(vgPattern)
   }
   // return getSpaceWeightedMap(default) if not specified
   return getSpaceWeightedMap(vgPattern)
}

Scheduler调度PV创建的主机

getNodeList根据请求的Preferred参数,找到匹配标签的主机列表。

runScheduler返回主机优先组列表。getNodeList中Preferred匹配的主机,从末分配过逻辑卷的主机优先级高于已经使用的主机优先级。然后再从NodeMap中同时存在的主机排出主机优先级列表。

// Scheduler schedules the PV as per topology constraints for
// the given node weight.
func Scheduler(req *csi.CreateVolumeRequest, nmap map[string]int64) []string {
   var nodelist []string
   areq := req.AccessibilityRequirements

   if areq == nil {
      klog.Errorf("scheduler: Accessibility Requirements not provided")
      return nodelist
   }

   topo := areq.Preferred
   if len(topo) == 0 {
      // if preferred list is empty, use the requisite
      topo = areq.Requisite
   }

   if len(topo) == 0 {
      klog.Errorf("scheduler: topology information not provided")
      return nodelist
   }

   nodelist, err := getNodeList(topo)
   if err != nil {
      klog.Errorf("scheduler: can not get the nodelist err : %v", err.Error())
      return nodelist
   } else if len(nodelist) == 0 {
      klog.Errorf("scheduler: nodelist is empty")
      return nodelist
   }

   // if there is a single node, schedule it on that
   if len(nodelist) == 1 {
      return nodelist
   }

   return runScheduler(nodelist, nmap)
}

// getNodeList gets the nodelist which satisfies the topology info
func getNodeList(topo []*csi.Topology) ([]string, error) {

	var nodelist []string

	list, err := k8sapi.ListNodes(metav1.ListOptions{})
	if err != nil {
		return nil, err
	}

	for _, node := range list.Items {
		for _, prf := range topo {
			nodeFiltered := false
			for key, value := range prf.Segments {
				if node.Labels[key] != value {
					nodeFiltered = true
					break
				}
			}
			if !nodeFiltered {
				nodelist = append(nodelist, node.Name)
				break
			}
		}
	}

	return nodelist, nil
}

// runScheduler goes through the node mapping in the topology
// and creates the list of preferred nodes as per their weight
func runScheduler(nodelist []string, nmap map[string]int64) []string {
	var preferred []string
	var fmap []kv

	// go though the filtered node and prepare the preferred list
	for _, node := range nodelist {
		if val, ok := nmap[node]; ok {
			// create the filtered node map
			fmap = append(fmap, kv{node, val})
		} else {
			// put the non occupied nodes in beginning of the list
			preferred = append(preferred, node)
		}
	}

	// sort the filtered node map
	sort.Slice(fmap, func(i, j int) bool {
		return fmap[i].Value < fmap[j].Value
	})

	// put the occupied nodes in the sorted order at the end
	for _, kv := range fmap {
		preferred = append(preferred, kv.Key)
	}

	return preferred
}

这里我们就列举一下CreateVolume服务,其他服务基本逻辑都差不多,构建LVMVolume, LVMSnapshot资源进行操作。

总结

Kubernetes开发的CO侧组件监听Kubernetes资源,调用Plugin侧Controller服务,Controller服务完成自定义资源的操作,再由Node-Agent进行自定义资源的调谐。可以看到Controller组件只实现了部分接口,存储厂商可以根据存储需要进行接口实现,CO通过调用ControllerGetCapabilities服务获取支持的接口服务列表。