golang

Kubernetes资源类型发现

在基于Kubernetes平台开发过程中,经常需要访问不同Kuberntes版本的集群资源,这时就需要去获取所访问集群支持的资源版本信息。下面我们一起来看一下,如何来实现Kubernetes资源类型发现? 文章分为下面几部分为大家介绍: 1,kubectl命令查看Kubernetes集群资源 2,client-go库获取Kubernetes集群资源 3,controller-runtime动态获取Kubernetes集群资源 kubectl命令 kubectl命令行工具提供api-resources,api-versions命令来查看kubernetes集群当前支持的资源类型。 kubectl api-resources查看k8s所有资源和版本列表。 sukai@sukai:~$ kubectl api-resources NAME SHORTNAMES APIVERSION NAMESPACED KIND bindings v1 true Binding componentstatuses cs v1 false ComponentStatus configmaps cm v1 true ConfigMap endpoints ep v1 true Endpoints events ev v1 true Event limitranges limits v1 true LimitRange namespaces ns v1 false Namespace nodes no v1 false Node persistentvolumeclaims pvc v1 true PersistentVolumeClaim persistentvolumes pv v1 false PersistentVolume pods po v1 true Pod .

继续阅读

Clusterpedia安装与使用

今天我们一起来安装一下Clusterpedia,看看Clusterpedia的功能。 安装 Clusterpedia代码目录deploy下提供了kubectl安装Clusterpedia的YAML文件。部署数据库,安装CRD自定义资源,安装apiserver和clustersynchro,注册apiservice。 cd ./deploy/internalstorage/postgres export STORAGE_NODE_NAME=sukai sed "s|__NODE_NAME__|$STORAGE_NODE_NAME|g" `grep __NODE_NAME__ -rl ./templates` > clusterpedia_internalstorage_pv.yaml kubectl create -f . cd ../../.. kubectl apply -f ./deploy/crds kubectl apply -f ./deploy 两个组件的deployment里可以修改镜像地址为: m.daocloud.io/ghcr.io/clusterpedia-io/clusterpedia/clustersynchro-manager:v0.1.0 m.daocloud.io/ghcr.io/clusterpedia-io/clusterpedia/apiserver:v0.1.0 导入集群 创建Clusterpedia的CRD PediaCluster资源实例,syncResources指定Clusterpedia需要同步的资源。 apiVersion: cluster.clusterpedia.io/v1alpha2 kind: PediaCluster metadata: name: cluster-sukai spec: apiserver: "https://192.168.0.111:6443" caData: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUMvakNDQWVhZ0F3SUJBZ0lCQURBTkJna3Foa2lHOXcwQkFRc0ZBREFWTVJNd0VRWURWUVFERXdwcmRXSmwKY201bGRHVnpNQjRYRFRJeU1ESXdOakE1TXpjek1Wb1hEVE15TURJd05EQTVNemN6TVZvd0ZURVRNQkVHQTFVRQpBeE1LYTNWaVpYSnVaWFJsY3pDQ0FTSXdEUVlKS29aSWh2Y05BUUVCQlFBRGdnRVBBRENDQVFvQ2dnRUJBSnUzCjlvS0MwUjVQYkgyUTZCa2ZCSzIwYzRVNkJpQmJpdW14akI5VVBZaExja2VhTkIwVHFLY2xXTjNzV2Y4QVBGM0sKYlFyb1F5UzdhNEVGWVU3WFJRcWFWTHdlSGhWZWZnMU1nZ3gzMXpsNS8wR3NqdFRlc1hyTmR4RXBvTnBvVWtrWAp1Uy8wWXZoNjZSUklPMks5WmNLKytvK1YycDl3UFZLTHYvUm45WDZpRVRlVjJSeCt0RDFXbEpDRSs0UGhublJNCm1jYUJyWkxkazVONGtCb1JDMTV0alhWRkpNSVJIejNaUVhwS1RydzVEZ3pvS0MyUUpCcnl1L2kxS0pSWGNIUm8KcXd6dzExWkRMemFXU3VnQ1hMT3pCeXVvblhoOGVpWW5jbldJa0NKZzlUblczamRXU3hZTHBtZUJFaXd5TDNNegpEWUIxWTVpaVVZV0pvbERCQjNNQ0F3RUFBYU5aTUZjd0RnWURWUjBQQVFIL0JBUURBZ0trTUE4R0ExVWRFd0VCCi93UUZNQU1CQWY4d0hRWURWUjBPQkJZRUZJK2NhY0psbW1oUElPZGJaRjV1LzMzaDVNRUtNQlVHQTFVZEVRUU8KTUF5Q0NtdDFZbVZ5Ym1WMFpYTXdEUVlKS29aSWh2Y05BUUVMQlFBRGdnRUJBQkdVanNacEZuKy9LR2VpRnpUNApKaEtITno3Yzl5NjF3eEUvNzYvcFd2eGU1RVRyT1FWbjdTQk54SUQ1OXNBKzUrcEtyNnk5TUxrUUxDZmhhZFdKCmd4WklEVmlHTmV4ejFsSlNKZ2h0L0lTOUtKbnl5YmQ1QUV4cTZ2ZkxrT1VKZkxaYnJMWUNGYUVIWXg1Q055TXEKUGdtdG1Tb1NlOVVKMGpEZlFvanJGakVIbWJpT0hHV0o3R2dHaStKUzhSbFZwc3pyK215c05FcitiMCtiR2hWWApNR3NDNTVCQndOTVZhSHFXZWZmRjB0ZGlSaVY4SW1xa0loSVdNSXA0RFdvbjBJc2xFYTZ0RWNrTGhaM0dsR2lrCmRvcCtGTmJZRmVZWXpPU05yRkpCZitKSnd4U1lSWGlTTXVCc29nd2NYcFZ1S0lza2hoVjU0dUEwUTcxSUNLN3IKN3VnPQotLS0tLUVORCBDRVJUSUZJQ0FURS0tLS0tCg== tokenData: certData: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSURJVENDQWdtZ0F3SUJBZ0lJV2YrWkJSK1I0SFF3RFFZSktvWklodmNOQVFFTEJRQXdGVEVUTUJFR0ExVUUKQXhNS2EzVmlaWEp1WlhSbGN6QWVGdzB5TWpBeU1EWXdPVE0zTXpGYUZ3MHlNekF5TURZd09UTTNNelJhTURReApGekFWQmdOVkJBb1REbk41YzNSbGJUcHRZWE4wWlhKek1Sa3dGd1lEVlFRREV4QnJkV0psY201bGRHVnpMV0ZrCmJXbHVNSUlCSWpBTkJna3Foa2lHOXcwQkFRRUZBQU9DQVE4QU1JSUJDZ0tDQVFFQXVhOHAvUnBITmUzemhGOWkKZEFZUFlBalJIaTlBYTlGc1FhdzY4NzJoOUhBVVpneUNnU3JxUmIwc1hBOUZ4aSsybHJsSWIzUE9sRGd3bnJwZApacmI1VnQyZHRuVGhXZitJYVV4bGNraGs1VWdNQnUwa1VFVGpUTFZtY1RvczVZWGJWejk5aUEwQ3h4LzViYzcwCnJKRWRyWkJQYngrdFN1Mk5kN1paeTE5SExWYVh6a256R3NRZ1o0Z1piT09oeFVNd201OFd4a2hDeDdyczhLQnIKdmNOWXFVMC9UNXVydFJPcEJabCtYMTFnNGcyeXkwSTdBc2R1cnVCMmhiOG1RUW1MRzg5VmdqZHQzOEdrUTJhMApZMGNLay9aeVQzUVUxeGZkUTlvb2FYbWN1SnBkUmJpeTU5a1VBNzVSc3V2L1V3UXZucDgrUzNLR2dFVkJBTkpqCjFCNTljd0lEQVFBQm8xWXdWREFPQmdOVkhROEJBZjhFQkFNQ0JhQXdFd1lEVlIwbEJBd3dDZ1lJS3dZQkJRVUgKQXdJd0RBWURWUjBUQVFIL0JBSXdBREFmQmdOVkhTTUVHREFXZ0JTUG5HbkNaWnBvVHlEblcyUmVidjk5NGVUQgpDakFOQmdrcWhraUc5dzBCQVFzRkFBT0NBUUVBUk8xVldObW40N1A4ajl3THkrZUdiMVB6WjBnUTg1WlQvOVU1Ck5yVHZ1YXFCVW80bUxvNzBISHRtVDAreVlMSmhWV0kxSW9IT3dqc3gwcUhJWXc3U0lIWS90K3VqOHh4TDJnakgKSVdWT0NwSW5xeDF1TTQwQ0xORGtmbUM1b2JYbEtUbCt0d0hETDBuQzV3am8wdE5TV0x0SERqWjVoRVF2VWlieQoxdmpHZnhDS3YxNXdqVFlvTjZ3OS9nS1ZHQm5scjJjNm1kTmUwSktmUEdHeHRKMmN3R0daQXRYQ1EvcUNRZmpoCnVVM3ZSSTlXSFp5THhJWWF2RjdpRlVlaGdqYVBKc3lwQ0xHejRPVjA3dlFEbHdBQlp3Mjk1WGI3c1FVK1FOMVoKMEwzWHIyVzIydE52ZHBiNlRNSjRMaFN2Z2RLOHFWUDREK0RscWY2ajFUUHFwOFVucGc9PQotLS0tLUVORCBDRVJUSUZJQ0FURS0tLS0tCg== keyData: LS0tLS1CRUdJTiBSU0EgUFJJVkFURSBLRVktLS0tLQpNSUlFb3dJQkFBS0NBUUVBdWE4cC9ScEhOZTN6aEY5aWRBWVBZQWpSSGk5QWE5RnNRYXc2ODcyaDlIQVVaZ3lDCmdTcnFSYjBzWEE5RnhpKzJscmxJYjNQT2xEZ3ducnBkWnJiNVZ0MmR0blRoV2YrSWFVeGxja2hrNVVnTUJ1MGsKVUVUalRMVm1jVG9zNVlYYlZ6OTlpQTBDeHgvNWJjNzBySkVkclpCUGJ4K3RTdTJOZDdaWnkxOUhMVmFYemtuegpHc1FnWjRnWmJPT2h4VU13bTU4V3hraEN4N3JzOEtCcnZjTllxVTAvVDV1cnRST3BCWmwrWDExZzRnMnl5MEk3CkFzZHVydUIyaGI4bVFRbUxHODlWZ2pkdDM4R2tRMmEwWTBjS2svWnlUM1FVMXhmZFE5b29hWG1jdUpwZFJiaXkKNTlrVUE3NVJzdXYvVXdRdm5wOCtTM0tHZ0VWQkFOSmoxQjU5Y3dJREFRQUJBb0lCQUVCcUdxZmFDT0FWaHdmaAp5eGF5ejN5aU1tRkZSUlRpRnFzRm80SFF4REUyL0d5V1pHT0l6cktZdUozTEVvcDVITjlXc1dFd2pIWndzN1VzCnM2QWhVNGdsNDBOYmNwMjAvczZBbVNTM0pvRS9xQ1J5K2NqNnpOdGNob2c3QlQ0dVhIUDg2NEJaK3grMjRPR08KRE9VY2htNGloTnZvNGtYKytMZVJ3NzdBYzhHdkUwYWU0akt6ZHZRb0hXNmNkcGxBWGdqRG1EVGVGOVZ6Q2o1SgplaC90OWQvN0V4by83SXJHdWROWnpwcTRZaktrTUdhdVdwcEt0N0FmUXpVU2pTOEh1R0ROL2VpUDhzRXlLdlZmCmwvVmdMb3FzUlRUUzZZNjFkU1FVaHlWSE80Z1VKamV2VVFyMzJFaDF3R2VxMXVDUE9iUjdzd2pnWjhET2kwcDgKam81eFR2a0NnWUVBOUM4Mm1zeDJjdUhLUjZoS2VYbTdhSFJKR0d4MXY1czVMY2lYN0ljMTltM1NROUZHODFBcAp3VjFBOUxydVhpVnoyTFRHV1ZDY2V1ZzZCc0FpTVNXSkF5eGx4TUhEbFVBbXZzT0dLT2N2ZXdlOHRPaUFlQkU0Clo5WWlUcUovZDdxdmdrN1RFdElDTHRDVStKcXRWZmUxVVFnK3FvUHY3Q09GRHplZ1ZrSTF5cDBDZ1lFQXdxdEsKZ3JTQktjaGQvUnlIdG9sbnlkbzYvZk9aWHRIbHkvcGdXQm9ZekZDcFRra3V3TnFjL2J6cE96dFhzL3VZNTlPaQprWUZJcFVLM0cvVnRNRno0UkY1R25mbnFMK0I4cndjcGNJRjJLSXFWYStsR2h5cy9mNWNveWFURHRDWnlSRTRpCmdEb2gyZmhUcEl0ZHN5QkNNdVZGMFRnNE15N1c5aUVTVlNhZm8wOENnWUFaYmVWSTM2d2lNS05wTFB4OGhCSGgKUWVMdTJUUzEvSXRLMms0QUF1QzZ4aHNVbHZIRm12Nk9OWkR6SzVoeFU0TXArVUdDd2FOYUpWOE5udXF3cFpFTQpOSTV3bkNFckpPQWtFNmFnRWR0ZSs2SktVTUE0UU1yWC9YUGJMbzhKdi9aUklyWldpbXBSeDhVTDBzZmtZUVNQCjZNVGw2eEdNVFBLcGNBaVJreG1ZL1FLQmdRQ3ZqSUNZOWVZMG83ZitkU2Y5ZUZQY042eFRMc1gwT0J5ZW9aOFkKVkJCZ3o2eWVLR2k5Q1dmaGVlWnB2ODRMUkt4VEF3cnJaRWI2b1BzM2YwK0QrWkw1Tkh0Q0l3a0pPOHUwbXlUSApqRGZkdjN1WDRMbjFVdzdrSkpCbnB1bkZINWFUK2xJcWlFSFdxcFhqSUxyU3VoaDRoVUU4dHhJWE5mb3I0dzhCCk10OXJDUUtCZ0VLNVZrM21sK1Bsbm1wVmpHMU5wOUFtSDY1c2N0eGhYb0ZkU2VVSUZJSTZLU3plRUZnTlllc0YKajRzRnBQeFhkNk5IaS8wYlZTckJBejFveXI2cXlXK3Iwa2hyV3krb1NRakFqV3ZQdlNiWEwwNE5YcDdDci81MAp6L29OTDRXUVI0eGJyWHlUU1FhbFp4R3EvOEV0MjM0amdnbTl3UWRhTkR6anNtaEZPQ2hRCi0tLS0tRU5EIFJTQSBQUklWQVRFIEtFWS0tLS0tCg== syncResources: - group: apps resources: - deployments - group: "" resources: - pods 查看集群状态,APIService,api资源包括:CRD和Aggregator两种方式注册的资源。 sukai@sukai:~$ kubectl get pediacluster NAME APISERVER VERSION STATUS cluster-sukai https://192.

继续阅读

开发Kubernetes自定义APIServer

前面文章介绍了DaoCloud开源的Clusterpedia的业务流程和资源同步机制,Clusterpedia将多个Kubernetes集群的指定资源对象同步保存到MySQL数据库并对外提供查询和检索。今天我们一起看一下Clusterpedia如何开发一个Kubernetes自定义的APIServer,将保存在数据库中的多个Kubernetes业务集群的资源对外提供查询和检索的。 本文内容分以下部分: 1,Kubernetes APIServer基本知识 2,自定义APIServer开发 Kubernetes APIServer基本知识 Kubernetes APIServer是一个实现了RESTful API的WebServer。 API类型 core group,/api/v1为前缀的API接口PATH,core group不需要在apiVersion字段中指定,示例:apiVersion: v1。 named groups,REST path为/apis/$GROUP_NAME/$VERSION,指定apiVersion: $GROUP_NAME/$VERSION,示例apiVersion: batch/v1。 暴露系统状态的API,比如/metrics、/healthz等。 扩展机制 为了增加Kubernetes API的扩展性,Kubernetes提供了两种机制:1,APIExtensions,创建自定义资源CRD,处理CRD/CR的REST请求。2,Aggretgator聚合机制,注册APIService资源,APIServer将对应的API GroupVersion请求代理转发到注册的Service上。 委托调用链 APIServer使用委托模式,通过DelegationTarget接口,把Aggretgator、API Server、APIExtensions链式串联起来,对外提供服务。 当请求Kubernetes对象时,如果在Aggregator中找不到,就去KubeAPIServer中找,最后到APIExtensions中找。 Clusterpedia通过这种委托模式,注册Aggregator服务,优先于所在Kubernetes集群的APIServer提供api和apis两个group的服务,来访问存储在数据库中的多个业务集群的Kubernetes资源对象,kubectl也可以直接执行命令进行下游集群的资源对象查询检索操作。 资源对象操作 在APIServer中,API请求资源对象时,先通过RESTStorage进行REST API处理,在RESTStorage中调用etcd store进行数据存储操作。 Clusterpedia中,与Kubernetes APIServer类似,先通过RESTStorage进行REST API处理,在RESTStorage中调用ResourceStorage进行数据库操作。 自定义APIServer开发 Clusterpedia APIServer基本流程 1,config-complete-new模式构造GenericAPIServer实例kubeResourceAPIServer NewDefaultConfig初始化默认配置。BuildHandlerChain中WithRequestInfo处理API请求信息保存到context中,WithPanicRecovery处理崩溃日志并恢复,RemoveFieldSelectorFromRequest处理URL Query。 complete完善配置,wrapRequestInfoResolverForNamespace封装处理RequestInfo解析器。 New从空委托者构造GenericAPIServer。创建discoveryManager提供自动发现API处理,NonGoRestfulMux.Handle注册/api和/apis的自动发现路由。创建resourceHandler提供资源对象请求API处理,NonGoRestfulMux.HandlePrefix注册/api和/apis为前缀PATH的路由。NewClusterResourceController创建ClusterPedia控制器,监听ClusterPedia CRD事件,处理下游集群的资源,discoveryManager读取controller更新的资源信息提供自动发现API服务。 2,BuildHandlerChain 将WithRequestQuery添加到HandlerChain,WithRequestQuery保存URL Query到context 3,构造GenericAPIServer实例genericServer config.GenericConfig.New从kubeResourceAPIServer委托者构造一个新的GenericAPIServer实例genericServer。所以API资源请求调用链是,先在kubeResourceAPIServer找,再到genericServer中找。 3,InstallAPIGroup注册API对象 genericServer中注册API对象:GroupVersion:clusterpedia.io/v1beta1,Resources资源为resources,collectionresources对象。每个资源对应一个REST storage,用于处理对应资源的请求。 4,AddPostStartHookOrDie添加APIServer启动后调用函数 APIServer启动后,启动Informer,同步Cache。 5,PrepareRun() 准备健康状态检查,存活检查 6,Run() 启动运行APIServer clusterpedia-apiserver命令 在opts.Config()中: 1,根据配置的数据库类型和配置项,创建storage接口,用于操作数据库 2,根据服务器配置,生成SSL自签名证书 3,生成genericapiserver默认配置 通过Config->Complete->New模式构造一个ClusterPediaServer实例server server.Run运行ClusterPediaServer func NewClusterPediaServerCommand(ctx context.Context) *cobra.

继续阅读

开发Kubernetes自已的Informer

我们在Kubernetes自定义资源CRD控制器开发中都使用过Informer,Informer主要提供了两个功能:1,同步数据到本地缓存。2,监听Kubernetes资源的事件,根据对应的事件操作类型,触发事先注册好的ResourceEventHandle。 前面文章介绍了DaoCloud开源的Clusterpedia通过Informer机制实现了同步多个Kubernetes集群资源到MySQL等数据库。那么如何开发一个简化的自定义的Informer呢?下面我们一起看一下DaoCloud开源的Clusterpedia如何实现Kubernetes资源版本Informer的。 文章内容分以下几部分: 1,Kubernetes原生的Informer机制与实现 2,Clusterpedia的ResourceVersionInformer与原生的Informer比较 3,ResourceVersionInformer代码实现 Kubernetes原生的Informer机制 Informer组件 Reflector:反射器,通过Kubernetes的List/Watch API监控指定类型的资源对象。 DeltaFIFO Queue:将Reflector监控到的变化的对象存放在这个FIFO队列中。 LocalStore:Informer的本地缓存,缓存Kubernetes资源对象,可以被Lister的List/Get方法访问,减少对APIServer的访问压力。 WorkQueue:DeltaFIFO中的事件更新完Store后保存到WorkQueue中,Controller处理WorkQueue中的资源对象事件调用对应的回调函数。 SharedInformer实现 这里可以看到: 1,NewSharedIndexInformer中,初始化了processor,这个processor用于回调用户注册的事件处理回调函数,NewIndexer初始化了一个indexer,这个indexer是作为informer的store本地缓存。listerWatcher指定ListerWatcher接口lw,这个是Kubernetes的client连接用于监控Kubernetes资源对象。objectType指定监控的Kubernetes资源类型。 2,SharedInformer的Run中,NewDeltaFIFOWithOptions构造了一个DeltaFIFO实例fifo,构造了一个controller,这个controller使用fifo队列,使用listerWatcher监控资源对象,ObjectType指定资源类型,Process指定controller处理队列的函数。s.processor.run启动监听通知,调用用户注册的回调函数处理。 type SharedInformer interface { AddEventHandler(handler ResourceEventHandler) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) GetStore() Store GetController() Controller Run(stopCh <-chan struct{}) HasSynced() bool LastSyncResourceVersion() string SetWatchErrorHandler(handler WatchErrorHandler) error } func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer { realClock := &clock.RealClock{} sharedIndexInformer := &sharedIndexInformer{ processor: &sharedProcessor{clock: realClock}, indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers), listerWatcher: lw, objectType: exampleObject, resyncCheckPeriod: defaultEventHandlerResyncPeriod, defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod, cacheMutationDetector: NewCacheMutationDetector(fmt.

继续阅读

Golang事件队列开发

前面文章介绍了DaoCloud开源的Clusterpedia通过Informer机制监控Kubernetes资源事件,保存事件对象到队列中,等待事件处理函数将Kubernetes资源对象保存到数据库中。今天一起学习一下DaoCloud开源的Clusterpedia如何实现事件队列的。 下面我们文章首先介绍Clusterpedia如何使用EventQueue的,再来介绍EventQueue如何实现的。主要分以下部分: 1,Clusterpedia事件处理函数HandleDeltas,调用informer.handler,这个handler实现了ResourceEventHandler接口,有OnAdd,OnUpdate,OnDelete方法。 2,实现了ResourceEventHandler接口的ResourceSynchro,ResourceSynchro调用EventQueue接口将对象写入队列 3,实现了EventQueue接口的pressurequeue,pressurequeue负责队列的操作。 事件处理函数HandleDeltas Clusterpedia直接基于client-go底层的cache.Controller开发了自己的resourceVersionInformer,cache.Controller的Process函数调用HandleDeltas来处理资源事件。根据cache.Deltas的操作类型,调用informer.storage和informer.handler的对应操作。这里的handler指的是ResourceEventHandler。 func (informer *resourceVersionInformer) HandleDeltas(deltas cache.Deltas) error { for _, d := range deltas { switch d.Type { case cache.Replaced, cache.Added, cache.Updated: version, exists, err := informer.storage.Get(d.Object) if err != nil { return err } if !exists { if err := informer.storage.Add(d.Object); err != nil { return err } informer.handler.OnAdd(d.Object) break } if d.Type == cache.Replaced { if v := compareResourceVersion(d.Object, version); v <= 0 { if v == 0 { informer.

继续阅读

DaoCloud Clusterpedia多Kubernetes集群资源同步与检索

Clusterpedia是DaoCloud开源的支持同步多个集群的指定资源,将资源对象保存到MySQL等数据库,可以通过Clusterpedia检索排序分页资源。 今天我们一起来看一下Clusterpedia的资源同步控制器代码, 了解他的业务流程。 Clusterpedia架构 Clusterpedia在一个Kubernetes集群中,运行Clusterpedia APIServer和ClusterSynchro Manager,提供聚合API,管理和同步多个Kubernetes集群的资源。Clusterpedia所在的集群可理解为管理集群或者控制集群。Clusterpeida将同步的业务集群的资源保存到MySQL/PostgreSQL数据库中,这部分由代码中的StorageFactory实现。 主要业务流程: 1,在管理集群中通过自定义资源PediaCluster CRD来保存、管理和调谐下调业务集群。 2,每一个PediaCluster资源,对应一个ClusterSynchro,也就是一个业务集群会有一个ClusterSynchro实例,用来保存下游业务集群的RestConfig,storage数据库工厂接口,resourceSynchros同步资源GVR对应的ResourceSynchro。 3,每个集群的每个GVR资源对应一个ResourceSynchro,用来保存GVR/GVK信息,事件队列等信息。 4,在PediaCluster资源调谐过程中,主要负责 a,创建对应的ClusterSynchro,将ClusterSynchro中的同步资源对应的Informer启动,将Informer事件调用ResourceSynchro的事件处理函数进行处理。ResourceSynchro实现了ResourceEventHandler接口。 b,ResourceSynchro的事件处理函数,主要实现将对象放入ResourceSynchro队列queue中。 c,开启worker线程池,等待处理队列中的事件,将对象同步到数据库中。 代码 clustersynchro-manager配置项 1,storage.NewStorageFactory生成存储工厂,支持数据库:MySQL, PostgreSQL 2,生成管理集群的Kubeconfig,创建kubernetes以及自定义资源PediaCluster的clientset连接。 func (o *Options) Config() (*config.Config, error) { if err := o.Validate(); err != nil { return nil, err } storagefactory, err := storage.NewStorageFactory(o.Storage.Name, o.Storage.ConfigPath) if err != nil { return nil, err } kubeconfig, err := clientcmd.BuildConfigFromFlags(o.Master, o.Kubeconfig) if err != nil { return nil, err } client, err := clientset.

继续阅读

Golang实现WorkerPool文件上传

在前面文章中介绍了golang开发rpc命令行工具,今天继续后续功能的介绍。 场景描述: 1,客户端运行一个daemon程序,执行文件上传任务,使用boltdb数据库记录任务执行状态。支持继续上传未完成文件。 2,客户端命令行通过rpc调用客户端daemon程序API,进行文件上传任务管理,包括任务创建,查看,启停。 通过这个功能开发,我们将golang并发编程的goroutine协程,channel通道,context上下文, Mutex互斥锁, WaitGroup协程同步等技术得到应用。 基本知识: WaiGroup 等待一组协程执行完成后继续向下执行,WaitGroup内部有一个计数器,从0开始计数,有3个方法:Add(),Done(), Wait()。Add()添加计数,Done()减掉一个计数,Wait()执行阻塞,直到WaitGroup数量变成0。 Select select和channel配合使用,通过select可以监听多个channel的I/O读写事件。 select { case <-ctx.Done(): p.log.Error("the context error \n") return context.Canceled default: } 如果没有default分支,select会阻塞在多个channel上,对多个channel进行监控。如果有default分支,多个channel都没有满足,则执行default分支。 Context Golang的Context称之为上下文,用来跟踪goroutine关系链,传递通知,达到控制他们的目的。主要用法是,传递取消信号,传递数据。 下面是一个传递取消信号的使用过程,首先context.Background()返回一个空的Context,一般用于整个context tree的根节点。context.WithCancel()返回一个ctx可取消的Sub Context,作为Run的参数传入goroutine,这样可以使用ctx跟踪这个Goroutine。cancel()调用Sub Context的取消函数,向关联的Goroutine发送一个"取消"通知。在Run函数中,接收ctx.Done的cancel通知,做相关清理后退出。 ctx, cancel := context.WithCancel(context.Background()) go pool.Run(ctx) cancel() for { select { case <-ctx.Done(): return } } Mutext互斥锁 Mutext互斥锁在同一时间只被一个goroutine访问,不区分读写。有两个方法:Lock()和Unlock()。当一个goroutine申请了Lock(),那么另一个goroutine申请Lock()时会阻塞等待直到Unlock()释放锁。 multipart/form请求 multipart/form请求是http Post方法,可以发送文件和消息,在请求的Header中包含一个特殊头信息Content-Type: multipart/form-data; boundary=,boundary的值为随机计算生成的值,用于分隔上传多个form-data的间隔。 POST /raw/v1alpha2/rawdatas/6214cb78ff3f2903536f5751/images HTTP/1.1 Host: 127.0.0.1:9090 Content-Length: 546 Content-Type: multipart/form-data; boundary=----WebKitFormBoundary7MA4YWxkTrZu0gW ----WebKitFormBoundary7MA4YWxkTrZu0gW Content-Disposition: form-data; name="files"; filename="/C:/Users/ycsk0/Pictures/ubuntu-install.png" Content-Type: image/png (data) ----WebKitFormBoundary7MA4YWxkTrZu0gW Content-Disposition: form-data; name="files"; filename="/C:/Users/ycsk0/Pictures/windows store.

继续阅读

Golang操作BoltDB数据库

boltdb是一个go语言开发的嵌入式kv数据库。 今天我们一起看看golang如何操作BoltDB,再看看Etcd是如何操作BoltDB的,Etcd对BoltDB做了很多优化,复杂度提高很多。 Etcd是一个基于Raft开发的分布式key-value存储。Kubernetes使用Etcd来作为集群数据的存储。Etcd存储层包含预写日志(WAL)、快照(Snapshot)、boltdb。其中WAL与Snapshot实现了故障恢复和数据回滚或重做,让数据尽量不丢失,boltdb则保存了集群元数据和用户写入的数据。 使用场景 在自动驾驶汽车算法模型开发中,需要大量的2D图片和3D点云图片,我们将这些图片上传到打标系统,进行数据标注,标注的数据用于算法模型的数据集。 这些图片文件的上传,需要有一个有状态的客户端来完成文件上传工作,客户端可以在本地创建上传任务,记录上传位置,任务启停等功能。 这里我们使用go来开发客户端,选择boltdb这种简单的嵌入式kv数据库作为存储。 BoltDB基本概念 DB数据库文件 BoltDB的数据库只有一个文件,一个进程打开此文件后,其他进程无法使用,只有等这个进程释放文件锁。对应关系型数据库中database。 Bucket 对应关系型数据库中的table,操作有CreateBucket, CreateBucketIfNotExists, DeleteBucket。 Key/Value键值对 boltdb中键值对都是使用字节数组值进行存储,keys以字节排序的顺序存储在一个bucket中。 Transaction事务 boltdb在某一时刻,只允许一个读写事务或者允许多个只读事务。用户可以通过db的Begin方法启动一个事务,通过Rollback和Commit方法自己控制提交和回滚,Close关闭事务。同时boltdb还提供了内置隐式事务Update, View, Batch方法。 读写事务 db.Update()启动一个读写事务,可重复读的事务。return error,回滚整个修改,return nil提交修改。 只读事务 db.View()打开一个只读事务,无法做写入操作。 批处理读写事务 db.Batch()多次读写事务合并为一次事务,使用上和Update读写事务相同,boltdb自动将其分批,分批写入磁盘减少并发读写事务等待磁盘I/O的开销。 代码学习 普通使用案例 定义Client接口,实现CreateBucketIfNotExists创建Bucket,Put写入更新数据,Delete删除数据,Range查询单条数据或数据列表,ForEach遍历数据。可以看到只使用了两种不同的事务Update和View来完成操作。 主要操作有三种Get, Put, Delete,这里只用到了Put,Delete操作,通过Range获取单条或者多条数据,另外通过Cursor游标,Seek偏移,Next来遍历数据。 package boltdb import ( "bytes" "math" "go.etcd.io/bbolt" bolt "go.etcd.io/bbolt" "go.uber.org/zap" ) type client struct { db *bbolt.DB log *zap.Logger } type Client interface { CreateBucketIfNotExists(bucket Bucket) Put(bucketType Bucket, key, value []byte) Delete(bucketType Bucket, key []byte) Range(bucketType Bucket, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte) ForEach(bucketType Bucket, visitor func(k, v []byte) error) Close() error } func NewClient(path string, log *zap.

继续阅读