网站建设 南通,太原在线制作网站,qq推广文案怎么写,废料回收网站建设简介 在此之前我们先来了解下kubernetes的两个概念声明式API和控制器模式。声明式API核心原理就是当用户向kubernetes提交了一个API对象的描述后#xff0c;Kubernetes会负责为你保证整个集群里各项资源的状态#xff0c;都与你的API对象… 简介 在此之前我们先来了解下kubernetes的两个概念声明式API和控制器模式。声明式API核心原理就是当用户向kubernetes提交了一个API对象的描述后Kubernetes会负责为你保证整个集群里各项资源的状态都与你的API对象描述的需求相一致。而对于每个保存在etcd里的API对象kubernetes都通过启动一种叫做控制器模式的无限循环不断检查然后调谐最后确保整个集群的状态与这个API对象的描述一致。在kubernetes中我们所熟悉的deployment\statefulset都是其自带的一些控制器。 k8s提供了强大了扩展能力来操作它里面的资源这些资源可以是内置资源比如pod、node等也可以自定义资源CRD。 对于自定义资源稍微麻烦些我们需要做如下几步 自定义CRD通过k8s提供的代码生成器自动生成基础代码编写自己的业务逻辑也就是自定义controller来操作CRD 对于内置资源我们就只需第三步就可以了。 注CRD自定义controller又被称作operatorhttps://github.com/operator-framework/awesome-operators hrefhttps://github.com/operator-framework/awesome-operators%2a%2a%2ahttps://github.com/operator-framework/awesome-operators* 自定义CRD 比如我们想模仿pod的yaml那样自定义一个资源 apiVersion: xxxx/v1beta1
kind: TsTest
metadata:name: tstest-sample
spec:# Add fields heredeploymentName: tstest-sample-deploymentreplicas: 2 如果我们直接kubectl create 这个yaml文件会报错因为k8s不认识这个资源那怎样才能让k8s识别我们的自定义资源呢需要像下面这样创建一个自定义资源描述文件标准格式如下 apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:# 名字必需与下面的 spec 字段匹配并且格式为 名称的复数形式.组名name: tstests.stable.example.com
spec:# 组名称用于 REST API: /apis/组/版本group: stable.example.com# 列举此 CustomResourceDefinition 所支持的版本versions:- name: v1# 每个版本都可以通过 served 标志来独立启用或禁止served: true# 其中一个且只有一个版本必需被标记为存储版本storage: trueschema:openAPIV3Schema:type: objectproperties:spec:type: objectproperties:cronSpec:type: stringimage:type: stringreplicas:type: integer# 可以是 Namespaced 或 Clusterscope: Namespacednames:# 名称的复数形式用于 URL/apis/组/版本/名称的复数形式plural: tstests# 名称的单数形式作为命令行使用时和显示时的别名singular: tstest# kind 通常是单数形式的驼峰编码CamelCased形式。你的资源清单会使用这一形式。kind: TsTest# shortNames 允许你在命令行使用较短的字符串来匹配资源shortNames:- ts 上面就定义了一个叫TsTest的crd然后通过kubectl命令创建它此时我们再次执行我们最开始定义的那个资源文件就不会报错了并且也可以像pod那样CRUD操作了但是上面的那些都只是让k8s能够识别我们自定义的这个资源当我们通过yaml文件创建这个资源的时候k8s也只是将这个资源记录在了etcd中了而已没有触发任何的业务逻辑。 自定义controller k8s提供了一个专门的client-go库来简化开发者扩展k8s的代码编写编写自定义controller也是基于client-go库因此实现自定义controller大致就是 第1步是listwatch的初始化主要是返回一个针对某类资源的ListFunc和WatchFunc。第2步是informer的初始化新建Indexer并将上述Listwatcher一同放入informer结构中第3步是给informer添加AddEventHandler通常包含add update delete并根据情况引入workqueue可是默认的也可以是限速、延时等队列第4步是启动informer.Run()然后等待同步完成第5步是worker的启动监听退出信号等待退出。 其实一个controller就是一个生产者和消费者模式上面的1-4对应的就是生产者操作5步对应的就是消费者操作。 下面以简单的示例代码来看 //Create the endpoints watcher,endpointsQueue,endpointController
endpointsListWatcher : cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(),endpoints, v1.NamespaceAll, fields.Everything())
endpointsQueue : workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
endpointsIndexer, endpointsInformer : cache.NewIndexerInformer(endpointsListWatcher, amp;v1.Endpoints{}, ReSyncPeriod,cache.ResourceEventHandlerFuncs{AddFunc: c.addEndpoints,DeleteFunc: c.deleteEndpoints,UpdateFunc: c.updateEndpoints,}, cache.Indexers{})//Create the configmap watcher,endpointsQueue,endpointController
configmapListWatcher : cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(),configmaps, v1.NamespaceAll, fields.Everything())
configmapQueue : workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
configmapIndexer, configmapInformer : cache.NewIndexerInformer(configmapListWatcher, amp;v1.ConfigMap{}, ReSyncPeriod,cache.ResourceEventHandlerFuncs{AddFunc: c.addConfigMap,DeleteFunc: c.deleteConfigMap,UpdateFunc: c.updateConfigMap,}, cache.Indexers{})c.endpointsQueue endpointsQueue
c.endpointsIndexer endpointsIndexer
c.endpointsInformer endpointsInformerc.configmapQueue configmapQueue
c.configmapIndexer configmapIndexer
c.configmapInformer configmapInformer 上面的代码就是对应1-2步 go c.endpointsInformer.Run(stopCh)
go c.configmapInformer.Run(stopCh)
go c.informerFactory.Start(stopCh)
go c.stsplusInformerFactory.Start(stopCh)//Wait for all involved caches to be synced, before processing items from the endpointsQueue is started
//if !cache.WaitForCacheSync(stopCh, c.endpointsInformer.HasSynced, c.configmapInformer.HasSynced) {
if !cache.WaitForCacheSync(stopCh, c.endpointsInformer.HasSynced, c.configmapInformer.HasSynced, c.podListerSynced, c.stsplusListerSynced) {runtime.HandleError(fmt.Errorf(Timed out waiting for caches to sync))return
} 上面这段代码对应的就是3-4步。 for i : 0; i threadiness; i {go wait.Until(c.runEndPointsProcessWorker, time.Second, stopCh)go wait.Until(c.runConfigmapProcessWorker, time.Second, stopCh)
}-stopChfunc (c *Controller) processEndPointsEvents() bool {seelog.Infof([processEndPointsEvents]: start to processEndPointsEvents)endpointsKey, endpointsQuit : c.endpointsQueue.Get()//Wait until there is a new item in the working endpointsQueuefor ;!endpointsQuit; endpointsKey, endpointsQuit c.endpointsQueue.Get(){defer c.endpointsQueue.Done(endpointsKey)//Handle the error if something went wrong during the execution of the updateRoute methodgo c.handleRoute(endpointsKey.(string))//defer c.handleErr(endpointsErr, endpointsKey)//c.endpointsQueue.Done(endpointsKey)}return true
} 上面这段代码对应的就是5步。 到目前为止一个自定义controller的模式化代码就完成了剩下的就是具体的业务代码了因此可以看出依靠k8s提供的这个client-go库真正的做到了开发者只需关注业务开发基础相关的代码都已经被封装了。 源码分析 代码太多后面有时间再分析吧先上个总结 Reflector使用ListAndWatch方法先从apiserver中list某类资源的所有实例拿到对象的最新版本然后用watch方法监听该resourceversion之后的所有变化若中途出现异常reflector则会从断开的resourceversion处重新监听所有变化一旦有Add、Del、Update动作Reflector会收到更新通知该事件及它所对应的API对象这个组合被称为增量Delta,它会被放进DeltaFIFO中Informer会不断从这个DeltaFIFO中读取增量每拿出一个对象Informer就会判断这个增量的事件类型然后创建或更新本地的缓存。DeltaFIFO再pop这个事件到controller中controller会调用事先注册到ResourceEventHandler回调函数进行处理。 【Kubebuilder】 因为对于k8s内置的资源来说client-go已经提供了对应的infomer、cache、listenAndWatch等操作但是对于自定义资源来说如果要编写自定义controller这些都得自己编写这些样板代码基于此社区针对k8s的api扩展提供了一个通用的工具来自动生成脚手架这个工具就是Kubebuilder Kubebuilder 是一个基于 CRD 来构建 Kubernetes API 的框架可以使用 CRD 来构建 API、Controller 和 Admission Webhook。Kubebuilder的工作流程如下 创建一个新的工程目录创建一个或多个资源 API CRD 然后将字段添加到资源在控制器中实现协调循环reconcile loopwatch 额外的资源在集群中运行测试自动安装 CRD 并自动启动控制器更新引导集成测试测试新字段和业务逻辑使用用户提供的 Dockerfile 构建和发布容器。