Kubernetes容器存储接口CSI插件node-agent

SuKai August 26, 2022

Kubernetes CSI插件包含两部分:controller, node-agent。今天我们一起来看一下OpenEBS的LVM存储插件,了解OpenEBS如何实现Kubernetes使用LVM逻辑卷存储数据的。LVM逻辑卷管理(Logical Volume Manager),建立在硬盘和分区之上的一个逻辑层,提高磁盘分区管理的灵活性。Kubernetes可以利用LVM本地磁盘提高存储性能,LVM具备快照功能,为KubeVirt虚拟机提供了基于Copy-On-Write的精简配置存储,是一个非常不错的轻量级超融合解决方案。

本文将包含以下几个部分:LVM基本概念,node-agent包含组件,node-agent代码实现。

LVM基本概念

img

PV物理卷(physical volume),LVM的基本存储逻辑块,一个PV对应一个硬盘或者硬盘分区。VG卷组(Volume Group)由物理卷组成。LV逻辑卷(Logical Volume),建立在卷组之上,一个卷组可以创建多个逻辑卷,一个逻辑卷对应一个卷组。

LVM卷快照(snapshot)用于从文件系统的时间点视图创建备份。从快照备份文件系统,卷本身供用户使用,快照最初包含自身相关的一些元数据,不包含源逻辑卷的实际数据。快照使用写时复制技术Copy-On-Write(COW)在原始数据块的数据发生更改时,复制源块中数据进行更改。

LVM精简卷Thin Provisioned Logical Volumes,精简配置逻辑卷对存储资源进行按需动态分配,即对存储进行了虚拟化管理。在卷组上建立精简池Thin Pool,在精简池上创建精简卷Thin Volume。

node-agent组件

node-agent包含:三个自定义资源控制器,一个gRPC服务。

gRPC服务

Kubelet调用node-agent的gRPC服务接口,将存储卷挂载到本机目录,kubelet创建容器时将目录文件挂载到容器。

NodeServer接口如下:

type NodeServer interface {
   // 挂载存储卷到临时目录
   NodeStageVolume(context.Context, *NodeStageVolumeRequest) (*NodeStageVolumeResponse, error)
   // 从临时目录卸载存储卷
   NodeUnstageVolume(context.Context, *NodeUnstageVolumeRequest) (*NodeUnstageVolumeResponse, error)
   // 发布存储卷,LVM逻辑卷挂载到对应主机指定目录。
   NodePublishVolume(context.Context, *NodePublishVolumeRequest) (*NodePublishVolumeResponse, error)
   // 取消发布存储卷,卸载对应主机上的LVM逻辑卷。
   NodeUnpublishVolume(context.Context, *NodeUnpublishVolumeRequest) (*NodeUnpublishVolumeResponse, error)
   // 返回存储卷统计信息,容量,inodes。
   NodeGetVolumeStats(context.Context, *NodeGetVolumeStatsRequest) (*NodeGetVolumeStatsResponse, error)
   // 扩展存储卷容量。
   NodeExpandVolume(context.Context, *NodeExpandVolumeRequest) (*NodeExpandVolumeResponse, error)
   // 返回主机节点支持的能力,比如支持存储卷扩展,存储卷数据统计。
   NodeGetCapabilities(context.Context, *NodeGetCapabilitiesRequest) (*NodeGetCapabilitiesResponse, error)
   // 返回主机节点信息,主要是Label标签里的拓扑信息。
   NodeGetInfo(context.Context, *NodeGetInfoRequest) (*NodeGetInfoResponse, error)
}

三个控制器

监听LVMNode,LVMSnapshot,LVMVolume自定义资源进行调谐。当插件的controller组件创建LVMSnapshot,LVMVolume这些自定义资源时,控制器调谐逻辑完成对应主机节点的操作。

LVMNode,一个CR资源对应一个Kubernetes主机节点,记录主机上VG卷组信息。

LVMSnapshot,负责Snapshot快照的生命周期管理,创建、更新、删除快照。

LVMVolume,负责LV逻辑卷生命周期管理。

node-agent代码实现

定义node结构体,node结构体实现了CSI规范的NodeServer接口,NewNode构造函数中启动三个自定义资源控制器。

// node is the server implementation
// for CSI NodeServer
type node struct {
   driver *CSIDriver
}

// NewNode returns a new instance
// of CSI NodeServer
func NewNode(d *CSIDriver) csi.NodeServer {
   var ControllerMutex = sync.RWMutex{}

   // set up signals so we handle the first shutdown signal gracefully
   stopCh := signals.SetupSignalHandler()

   // start the lvm node resource watcher
   go func() {
      err := lvmnode.Start(&ControllerMutex, stopCh)
      if err != nil {
         klog.Fatalf("Failed to start LVM node controller: %s", err.Error())
      }
   }()

   // start the lvmvolume watcher
   go func() {
      err := volume.Start(&ControllerMutex, stopCh)
      if err != nil {
         klog.Fatalf("Failed to start LVM volume management controller: %s", err.Error())
      }
   }()

   // start the lvm snapshot watcher
   go func() {
      err := snapshot.Start(&ControllerMutex, stopCh)
      if err != nil {
         klog.Fatalf("Failed to start LVM volume snapshot management controller: %s", err.Error())
      }
   }()

   if d.config.ListenAddress != "" {
      exposeMetrics(d.config.ListenAddress, d.config.MetricsPath, d.config.DisableExporterMetrics)
   }

   return &node{
      driver: d,
   }
}

NodePublishVolume服务

GetVolAndMountInfo返回LVMVolume资源,MountInfo挂载信息。从Kubelet请求体中获取文件系统类型FSType,挂载路径,挂载参数选项,是否只读,存储卷名称等信息。根据存储卷名称查询Kubernetes自定义资源LVMVolume实例。

getPodLVInfo返回Pod唯一标识UID,LVGroup逻辑卷组名称信息。

挂载存储卷,分为两种类型block volume,mounted volume。block volume为挂载块设备,存储卷在Pod容器里是一个块设备。mounted volume为挂载文件系统,在容器里看到的是一个目录。

// GetVolAndMountInfo get volume and mount info from node csi volume request
func GetVolAndMountInfo(
	req *csi.NodePublishVolumeRequest,
) (*apis.LVMVolume, *lvm.MountInfo, error) {
	var mountinfo lvm.MountInfo

	mountinfo.FSType = req.GetVolumeCapability().GetMount().GetFsType()
	mountinfo.MountPath = req.GetTargetPath()
	mountinfo.MountOptions = append(mountinfo.MountOptions, req.GetVolumeCapability().GetMount().GetMountFlags()...)

	if req.GetReadonly() {
		mountinfo.MountOptions = append(mountinfo.MountOptions, "ro")
	}

	volName := strings.ToLower(req.GetVolumeId())

	getOptions := metav1.GetOptions{}
	vol, err := volbuilder.NewKubeclient().
		WithNamespace(lvm.LvmNamespace).
		Get(volName, getOptions)

	if err != nil {
		return nil, nil, err
	}

	return vol, &mountinfo, nil
}

func getPodLVInfo(req *csi.NodePublishVolumeRequest) (*lvm.PodLVInfo, error) {
	var podLVInfo lvm.PodLVInfo
	var ok bool
	if podLVInfo.UID, ok = req.VolumeContext["csi.storage.k8s.io/pod.uid"]; !ok {
		return nil, errors.New("csi.storage.k8s.io/pod.uid key missing in VolumeContext")
	}
	if podLVInfo.LVGroup, ok = req.VolumeContext["openebs.io/volgroup"]; !ok {
		return nil, errors.New("openebs.io/volgroup key missing in VolumeContext")
	}
	return &podLVInfo, nil
}

// NodePublishVolume publishes (mounts) the volume
// at the corresponding node at a given path
//
// This implements csi.NodeServer
func (ns *node) NodePublishVolume(
   ctx context.Context,
   req *csi.NodePublishVolumeRequest,
) (*csi.NodePublishVolumeResponse, error) {

   var (
      err error
   )

   if err = ns.validateNodePublishReq(req); err != nil {
      return nil, err
   }

   vol, mountInfo, err := GetVolAndMountInfo(req)
   if err != nil {
      return nil, status.Error(codes.Internal, err.Error())
   }

   podLVinfo, err := getPodLVInfo(req)
   if err != nil {
      klog.Warningf("PodLVInfo could not be obtained for volume_id: %s, err = %v", req.VolumeId, err)
   }
   switch req.GetVolumeCapability().GetAccessType().(type) {
   case *csi.VolumeCapability_Block:
      // attempt block mount operation on the requested path
      err = lvm.MountBlock(vol, mountInfo, podLVinfo)
   case *csi.VolumeCapability_Mount:
      // attempt filesystem mount operation on the requested path
      err = lvm.MountFilesystem(vol, mountInfo, podLVinfo)
   }

   if err != nil {
      return nil, err
   }

   return &csi.NodePublishVolumeResponse{}, nil
}

挂载块设备

MountBlock实现将逻辑卷设备文件通过bind挂载方式挂载到Kubelet指定的target位置。

// MountBlock mounts the block disk to the specified path
func MountBlock(vol *apis.LVMVolume, mountinfo *MountInfo, podLVInfo *PodLVInfo) error {
   target := mountinfo.MountPath
   volume := vol.Spec.VolGroup + "/" + vol.Name
   devicePath := DevPath + volume

   mountopt := []string{"bind"}

   mounter := &mount.SafeFormatAndMount{Interface: mount.New(""), Exec: utilexec.New()}

   // Create the mount point as a file since bind mount device node requires it to be a file
   err := makeFile(target)
   if err != nil {
      return status.Errorf(codes.Internal, "Could not create target file %q: %v", target, err)
   }

   // do the bind mount of the device at the target path
   if err := mounter.Mount(devicePath, target, "", mountopt); err != nil {
      if removeErr := os.Remove(target); removeErr != nil {
         return status.Errorf(codes.Internal, "Could not remove mount target %q: %v", target, removeErr)
      }
      return status.Errorf(codes.Internal, "mount failed at %v err : %v", target, err)
   }

   klog.Infof("NodePublishVolume mounted block device %s at %s", devicePath, target)

   if ioLimitsEnabled && podLVInfo != nil {
      if err := setIOLimits(vol, podLVInfo, devicePath); err != nil {
         klog.Warningf(": error setting io limits for podUid %s, device %s, err=%v", podLVInfo.UID, devicePath, err)
      } else {
         klog.Infof("lvm: io limits set for podUid %s, device %s", podLVInfo.UID, devicePath)
      }
   }
   return nil
}

命令行挂载块设备

sukai@ubuntu-01:~$ sudo mkdir /data1
sukai@ubuntu-01:~$ sudo touch /data1/thinvol
sukai@ubuntu-01:~$ sudo mount /dev/data1/thinvol /data1/thinvol --bind
sukai@ubuntu-01:~$ sudo ls -al /data1/thinvol
brw-rw---- 1 root disk 253, 4 Aug 26 15:15 /data1/thinvol
sukai@ubuntu-01:~$ sudo mkfs /data1/thinvol
mke2fs 1.45.5 (07-Jan-2020)
Discarding device blocks: done
Creating filesystem with 262144 4k blocks and 65536 inodes
Filesystem UUID: 8b3b439f-96f1-4104-9d95-33b91a1f654d
Superblock backups stored on blocks:
        32768, 98304, 163840, 229376

Allocating group tables: done
Writing inode tables: done
Writing superblocks and filesystem accounting information: done

sukai@ubuntu-01:~$

挂载文件系统

将逻辑卷格式化后挂载到指定目录。

// MountVolume mounts the disk to the specified path
func MountVolume(vol *apis.LVMVolume, mount *MountInfo, podLVInfo *PodLVInfo) error {
   volume := vol.Spec.VolGroup + "/" + vol.Name
   mounted, err := verifyMountRequest(vol, mount.MountPath)
   if err != nil {
      return err
   }

   if mounted {
      klog.Infof("lvm : already mounted %s => %s", volume, mount.MountPath)
      return nil
   }

   devicePath := DevPath + volume

   err = FormatAndMountVol(devicePath, mount)
   if err != nil {
      return status.Errorf(
         codes.Internal,
         "failed to format and mount the volume error: %s",
         err.Error(),
      )
   }

   klog.Infof("lvm: volume %v mounted %v fs %v", volume, mount.MountPath, mount.FSType)

   if ioLimitsEnabled && podLVInfo != nil {
      if err := setIOLimits(vol, podLVInfo, devicePath); err != nil {
         klog.Warningf("lvm: error setting io limits: podUid %s, device %s, err=%v", podLVInfo.UID, devicePath, err)
      } else {
         klog.Infof("lvm: io limits set for podUid %v, device %s", podLVInfo.UID, devicePath)
      }
   }

   return nil
}

// MountFilesystem mounts the disk to the specified path
func MountFilesystem(vol *apis.LVMVolume, mount *MountInfo, podinfo *PodLVInfo) error {
   if err := os.MkdirAll(mount.MountPath, 0755); err != nil {
      return status.Errorf(codes.Internal, "Could not create dir {%q}, err: %v", mount.MountPath, err)
   }

   return MountVolume(vol, mount, podinfo)
}

命令行挂载文件系统

sukai@ubuntu-01:~$ sudo mkfs /dev/data1/thinvol2
mke2fs 1.45.5 (07-Jan-2020)
Discarding device blocks: done
Creating filesystem with 262144 4k blocks and 65536 inodes
Filesystem UUID: 3957607d-cdfd-46b3-bc56-1c7ab7864570
Superblock backups stored on blocks:
        32768, 98304, 163840, 229376

Allocating group tables: done
Writing inode tables: done
Writing superblocks and filesystem accounting information: done

sukai@ubuntu-01:~$ sudo mkdir /data1/thinvol2
sukai@ubuntu-01:~$ sudo mount /dev/data1/thinvol2 /data1/thinvol2/
sukai@ubuntu-01:~$ sudo ls -al /data1/thinvol2/
total 24
drwxr-xr-x 3 root root  4096 Aug 26 15:22 .
drwxr-xr-x 3 root root  4096 Aug 26 15:23 ..
drwx------ 2 root root 16384 Aug 26 15:22 lost+found
sukai@ubuntu-01:~$

配置IO限制

如果开启了IO限制,挂载完成后通过setIOLimits函数通过Cgroup配置逻辑卷的iops, bps读写操作限制。

func setIOLimits(vol *apis.LVMVolume, podLVInfo *PodLVInfo, devicePath string) error {
	if podLVInfo == nil {
		return errors.New("PodLVInfo is missing. Skipping setting IOLimits")
	}
	capacityBytes, err := strconv.ParseUint(vol.Spec.Capacity, 10, 64)
	if err != nil {
		klog.Warning("error parsing LVMVolume.Spec.Capacity. Skipping setting IOLimits", err)
		return err
	}
	capacityGB := uint64(math.Ceil(float64(capacityBytes) / (1024 * 1024 * 1024)))
	klog.Infof("Capacity of device in GB: %v", capacityGB)
	riops := getRIopsPerGB(podLVInfo.LVGroup) * capacityGB
	wiops := getWIopsPerGB(podLVInfo.LVGroup) * capacityGB
	rbps := getRBpsPerGB(podLVInfo.LVGroup) * capacityGB
	wbps := getWBpsPerGB(podLVInfo.LVGroup) * capacityGB
	klog.Infof("Setting iolimits for podUId %s, device %s: riops=%v, wiops=%v, rbps=%v, wbps=%v",
		podLVInfo.UID, devicePath, riops, wiops, rbps, wbps,
	)
	err = iolimit.SetIOLimits(&iolimit.Request{
		DeviceName:       devicePath,
		PodUid:           podLVInfo.UID,
		ContainerRuntime: getContainerRuntime(),
		IOLimit: &iolimit.IOMax{
			Riops: riops,
			Wiops: wiops,
			Rbps:  rbps,
			Wbps:  wbps,
		},
	})
	if err != nil {
		return err
	}
	return nil
}

NodeUnpublishVolume服务

调用UmountVolume函数卸载并删除目录

// NodeUnpublishVolume unpublishes (unmounts) the volume
// from the corresponding node from the given path
//
// This implements csi.NodeServer
func (ns *node) NodeUnpublishVolume(
   ctx context.Context,
   req *csi.NodeUnpublishVolumeRequest,
) (*csi.NodeUnpublishVolumeResponse, error) {

   var (
      err error
      vol *apis.LVMVolume
   )

   if err = ns.validateNodeUnpublishReq(req); err != nil {
      return nil, err
   }

   targetPath := req.GetTargetPath()
   volumeID := req.GetVolumeId()

   if vol, err = lvm.GetLVMVolume(volumeID); err != nil {
      return nil, status.Errorf(codes.Internal,
         "not able to get the LVMVolume %s err : %s",
         volumeID, err.Error())
   }

   err = lvm.UmountVolume(vol, targetPath)

   if err != nil {
      return nil, status.Errorf(codes.Internal,
         "unable to umount the volume %s err : %s",
         volumeID, err.Error())
   }
   klog.Infof("hostpath: volume %s path: %s has been unmounted.",
      volumeID, targetPath)

   return &csi.NodeUnpublishVolumeResponse{}, nil
}

// UmountVolume unmounts the volume and the corresponding mount path is removed
func UmountVolume(vol *apis.LVMVolume, targetPath string,
) error {
	mounter := &mount.SafeFormatAndMount{Interface: mount.New(""), Exec: utilexec.New()}

	dev, ref, err := mount.GetDeviceNameFromMount(mounter, targetPath)
	if err != nil {
		klog.Errorf(
			"lvm: umount volume: failed to get device from mnt: %s\nError: %v",
			targetPath, err,
		)
		return err
	}

	// device has already been un-mounted, return successful
	if len(dev) == 0 || ref == 0 {
		klog.Warningf(
			"Warning: Unmount skipped because volume %s not mounted: %v",
			vol.Name, targetPath,
		)
		return nil
	}

	if pathExists, pathErr := mount.PathExists(targetPath); pathErr != nil {
		return fmt.Errorf("error checking if path exists: %v", pathErr)
	} else if !pathExists {
		klog.Warningf(
			"Warning: Unmount skipped because path does not exist: %v",
			targetPath,
		)
		return nil
	}

	if err = mounter.Unmount(targetPath); err != nil {
		klog.Errorf(
			"lvm: failed to unmount %s: path %s err: %v",
			vol.Name, targetPath, err,
		)
		return err
	}

	if err := os.Remove(targetPath); err != nil {
		klog.Errorf("lvm: failed to remove mount path vol %s err : %v", vol.Name, err)
	}

	klog.Infof("umount done %s path %v", vol.Name, targetPath)

	return nil
}

NodeGetInfo服务

返回Node主机节点的Topology信息,返回主机的Labels标签K/V键值对,包括:“openebs.io/nodename"和ALLOWED_TOPOLOGIES环境变量中配置的键值。

// NodeGetInfo returns node details
//
// This implements csi.NodeServer
func (ns *node) NodeGetInfo(
   ctx context.Context,
   req *csi.NodeGetInfoRequest,
) (*csi.NodeGetInfoResponse, error) {

   node, err := k8sapi.GetNode(ns.driver.config.NodeID)
   if err != nil {
      klog.Errorf("failed to get the node %s", ns.driver.config.NodeID)
      return nil, err
   }
   /*
    * The driver will support all the keys and values defined in the node's label.
    * if nodes are labeled with the below keys and values
    * map[beta.kubernetes.io/arch:amd64 beta.kubernetes.io/os:linux kubernetes.io/arch:amd64 kubernetes.io/hostname:pawan-node-1 kubernetes.io/os:linux node-role.kubernetes.io/worker:true openebs.io/zone:zone1 openebs.io/zpool:ssd]
    * The driver will support below key and values
    * {
    * beta.kubernetes.io/arch:amd64
    * beta.kubernetes.io/os:linux
    * kubernetes.io/arch:amd64
    * kubernetes.io/hostname:pawan-node-1
    * kubernetes.io/os:linux
    * node-role.kubernetes.io/worker:true
    * openebs.io/zone:zone1
    * openebs.io/zpool:ssd
    * }
    */

   // add driver's topology key
   topology := map[string]string{
      lvm.LVMTopologyKey: ns.driver.config.NodeID,
   }

   // support topologykeys from env ALLOWED_TOPOLOGIES
   allowedTopologies := os.Getenv("ALLOWED_TOPOLOGIES")
   allowedKeys := strings.Split(allowedTopologies, ",")
   for _, key := range allowedKeys {
      if key != "" {
         v, ok := node.Labels[key]
         if ok {
            topology[key] = v
         }
      }
   }

   return &csi.NodeGetInfoResponse{
      NodeId: ns.driver.config.NodeID,
      AccessibleTopology: &csi.Topology{
         Segments: topology,
      },
   }, nil
}

NodeGetCapabilities服务

返回支持的能力,支持存储卷统计,扩展存储卷容量。

// NodeGetCapabilities returns capabilities supported
// by this node service
//
// This implements csi.NodeServer
func (ns *node) NodeGetCapabilities(
   ctx context.Context,
   req *csi.NodeGetCapabilitiesRequest,
) (*csi.NodeGetCapabilitiesResponse, error) {

   return &csi.NodeGetCapabilitiesResponse{
      Capabilities: []*csi.NodeServiceCapability{
         {
            Type: &csi.NodeServiceCapability_Rpc{
               Rpc: &csi.NodeServiceCapability_RPC{
                  Type: csi.NodeServiceCapability_RPC_GET_VOLUME_STATS,
               },
            },
         },
         {
            Type: &csi.NodeServiceCapability_Rpc{
               Rpc: &csi.NodeServiceCapability_RPC{
                  Type: csi.NodeServiceCapability_RPC_EXPAND_VOLUME,
               },
            },
         },
      },
   }, nil
}

NodeExpandVolume服务

当逻辑卷容量小于LVMVolume定义的容量时,执行lvextend扩展逻辑卷容量并加上参数-r调整文件系统大小。如果存储卷LVMVolume是block类型,则不调整文件系统大小。如果文件系统类型为btrfs文件系统,使用btrfs命令调整文件系统大小,不使用lvextend工具调整文件系统大小。

// TODO
// Verify if this needs to be implemented
//
// NodeExpandVolume resizes the filesystem if required
//
// If ControllerExpandVolumeResponse returns true in
// node_expansion_required then FileSystemResizePending
// condition will be added to PVC and NodeExpandVolume
// operation will be queued on kubelet
//
// This implements csi.NodeServer
func (ns *node) NodeExpandVolume(
   ctx context.Context,
   req *csi.NodeExpandVolumeRequest,
) (*csi.NodeExpandVolumeResponse, error) {
   volumeID := req.GetVolumeId()
   if req.GetVolumePath() == "" || volumeID == "" {
      return nil, status.Errorf(
         codes.InvalidArgument,
         "path not provided for NodeExpandVolume Request %s",
         volumeID,
      )
   }

   vol, err := lvm.GetLVMVolume(volumeID)

   if err != nil {
      return nil, status.Errorf(
         codes.NotFound,
         "failed to handle NodeExpandVolume Request for %s, {%s}",
         req.VolumeId,
         err.Error(),
      )
   }

   isBlockMode := req.GetVolumeCapability().GetBlock() != nil
   fsType := req.GetVolumeCapability().GetMount().GetFsType()

   resizeFS := true
   if isBlockMode || fsType == "btrfs" {
      // In case of volume block mode (or) btrfs filesystem mode
      // lvm doesn't expand the fs natively
      resizeFS = false
   }

   err = lvm.ResizeLVMVolume(vol, resizeFS)
   if err != nil {
      return nil, status.Errorf(
         codes.Internal,
         "failed to handle NodeExpandVolume Request for %s, {%s}",
         req.VolumeId,
         err.Error(),
      )
   }

   // Expand btrfs filesystem
   if fsType == "btrfs" {
      err = btrfs.ResizeBTRFS(req.GetVolumePath())
      if err != nil {
         return nil, status.Errorf(
            codes.Internal,
            "failed to handle NodeExpandVolume Request for %s, {%s}",
            req.VolumeId,
            err.Error(),
         )
      }
   }

   return &csi.NodeExpandVolumeResponse{
      CapacityBytes: req.GetCapacityRange().GetRequiredBytes(),
   }, nil
}

命令行扩展LV逻辑卷

sukai@ubuntu-01:~$ sudo df -Th /data1/thinvol2
Filesystem                 Type  Size  Used Avail Use% Mounted on
/dev/mapper/data1-thinvol2 ext2 1007M   24K  956M   1% /data1/thinvol2

sukai@ubuntu-01:~$
sukai@ubuntu-01:~$ sudo lvextend /dev/data1/thinvol2 -L 5G -r
  WARNING: Sum of all thin volume sizes (7.00 GiB) exceeds the size of thin pools (2.00 GiB).
  WARNING: You have not turned on protection against thin pools running out of space.
  WARNING: Set activation/thin_pool_autoextend_threshold below 100 to trigger automatic extension of thin pools before they get full.
  Size of logical volume data1/thinvol2 changed from 1.00 GiB (256 extents) to 5.00 GiB (1280 extents).
  Logical volume data1/thinvol2 successfully resized.
resize2fs 1.45.5 (07-Jan-2020)
Filesystem at /dev/mapper/data1-thinvol2 is mounted on /data1/thinvol2; on-line resizing required
old_desc_blocks = 1, new_desc_blocks = 1
The filesystem on /dev/mapper/data1-thinvol2 is now 1310720 (4k) blocks long.

sukai@ubuntu-01:~$ sudo df -Th /data1/thinvol2
Filesystem                 Type  Size  Used Avail Use% Mounted on
/dev/mapper/data1-thinvol2 ext2  5.0G   24K  4.8G   1% /data1/thinvol2
sukai@ubuntu-01:~$

NodeGetVolumeStats服务

返回LV逻辑卷的容量和inode使用统计。

// NodeGetVolumeStats returns statistics for the
// given volume
func (ns *node) NodeGetVolumeStats(
   ctx context.Context,
   req *csi.NodeGetVolumeStatsRequest,
) (*csi.NodeGetVolumeStatsResponse, error) {

   volID := req.GetVolumeId()
   path := req.GetVolumePath()

   if len(volID) == 0 {
      return nil, status.Error(codes.InvalidArgument, "volume id is not provided")
   }
   if len(path) == 0 {
      return nil, status.Error(codes.InvalidArgument, "path is not provided")
   }

   if !mount.IsMountPath(path) {
      return nil, status.Error(codes.NotFound, "path is not a mount path")
   }

   var sfs unix.Statfs_t
   if err := unix.Statfs(path, &sfs); err != nil {
      return nil, status.Errorf(codes.Internal, "statfs on %s failed: %v", path, err)
   }

   var usage []*csi.VolumeUsage
   usage = append(usage, &csi.VolumeUsage{
      Unit:      csi.VolumeUsage_BYTES,
      Total:     int64(sfs.Blocks) * int64(sfs.Bsize),
      Used:      int64(sfs.Blocks-sfs.Bfree) * int64(sfs.Bsize),
      Available: int64(sfs.Bavail) * int64(sfs.Bsize),
   })
   usage = append(usage, &csi.VolumeUsage{
      Unit:      csi.VolumeUsage_INODES,
      Total:     int64(sfs.Files),
      Used:      int64(sfs.Files - sfs.Ffree),
      Available: int64(sfs.Ffree),
   })

   return &csi.NodeGetVolumeStatsResponse{Usage: usage}, nil
}

LVMNode,LVMSnapshot,LVMVolume控制器代码

LVMNode

开启定时器,每隔60秒,向工作队列中添加一条记录,记录为运行本地主机。

// Run will set up the event handlers for types we are interested in, as well
// as syncing informer caches and starting workers. It will block until stopCh
// is closed, at which point it will shutdown the workqueue and wait for
// workers to finish processing their current work items.
func (c *NodeController) Run(threadiness int, stopCh <-chan struct{}) error {
...
   timer := time.NewTimer(0)
   defer timer.Stop()
   for {
      select {
      case <-timer.C:
      case <-stopCh:
         klog.Info("Shutting down Node controller")
         return nil
      }
      item := lvm.LvmNamespace + "/" + lvm.NodeID
      c.workqueue.Add(item) // add the item to worker queue.
      timer.Reset(c.pollInterval)
   }
}

syncNode调谐逻辑主要是更新主机节点上的VG卷组列表。

// syncNode is the function which tries to converge to a desired state for the
// LVMNode
func (c *NodeController) syncNode(namespace string, name string) error {
   

   vgs, err := c.listLVMVolumeGroup()
   if err != nil {
      return err
   }

   if node == nil { // if it doesn't exists, create lvm node object
      if node, err = nodebuilder.NewBuilder().
         WithNamespace(namespace).WithName(name).
         WithVolumeGroups(vgs).
         WithOwnerReferences(c.ownerRef).
         Build(); err != nil {
         return err
      }

      klog.Infof("lvm node controller: creating new node object for %+v", node)
      if _, err = nodebuilder.NewKubeclient().WithNamespace(namespace).Create(node); err != nil {
         return errors.Errorf("create lvm node %s/%s: %v", namespace, name, err)
      }
      klog.Infof("lvm node controller: created node object %s/%s", namespace, name)
      return nil
   }



   return nil
}

LVMVolume

监听LVMVolume资源,vol.Spec.VolGroup如果卷组不为空,则尝试创建,成功创建更新Volume状态。否则查找可用的VG卷组,逐个尝试创建逻辑卷。

// synVol is the function which tries to converge to a desired state for the
// LVMVolume
func (c *VolController) syncVol(vol *apis.LVMVolume) error {
...

   // if there is already a volGroup field set for lvmvolume resource,
   // we'll first try to create a volume in that volume group.
   if vol.Spec.VolGroup != "" {
      err = lvm.CreateVolume(vol)
      if err == nil {
         return lvm.UpdateVolInfo(vol, lvm.LVMStatusReady)
      }
   }

   vgs, err := c.getVgPriorityList(vol)
   if err != nil {
      return err
   }

   if len(vgs) == 0 {
      err = fmt.Errorf("no vg available to serve volume request having regex=%q & capacity=%q",
         vol.Spec.VgPattern, vol.Spec.Capacity)
      klog.Errorf("lvm volume %v - %v", vol.Name, err)
   } else {
      for _, vg := range vgs {
         // first update volGroup field in lvm volume resource for ensuring
         // idempotency and avoiding volume leaks during crash.
         if vol, err = lvm.UpdateVolGroup(vol, vg.Name); err != nil {
            klog.Errorf("failed to update volGroup to %v: %v", vg.Name, err)
            return err
         }
         if err = lvm.CreateVolume(vol); err == nil {
            return lvm.UpdateVolInfo(vol, lvm.LVMStatusReady)
         }
      }
   }
...
}

getVgPriorityList获取主机上卷优先级列表,卷组名称正则匹配vol.Spec.VgPattern,满足空间要求,按可用空间从小到大排序。

// getVgPriorityList returns ordered list of volume groups from higher to lower
// priority to use for provisioning a lvm volume. As of now, we are prioritizing
// the vg having least amount free space available to fit the volume.
func (c *VolController) getVgPriorityList(vol *apis.LVMVolume) ([]apis.VolumeGroup, error) {
   re, err := regexp.Compile(vol.Spec.VgPattern)
   if err != nil {
      return nil, fmt.Errorf("invalid regular expression %v for lvm volume %s: %v",
         vol.Spec.VgPattern, vol.Name, err)
   }
   capacity, err := strconv.Atoi(vol.Spec.Capacity)
   if err != nil {
      return nil, fmt.Errorf("invalid requested capacity %v for lvm volume %s: %v",
         vol.Spec.Capacity, vol.Name, err)
   }

   vgs, err := lvm.ListLVMVolumeGroup(true)
   if err != nil {
      return nil, fmt.Errorf("failed to list vgs available on node: %v", err)
   }
   filteredVgs := make([]apis.VolumeGroup, 0)
   for _, vg := range vgs {
      if !re.MatchString(vg.Name) {
         continue
      }
      // skip the vgs capacity comparison in case of thin provision enable volume
      if vol.Spec.ThinProvision != "yes" {
         // filter vgs having insufficient capacity.
         if vg.Free.Value() < int64(capacity) {
            continue
         }
      }
      filteredVgs = append(filteredVgs, vg)
   }

   // prioritize the volume group having less free space available.
   sort.SliceStable(filteredVgs, func(i, j int) bool {
      return filteredVgs[i].Free.Cmp(filteredVgs[j].Free) < 0
   })
   return filteredVgs, nil
}

通过buildLVMCreateArgs构建lvcreate命令参数,配置逻辑卷大小,添加卷组,以及是否使用精简逻辑卷模式。

// builldLVMCreateArgs returns lvcreate command for the volume
func buildLVMCreateArgs(vol *apis.LVMVolume) []string {
   var LVMVolArg []string

   volume := vol.Name
   size := vol.Spec.Capacity + "b"
   // thinpool name required for thinProvision volumes
   pool := vol.Spec.VolGroup + "_thinpool"

   if len(vol.Spec.Capacity) != 0 {
      // check if thin pool exists for given volumegroup requested thin volume
      if strings.TrimSpace(vol.Spec.ThinProvision) != YES {
         LVMVolArg = append(LVMVolArg, "-L", size)
      } else if !lvThinExists(vol.Spec.VolGroup, pool) {
         // thinpool size can't be equal or greater than actual volumegroup size
         LVMVolArg = append(LVMVolArg, "-L", getThinPoolSize(vol.Spec.VolGroup, vol.Spec.Capacity))
      }
   }

   // command to create thinpool and thin volume if thinProvision is enabled
   // `lvcreate -L 1G -T lvmvg/mythinpool -V 1G -n thinvol`
   if strings.TrimSpace(vol.Spec.ThinProvision) == YES {
      LVMVolArg = append(LVMVolArg, "-T", vol.Spec.VolGroup+"/"+pool, "-V", size)
   }

   if len(vol.Spec.VolGroup) != 0 {
      LVMVolArg = append(LVMVolArg, "-n", volume)
   }

   if strings.TrimSpace(vol.Spec.ThinProvision) != YES {
      LVMVolArg = append(LVMVolArg, vol.Spec.VolGroup)
   }

   // -y is used to wipe the signatures before creating LVM volume
   LVMVolArg = append(LVMVolArg, "-y")
   return LVMVolArg
}

命令行创建逻辑卷

# 创建三个PV物理卷
sukai@ubuntu-01:~$ sudo pvcreate /dev/sda1
  Physical volume "/dev/sda1" successfully created.
sukai@ubuntu-01:~$ sudo pvcreate /dev/sda2
  Physical volume "/dev/sda2" successfully created.
sukai@ubuntu-01:~$ sudo pvcreate /dev/sda3
  Physical volume "/dev/sda3" successfully created.

# 创建两个VG卷组
sukai@ubuntu-01:~$ sudo vgcreate data1 /dev/sda1
  Volume group "data1" successfully created
sukai@ubuntu-01:~$ sudo vgcreate data2 /dev/sda2
  Volume group "data2" successfully created

# 在data1卷组上创建mythinpool精简池,精简池大小为1G,并创建thinvol精简卷,精简卷大小为1G
sukai@ubuntu-01:~$ sudo lvcreate -L 1G -T data1/mythinpool -V 1G -n thinvol
  Thin pool volume with chunk size 64.00 KiB can address at most 15.81 TiB of data.
  Logical volume "thinvol" created.
sukai@ubuntu-01:~$ sudo lvs
  LV         VG    Attr       LSize Pool       Origin Data%  Meta%  Move Log Cpy%Sync Convert
  mythinpool data1 twi-aotz-- 1.00g                   0.00   11.04
  thinvol    data1 Vwi-a-tz-- 1.00g mythinpool        0.00
sukai@ubuntu-01:~$
sukai@ubuntu-01:~$ sudo lvcreate -L 1G -T data1/mythinpool2 -V 1G -n thinvol2
  Thin pool volume with chunk size 64.00 KiB can address at most 15.81 TiB of data.
  Logical volume "thinvol2" created.
sukai@ubuntu-01:~$

LVMSnapshot

监听LVMSnapshot资源,syncSnap调谐负责逻辑卷的创建、更新、删除操作,操作通过调用LVM命令行工具lvcreate,lvremove完成。

// synSnap is the function which tries to converge to a desired state for the
// LVMSnapshot
func (c *SnapController) syncSnap(snap *apis.LVMSnapshot) error {
   var err error
   // LVMSnapshot should be deleted. Check if deletion timestamp is set
   if c.isDeletionCandidate(snap) {
      err = lvm.DestroySnapshot(snap)
      if err == nil {
         err = lvm.RemoveSnapFinalizer(snap)
      }
   } else {
      // if the status of the snapshot resource is Pending, then
      // we create the snapshot on the machine
      if snap.Status.State == lvm.LVMStatusPending {
         err = lvm.CreateSnapshot(snap)
         if err == nil {
            err = lvm.UpdateSnapInfo(snap)
         }
      }
   }
   return err
}

// CreateSnapshot creates the lvm volume snapshot
func CreateSnapshot(snap *apis.LVMSnapshot) error {

	volume := snap.Labels[LVMVolKey]

	snapVolume := snap.Spec.VolGroup + "/" + getLVMSnapName(snap.Name)

	args := buildLVMSnapCreateArgs(snap)
	cmd := exec.Command(LVCreate, args...)
	out, err := cmd.CombinedOutput()

	if err != nil {
		klog.Errorf("lvm: could not create snapshot %s cmd %v error: %s", snapVolume, args, string(out))
		return err
	}

	klog.Infof("created snapshot %s from %s", snapVolume, volume)
	return nil

}

命令行创建Snapshot快照

sukai@ubuntu-01:~$ sudo lvcreate --snapshot --name mysnapshot-thinvol --permission r /dev/data1/thinvol
  WARNING: Sum of all thin volume sizes (2.00 GiB) exceeds the size of thin pool data1/mythinpool (1.00 GiB).
  WARNING: You have not turned on protection against thin pools running out of space.
  WARNING: Set activation/thin_pool_autoextend_threshold below 100 to trigger automatic extension of thin pools before they get full.
  Logical volume "mysnapshot-thinvol" created.
sukai@ubuntu-01:~$

总结

CSI插件的Node-Agent组件,主要负责需要在主机上执行的操作。当CSI插件的Controller组件创建了逻辑卷、快照自定义资源时,Node-Agent在主机上执行相关的卷和快照的操作命令。当Kubelet调用CSI的gRPC服务时,Node-Agent在主机上挂载设备提供给Pod容器使用。