SuKai

Golang实现WorkerPool文件上传

在前面文章中介绍了golang开发rpc命令行工具,今天继续后续功能的介绍。 场景描述: 1,客户端运行一个daemon程序,执行文件上传任务,使用boltdb数据库记录任务执行状态。支持继续上传未完成文件。 2,客户端命令行通过rpc调用客户端daemon程序API,进行文件上传任务管理,包括任务创建,查看,启停。 通过这个功能开发,我们将golang并发编程的goroutine协程,channel通道,context上下文, Mutex互斥锁, WaitGroup协程同步等技术得到应用。 基本知识: WaiGroup 等待一组协程执行完成后继续向下执行,WaitGroup内部有一个计数器,从0开始计数,有3个方法:Add(),Done(), Wait()。Add()添加计数,Done()减掉一个计数,Wait()执行阻塞,直到WaitGroup数量变成0。 Select select和channel配合使用,通过select可以监听多个channel的I/O读写事件。 select { case <-ctx.Done(): p.log.Error("the context error \n") return context.Canceled default: } 如果没有default分支,select会阻塞在多个channel上,对多个channel进行监控。如果有default分支,多个channel都没有满足,则执行default分支。 Context Golang的Context称之为上下文,用来跟踪goroutine关系链,传递通知,达到控制他们的目的。主要用法是,传递取消信号,传递数据。 下面是一个传递取消信号的使用过程,首先context.Background()返回一个空的Context,一般用于整个context tree的根节点。context.WithCancel()返回一个ctx可取消的Sub Context,作为Run的参数传入goroutine,这样可以使用ctx跟踪这个Goroutine。cancel()调用Sub Context的取消函数,向关联的Goroutine发送一个"取消"通知。在Run函数中,接收ctx.Done的cancel通知,做相关清理后退出。 ctx, cancel := context.WithCancel(context.Background()) go pool.Run(ctx) cancel() for { select { case <-ctx.Done(): return } } Mutext互斥锁 Mutext互斥锁在同一时间只被一个goroutine访问,不区分读写。有两个方法:Lock()和Unlock()。当一个goroutine申请了Lock(),那么另一个goroutine申请Lock()时会阻塞等待直到Unlock()释放锁。 multipart/form请求 multipart/form请求是http Post方法,可以发送文件和消息,在请求的Header中包含一个特殊头信息Content-Type: multipart/form-data; boundary=,boundary的值为随机计算生成的值,用于分隔上传多个form-data的间隔。 POST /raw/v1alpha2/rawdatas/6214cb78ff3f2903536f5751/images HTTP/1.1 Host: 127.0.0.1:9090 Content-Length: 546 Content-Type: multipart/form-data; boundary=----WebKitFormBoundary7MA4YWxkTrZu0gW ----WebKitFormBoundary7MA4YWxkTrZu0gW Content-Disposition: form-data; name="files"; filename="/C:/Users/ycsk0/Pictures/ubuntu-install.png" Content-Type: image/png (data) ----WebKitFormBoundary7MA4YWxkTrZu0gW Content-Disposition: form-data; name="files"; filename="/C:/Users/ycsk0/Pictures/windows store.

继续阅读

Golang操作BoltDB数据库

boltdb是一个go语言开发的嵌入式kv数据库。 今天我们一起看看golang如何操作BoltDB,再看看Etcd是如何操作BoltDB的,Etcd对BoltDB做了很多优化,复杂度提高很多。 Etcd是一个基于Raft开发的分布式key-value存储。Kubernetes使用Etcd来作为集群数据的存储。Etcd存储层包含预写日志(WAL)、快照(Snapshot)、boltdb。其中WAL与Snapshot实现了故障恢复和数据回滚或重做,让数据尽量不丢失,boltdb则保存了集群元数据和用户写入的数据。 使用场景 在自动驾驶汽车算法模型开发中,需要大量的2D图片和3D点云图片,我们将这些图片上传到打标系统,进行数据标注,标注的数据用于算法模型的数据集。 这些图片文件的上传,需要有一个有状态的客户端来完成文件上传工作,客户端可以在本地创建上传任务,记录上传位置,任务启停等功能。 这里我们使用go来开发客户端,选择boltdb这种简单的嵌入式kv数据库作为存储。 BoltDB基本概念 DB数据库文件 BoltDB的数据库只有一个文件,一个进程打开此文件后,其他进程无法使用,只有等这个进程释放文件锁。对应关系型数据库中database。 Bucket 对应关系型数据库中的table,操作有CreateBucket, CreateBucketIfNotExists, DeleteBucket。 Key/Value键值对 boltdb中键值对都是使用字节数组值进行存储,keys以字节排序的顺序存储在一个bucket中。 Transaction事务 boltdb在某一时刻,只允许一个读写事务或者允许多个只读事务。用户可以通过db的Begin方法启动一个事务,通过Rollback和Commit方法自己控制提交和回滚,Close关闭事务。同时boltdb还提供了内置隐式事务Update, View, Batch方法。 读写事务 db.Update()启动一个读写事务,可重复读的事务。return error,回滚整个修改,return nil提交修改。 只读事务 db.View()打开一个只读事务,无法做写入操作。 批处理读写事务 db.Batch()多次读写事务合并为一次事务,使用上和Update读写事务相同,boltdb自动将其分批,分批写入磁盘减少并发读写事务等待磁盘I/O的开销。 代码学习 普通使用案例 定义Client接口,实现CreateBucketIfNotExists创建Bucket,Put写入更新数据,Delete删除数据,Range查询单条数据或数据列表,ForEach遍历数据。可以看到只使用了两种不同的事务Update和View来完成操作。 主要操作有三种Get, Put, Delete,这里只用到了Put,Delete操作,通过Range获取单条或者多条数据,另外通过Cursor游标,Seek偏移,Next来遍历数据。 package boltdb import ( "bytes" "math" "go.etcd.io/bbolt" bolt "go.etcd.io/bbolt" "go.uber.org/zap" ) type client struct { db *bbolt.DB log *zap.Logger } type Client interface { CreateBucketIfNotExists(bucket Bucket) Put(bucketType Bucket, key, value []byte) Delete(bucketType Bucket, key []byte) Range(bucketType Bucket, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte) ForEach(bucketType Bucket, visitor func(k, v []byte) error) Close() error } func NewClient(path string, log *zap.

继续阅读

Kubernetes应用系统API用户认证与鉴权 -- 鉴权篇

上篇文章讲了通过LDAP和Dex实现两种不同的API用户认证的方式,用户认证是识别API请求的来源,让系统知道请求的访问者是谁。应用系统可以根据请求的身份,来判断该用户可以访问哪些API接口获得数据,这就是用户鉴权。下面我们一起来看一下,在Kubernetes平台上,如何通过RBAC来实现资源访问限制,同样我们还是通过Kubesphere的代码来学习。 鉴权实现的过程: 1,构建一个HandlerChain,HandlerChain中包含了:WithAuthentication, WithRequestInfo和WithAuthorization用户认证和鉴权的三个Filter以及WithAuditing Filter用于审计日志。Filter可以理解为web 框架的Middleware。 2,filters.WithAuthentication上篇文章介绍过,完成用户身份识别和分发Token,将用户信息写入Context。 3,filters.WithRequestInfo处理API请求信息,将请求信息保存到context中。请求信息包括:Path, Verb, APIPrefix, APIGroup, APIVersion, Namespace, Resource, Name这些RBAC鉴权需要用到的资源信息,以及客户端信息等。 4,filters.WithAuthorization根据RequestInfo和RBAC规则,判断请求的合法性,终止或者允许请求继续下去。 代码实现: 1,HandlerChain处理请求 这里可以看到有两个鉴权,一个是pathAuthorizer,一个是RBACAuthorizer,pathAuthorizer定义了哪些URL Path不需要鉴权,RBACAuthorizer顾名思义就是通过RBAC鉴权,后面我们详细介绍这两个鉴权。 var authorizers authorizer.Authorizer switch s.Config.AuthorizationOptions.Mode { case authorization.AlwaysAllow: authorizers = authorizerfactory.NewAlwaysAllowAuthorizer() case authorization.AlwaysDeny: authorizers = authorizerfactory.NewAlwaysDenyAuthorizer() default: fallthrough case authorization.RBAC: excludedPaths := []string{"/oauth/*", "/kapis/config.kubesphere.io/*", "/kapis/version", "/kapis/metrics"} pathAuthorizer, _ := path.NewAuthorizer(excludedPaths) amOperator := am.NewReadOnlyOperator(s.InformerFactory, s.DevopsClient) authorizers = unionauthorizer.New(pathAuthorizer, rbac.NewRBACAuthorizer(amOperator)) } handler = filters.WithAuthorization(handler, authorizers) handler = filters.WithAuthentication(handler, authn) handler = filters.WithRequestInfo(handler, requestInfoResolver) s.

继续阅读

Kubernetes应用系统API用户认证与鉴权 -- 认证篇

前面文章介绍了Kubernetes应用系统用户管理,实现了外部用户存储(LDAP)与Kubernetes用户映射,下面以Kubesphere为参考,一起看一下如何实现Kubernetes应用系统用户的认证。 应用平台用户认证主要实现两个功能: 1,用户身份鉴别 用户身份认证可以通过LDAP进行用户名和密码认证,也可以通过第三方认证服务进行OAuth认证。 2,Token管理 Client端认证通过后,带上获取Token请求API。应用系统根据Token提取用户ID,进行请求权限鉴别。同时应用系统负责Token的生命周期管理。 基本概念: OpenID Connect(OIDC)是基于OAuth 2.0身份认证协议,增加了OAuth 2.0中末定义的规范,例如scope, Claim用户信息字段。 代码实现 基本认证方式 API路由 // legacy auth API legacy := &restful.WebService{} legacy.Path("/aiapis/iam.aiscope/v1alpha2/login"). Consumes(restful.MIME_JSON). Produces(restful.MIME_JSON) legacy.Route(legacy.POST(""). To(handler.login). Deprecate(). Doc("Aiscope APIs support token-based authentication via the Authtoken request header. The POST Login API is used to retrieve the authentication token. After the authentication token is obtained, it must be inserted into the Authtoken header for all requests."). Reads(LoginRequest{}). Returns(http.StatusOK, api.StatusOK, oauth.Token{}). Metadata(restfulspec.

继续阅读

Kubernetes应用平台API开发实战

前面文章介绍了通过基于go-restful框架开发API, client-go生成clientset, informers, listers来读取和写入自定义资源,基于kubebuilder开发CRD资源控制器。今天我们通过一个实例来看一下API整个开发过程。 开发步骤 1,通过kubebuilder来开发CRD控制器 2,通过client-gen,lister-gen,informer-gen生成clientset, informers, listers代码 3,开发models实现CRD资源的kubernets读写操作,读取列表时的排序,分页,过滤 4,开发handler实现CRD资源的API处理 CRD资源的读操作 CRD资源的读操作通过Informer来读取,减少API和Etcd集群的压力。 Informer的主要工作原理为:通过Reflector反射来监听Kubernetes对资源的操作事件,把资源对象和操作类型写入到一个DeltaFIFO的队列中。Reflector消费队列,将资源对象存储到indexer,indexer与Etcd集群的数据完全保持一致。 CRD资源的写操作 CRD资源的写通过client-go的clientset来完成对资源的Create,Update,Patch操作。 代码开发 这里我们需要实现上一篇讲的TrackingServer自定义资源的API。TrackingServer主要用于管理机器学习实验跟踪MLflow在k8s里的实例资源。 创建Informer informerFactories结构体实现了InformerFactory接口,这个接口有两个SharedInformerFactory,一个为Kubernetes资源的informerFactory,一个为本项目自定义资源的aiInformerFactory。aiscopeinformers.NewSharedInformerFactory创建了一个aiInformerFactory实例,这里的NewSharedInformerFactory为代码生成器生成的方法。通过代码生成器创建clientset客户端aiClient,作为参数来创建InformerFactory type InformerFactory interface { KubernetesSharedInformerFactory() k8sinformers.SharedInformerFactory AIScopeSharedInformerFactory() aiscopeinformers.SharedInformerFactory // Start shared informer factory one by one if they are not nil Start(stopCh <-chan struct{}) } type informerFactories struct { informerFactory k8sinformers.SharedInformerFactory aiInformerFactory aiscopeinformers.SharedInformerFactory } func NewInformerFactories(client kubernetes.Interface, aiClient versioned.Interface) InformerFactory { factory := &informerFactories{} if client != nil { factory.informerFactory = k8sinformers.

继续阅读

Kubebuilder开发MLflow实验跟踪控制器

前面文章讲了client-go, go-restful开发Kubernetes应用平台,今天给大家看看在这个应用平台中添加一个自定义资源控制器的开发。 需求场景: 在多租户机器学习平台中,开发一个Kubernetes控制器,实现CRD(自定义资源) TrackingServer的调谐,完成Kubernetes中对应的PersistentVolumeClaim, TLS Secret, Service, Ingress, Deployment资源管理。 功能描述: 1,当CR实例的参数中指定了VolumeSize和StorageClassName,则创建对应的PersistentVolumeClaim用于MLflow的local database sqllite的数据存储目录。当未指定时,不创建或者删除已经创建的PersistentVolumeClaim。 2,当CR实例的参数中指定了Cert和Key数据,则创建对应的TLS类型的Secret,用于Ingress的TLS证书。当未指定时,不创建或删除对应Secret。 3,查找对应命名空间和名称的Secret,如果有Ingress配置对应的TLS证书。 4,根据CR实例的参数管理Service和Deployment的创建和修改。 5,删除CR实例后,对应清理K8S资源。当删除资源时,判断被删除资源是否为CR实例的附属资源。 代码实现: 整个业务代码开发分为几个大的步骤: 1,Kubebuilder生成代码和部署文件 2,在Controller Manager中注册控制器 3,在控制器调谐代码中,实现业务逻辑 Kubebuilder中创建API 指定GVK,这里TrackingServer为我们需要的MLflow资源。 kubebuilder create api --group experiment --version v1alpha2 --kind TrackingServer kubebuilder create api --group experiment --version v1alpha2 --kind JupyterNotebook kubebuilder create api --group experiment --version v1alpha2 --kind CodeServer 自定义资源TrackingServer定义 定义VolumeSize, Cert, Key字段为omitempty,表示非必须字段。 +genclient表示代码生成器生成clientset,informer, lister代码。 printcolumn表示kubectl get资源时展示字段 // TrackingServerSpec defines the desired state of TrackingServer type TrackingServerSpec struct { // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster // Important: Run "make" to regenerate code after modifying this file Size int32 `json:"size"` Image string `json:"image"` S3_ENDPOINT_URL string `json:"s3_endpoint_url"` AWS_ACCESS_KEY string `json:"aws_access_key"` AWS_SECRET_KEY string `json:"aws_secret_key"` ARTIFACT_ROOT string `json:"artifact_root"` BACKEND_URI string `json:"backend_uri"` URL string `json:"url"` VolumeSize string `json:"volumeSize,omitempty"` StorageClassName string `json:"storageClassName,omitempty"` Cert string `json:"cert,omitempty"` Key string `json:"key,omitempty"` } // TrackingServerStatus defines the observed state of TrackingServer type TrackingServerStatus struct { // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster // Important: Run "make" to regenerate code after modifying this file } // +genclient // +kubebuilder:object:root=true // +kubebuilder:subresource:status // +kubebuilder:printcolumn:name="S3_ENDPOINT_URL",type="string",JSONPath=".

继续阅读

go-restful框架开发Kubernetes应用平台

go-restful是一个golang语言实现的RESTful库,Kubernetes APIServer使用它实现RESTful API。下面我们一起简单看一下Kubesphere如何使用go-restful的。 Container Container逻辑上是WebService的集合,功能上可以实现多终端的效果。 它包括一组restful.WebService和一个http.ServeMux对象,使用RouteSelector进行请求派发。 Webservice WebService逻辑上是Route的集合,功能上主要是为一组Route统一设置包括root path,请求响应的数据类型等一些通用的属性。 Route 路由包含两种,一种是标准JSR311接口规范的实现RouterJSR311,一种是快速路由CurlyRouter。 CurlyRouter支持正则表达式和动态参数,相比RouterJSR11更加轻量级,apiserver中使用的就是这种路由。 一种Route的设定包含:请求方法(http Method),请求路径(URL Path),输入输出类型(JSON/YAML)以及对应的回掉函数restful.RouteFunction,响应内容类型(Accept)等。 代码示例:一个API聚合服务,包含两种服务,一种是/api, /apis两个路径请求代理给Kubernetes,另一种是本地注册API提供服务。 定义APIServer结构体 type APIServer struct { ServerCount int Server *http.Server Config *apiserverconfig.Config // webservice container, where all webservice defines container *restful.Container KubernetesClient k8s.Client } APIServer构造函数 初始化kubernets Client和http server func (s *ServerRunOptions) NewAPIServer(stopCh <-chan struct{}) (*apiserver.APIServer, error) { apiServer := &apiserver.APIServer{ Config: s.Config, } kubernetesClient, err := k8s.NewKubernetesClient(s.KubernetesOptions) if err != nil { return nil, err } apiServer.

继续阅读

Kubernetes Operator实现用户管理

在Kubernetes里User只是一个用户身份辨识的ID,没有真正用户管理,k8s一般通过第三方提供用户管理和存储,k8s通过User进行身份验证与权限认证。 Kubernetes用户验证支持X509证书认证,token认证和密码验证几种方式。 RBAC是Kubernetes进行权限控制的方式。用户与角色绑定,赋予角色权限。 今天我们来一起看一下Kubesphere如何通过Operator实现kubernetes用户管理。我们在Kubernetes里创建User自定义资源,使用LDAP存储用户帐号信息。通过Kubernets CertificateSigningRequest请求X509证书,生成Kubeconfig。通过各种自定义Role资源来创建Kubernetes Role与用户绑定,分配用户权限。最终用户通过客户端使用kubeconfig来访问Kubernetes资源。 这个场景不像Dex这种Kubernetes OpenID服务,他不需要在Kubernetes APIServer上进行配置,改变Kubernetes集群的部署配置。 代码主要流程 1, User控制器调谐,创建LDAP用户,创建用户KubeConfig的Configmap 2, 在CreateKubeConfig生成kubeconfig用户信息,创建CertificateSigningRequest 3, 在Informer中监听CertificateSigningRequest事件,Approve请求,更新Configmap中用户kubeconfig的证书 代码实现 主入口,创建Kubernetes集群Client,创建Informer,创建controller,在controller的mgr中添加user, kubeconfig自定义资源控制器。 func run(s *options.AIScopeControllerManagerOptions, ctx context.Context) error { kubernetesClient, err := k8s.NewKubernetesClient(s.KubernetesOptions) if err != nil { klog.Errorf("Failed to create kubernetes clientset %v", err) return err } informerFactory := informers.NewInformerFactories( kubernetesClient.Kubernetes()) mgrOptions := manager.Options{ Port: 8443, } if s.LeaderElect { mgrOptions = manager.Options{ Port: 8443, LeaderElection: s.LeaderElect, LeaderElectionNamespace: "aiscope-system", LeaderElectionID: "aiscope-controller-manager-leader-election", LeaseDuration: &s.

继续阅读

Kubernetes部署存储Rook Ceph

Rook 是一个可以提供 Ceph 集群管理能力的 Operator。Rook 使用 CRD 一个控制器来对 Ceph 之类的资源进行部署和管理。Rook Ceph要求存储设备为块设备,支持分区或者整块硬盘。 | 磁盘分区 sukai@ceph-01:~$ sudo pvcreate /dev/sda WARNING: ext4 signature detected on /dev/sda at offset 1080. Wipe it? [y/n]: y Wiping ext4 signature on /dev/sda. Physical volume "/dev/sda" successfully created. sukai@ceph-01:~$ sudo vgcreate data /dev/sda Volume group "data" successfully created sukai@ceph-01:~$ sudo pvs PV VG Fmt Attr PSize PFree /dev/sda data lvm2 a-- <9.10t <9.10t sukai@ceph-01:~$ sudo vgs VG #PV #LV #SN Attr VSize VFree data 1 0 0 wz--n- <9.

继续阅读

机器学习部署模型服务

当训练好了一个模型,如何对外提供推理服务。Seldon Core是在Kubernetes上部署机器学习模型的流行组件。简单地说,Seldon Core将模型封装成生产级的REST/GRPC微服务。Seldon Core已与Istio、Jeager、Prometheus做了集成,支持灰度发布、A/B测试、链路跟踪、指标监控等。 今天给大家示例的是最简化的使用方式,仅有Seldon Core,无其他开源组件。我只想用Seldon Core来完成我的模型加载和提供API服务。 | 部署Seldon Core Operator 编辑values.yaml,禁用ambassador, istio ambassador: enabled: false istio: enabled: false 因为我的Kubernetes集群版本v1.22.2,所以要修改一下webhook.yaml里的协议版本 sideEffects: None admissionReviewVersions: - v1beta1 | 安装 helm install -n seldon-system seldon-core-operator seldon-core-operator | Prefect工作流 Prefect agent role添加seldon API操作权限 - apiGroups: - machinelearning.seldon.io resources: - seldondeployments verbs: - '*' | 修改模型训练任务 在训练模型任务返回MLflow的run_id,归档模型文件时,不注册模型版本 @task def train_model(data, mlflow_experiment_id, alpha=0.5, l1_ratio=0.5): mlflow.set_tracking_uri(f'http://mlflow.platform.sukai.com/') train, test = train_test_split(data) # The predicted column is "quality" which is a scalar from [3, 9] train_x = train.

继续阅读