由 SuKai August 26, 2022
Kubernetes CSI插件包含两部分:controller, node-agent。今天我们一起来看一下OpenEBS的LVM存储插件,了解OpenEBS如何实现Kubernetes使用LVM逻辑卷存储数据的。LVM逻辑卷管理(Logical Volume Manager),建立在硬盘和分区之上的一个逻辑层,提高磁盘分区管理的灵活性。Kubernetes可以利用LVM本地磁盘提高存储性能,LVM具备快照功能,为KubeVirt虚拟机提供了基于Copy-On-Write的精简配置存储,是一个非常不错的轻量级超融合解决方案。
本文将包含以下几个部分:LVM基本概念,node-agent包含组件,node-agent代码实现。
LVM基本概念
PV物理卷(physical volume),LVM的基本存储逻辑块,一个PV对应一个硬盘或者硬盘分区。VG卷组(Volume Group)由物理卷组成。LV逻辑卷(Logical Volume),建立在卷组之上,一个卷组可以创建多个逻辑卷,一个逻辑卷对应一个卷组。
LVM卷快照(snapshot)用于从文件系统的时间点视图创建备份。从快照备份文件系统,卷本身供用户使用,快照最初包含自身相关的一些元数据,不包含源逻辑卷的实际数据。快照使用写时复制技术Copy-On-Write(COW)在原始数据块的数据发生更改时,复制源块中数据进行更改。
LVM精简卷Thin Provisioned Logical Volumes,精简配置逻辑卷对存储资源进行按需动态分配,即对存储进行了虚拟化管理。在卷组上建立精简池Thin Pool,在精简池上创建精简卷Thin Volume。
node-agent组件
node-agent包含:三个自定义资源控制器,一个gRPC服务。
gRPC服务
Kubelet调用node-agent的gRPC服务接口,将存储卷挂载到本机目录,kubelet创建容器时将目录文件挂载到容器。
NodeServer接口如下:
type NodeServer interface {
// 挂载存储卷到临时目录
NodeStageVolume(context.Context, *NodeStageVolumeRequest) (*NodeStageVolumeResponse, error)
// 从临时目录卸载存储卷
NodeUnstageVolume(context.Context, *NodeUnstageVolumeRequest) (*NodeUnstageVolumeResponse, error)
// 发布存储卷,LVM逻辑卷挂载到对应主机指定目录。
NodePublishVolume(context.Context, *NodePublishVolumeRequest) (*NodePublishVolumeResponse, error)
// 取消发布存储卷,卸载对应主机上的LVM逻辑卷。
NodeUnpublishVolume(context.Context, *NodeUnpublishVolumeRequest) (*NodeUnpublishVolumeResponse, error)
// 返回存储卷统计信息,容量,inodes。
NodeGetVolumeStats(context.Context, *NodeGetVolumeStatsRequest) (*NodeGetVolumeStatsResponse, error)
// 扩展存储卷容量。
NodeExpandVolume(context.Context, *NodeExpandVolumeRequest) (*NodeExpandVolumeResponse, error)
// 返回主机节点支持的能力,比如支持存储卷扩展,存储卷数据统计。
NodeGetCapabilities(context.Context, *NodeGetCapabilitiesRequest) (*NodeGetCapabilitiesResponse, error)
// 返回主机节点信息,主要是Label标签里的拓扑信息。
NodeGetInfo(context.Context, *NodeGetInfoRequest) (*NodeGetInfoResponse, error)
}
三个控制器
监听LVMNode,LVMSnapshot,LVMVolume自定义资源进行调谐。当插件的controller组件创建LVMSnapshot,LVMVolume这些自定义资源时,控制器调谐逻辑完成对应主机节点的操作。
LVMNode,一个CR资源对应一个Kubernetes主机节点,记录主机上VG卷组信息。
LVMSnapshot,负责Snapshot快照的生命周期管理,创建、更新、删除快照。
LVMVolume,负责LV逻辑卷生命周期管理。
node-agent代码实现
定义node结构体,node结构体实现了CSI规范的NodeServer接口,NewNode构造函数中启动三个自定义资源控制器。
// node is the server implementation
// for CSI NodeServer
type node struct {
driver *CSIDriver
}
// NewNode returns a new instance
// of CSI NodeServer
func NewNode(d *CSIDriver) csi.NodeServer {
var ControllerMutex = sync.RWMutex{}
// set up signals so we handle the first shutdown signal gracefully
stopCh := signals.SetupSignalHandler()
// start the lvm node resource watcher
go func() {
err := lvmnode.Start(&ControllerMutex, stopCh)
if err != nil {
klog.Fatalf("Failed to start LVM node controller: %s", err.Error())
}
}()
// start the lvmvolume watcher
go func() {
err := volume.Start(&ControllerMutex, stopCh)
if err != nil {
klog.Fatalf("Failed to start LVM volume management controller: %s", err.Error())
}
}()
// start the lvm snapshot watcher
go func() {
err := snapshot.Start(&ControllerMutex, stopCh)
if err != nil {
klog.Fatalf("Failed to start LVM volume snapshot management controller: %s", err.Error())
}
}()
if d.config.ListenAddress != "" {
exposeMetrics(d.config.ListenAddress, d.config.MetricsPath, d.config.DisableExporterMetrics)
}
return &node{
driver: d,
}
}
NodePublishVolume服务
GetVolAndMountInfo返回LVMVolume资源,MountInfo挂载信息。从Kubelet请求体中获取文件系统类型FSType,挂载路径,挂载参数选项,是否只读,存储卷名称等信息。根据存储卷名称查询Kubernetes自定义资源LVMVolume实例。
getPodLVInfo返回Pod唯一标识UID,LVGroup逻辑卷组名称信息。
挂载存储卷,分为两种类型block volume,mounted volume。block volume为挂载块设备,存储卷在Pod容器里是一个块设备。mounted volume为挂载文件系统,在容器里看到的是一个目录。
// GetVolAndMountInfo get volume and mount info from node csi volume request
func GetVolAndMountInfo(
req *csi.NodePublishVolumeRequest,
) (*apis.LVMVolume, *lvm.MountInfo, error) {
var mountinfo lvm.MountInfo
mountinfo.FSType = req.GetVolumeCapability().GetMount().GetFsType()
mountinfo.MountPath = req.GetTargetPath()
mountinfo.MountOptions = append(mountinfo.MountOptions, req.GetVolumeCapability().GetMount().GetMountFlags()...)
if req.GetReadonly() {
mountinfo.MountOptions = append(mountinfo.MountOptions, "ro")
}
volName := strings.ToLower(req.GetVolumeId())
getOptions := metav1.GetOptions{}
vol, err := volbuilder.NewKubeclient().
WithNamespace(lvm.LvmNamespace).
Get(volName, getOptions)
if err != nil {
return nil, nil, err
}
return vol, &mountinfo, nil
}
func getPodLVInfo(req *csi.NodePublishVolumeRequest) (*lvm.PodLVInfo, error) {
var podLVInfo lvm.PodLVInfo
var ok bool
if podLVInfo.UID, ok = req.VolumeContext["csi.storage.k8s.io/pod.uid"]; !ok {
return nil, errors.New("csi.storage.k8s.io/pod.uid key missing in VolumeContext")
}
if podLVInfo.LVGroup, ok = req.VolumeContext["openebs.io/volgroup"]; !ok {
return nil, errors.New("openebs.io/volgroup key missing in VolumeContext")
}
return &podLVInfo, nil
}
// NodePublishVolume publishes (mounts) the volume
// at the corresponding node at a given path
//
// This implements csi.NodeServer
func (ns *node) NodePublishVolume(
ctx context.Context,
req *csi.NodePublishVolumeRequest,
) (*csi.NodePublishVolumeResponse, error) {
var (
err error
)
if err = ns.validateNodePublishReq(req); err != nil {
return nil, err
}
vol, mountInfo, err := GetVolAndMountInfo(req)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
podLVinfo, err := getPodLVInfo(req)
if err != nil {
klog.Warningf("PodLVInfo could not be obtained for volume_id: %s, err = %v", req.VolumeId, err)
}
switch req.GetVolumeCapability().GetAccessType().(type) {
case *csi.VolumeCapability_Block:
// attempt block mount operation on the requested path
err = lvm.MountBlock(vol, mountInfo, podLVinfo)
case *csi.VolumeCapability_Mount:
// attempt filesystem mount operation on the requested path
err = lvm.MountFilesystem(vol, mountInfo, podLVinfo)
}
if err != nil {
return nil, err
}
return &csi.NodePublishVolumeResponse{}, nil
}
挂载块设备
MountBlock实现将逻辑卷设备文件通过bind挂载方式挂载到Kubelet指定的target位置。
// MountBlock mounts the block disk to the specified path
func MountBlock(vol *apis.LVMVolume, mountinfo *MountInfo, podLVInfo *PodLVInfo) error {
target := mountinfo.MountPath
volume := vol.Spec.VolGroup + "/" + vol.Name
devicePath := DevPath + volume
mountopt := []string{"bind"}
mounter := &mount.SafeFormatAndMount{Interface: mount.New(""), Exec: utilexec.New()}
// Create the mount point as a file since bind mount device node requires it to be a file
err := makeFile(target)
if err != nil {
return status.Errorf(codes.Internal, "Could not create target file %q: %v", target, err)
}
// do the bind mount of the device at the target path
if err := mounter.Mount(devicePath, target, "", mountopt); err != nil {
if removeErr := os.Remove(target); removeErr != nil {
return status.Errorf(codes.Internal, "Could not remove mount target %q: %v", target, removeErr)
}
return status.Errorf(codes.Internal, "mount failed at %v err : %v", target, err)
}
klog.Infof("NodePublishVolume mounted block device %s at %s", devicePath, target)
if ioLimitsEnabled && podLVInfo != nil {
if err := setIOLimits(vol, podLVInfo, devicePath); err != nil {
klog.Warningf(": error setting io limits for podUid %s, device %s, err=%v", podLVInfo.UID, devicePath, err)
} else {
klog.Infof("lvm: io limits set for podUid %s, device %s", podLVInfo.UID, devicePath)
}
}
return nil
}
命令行挂载块设备
sukai@ubuntu-01:~$ sudo mkdir /data1
sukai@ubuntu-01:~$ sudo touch /data1/thinvol
sukai@ubuntu-01:~$ sudo mount /dev/data1/thinvol /data1/thinvol --bind
sukai@ubuntu-01:~$ sudo ls -al /data1/thinvol
brw-rw---- 1 root disk 253, 4 Aug 26 15:15 /data1/thinvol
sukai@ubuntu-01:~$ sudo mkfs /data1/thinvol
mke2fs 1.45.5 (07-Jan-2020)
Discarding device blocks: done
Creating filesystem with 262144 4k blocks and 65536 inodes
Filesystem UUID: 8b3b439f-96f1-4104-9d95-33b91a1f654d
Superblock backups stored on blocks:
32768, 98304, 163840, 229376
Allocating group tables: done
Writing inode tables: done
Writing superblocks and filesystem accounting information: done
sukai@ubuntu-01:~$
挂载文件系统
将逻辑卷格式化后挂载到指定目录。
// MountVolume mounts the disk to the specified path
func MountVolume(vol *apis.LVMVolume, mount *MountInfo, podLVInfo *PodLVInfo) error {
volume := vol.Spec.VolGroup + "/" + vol.Name
mounted, err := verifyMountRequest(vol, mount.MountPath)
if err != nil {
return err
}
if mounted {
klog.Infof("lvm : already mounted %s => %s", volume, mount.MountPath)
return nil
}
devicePath := DevPath + volume
err = FormatAndMountVol(devicePath, mount)
if err != nil {
return status.Errorf(
codes.Internal,
"failed to format and mount the volume error: %s",
err.Error(),
)
}
klog.Infof("lvm: volume %v mounted %v fs %v", volume, mount.MountPath, mount.FSType)
if ioLimitsEnabled && podLVInfo != nil {
if err := setIOLimits(vol, podLVInfo, devicePath); err != nil {
klog.Warningf("lvm: error setting io limits: podUid %s, device %s, err=%v", podLVInfo.UID, devicePath, err)
} else {
klog.Infof("lvm: io limits set for podUid %v, device %s", podLVInfo.UID, devicePath)
}
}
return nil
}
// MountFilesystem mounts the disk to the specified path
func MountFilesystem(vol *apis.LVMVolume, mount *MountInfo, podinfo *PodLVInfo) error {
if err := os.MkdirAll(mount.MountPath, 0755); err != nil {
return status.Errorf(codes.Internal, "Could not create dir {%q}, err: %v", mount.MountPath, err)
}
return MountVolume(vol, mount, podinfo)
}
命令行挂载文件系统
sukai@ubuntu-01:~$ sudo mkfs /dev/data1/thinvol2
mke2fs 1.45.5 (07-Jan-2020)
Discarding device blocks: done
Creating filesystem with 262144 4k blocks and 65536 inodes
Filesystem UUID: 3957607d-cdfd-46b3-bc56-1c7ab7864570
Superblock backups stored on blocks:
32768, 98304, 163840, 229376
Allocating group tables: done
Writing inode tables: done
Writing superblocks and filesystem accounting information: done
sukai@ubuntu-01:~$ sudo mkdir /data1/thinvol2
sukai@ubuntu-01:~$ sudo mount /dev/data1/thinvol2 /data1/thinvol2/
sukai@ubuntu-01:~$ sudo ls -al /data1/thinvol2/
total 24
drwxr-xr-x 3 root root 4096 Aug 26 15:22 .
drwxr-xr-x 3 root root 4096 Aug 26 15:23 ..
drwx------ 2 root root 16384 Aug 26 15:22 lost+found
sukai@ubuntu-01:~$
配置IO限制
如果开启了IO限制,挂载完成后通过setIOLimits函数通过Cgroup配置逻辑卷的iops, bps读写操作限制。
func setIOLimits(vol *apis.LVMVolume, podLVInfo *PodLVInfo, devicePath string) error {
if podLVInfo == nil {
return errors.New("PodLVInfo is missing. Skipping setting IOLimits")
}
capacityBytes, err := strconv.ParseUint(vol.Spec.Capacity, 10, 64)
if err != nil {
klog.Warning("error parsing LVMVolume.Spec.Capacity. Skipping setting IOLimits", err)
return err
}
capacityGB := uint64(math.Ceil(float64(capacityBytes) / (1024 * 1024 * 1024)))
klog.Infof("Capacity of device in GB: %v", capacityGB)
riops := getRIopsPerGB(podLVInfo.LVGroup) * capacityGB
wiops := getWIopsPerGB(podLVInfo.LVGroup) * capacityGB
rbps := getRBpsPerGB(podLVInfo.LVGroup) * capacityGB
wbps := getWBpsPerGB(podLVInfo.LVGroup) * capacityGB
klog.Infof("Setting iolimits for podUId %s, device %s: riops=%v, wiops=%v, rbps=%v, wbps=%v",
podLVInfo.UID, devicePath, riops, wiops, rbps, wbps,
)
err = iolimit.SetIOLimits(&iolimit.Request{
DeviceName: devicePath,
PodUid: podLVInfo.UID,
ContainerRuntime: getContainerRuntime(),
IOLimit: &iolimit.IOMax{
Riops: riops,
Wiops: wiops,
Rbps: rbps,
Wbps: wbps,
},
})
if err != nil {
return err
}
return nil
}
NodeUnpublishVolume服务
调用UmountVolume函数卸载并删除目录
// NodeUnpublishVolume unpublishes (unmounts) the volume
// from the corresponding node from the given path
//
// This implements csi.NodeServer
func (ns *node) NodeUnpublishVolume(
ctx context.Context,
req *csi.NodeUnpublishVolumeRequest,
) (*csi.NodeUnpublishVolumeResponse, error) {
var (
err error
vol *apis.LVMVolume
)
if err = ns.validateNodeUnpublishReq(req); err != nil {
return nil, err
}
targetPath := req.GetTargetPath()
volumeID := req.GetVolumeId()
if vol, err = lvm.GetLVMVolume(volumeID); err != nil {
return nil, status.Errorf(codes.Internal,
"not able to get the LVMVolume %s err : %s",
volumeID, err.Error())
}
err = lvm.UmountVolume(vol, targetPath)
if err != nil {
return nil, status.Errorf(codes.Internal,
"unable to umount the volume %s err : %s",
volumeID, err.Error())
}
klog.Infof("hostpath: volume %s path: %s has been unmounted.",
volumeID, targetPath)
return &csi.NodeUnpublishVolumeResponse{}, nil
}
// UmountVolume unmounts the volume and the corresponding mount path is removed
func UmountVolume(vol *apis.LVMVolume, targetPath string,
) error {
mounter := &mount.SafeFormatAndMount{Interface: mount.New(""), Exec: utilexec.New()}
dev, ref, err := mount.GetDeviceNameFromMount(mounter, targetPath)
if err != nil {
klog.Errorf(
"lvm: umount volume: failed to get device from mnt: %s\nError: %v",
targetPath, err,
)
return err
}
// device has already been un-mounted, return successful
if len(dev) == 0 || ref == 0 {
klog.Warningf(
"Warning: Unmount skipped because volume %s not mounted: %v",
vol.Name, targetPath,
)
return nil
}
if pathExists, pathErr := mount.PathExists(targetPath); pathErr != nil {
return fmt.Errorf("error checking if path exists: %v", pathErr)
} else if !pathExists {
klog.Warningf(
"Warning: Unmount skipped because path does not exist: %v",
targetPath,
)
return nil
}
if err = mounter.Unmount(targetPath); err != nil {
klog.Errorf(
"lvm: failed to unmount %s: path %s err: %v",
vol.Name, targetPath, err,
)
return err
}
if err := os.Remove(targetPath); err != nil {
klog.Errorf("lvm: failed to remove mount path vol %s err : %v", vol.Name, err)
}
klog.Infof("umount done %s path %v", vol.Name, targetPath)
return nil
}
NodeGetInfo服务
返回Node主机节点的Topology信息,返回主机的Labels标签K/V键值对,包括:“openebs.io/nodename"和ALLOWED_TOPOLOGIES环境变量中配置的键值。
// NodeGetInfo returns node details
//
// This implements csi.NodeServer
func (ns *node) NodeGetInfo(
ctx context.Context,
req *csi.NodeGetInfoRequest,
) (*csi.NodeGetInfoResponse, error) {
node, err := k8sapi.GetNode(ns.driver.config.NodeID)
if err != nil {
klog.Errorf("failed to get the node %s", ns.driver.config.NodeID)
return nil, err
}
/*
* The driver will support all the keys and values defined in the node's label.
* if nodes are labeled with the below keys and values
* map[beta.kubernetes.io/arch:amd64 beta.kubernetes.io/os:linux kubernetes.io/arch:amd64 kubernetes.io/hostname:pawan-node-1 kubernetes.io/os:linux node-role.kubernetes.io/worker:true openebs.io/zone:zone1 openebs.io/zpool:ssd]
* The driver will support below key and values
* {
* beta.kubernetes.io/arch:amd64
* beta.kubernetes.io/os:linux
* kubernetes.io/arch:amd64
* kubernetes.io/hostname:pawan-node-1
* kubernetes.io/os:linux
* node-role.kubernetes.io/worker:true
* openebs.io/zone:zone1
* openebs.io/zpool:ssd
* }
*/
// add driver's topology key
topology := map[string]string{
lvm.LVMTopologyKey: ns.driver.config.NodeID,
}
// support topologykeys from env ALLOWED_TOPOLOGIES
allowedTopologies := os.Getenv("ALLOWED_TOPOLOGIES")
allowedKeys := strings.Split(allowedTopologies, ",")
for _, key := range allowedKeys {
if key != "" {
v, ok := node.Labels[key]
if ok {
topology[key] = v
}
}
}
return &csi.NodeGetInfoResponse{
NodeId: ns.driver.config.NodeID,
AccessibleTopology: &csi.Topology{
Segments: topology,
},
}, nil
}
NodeGetCapabilities服务
返回支持的能力,支持存储卷统计,扩展存储卷容量。
// NodeGetCapabilities returns capabilities supported
// by this node service
//
// This implements csi.NodeServer
func (ns *node) NodeGetCapabilities(
ctx context.Context,
req *csi.NodeGetCapabilitiesRequest,
) (*csi.NodeGetCapabilitiesResponse, error) {
return &csi.NodeGetCapabilitiesResponse{
Capabilities: []*csi.NodeServiceCapability{
{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
Type: csi.NodeServiceCapability_RPC_GET_VOLUME_STATS,
},
},
},
{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
Type: csi.NodeServiceCapability_RPC_EXPAND_VOLUME,
},
},
},
},
}, nil
}
NodeExpandVolume服务
当逻辑卷容量小于LVMVolume定义的容量时,执行lvextend扩展逻辑卷容量并加上参数-r调整文件系统大小。如果存储卷LVMVolume是block类型,则不调整文件系统大小。如果文件系统类型为btrfs文件系统,使用btrfs命令调整文件系统大小,不使用lvextend工具调整文件系统大小。
// TODO
// Verify if this needs to be implemented
//
// NodeExpandVolume resizes the filesystem if required
//
// If ControllerExpandVolumeResponse returns true in
// node_expansion_required then FileSystemResizePending
// condition will be added to PVC and NodeExpandVolume
// operation will be queued on kubelet
//
// This implements csi.NodeServer
func (ns *node) NodeExpandVolume(
ctx context.Context,
req *csi.NodeExpandVolumeRequest,
) (*csi.NodeExpandVolumeResponse, error) {
volumeID := req.GetVolumeId()
if req.GetVolumePath() == "" || volumeID == "" {
return nil, status.Errorf(
codes.InvalidArgument,
"path not provided for NodeExpandVolume Request %s",
volumeID,
)
}
vol, err := lvm.GetLVMVolume(volumeID)
if err != nil {
return nil, status.Errorf(
codes.NotFound,
"failed to handle NodeExpandVolume Request for %s, {%s}",
req.VolumeId,
err.Error(),
)
}
isBlockMode := req.GetVolumeCapability().GetBlock() != nil
fsType := req.GetVolumeCapability().GetMount().GetFsType()
resizeFS := true
if isBlockMode || fsType == "btrfs" {
// In case of volume block mode (or) btrfs filesystem mode
// lvm doesn't expand the fs natively
resizeFS = false
}
err = lvm.ResizeLVMVolume(vol, resizeFS)
if err != nil {
return nil, status.Errorf(
codes.Internal,
"failed to handle NodeExpandVolume Request for %s, {%s}",
req.VolumeId,
err.Error(),
)
}
// Expand btrfs filesystem
if fsType == "btrfs" {
err = btrfs.ResizeBTRFS(req.GetVolumePath())
if err != nil {
return nil, status.Errorf(
codes.Internal,
"failed to handle NodeExpandVolume Request for %s, {%s}",
req.VolumeId,
err.Error(),
)
}
}
return &csi.NodeExpandVolumeResponse{
CapacityBytes: req.GetCapacityRange().GetRequiredBytes(),
}, nil
}
命令行扩展LV逻辑卷
sukai@ubuntu-01:~$ sudo df -Th /data1/thinvol2
Filesystem Type Size Used Avail Use% Mounted on
/dev/mapper/data1-thinvol2 ext2 1007M 24K 956M 1% /data1/thinvol2
sukai@ubuntu-01:~$
sukai@ubuntu-01:~$ sudo lvextend /dev/data1/thinvol2 -L 5G -r
WARNING: Sum of all thin volume sizes (7.00 GiB) exceeds the size of thin pools (2.00 GiB).
WARNING: You have not turned on protection against thin pools running out of space.
WARNING: Set activation/thin_pool_autoextend_threshold below 100 to trigger automatic extension of thin pools before they get full.
Size of logical volume data1/thinvol2 changed from 1.00 GiB (256 extents) to 5.00 GiB (1280 extents).
Logical volume data1/thinvol2 successfully resized.
resize2fs 1.45.5 (07-Jan-2020)
Filesystem at /dev/mapper/data1-thinvol2 is mounted on /data1/thinvol2; on-line resizing required
old_desc_blocks = 1, new_desc_blocks = 1
The filesystem on /dev/mapper/data1-thinvol2 is now 1310720 (4k) blocks long.
sukai@ubuntu-01:~$ sudo df -Th /data1/thinvol2
Filesystem Type Size Used Avail Use% Mounted on
/dev/mapper/data1-thinvol2 ext2 5.0G 24K 4.8G 1% /data1/thinvol2
sukai@ubuntu-01:~$
NodeGetVolumeStats服务
返回LV逻辑卷的容量和inode使用统计。
// NodeGetVolumeStats returns statistics for the
// given volume
func (ns *node) NodeGetVolumeStats(
ctx context.Context,
req *csi.NodeGetVolumeStatsRequest,
) (*csi.NodeGetVolumeStatsResponse, error) {
volID := req.GetVolumeId()
path := req.GetVolumePath()
if len(volID) == 0 {
return nil, status.Error(codes.InvalidArgument, "volume id is not provided")
}
if len(path) == 0 {
return nil, status.Error(codes.InvalidArgument, "path is not provided")
}
if !mount.IsMountPath(path) {
return nil, status.Error(codes.NotFound, "path is not a mount path")
}
var sfs unix.Statfs_t
if err := unix.Statfs(path, &sfs); err != nil {
return nil, status.Errorf(codes.Internal, "statfs on %s failed: %v", path, err)
}
var usage []*csi.VolumeUsage
usage = append(usage, &csi.VolumeUsage{
Unit: csi.VolumeUsage_BYTES,
Total: int64(sfs.Blocks) * int64(sfs.Bsize),
Used: int64(sfs.Blocks-sfs.Bfree) * int64(sfs.Bsize),
Available: int64(sfs.Bavail) * int64(sfs.Bsize),
})
usage = append(usage, &csi.VolumeUsage{
Unit: csi.VolumeUsage_INODES,
Total: int64(sfs.Files),
Used: int64(sfs.Files - sfs.Ffree),
Available: int64(sfs.Ffree),
})
return &csi.NodeGetVolumeStatsResponse{Usage: usage}, nil
}
LVMNode,LVMSnapshot,LVMVolume控制器代码
LVMNode
开启定时器,每隔60秒,向工作队列中添加一条记录,记录为运行本地主机。
// Run will set up the event handlers for types we are interested in, as well
// as syncing informer caches and starting workers. It will block until stopCh
// is closed, at which point it will shutdown the workqueue and wait for
// workers to finish processing their current work items.
func (c *NodeController) Run(threadiness int, stopCh <-chan struct{}) error {
...
timer := time.NewTimer(0)
defer timer.Stop()
for {
select {
case <-timer.C:
case <-stopCh:
klog.Info("Shutting down Node controller")
return nil
}
item := lvm.LvmNamespace + "/" + lvm.NodeID
c.workqueue.Add(item) // add the item to worker queue.
timer.Reset(c.pollInterval)
}
}
syncNode调谐逻辑主要是更新主机节点上的VG卷组列表。
// syncNode is the function which tries to converge to a desired state for the
// LVMNode
func (c *NodeController) syncNode(namespace string, name string) error {
vgs, err := c.listLVMVolumeGroup()
if err != nil {
return err
}
if node == nil { // if it doesn't exists, create lvm node object
if node, err = nodebuilder.NewBuilder().
WithNamespace(namespace).WithName(name).
WithVolumeGroups(vgs).
WithOwnerReferences(c.ownerRef).
Build(); err != nil {
return err
}
klog.Infof("lvm node controller: creating new node object for %+v", node)
if _, err = nodebuilder.NewKubeclient().WithNamespace(namespace).Create(node); err != nil {
return errors.Errorf("create lvm node %s/%s: %v", namespace, name, err)
}
klog.Infof("lvm node controller: created node object %s/%s", namespace, name)
return nil
}
return nil
}
LVMVolume
监听LVMVolume资源,vol.Spec.VolGroup如果卷组不为空,则尝试创建,成功创建更新Volume状态。否则查找可用的VG卷组,逐个尝试创建逻辑卷。
// synVol is the function which tries to converge to a desired state for the
// LVMVolume
func (c *VolController) syncVol(vol *apis.LVMVolume) error {
...
// if there is already a volGroup field set for lvmvolume resource,
// we'll first try to create a volume in that volume group.
if vol.Spec.VolGroup != "" {
err = lvm.CreateVolume(vol)
if err == nil {
return lvm.UpdateVolInfo(vol, lvm.LVMStatusReady)
}
}
vgs, err := c.getVgPriorityList(vol)
if err != nil {
return err
}
if len(vgs) == 0 {
err = fmt.Errorf("no vg available to serve volume request having regex=%q & capacity=%q",
vol.Spec.VgPattern, vol.Spec.Capacity)
klog.Errorf("lvm volume %v - %v", vol.Name, err)
} else {
for _, vg := range vgs {
// first update volGroup field in lvm volume resource for ensuring
// idempotency and avoiding volume leaks during crash.
if vol, err = lvm.UpdateVolGroup(vol, vg.Name); err != nil {
klog.Errorf("failed to update volGroup to %v: %v", vg.Name, err)
return err
}
if err = lvm.CreateVolume(vol); err == nil {
return lvm.UpdateVolInfo(vol, lvm.LVMStatusReady)
}
}
}
...
}
getVgPriorityList获取主机上卷优先级列表,卷组名称正则匹配vol.Spec.VgPattern,满足空间要求,按可用空间从小到大排序。
// getVgPriorityList returns ordered list of volume groups from higher to lower
// priority to use for provisioning a lvm volume. As of now, we are prioritizing
// the vg having least amount free space available to fit the volume.
func (c *VolController) getVgPriorityList(vol *apis.LVMVolume) ([]apis.VolumeGroup, error) {
re, err := regexp.Compile(vol.Spec.VgPattern)
if err != nil {
return nil, fmt.Errorf("invalid regular expression %v for lvm volume %s: %v",
vol.Spec.VgPattern, vol.Name, err)
}
capacity, err := strconv.Atoi(vol.Spec.Capacity)
if err != nil {
return nil, fmt.Errorf("invalid requested capacity %v for lvm volume %s: %v",
vol.Spec.Capacity, vol.Name, err)
}
vgs, err := lvm.ListLVMVolumeGroup(true)
if err != nil {
return nil, fmt.Errorf("failed to list vgs available on node: %v", err)
}
filteredVgs := make([]apis.VolumeGroup, 0)
for _, vg := range vgs {
if !re.MatchString(vg.Name) {
continue
}
// skip the vgs capacity comparison in case of thin provision enable volume
if vol.Spec.ThinProvision != "yes" {
// filter vgs having insufficient capacity.
if vg.Free.Value() < int64(capacity) {
continue
}
}
filteredVgs = append(filteredVgs, vg)
}
// prioritize the volume group having less free space available.
sort.SliceStable(filteredVgs, func(i, j int) bool {
return filteredVgs[i].Free.Cmp(filteredVgs[j].Free) < 0
})
return filteredVgs, nil
}
通过buildLVMCreateArgs构建lvcreate命令参数,配置逻辑卷大小,添加卷组,以及是否使用精简逻辑卷模式。
// builldLVMCreateArgs returns lvcreate command for the volume
func buildLVMCreateArgs(vol *apis.LVMVolume) []string {
var LVMVolArg []string
volume := vol.Name
size := vol.Spec.Capacity + "b"
// thinpool name required for thinProvision volumes
pool := vol.Spec.VolGroup + "_thinpool"
if len(vol.Spec.Capacity) != 0 {
// check if thin pool exists for given volumegroup requested thin volume
if strings.TrimSpace(vol.Spec.ThinProvision) != YES {
LVMVolArg = append(LVMVolArg, "-L", size)
} else if !lvThinExists(vol.Spec.VolGroup, pool) {
// thinpool size can't be equal or greater than actual volumegroup size
LVMVolArg = append(LVMVolArg, "-L", getThinPoolSize(vol.Spec.VolGroup, vol.Spec.Capacity))
}
}
// command to create thinpool and thin volume if thinProvision is enabled
// `lvcreate -L 1G -T lvmvg/mythinpool -V 1G -n thinvol`
if strings.TrimSpace(vol.Spec.ThinProvision) == YES {
LVMVolArg = append(LVMVolArg, "-T", vol.Spec.VolGroup+"/"+pool, "-V", size)
}
if len(vol.Spec.VolGroup) != 0 {
LVMVolArg = append(LVMVolArg, "-n", volume)
}
if strings.TrimSpace(vol.Spec.ThinProvision) != YES {
LVMVolArg = append(LVMVolArg, vol.Spec.VolGroup)
}
// -y is used to wipe the signatures before creating LVM volume
LVMVolArg = append(LVMVolArg, "-y")
return LVMVolArg
}
命令行创建逻辑卷
# 创建三个PV物理卷
sukai@ubuntu-01:~$ sudo pvcreate /dev/sda1
Physical volume "/dev/sda1" successfully created.
sukai@ubuntu-01:~$ sudo pvcreate /dev/sda2
Physical volume "/dev/sda2" successfully created.
sukai@ubuntu-01:~$ sudo pvcreate /dev/sda3
Physical volume "/dev/sda3" successfully created.
# 创建两个VG卷组
sukai@ubuntu-01:~$ sudo vgcreate data1 /dev/sda1
Volume group "data1" successfully created
sukai@ubuntu-01:~$ sudo vgcreate data2 /dev/sda2
Volume group "data2" successfully created
# 在data1卷组上创建mythinpool精简池,精简池大小为1G,并创建thinvol精简卷,精简卷大小为1G
sukai@ubuntu-01:~$ sudo lvcreate -L 1G -T data1/mythinpool -V 1G -n thinvol
Thin pool volume with chunk size 64.00 KiB can address at most 15.81 TiB of data.
Logical volume "thinvol" created.
sukai@ubuntu-01:~$ sudo lvs
LV VG Attr LSize Pool Origin Data% Meta% Move Log Cpy%Sync Convert
mythinpool data1 twi-aotz-- 1.00g 0.00 11.04
thinvol data1 Vwi-a-tz-- 1.00g mythinpool 0.00
sukai@ubuntu-01:~$
sukai@ubuntu-01:~$ sudo lvcreate -L 1G -T data1/mythinpool2 -V 1G -n thinvol2
Thin pool volume with chunk size 64.00 KiB can address at most 15.81 TiB of data.
Logical volume "thinvol2" created.
sukai@ubuntu-01:~$
LVMSnapshot
监听LVMSnapshot资源,syncSnap调谐负责逻辑卷的创建、更新、删除操作,操作通过调用LVM命令行工具lvcreate,lvremove完成。
// synSnap is the function which tries to converge to a desired state for the
// LVMSnapshot
func (c *SnapController) syncSnap(snap *apis.LVMSnapshot) error {
var err error
// LVMSnapshot should be deleted. Check if deletion timestamp is set
if c.isDeletionCandidate(snap) {
err = lvm.DestroySnapshot(snap)
if err == nil {
err = lvm.RemoveSnapFinalizer(snap)
}
} else {
// if the status of the snapshot resource is Pending, then
// we create the snapshot on the machine
if snap.Status.State == lvm.LVMStatusPending {
err = lvm.CreateSnapshot(snap)
if err == nil {
err = lvm.UpdateSnapInfo(snap)
}
}
}
return err
}
// CreateSnapshot creates the lvm volume snapshot
func CreateSnapshot(snap *apis.LVMSnapshot) error {
volume := snap.Labels[LVMVolKey]
snapVolume := snap.Spec.VolGroup + "/" + getLVMSnapName(snap.Name)
args := buildLVMSnapCreateArgs(snap)
cmd := exec.Command(LVCreate, args...)
out, err := cmd.CombinedOutput()
if err != nil {
klog.Errorf("lvm: could not create snapshot %s cmd %v error: %s", snapVolume, args, string(out))
return err
}
klog.Infof("created snapshot %s from %s", snapVolume, volume)
return nil
}
命令行创建Snapshot快照
sukai@ubuntu-01:~$ sudo lvcreate --snapshot --name mysnapshot-thinvol --permission r /dev/data1/thinvol
WARNING: Sum of all thin volume sizes (2.00 GiB) exceeds the size of thin pool data1/mythinpool (1.00 GiB).
WARNING: You have not turned on protection against thin pools running out of space.
WARNING: Set activation/thin_pool_autoextend_threshold below 100 to trigger automatic extension of thin pools before they get full.
Logical volume "mysnapshot-thinvol" created.
sukai@ubuntu-01:~$
总结
CSI插件的Node-Agent组件,主要负责需要在主机上执行的操作。当CSI插件的Controller组件创建了逻辑卷、快照自定义资源时,Node-Agent在主机上执行相关的卷和快照的操作命令。当Kubelet调用CSI的gRPC服务时,Node-Agent在主机上挂载设备提供给Pod容器使用。