golang

kube-ovn实现Kubernetes多租户网络管理

Kubernetes容器平台正在成为越来越多的数据中心基础平台,我们希望Kubernetes能够满足虚拟化平台的一些基本要求,比如实现了多租户的灵活的软件定义网络SDN。工作中一个项目在使用Kubernetes平台,所以考虑通过KubeVirt来管理虚拟机,同时使用kube-ovn来实现多租户网络隔离。下面我们一起来看看如何使用kube-ovn来管理网络。 基本概念 Underlay/Overlay网络 Underlay网络是指传统IT基础设施网络,是由交换机、路由器、负载均衡等设备组成的底层物理网络。Overlay网络是通过网络虚拟化技术,在Underlay网络上构建出的虚拟的逻辑网络。 OVS/OVN Open vSwitch(OVS)是一个多层软件交换机,OVS只里一个单机软件,没有集群的信息。Open Virtual Nework(OVN)提供了一个集中式的OVS控制器,从集群的角度对整个网络设施进行编排。 使用Kubernetes后会发现,Kubernetes网络功能缺少软件定义网络SDN能力,缺少VPC, Subnet, Nat, Route, SecurityGroup等常用功能。Kube-OVN基于OVN为Kubernetes网络提供了网络编排能力。 CNI 容器网络接口(Container Network Interface),由CoreOS提出的一种容器网络规范,主要内容是容器创建时的网络分配,和容器被删除时释放网络资源。CNI让网络层变得可插拔,只要遵循CNI的协议规范,容器管理平台就可以调用CNI插件可执行文件提供网络功能。Kubernetes网络模型采用了CNI容器网络接口规范。 macvlan macvlan是一种Linux内核的网络虚拟化技术,从一个主机接口虚拟出多个虚拟网络接口。macvlan可以在物理网卡构成的父接口上添加子接口,每个子接口都拥有独立的MAC地址和IP地址。容器可以通过绑定子接口,拥有与物理网络通信的能力。这解决了容器接入物理网络需求,比如我们需要通过docker运行gitlab服务,gitlab服务需要用到80,443,22端口,这些常用端口经常会产生冲突,那么我们可以通过docker命令创建一个macvlan驱动类型的网络来拥有独立MAC和IP地址。Kubernetes内置CNI插件包含了macvlan,配置使用macvlan CNI,可以让Kubernetes的Pod使用Underlay网络 。 Kube-OVN Kube-OVN插件将Kubernetes容器网络接入ovs网络。提供了vpc, router, switch, subnet管理能力。 Multus Multus CNI插件提供了Kubernetes Pod添加多块网卡的能力。容器同时接入多个不同的网络,解决了类似Ceph这种区分多个网络应用场景。 IPAM IP地址管理(IP Address Management),分配和维护IP地址,DNS,网关,路由等信息。CNI插件在执行过程中调用相应的IPAM插件,IPAM插件将IP相关信息返回到主CNI插件。IPAM插件减少了CNI插件重复编写相同代码管理IP的工作,而且解决了多个CNI插件统一集中IP管理的需求。 场景需求 1,通过VPC实现网络租户隔离 2,通过NAT网关SNAT访问外网 3,通过NAT网关DNAT暴露端口给外网访问 安装部署 安装Kube-OVN curl -O https://raw.githubusercontent.com/kubeovn/kube-ovn/release-1.10/yamls/crd.yaml curl -O https://raw.githubusercontent.com/kubeovn/kube-ovn/release-1.10/yamls/ovn.yaml curl -O https://raw.githubusercontent.com/kubeovn/kube-ovn/release-1.10/yamls/kube-ovn.yaml curl -O https://raw.githubusercontent.com/kubeovn/kube-ovn/master/charts/templates/kubeovn-crd.yaml sed -i 's/\$addresses/<Node IP>/g' ovn.yaml kubectl label node ubuntuserver1 kube-ovn/role=master kubectl apply -f crd.yaml kubectl apply -f kubeovn-crd.

继续阅读

Clusterpedia安装与使用

今天我们一起来安装一下Clusterpedia,看看Clusterpedia的功能。 安装 Clusterpedia代码目录deploy下提供了kubectl安装Clusterpedia的YAML文件。部署数据库,安装CRD自定义资源,安装apiserver和clustersynchro,注册apiservice。 cd ./deploy/internalstorage/postgres export STORAGE_NODE_NAME=sukai sed "s|__NODE_NAME__|$STORAGE_NODE_NAME|g" `grep __NODE_NAME__ -rl ./templates` > clusterpedia_internalstorage_pv.yaml kubectl create -f . cd ../../.. kubectl apply -f ./deploy/crds kubectl apply -f ./deploy 两个组件的deployment里可以修改镜像地址为: m.daocloud.io/ghcr.io/clusterpedia-io/clusterpedia/clustersynchro-manager:v0.1.0 m.daocloud.io/ghcr.io/clusterpedia-io/clusterpedia/apiserver:v0.1.0 导入集群 创建Clusterpedia的CRD PediaCluster资源实例,syncResources指定Clusterpedia需要同步的资源。 apiVersion: cluster.clusterpedia.io/v1alpha2 kind: PediaCluster metadata: name: cluster-sukai spec: apiserver: "https://192.168.0.111:6443" caData: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUMvakNDQWVhZ0F3SUJBZ0lCQURBTkJna3Foa2lHOXcwQkFRc0ZBREFWTVJNd0VRWURWUVFERXdwcmRXSmwKY201bGRHVnpNQjRYRFRJeU1ESXdOakE1TXpjek1Wb1hEVE15TURJd05EQTVNemN6TVZvd0ZURVRNQkVHQTFVRQpBeE1LYTNWaVpYSnVaWFJsY3pDQ0FTSXdEUVlKS29aSWh2Y05BUUVCQlFBRGdnRVBBRENDQVFvQ2dnRUJBSnUzCjlvS0MwUjVQYkgyUTZCa2ZCSzIwYzRVNkJpQmJpdW14akI5VVBZaExja2VhTkIwVHFLY2xXTjNzV2Y4QVBGM0sKYlFyb1F5UzdhNEVGWVU3WFJRcWFWTHdlSGhWZWZnMU1nZ3gzMXpsNS8wR3NqdFRlc1hyTmR4RXBvTnBvVWtrWAp1Uy8wWXZoNjZSUklPMks5WmNLKytvK1YycDl3UFZLTHYvUm45WDZpRVRlVjJSeCt0RDFXbEpDRSs0UGhublJNCm1jYUJyWkxkazVONGtCb1JDMTV0alhWRkpNSVJIejNaUVhwS1RydzVEZ3pvS0MyUUpCcnl1L2kxS0pSWGNIUm8KcXd6dzExWkRMemFXU3VnQ1hMT3pCeXVvblhoOGVpWW5jbldJa0NKZzlUblczamRXU3hZTHBtZUJFaXd5TDNNegpEWUIxWTVpaVVZV0pvbERCQjNNQ0F3RUFBYU5aTUZjd0RnWURWUjBQQVFIL0JBUURBZ0trTUE4R0ExVWRFd0VCCi93UUZNQU1CQWY4d0hRWURWUjBPQkJZRUZJK2NhY0psbW1oUElPZGJaRjV1LzMzaDVNRUtNQlVHQTFVZEVRUU8KTUF5Q0NtdDFZbVZ5Ym1WMFpYTXdEUVlKS29aSWh2Y05BUUVMQlFBRGdnRUJBQkdVanNacEZuKy9LR2VpRnpUNApKaEtITno3Yzl5NjF3eEUvNzYvcFd2eGU1RVRyT1FWbjdTQk54SUQ1OXNBKzUrcEtyNnk5TUxrUUxDZmhhZFdKCmd4WklEVmlHTmV4ejFsSlNKZ2h0L0lTOUtKbnl5YmQ1QUV4cTZ2ZkxrT1VKZkxaYnJMWUNGYUVIWXg1Q055TXEKUGdtdG1Tb1NlOVVKMGpEZlFvanJGakVIbWJpT0hHV0o3R2dHaStKUzhSbFZwc3pyK215c05FcitiMCtiR2hWWApNR3NDNTVCQndOTVZhSHFXZWZmRjB0ZGlSaVY4SW1xa0loSVdNSXA0RFdvbjBJc2xFYTZ0RWNrTGhaM0dsR2lrCmRvcCtGTmJZRmVZWXpPU05yRkpCZitKSnd4U1lSWGlTTXVCc29nd2NYcFZ1S0lza2hoVjU0dUEwUTcxSUNLN3IKN3VnPQotLS0tLUVORCBDRVJUSUZJQ0FURS0tLS0tCg== tokenData: certData: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSURJVENDQWdtZ0F3SUJBZ0lJV2YrWkJSK1I0SFF3RFFZSktvWklodmNOQVFFTEJRQXdGVEVUTUJFR0ExVUUKQXhNS2EzVmlaWEp1WlhSbGN6QWVGdzB5TWpBeU1EWXdPVE0zTXpGYUZ3MHlNekF5TURZd09UTTNNelJhTURReApGekFWQmdOVkJBb1REbk41YzNSbGJUcHRZWE4wWlhKek1Sa3dGd1lEVlFRREV4QnJkV0psY201bGRHVnpMV0ZrCmJXbHVNSUlCSWpBTkJna3Foa2lHOXcwQkFRRUZBQU9DQVE4QU1JSUJDZ0tDQVFFQXVhOHAvUnBITmUzemhGOWkKZEFZUFlBalJIaTlBYTlGc1FhdzY4NzJoOUhBVVpneUNnU3JxUmIwc1hBOUZ4aSsybHJsSWIzUE9sRGd3bnJwZApacmI1VnQyZHRuVGhXZitJYVV4bGNraGs1VWdNQnUwa1VFVGpUTFZtY1RvczVZWGJWejk5aUEwQ3h4LzViYzcwCnJKRWRyWkJQYngrdFN1Mk5kN1paeTE5SExWYVh6a256R3NRZ1o0Z1piT09oeFVNd201OFd4a2hDeDdyczhLQnIKdmNOWXFVMC9UNXVydFJPcEJabCtYMTFnNGcyeXkwSTdBc2R1cnVCMmhiOG1RUW1MRzg5VmdqZHQzOEdrUTJhMApZMGNLay9aeVQzUVUxeGZkUTlvb2FYbWN1SnBkUmJpeTU5a1VBNzVSc3V2L1V3UXZucDgrUzNLR2dFVkJBTkpqCjFCNTljd0lEQVFBQm8xWXdWREFPQmdOVkhROEJBZjhFQkFNQ0JhQXdFd1lEVlIwbEJBd3dDZ1lJS3dZQkJRVUgKQXdJd0RBWURWUjBUQVFIL0JBSXdBREFmQmdOVkhTTUVHREFXZ0JTUG5HbkNaWnBvVHlEblcyUmVidjk5NGVUQgpDakFOQmdrcWhraUc5dzBCQVFzRkFBT0NBUUVBUk8xVldObW40N1A4ajl3THkrZUdiMVB6WjBnUTg1WlQvOVU1Ck5yVHZ1YXFCVW80bUxvNzBISHRtVDAreVlMSmhWV0kxSW9IT3dqc3gwcUhJWXc3U0lIWS90K3VqOHh4TDJnakgKSVdWT0NwSW5xeDF1TTQwQ0xORGtmbUM1b2JYbEtUbCt0d0hETDBuQzV3am8wdE5TV0x0SERqWjVoRVF2VWlieQoxdmpHZnhDS3YxNXdqVFlvTjZ3OS9nS1ZHQm5scjJjNm1kTmUwSktmUEdHeHRKMmN3R0daQXRYQ1EvcUNRZmpoCnVVM3ZSSTlXSFp5THhJWWF2RjdpRlVlaGdqYVBKc3lwQ0xHejRPVjA3dlFEbHdBQlp3Mjk1WGI3c1FVK1FOMVoKMEwzWHIyVzIydE52ZHBiNlRNSjRMaFN2Z2RLOHFWUDREK0RscWY2ajFUUHFwOFVucGc9PQotLS0tLUVORCBDRVJUSUZJQ0FURS0tLS0tCg== keyData: LS0tLS1CRUdJTiBSU0EgUFJJVkFURSBLRVktLS0tLQpNSUlFb3dJQkFBS0NBUUVBdWE4cC9ScEhOZTN6aEY5aWRBWVBZQWpSSGk5QWE5RnNRYXc2ODcyaDlIQVVaZ3lDCmdTcnFSYjBzWEE5RnhpKzJscmxJYjNQT2xEZ3ducnBkWnJiNVZ0MmR0blRoV2YrSWFVeGxja2hrNVVnTUJ1MGsKVUVUalRMVm1jVG9zNVlYYlZ6OTlpQTBDeHgvNWJjNzBySkVkclpCUGJ4K3RTdTJOZDdaWnkxOUhMVmFYemtuegpHc1FnWjRnWmJPT2h4VU13bTU4V3hraEN4N3JzOEtCcnZjTllxVTAvVDV1cnRST3BCWmwrWDExZzRnMnl5MEk3CkFzZHVydUIyaGI4bVFRbUxHODlWZ2pkdDM4R2tRMmEwWTBjS2svWnlUM1FVMXhmZFE5b29hWG1jdUpwZFJiaXkKNTlrVUE3NVJzdXYvVXdRdm5wOCtTM0tHZ0VWQkFOSmoxQjU5Y3dJREFRQUJBb0lCQUVCcUdxZmFDT0FWaHdmaAp5eGF5ejN5aU1tRkZSUlRpRnFzRm80SFF4REUyL0d5V1pHT0l6cktZdUozTEVvcDVITjlXc1dFd2pIWndzN1VzCnM2QWhVNGdsNDBOYmNwMjAvczZBbVNTM0pvRS9xQ1J5K2NqNnpOdGNob2c3QlQ0dVhIUDg2NEJaK3grMjRPR08KRE9VY2htNGloTnZvNGtYKytMZVJ3NzdBYzhHdkUwYWU0akt6ZHZRb0hXNmNkcGxBWGdqRG1EVGVGOVZ6Q2o1SgplaC90OWQvN0V4by83SXJHdWROWnpwcTRZaktrTUdhdVdwcEt0N0FmUXpVU2pTOEh1R0ROL2VpUDhzRXlLdlZmCmwvVmdMb3FzUlRUUzZZNjFkU1FVaHlWSE80Z1VKamV2VVFyMzJFaDF3R2VxMXVDUE9iUjdzd2pnWjhET2kwcDgKam81eFR2a0NnWUVBOUM4Mm1zeDJjdUhLUjZoS2VYbTdhSFJKR0d4MXY1czVMY2lYN0ljMTltM1NROUZHODFBcAp3VjFBOUxydVhpVnoyTFRHV1ZDY2V1ZzZCc0FpTVNXSkF5eGx4TUhEbFVBbXZzT0dLT2N2ZXdlOHRPaUFlQkU0Clo5WWlUcUovZDdxdmdrN1RFdElDTHRDVStKcXRWZmUxVVFnK3FvUHY3Q09GRHplZ1ZrSTF5cDBDZ1lFQXdxdEsKZ3JTQktjaGQvUnlIdG9sbnlkbzYvZk9aWHRIbHkvcGdXQm9ZekZDcFRra3V3TnFjL2J6cE96dFhzL3VZNTlPaQprWUZJcFVLM0cvVnRNRno0UkY1R25mbnFMK0I4cndjcGNJRjJLSXFWYStsR2h5cy9mNWNveWFURHRDWnlSRTRpCmdEb2gyZmhUcEl0ZHN5QkNNdVZGMFRnNE15N1c5aUVTVlNhZm8wOENnWUFaYmVWSTM2d2lNS05wTFB4OGhCSGgKUWVMdTJUUzEvSXRLMms0QUF1QzZ4aHNVbHZIRm12Nk9OWkR6SzVoeFU0TXArVUdDd2FOYUpWOE5udXF3cFpFTQpOSTV3bkNFckpPQWtFNmFnRWR0ZSs2SktVTUE0UU1yWC9YUGJMbzhKdi9aUklyWldpbXBSeDhVTDBzZmtZUVNQCjZNVGw2eEdNVFBLcGNBaVJreG1ZL1FLQmdRQ3ZqSUNZOWVZMG83ZitkU2Y5ZUZQY042eFRMc1gwT0J5ZW9aOFkKVkJCZ3o2eWVLR2k5Q1dmaGVlWnB2ODRMUkt4VEF3cnJaRWI2b1BzM2YwK0QrWkw1Tkh0Q0l3a0pPOHUwbXlUSApqRGZkdjN1WDRMbjFVdzdrSkpCbnB1bkZINWFUK2xJcWlFSFdxcFhqSUxyU3VoaDRoVUU4dHhJWE5mb3I0dzhCCk10OXJDUUtCZ0VLNVZrM21sK1Bsbm1wVmpHMU5wOUFtSDY1c2N0eGhYb0ZkU2VVSUZJSTZLU3plRUZnTlllc0YKajRzRnBQeFhkNk5IaS8wYlZTckJBejFveXI2cXlXK3Iwa2hyV3krb1NRakFqV3ZQdlNiWEwwNE5YcDdDci81MAp6L29OTDRXUVI0eGJyWHlUU1FhbFp4R3EvOEV0MjM0amdnbTl3UWRhTkR6anNtaEZPQ2hRCi0tLS0tRU5EIFJTQSBQUklWQVRFIEtFWS0tLS0tCg== syncResources: - group: apps resources: - deployments - group: "" resources: - pods 查看集群状态,APIService,api资源包括:CRD和Aggregator两种方式注册的资源。 sukai@sukai:~$ kubectl get pediacluster NAME APISERVER VERSION STATUS cluster-sukai https://192.

继续阅读

开发Kubernetes自定义APIServer

前面文章介绍了DaoCloud开源的Clusterpedia的业务流程和资源同步机制,Clusterpedia将多个Kubernetes集群的指定资源对象同步保存到MySQL数据库并对外提供查询和检索。今天我们一起看一下Clusterpedia如何开发一个Kubernetes自定义的APIServer,将保存在数据库中的多个Kubernetes业务集群的资源对外提供查询和检索的。 本文内容分以下部分: 1,Kubernetes APIServer基本知识 2,自定义APIServer开发 Kubernetes APIServer基本知识 Kubernetes APIServer是一个实现了RESTful API的WebServer。 API类型 core group,/api/v1为前缀的API接口PATH,core group不需要在apiVersion字段中指定,示例:apiVersion: v1。 named groups,REST path为/apis/$GROUP_NAME/$VERSION,指定apiVersion: $GROUP_NAME/$VERSION,示例apiVersion: batch/v1。 暴露系统状态的API,比如/metrics、/healthz等。 扩展机制 为了增加Kubernetes API的扩展性,Kubernetes提供了两种机制:1,APIExtensions,创建自定义资源CRD,处理CRD/CR的REST请求。2,Aggretgator聚合机制,注册APIService资源,APIServer将对应的API GroupVersion请求代理转发到注册的Service上。 委托调用链 APIServer使用委托模式,通过DelegationTarget接口,把Aggretgator、API Server、APIExtensions链式串联起来,对外提供服务。 当请求Kubernetes对象时,如果在Aggregator中找不到,就去KubeAPIServer中找,最后到APIExtensions中找。 Clusterpedia通过这种委托模式,注册Aggregator服务,优先于所在Kubernetes集群的APIServer提供api和apis两个group的服务,来访问存储在数据库中的多个业务集群的Kubernetes资源对象,kubectl也可以直接执行命令进行下游集群的资源对象查询检索操作。 资源对象操作 在APIServer中,API请求资源对象时,先通过RESTStorage进行REST API处理,在RESTStorage中调用etcd store进行数据存储操作。 Clusterpedia中,与Kubernetes APIServer类似,先通过RESTStorage进行REST API处理,在RESTStorage中调用ResourceStorage进行数据库操作。 自定义APIServer开发 Clusterpedia APIServer基本流程 1,config-complete-new模式构造GenericAPIServer实例kubeResourceAPIServer NewDefaultConfig初始化默认配置。BuildHandlerChain中WithRequestInfo处理API请求信息保存到context中,WithPanicRecovery处理崩溃日志并恢复,RemoveFieldSelectorFromRequest处理URL Query。 complete完善配置,wrapRequestInfoResolverForNamespace封装处理RequestInfo解析器。 New从空委托者构造GenericAPIServer。创建discoveryManager提供自动发现API处理,NonGoRestfulMux.Handle注册/api和/apis的自动发现路由。创建resourceHandler提供资源对象请求API处理,NonGoRestfulMux.HandlePrefix注册/api和/apis为前缀PATH的路由。NewClusterResourceController创建ClusterPedia控制器,监听ClusterPedia CRD事件,处理下游集群的资源,discoveryManager读取controller更新的资源信息提供自动发现API服务。 2,BuildHandlerChain 将WithRequestQuery添加到HandlerChain,WithRequestQuery保存URL Query到context 3,构造GenericAPIServer实例genericServer config.GenericConfig.New从kubeResourceAPIServer委托者构造一个新的GenericAPIServer实例genericServer。所以API资源请求调用链是,先在kubeResourceAPIServer找,再到genericServer中找。 3,InstallAPIGroup注册API对象 genericServer中注册API对象:GroupVersion:clusterpedia.io/v1beta1,Resources资源为resources,collectionresources对象。每个资源对应一个REST storage,用于处理对应资源的请求。 4,AddPostStartHookOrDie添加APIServer启动后调用函数 APIServer启动后,启动Informer,同步Cache。 5,PrepareRun() 准备健康状态检查,存活检查 6,Run() 启动运行APIServer clusterpedia-apiserver命令 在opts.Config()中: 1,根据配置的数据库类型和配置项,创建storage接口,用于操作数据库 2,根据服务器配置,生成SSL自签名证书 3,生成genericapiserver默认配置 通过Config->Complete->New模式构造一个ClusterPediaServer实例server server.Run运行ClusterPediaServer func NewClusterPediaServerCommand(ctx context.Context) *cobra.

继续阅读

开发Kubernetes自已的Informer

我们在Kubernetes自定义资源CRD控制器开发中都使用过Informer,Informer主要提供了两个功能:1,同步数据到本地缓存。2,监听Kubernetes资源的事件,根据对应的事件操作类型,触发事先注册好的ResourceEventHandle。 前面文章介绍了DaoCloud开源的Clusterpedia通过Informer机制实现了同步多个Kubernetes集群资源到MySQL等数据库。那么如何开发一个简化的自定义的Informer呢?下面我们一起看一下DaoCloud开源的Clusterpedia如何实现Kubernetes资源版本Informer的。 文章内容分以下几部分: 1,Kubernetes原生的Informer机制与实现 2,Clusterpedia的ResourceVersionInformer与原生的Informer比较 3,ResourceVersionInformer代码实现 Kubernetes原生的Informer机制 Informer组件 Reflector:反射器,通过Kubernetes的List/Watch API监控指定类型的资源对象。 DeltaFIFO Queue:将Reflector监控到的变化的对象存放在这个FIFO队列中。 LocalStore:Informer的本地缓存,缓存Kubernetes资源对象,可以被Lister的List/Get方法访问,减少对APIServer的访问压力。 WorkQueue:DeltaFIFO中的事件更新完Store后保存到WorkQueue中,Controller处理WorkQueue中的资源对象事件调用对应的回调函数。 SharedInformer实现 这里可以看到: 1,NewSharedIndexInformer中,初始化了processor,这个processor用于回调用户注册的事件处理回调函数,NewIndexer初始化了一个indexer,这个indexer是作为informer的store本地缓存。listerWatcher指定ListerWatcher接口lw,这个是Kubernetes的client连接用于监控Kubernetes资源对象。objectType指定监控的Kubernetes资源类型。 2,SharedInformer的Run中,NewDeltaFIFOWithOptions构造了一个DeltaFIFO实例fifo,构造了一个controller,这个controller使用fifo队列,使用listerWatcher监控资源对象,ObjectType指定资源类型,Process指定controller处理队列的函数。s.processor.run启动监听通知,调用用户注册的回调函数处理。 type SharedInformer interface { AddEventHandler(handler ResourceEventHandler) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) GetStore() Store GetController() Controller Run(stopCh <-chan struct{}) HasSynced() bool LastSyncResourceVersion() string SetWatchErrorHandler(handler WatchErrorHandler) error } func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer { realClock := &clock.RealClock{} sharedIndexInformer := &sharedIndexInformer{ processor: &sharedProcessor{clock: realClock}, indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers), listerWatcher: lw, objectType: exampleObject, resyncCheckPeriod: defaultEventHandlerResyncPeriod, defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod, cacheMutationDetector: NewCacheMutationDetector(fmt.

继续阅读

Golang事件队列开发

前面文章介绍了DaoCloud开源的Clusterpedia通过Informer机制监控Kubernetes资源事件,保存事件对象到队列中,等待事件处理函数将Kubernetes资源对象保存到数据库中。今天一起学习一下DaoCloud开源的Clusterpedia如何实现事件队列的。 下面我们文章首先介绍Clusterpedia如何使用EventQueue的,再来介绍EventQueue如何实现的。主要分以下部分: 1,Clusterpedia事件处理函数HandleDeltas,调用informer.handler,这个handler实现了ResourceEventHandler接口,有OnAdd,OnUpdate,OnDelete方法。 2,实现了ResourceEventHandler接口的ResourceSynchro,ResourceSynchro调用EventQueue接口将对象写入队列 3,实现了EventQueue接口的pressurequeue,pressurequeue负责队列的操作。 事件处理函数HandleDeltas Clusterpedia直接基于client-go底层的cache.Controller开发了自己的resourceVersionInformer,cache.Controller的Process函数调用HandleDeltas来处理资源事件。根据cache.Deltas的操作类型,调用informer.storage和informer.handler的对应操作。这里的handler指的是ResourceEventHandler。 func (informer *resourceVersionInformer) HandleDeltas(deltas cache.Deltas) error { for _, d := range deltas { switch d.Type { case cache.Replaced, cache.Added, cache.Updated: version, exists, err := informer.storage.Get(d.Object) if err != nil { return err } if !exists { if err := informer.storage.Add(d.Object); err != nil { return err } informer.handler.OnAdd(d.Object) break } if d.Type == cache.Replaced { if v := compareResourceVersion(d.Object, version); v <= 0 { if v == 0 { informer.

继续阅读

DaoCloud Clusterpedia多Kubernetes集群资源同步与检索

Clusterpedia是DaoCloud开源的支持同步多个集群的指定资源,将资源对象保存到MySQL等数据库,可以通过Clusterpedia检索排序分页资源。 今天我们一起来看一下Clusterpedia的资源同步控制器代码, 了解他的业务流程。 Clusterpedia架构 Clusterpedia在一个Kubernetes集群中,运行Clusterpedia APIServer和ClusterSynchro Manager,提供聚合API,管理和同步多个Kubernetes集群的资源。Clusterpedia所在的集群可理解为管理集群或者控制集群。Clusterpeida将同步的业务集群的资源保存到MySQL/PostgreSQL数据库中,这部分由代码中的StorageFactory实现。 主要业务流程: 1,在管理集群中通过自定义资源PediaCluster CRD来保存、管理和调谐下调业务集群。 2,每一个PediaCluster资源,对应一个ClusterSynchro,也就是一个业务集群会有一个ClusterSynchro实例,用来保存下游业务集群的RestConfig,storage数据库工厂接口,resourceSynchros同步资源GVR对应的ResourceSynchro。 3,每个集群的每个GVR资源对应一个ResourceSynchro,用来保存GVR/GVK信息,事件队列等信息。 4,在PediaCluster资源调谐过程中,主要负责 a,创建对应的ClusterSynchro,将ClusterSynchro中的同步资源对应的Informer启动,将Informer事件调用ResourceSynchro的事件处理函数进行处理。ResourceSynchro实现了ResourceEventHandler接口。 b,ResourceSynchro的事件处理函数,主要实现将对象放入ResourceSynchro队列queue中。 c,开启worker线程池,等待处理队列中的事件,将对象同步到数据库中。 代码 clustersynchro-manager配置项 1,storage.NewStorageFactory生成存储工厂,支持数据库:MySQL, PostgreSQL 2,生成管理集群的Kubeconfig,创建kubernetes以及自定义资源PediaCluster的clientset连接。 func (o *Options) Config() (*config.Config, error) { if err := o.Validate(); err != nil { return nil, err } storagefactory, err := storage.NewStorageFactory(o.Storage.Name, o.Storage.ConfigPath) if err != nil { return nil, err } kubeconfig, err := clientcmd.BuildConfigFromFlags(o.Master, o.Kubeconfig) if err != nil { return nil, err } client, err := clientset.

继续阅读

Golang实现WorkerPool文件上传

在前面文章中介绍了golang开发rpc命令行工具,今天继续后续功能的介绍。 场景描述: 1,客户端运行一个daemon程序,执行文件上传任务,使用boltdb数据库记录任务执行状态。支持继续上传未完成文件。 2,客户端命令行通过rpc调用客户端daemon程序API,进行文件上传任务管理,包括任务创建,查看,启停。 通过这个功能开发,我们将golang并发编程的goroutine协程,channel通道,context上下文, Mutex互斥锁, WaitGroup协程同步等技术得到应用。 基本知识: WaiGroup 等待一组协程执行完成后继续向下执行,WaitGroup内部有一个计数器,从0开始计数,有3个方法:Add(),Done(), Wait()。Add()添加计数,Done()减掉一个计数,Wait()执行阻塞,直到WaitGroup数量变成0。 Select select和channel配合使用,通过select可以监听多个channel的I/O读写事件。 select { case <-ctx.Done(): p.log.Error("the context error \n") return context.Canceled default: } 如果没有default分支,select会阻塞在多个channel上,对多个channel进行监控。如果有default分支,多个channel都没有满足,则执行default分支。 Context Golang的Context称之为上下文,用来跟踪goroutine关系链,传递通知,达到控制他们的目的。主要用法是,传递取消信号,传递数据。 下面是一个传递取消信号的使用过程,首先context.Background()返回一个空的Context,一般用于整个context tree的根节点。context.WithCancel()返回一个ctx可取消的Sub Context,作为Run的参数传入goroutine,这样可以使用ctx跟踪这个Goroutine。cancel()调用Sub Context的取消函数,向关联的Goroutine发送一个"取消"通知。在Run函数中,接收ctx.Done的cancel通知,做相关清理后退出。 ctx, cancel := context.WithCancel(context.Background()) go pool.Run(ctx) cancel() for { select { case <-ctx.Done(): return } } Mutext互斥锁 Mutext互斥锁在同一时间只被一个goroutine访问,不区分读写。有两个方法:Lock()和Unlock()。当一个goroutine申请了Lock(),那么另一个goroutine申请Lock()时会阻塞等待直到Unlock()释放锁。 multipart/form请求 multipart/form请求是http Post方法,可以发送文件和消息,在请求的Header中包含一个特殊头信息Content-Type: multipart/form-data; boundary=,boundary的值为随机计算生成的值,用于分隔上传多个form-data的间隔。 POST /raw/v1alpha2/rawdatas/6214cb78ff3f2903536f5751/images HTTP/1.1 Host: 127.0.0.1:9090 Content-Length: 546 Content-Type: multipart/form-data; boundary=----WebKitFormBoundary7MA4YWxkTrZu0gW ----WebKitFormBoundary7MA4YWxkTrZu0gW Content-Disposition: form-data; name="files"; filename="/C:/Users/ycsk0/Pictures/ubuntu-install.png" Content-Type: image/png (data) ----WebKitFormBoundary7MA4YWxkTrZu0gW Content-Disposition: form-data; name="files"; filename="/C:/Users/ycsk0/Pictures/windows store.

继续阅读

Golang操作BoltDB数据库

boltdb是一个go语言开发的嵌入式kv数据库。 今天我们一起看看golang如何操作BoltDB,再看看Etcd是如何操作BoltDB的,Etcd对BoltDB做了很多优化,复杂度提高很多。 Etcd是一个基于Raft开发的分布式key-value存储。Kubernetes使用Etcd来作为集群数据的存储。Etcd存储层包含预写日志(WAL)、快照(Snapshot)、boltdb。其中WAL与Snapshot实现了故障恢复和数据回滚或重做,让数据尽量不丢失,boltdb则保存了集群元数据和用户写入的数据。 使用场景 在自动驾驶汽车算法模型开发中,需要大量的2D图片和3D点云图片,我们将这些图片上传到打标系统,进行数据标注,标注的数据用于算法模型的数据集。 这些图片文件的上传,需要有一个有状态的客户端来完成文件上传工作,客户端可以在本地创建上传任务,记录上传位置,任务启停等功能。 这里我们使用go来开发客户端,选择boltdb这种简单的嵌入式kv数据库作为存储。 BoltDB基本概念 DB数据库文件 BoltDB的数据库只有一个文件,一个进程打开此文件后,其他进程无法使用,只有等这个进程释放文件锁。对应关系型数据库中database。 Bucket 对应关系型数据库中的table,操作有CreateBucket, CreateBucketIfNotExists, DeleteBucket。 Key/Value键值对 boltdb中键值对都是使用字节数组值进行存储,keys以字节排序的顺序存储在一个bucket中。 Transaction事务 boltdb在某一时刻,只允许一个读写事务或者允许多个只读事务。用户可以通过db的Begin方法启动一个事务,通过Rollback和Commit方法自己控制提交和回滚,Close关闭事务。同时boltdb还提供了内置隐式事务Update, View, Batch方法。 读写事务 db.Update()启动一个读写事务,可重复读的事务。return error,回滚整个修改,return nil提交修改。 只读事务 db.View()打开一个只读事务,无法做写入操作。 批处理读写事务 db.Batch()多次读写事务合并为一次事务,使用上和Update读写事务相同,boltdb自动将其分批,分批写入磁盘减少并发读写事务等待磁盘I/O的开销。 代码学习 普通使用案例 定义Client接口,实现CreateBucketIfNotExists创建Bucket,Put写入更新数据,Delete删除数据,Range查询单条数据或数据列表,ForEach遍历数据。可以看到只使用了两种不同的事务Update和View来完成操作。 主要操作有三种Get, Put, Delete,这里只用到了Put,Delete操作,通过Range获取单条或者多条数据,另外通过Cursor游标,Seek偏移,Next来遍历数据。 package boltdb import ( "bytes" "math" "go.etcd.io/bbolt" bolt "go.etcd.io/bbolt" "go.uber.org/zap" ) type client struct { db *bbolt.DB log *zap.Logger } type Client interface { CreateBucketIfNotExists(bucket Bucket) Put(bucketType Bucket, key, value []byte) Delete(bucketType Bucket, key []byte) Range(bucketType Bucket, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte) ForEach(bucketType Bucket, visitor func(k, v []byte) error) Close() error } func NewClient(path string, log *zap.

继续阅读