由 SuKai August 29, 2022
前面文章介绍了CSI插件node-agent组件,node-agent主要实现了需要在主机节点上才能进行的操作功能,包含了LVMNode, LVMVolume, LVMSnapshot三个自定义资源控制器和NodeServer接口服务。那么LVMVolume,LVMSnapshot这些自定义资源又是由谁创建的呢?今天我们一起看一下OpenEBS LVM存储插件的Controller部分,了解Controller如何被编排调度系统Container Orchestration system(CO)侧Kubernetes调用,Controller又如何创建自定义资源给Node-Agent调谐的。
之前我们了解到CO侧Kubernetes开发了一系列组件如external-provisioner,external-attacher,external-snapshotter,external-resizer,这些组件通过Sidecar的方式和Plugin插件驱动侧一起运行,直接调用插件Controller的gRPC服务,实现存储卷的管理。下面我们来看一下Controller的接口服务。
Controller Interface包括以下方法
// ControllerServer is the server API for Controller service.
type ControllerServer interface {
// 创建存储卷,创建自定义资源LVMVolume实例
CreateVolume(context.Context, *CreateVolumeRequest) (*CreateVolumeResponse, error)
// 删除存储卷,删除LVMVolume实例
DeleteVolume(context.Context, *DeleteVolumeRequest) (*DeleteVolumeResponse, error)
// 挂接存储卷到指定节点,因为LVM是本地设备,末实现
ControllerPublishVolume(context.Context, *ControllerPublishVolumeRequest) (*ControllerPublishVolumeResponse, error)
// 卸载存储卷,末实现
ControllerUnpublishVolume(context.Context, *ControllerUnpublishVolumeRequest) (*ControllerUnpublishVolumeResponse, error)
// 验证是否支持存储卷访问模式,比如支持单/多机读写,Read-Write-Once
ValidateVolumeCapabilities(context.Context, *ValidateVolumeCapabilitiesRequest) (*ValidateVolumeCapabilitiesResponse, error)
// 列出所有的存储卷,末实现
ListVolumes(context.Context, *ListVolumesRequest) (*ListVolumesResponse, error)
// 返回指定节点的容量信息,可用存储空间
GetCapacity(context.Context, *GetCapacityRequest) (*GetCapacityResponse, error)
// 返回controller支持的功能,比如创建/删除存储卷,扩展存储卷,创建删除快照,获取容量
ControllerGetCapabilities(context.Context, *ControllerGetCapabilitiesRequest) (*ControllerGetCapabilitiesResponse, error)
// 创建指定存储卷快照
CreateSnapshot(context.Context, *CreateSnapshotRequest) (*CreateSnapshotResponse, error)
// 删除指定存储卷快照
DeleteSnapshot(context.Context, *DeleteSnapshotRequest) (*DeleteSnapshotResponse, error)
// 列出指定存储卷所有快照,末实现
ListSnapshots(context.Context, *ListSnapshotsRequest) (*ListSnapshotsResponse, error)
// 调用存储卷大小
ControllerExpandVolume(context.Context, *ControllerExpandVolumeRequest) (*ControllerExpandVolumeResponse, error)
}
ControllerGetCapabilities服务
返回Controller支持的功能列表。CO侧Kubernetes调用此接口检查Controller支持的服务,也就是Controller实现了哪些服务接口可供CO调用。
// ControllerGetCapabilities fetches controller capabilities
//
// This implements csi.ControllerServer
func (cs *controller) ControllerGetCapabilities(
ctx context.Context,
req *csi.ControllerGetCapabilitiesRequest,
) (*csi.ControllerGetCapabilitiesResponse, error) {
resp := &csi.ControllerGetCapabilitiesResponse{
Capabilities: cs.capabilities,
}
return resp, nil
}
// newControllerCapabilities returns a list
// of this controller's capabilities
func newControllerCapabilities() []*csi.ControllerServiceCapability {
fromType := func(
cap csi.ControllerServiceCapability_RPC_Type,
) *csi.ControllerServiceCapability {
return &csi.ControllerServiceCapability{
Type: &csi.ControllerServiceCapability_Rpc{
Rpc: &csi.ControllerServiceCapability_RPC{
Type: cap,
},
},
}
}
var capabilities []*csi.ControllerServiceCapability
for _, cap := range []csi.ControllerServiceCapability_RPC_Type{
csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
csi.ControllerServiceCapability_RPC_EXPAND_VOLUME,
csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT,
csi.ControllerServiceCapability_RPC_GET_CAPACITY,
} {
capabilities = append(capabilities, fromType(cap))
}
return capabilities
}
CreateVolume服务
创建存储卷过程
1,NewVolumeParams方法,从请求体中获得信息
a, 从storageclass参数中获取卷组信息,匹配逻辑卷组的条件vgpattern, volgroup
b, 从storageclass参数中获取调度策略,是否可共享永定,是否精简配置。
调度策略分为:SpaceWeighted空间权重(按实际使用空间来评估),可用空间大小,可用空间越大优先级越高。CapacityWeighted容量权重(按已经分配容量大小评估),已经分配空间占比,分配越少优先级越高。VolumeWeighted逻辑卷权重,逻辑卷数量越少优先级越高。
c, PVC持久卷声明命令空间和名称,PV持久卷名称
2,contentSource指定存储卷的数据源,代码没有实现这部分功能。但看代码是区分了逻辑卷的源数据分为Snapshot和Volume。
3,BeginCreateVolume,创建逻辑卷前在LeakProtectionController的claimsInProgress中记录逻辑卷正在处理中。创建finishCreateVolume函数,创建逻辑卷结束后添加finalizer Annotation标签。
4,调用CreateLVMVolume创建逻辑卷
5,返回CreateVolumeResponse,返回结构体Volume的AccessibleTopology表明这个PV持久卷所在主机,Kubernetes将使用此信息调度Pod工作负载到这台主机。
// CreateVolume provisions a volume
func (cs *controller) CreateVolume(
ctx context.Context,
req *csi.CreateVolumeRequest,
) (*csi.CreateVolumeResponse, error) {
if err := cs.validateVolumeCreateReq(req); err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
params, err := NewVolumeParams(req.GetParameters())
if err != nil {
return nil, status.Errorf(codes.InvalidArgument,
"failed to parse csi volume params: %v", err)
}
volName := strings.ToLower(req.GetName())
size := getRoundedCapacity(req.GetCapacityRange().GetRequiredBytes())
contentSource := req.GetVolumeContentSource()
var vol *lvmapi.LVMVolume
if contentSource != nil && contentSource.GetSnapshot() != nil {
return nil, status.Error(codes.Unimplemented, "")
} else if contentSource != nil && contentSource.GetVolume() != nil {
return nil, status.Error(codes.Unimplemented, "")
} else {
// mark volume for leak protection if pvc gets deleted
// before the creation of pv.
var finishCreateVolume func()
if finishCreateVolume, err = cs.leakProtection.BeginCreateVolume(volName,
params.PVCNamespace, params.PVCName); err != nil {
return nil, err
}
defer finishCreateVolume()
vol, err = CreateLVMVolume(ctx, req, params)
}
if err != nil {
return nil, err
}
sendEventOrIgnore(params.PVCName, volName,
strconv.FormatInt(int64(size), 10),
"lvm-localpv", analytics.VolumeProvision)
topology := map[string]string{lvm.LVMTopologyKey: vol.Spec.OwnerNodeID}
cntx := map[string]string{lvm.VolGroupKey: vol.Spec.VolGroup, lvm.OpenEBSCasTypeKey: lvm.LVMCasTypeName}
return csipayload.NewCreateVolumeResponseBuilder().
WithName(volName).
WithCapacity(size).
WithTopology(topology).
WithContext(cntx).
WithContentSource(contentSource).
Build(), nil
}
创建LVM逻辑卷,主要流程如下:
1,获取主机列表,getNodeMap获取主机节点列表,
2,schd.Scheduler调度主机,算法获取调度主机优先级列表
4,lvm.ProvisionVolume创建LVMVolume
5,waitForLVMVolume等待逻辑卷创建结果
// CreateLVMVolume create new lvm volume for csi volume request
func CreateLVMVolume(ctx context.Context, req *csi.CreateVolumeRequest,
params *VolumeParams) (*lvmapi.LVMVolume, error) {
volName := strings.ToLower(req.GetName())
capacity := strconv.FormatInt(getRoundedCapacity(
req.GetCapacityRange().RequiredBytes), 10)
vol, err := lvm.GetLVMVolume(volName)
nmap, err := getNodeMap(params.Scheduler, params.VgPattern)
if err != nil {
return nil, status.Errorf(codes.Internal, "get node map failed : %s", err.Error())
}
// run the scheduler
selected := schd.Scheduler(req, nmap)
if len(selected) == 0 {
return nil, status.Error(codes.Internal, "scheduler failed, not able to select a node to create the PV")
}
owner := selected[0]
klog.Infof("scheduling the volume %s/%s on node %s",
params.VgPattern.String(), volName, owner)
volObj, err := volbuilder.NewBuilder().
WithName(volName).
WithCapacity(capacity).
WithVgPattern(params.VgPattern.String()).
WithOwnerNode(owner).
WithVolumeStatus(lvm.LVMStatusPending).
WithShared(params.Shared).
WithThinProvision(params.ThinProvision).Build()
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
vol, err = lvm.ProvisionVolume(volObj)
if err != nil {
return nil, status.Errorf(codes.Internal, "not able to provision the volume %s", err.Error())
}
vol, _, err = waitForLVMVolume(ctx, vol)
return vol, err
}
getNodeMap返回主机Map,key为主机名,value为:Volume数量,已经分配Volume容量,MaxInt64减去已经使用空间量。
func getNodeMap(schd string, vgPattern *regexp.Regexp) (map[string]int64, error) {
switch schd {
case VolumeWeighted:
return getVolumeWeightedMap(vgPattern)
case CapacityWeighted:
return getCapacityWeightedMap(vgPattern)
case SpaceWeighted:
return getSpaceWeightedMap(vgPattern)
}
// return getSpaceWeightedMap(default) if not specified
return getSpaceWeightedMap(vgPattern)
}
Scheduler调度PV创建的主机
getNodeList根据请求的Preferred参数,找到匹配标签的主机列表。
runScheduler返回主机优先组列表。getNodeList中Preferred匹配的主机,从末分配过逻辑卷的主机优先级高于已经使用的主机优先级。然后再从NodeMap中同时存在的主机排出主机优先级列表。
// Scheduler schedules the PV as per topology constraints for
// the given node weight.
func Scheduler(req *csi.CreateVolumeRequest, nmap map[string]int64) []string {
var nodelist []string
areq := req.AccessibilityRequirements
if areq == nil {
klog.Errorf("scheduler: Accessibility Requirements not provided")
return nodelist
}
topo := areq.Preferred
if len(topo) == 0 {
// if preferred list is empty, use the requisite
topo = areq.Requisite
}
if len(topo) == 0 {
klog.Errorf("scheduler: topology information not provided")
return nodelist
}
nodelist, err := getNodeList(topo)
if err != nil {
klog.Errorf("scheduler: can not get the nodelist err : %v", err.Error())
return nodelist
} else if len(nodelist) == 0 {
klog.Errorf("scheduler: nodelist is empty")
return nodelist
}
// if there is a single node, schedule it on that
if len(nodelist) == 1 {
return nodelist
}
return runScheduler(nodelist, nmap)
}
// getNodeList gets the nodelist which satisfies the topology info
func getNodeList(topo []*csi.Topology) ([]string, error) {
var nodelist []string
list, err := k8sapi.ListNodes(metav1.ListOptions{})
if err != nil {
return nil, err
}
for _, node := range list.Items {
for _, prf := range topo {
nodeFiltered := false
for key, value := range prf.Segments {
if node.Labels[key] != value {
nodeFiltered = true
break
}
}
if !nodeFiltered {
nodelist = append(nodelist, node.Name)
break
}
}
}
return nodelist, nil
}
// runScheduler goes through the node mapping in the topology
// and creates the list of preferred nodes as per their weight
func runScheduler(nodelist []string, nmap map[string]int64) []string {
var preferred []string
var fmap []kv
// go though the filtered node and prepare the preferred list
for _, node := range nodelist {
if val, ok := nmap[node]; ok {
// create the filtered node map
fmap = append(fmap, kv{node, val})
} else {
// put the non occupied nodes in beginning of the list
preferred = append(preferred, node)
}
}
// sort the filtered node map
sort.Slice(fmap, func(i, j int) bool {
return fmap[i].Value < fmap[j].Value
})
// put the occupied nodes in the sorted order at the end
for _, kv := range fmap {
preferred = append(preferred, kv.Key)
}
return preferred
}
这里我们就列举一下CreateVolume服务,其他服务基本逻辑都差不多,构建LVMVolume, LVMSnapshot资源进行操作。
总结
Kubernetes开发的CO侧组件监听Kubernetes资源,调用Plugin侧Controller服务,Controller服务完成自定义资源的操作,再由Node-Agent进行自定义资源的调谐。可以看到Controller组件只实现了部分接口,存储厂商可以根据存储需要进行接口实现,CO通过调用ControllerGetCapabilities服务获取支持的接口服务列表。