建站工具搭建前台网站,西安发布信息的平台,wordpress 插件 主体,合肥网站建设百姓网1 什么是Controller#xff1f;
kubernetes采用了声明式API#xff0c;与声明式API相对应的是命令式API#xff1a;
声明式API#xff1a;用户只需要告诉期望达到的结果#xff0c;系统自动去完成用户的期望命令式API#xff1a;用户需要关注过程#xff0c;通过命令一…1 什么是Controller
kubernetes采用了声明式API与声明式API相对应的是命令式API
声明式API用户只需要告诉期望达到的结果系统自动去完成用户的期望命令式API用户需要关注过程通过命令一步一步完成用户的需求
因此用户向k8s提交的yaml文件中最重要的部分就是spec相当于就是用户期望的结果而使用-o yaml选项查看时还有一个很重要的部分就是status它表示的就是当前状态因此k8s主要任务就是完成status-spec的转变。这项工作就是Controller(控制器)完成的。
对于不同的资源控制逻辑是不一样的因此就有很多Controller例如DeploymentController负责将Deployment的status向spec进行转变ReplicaSetController负责将ReplicaSet的status向spec进行转变。
从上面可以看出Controller的工作方式如下
监听资源变化得到资源的当前状态status和期望状态spec执行逻辑使得status-spec
下面以Deployment的创建操作为例说明整个流程
通过上图重新复习下各组件的工作方式
apiserver为其他组件提供接口并且所有的组件都通过apiserver进行交互etcd存储集群的资源对象Controller Manager管理控制器Watch - Analyze - Act监听资源的变化分析出spec和status的差别执行操作使得status向spec转变Scheduler监听资源的变化如果发现未调度的Pod通过一定的策略选择出Node设置Pod的Node字段Kubelet监听调度给当前Node的Pod并执行对应的操作
可以发现除了apiserver和etcd其他组件都可以称为Controller。
2 Controller的实现
知道了Controller的工作方式如果是我们自己实现Controller可以会这样来实现 Controller直接通过Apiserver的接口监控对应资源的变化当资源发生变化时直接执行对应的业务逻辑也就是调协循环。
这样会有啥问题呢
当集群中Node很多时就会有很多kubelet监控Pod的状态变化而所有的监听操作都需要通过apiserver那么apiserver的压力就会很大就会造成集群的不稳定。
当然其他资源(例如Pod或者服务)很多时同样会造成集群不稳定。
因此k8s的client-go(client-go)库采用了另外的设计 client-go components
Reflector对特定类型的资源执行ListAndWatch当监听到资源变更时通过API获取最新的资源对象然后将它放到Delta Fifo queue队列中Informer从Delta Fifo queue队列中弹出对象然后调用Indexer放到Store里面同时调用用户提交的回调函数(ResourceEventHandler)Indexer用于操作Store中的对象
Custom Controller components
Informer Reference和Indexer Reference都是对client-go中对象的引用用户控制器可以通过cache库直接创建或者使用Factory工厂函数创建ResourceEventHandler用户控制器接收对象的回调函数一般来说里面的逻辑就是获取对象的key然后将key写入WorkQueueWorkQueue用户控制器创建的队列负责存储用户控制器需要处理的对象的keyProcess Item从WorkQueue中读取key通过key获取对应的对象
上图是通常会给出的关于Controller的实际实现的逻辑初看还是挺复杂的大致的模块和功能如下 于是Controller实现的步骤如下
获取Informer和Indexer的引用指定要监控变更的资源类型注册ResourceEventHandler并创建WorkQueue用上述的3个对象初始化我们自己的Controller编写Process Item Loop从WorkQueue中读取key然后执行我们自己的业务逻辑
因此整个Controller我们需要注入的逻辑只有2个部分其他都是相对固定的
ResourceEventHandlerProcess Item
3 Controller的使用
上面介绍了k8s中的Controller的实现而要使用
下面对client-go中的workqueue的例子进行分析
workqueue example by client-go
type Controller struct {indexer cache.Indexer // Indexer缓存的索引queue workqueue.RateLimitingInterface // 带限速功能的WorkQueueinformer cache.Controller // Informer
}// 创建控制器
func NewController(queue workqueue.RateLimitingInterface, indexer cache.Indexer, informer cache.Controller) *Controller {return Controller{informer: informer,indexer: indexer,queue: queue,}
}// worker的具体执行逻辑
func (c *Controller) processNextItem() bool {// 从workqueue中获取keykey, quit : c.queue.Get()if quit {return false}// 告诉队列已经处理完毕defer c.queue.Done(key)err : c.syncToStdout(key.(string))// 错误处理c.handleErr(err, key)return true
}// 控制器的业务逻辑这里就执行status-spec的转变
func (c *Controller) syncToStdout(key string) error {obj, exists, err : c.indexer.GetByKey(key)if err ! nil {klog.Errorf(Fetching object with key %s from store failed with %v, key, err)return err}if !exists {// Pod已经不存在fmt.Printf(Pod %s does not exist anymore\n, key)} else {// 这里执行status-spec的转变逻辑fmt.Printf(Sync/Add/Update for Pod %s\n, obj.(*v1.Pod).GetName())}return nil
}// 错误处理包含重试处理
func (c *Controller) handleErr(err error, key interface{}) {if err nil {// 处理完毕c.queue.Forget(key)return}// 如果出现问题会进行重试也就是重新入workqueue// 但是入workqueue不超过5次if c.queue.NumRequeues(key) 5 {klog.Infof(Error syncing pod %v: %v, key, err)// 重新入workqueuec.queue.AddRateLimited(key)return}c.queue.Forget(key)runtime.HandleError(err)klog.Infof(Dropping pod %q out of the queue: %v, key, err)
}// 启动我们自己的控制器
func (c *Controller) Run(workers int, stopCh chan struct{}) {defer runtime.HandleCrash()defer c.queue.ShutDown()// 启动Informer开始监听资源变化go c.informer.Run(stopCh)// 等待cache同步if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {runtime.HandleError(fmt.Errorf(Timed out waiting for caches to sync))return}// 运行若干个worker// wait.Until()每隔1秒执行runWorker()函数直到stopCh收到结束信号for i : 0; i workers; i {go wait.Until(c.runWorker, time.Second, stopCh)}// 读取结束信号结束控制器-stopChklog.Info(Stopping Pod controller)
}func (c *Controller) runWorker() {for c.processNextItem() {}
}func main() {var kubeconfig stringvar master stringflag.StringVar(kubeconfig, kubeconfig, , absolute path to the kubeconfig file)flag.StringVar(master, master, , master url)flag.Parse()// 通过master和kubeconfig生成配置对象config, err : clientcmd.BuildConfigFromFlags(master, kubeconfig)if err ! nil {klog.Fatal(err)}// 根据配置对象生成clientset用于连接k8sclientset, err : kubernetes.NewForConfig(config)if err ! nil {klog.Fatal(err)}// 创建Pod的watcherpodListWatcher : cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), pods, v1.NamespaceDefault, fields.Everything())// 创建workqueuequeue : workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())// 创建Indexer和Informer其中重要的是两个参数Pod的watcher和回调函数// 告知Informer我们只监听Pod的资源变化并且给Infomer注册回调函数indexer, informer : cache.NewIndexerInformer(podListWatcher, v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{AddFunc: func(obj interface{}) {key, err : cache.MetaNamespaceKeyFunc(obj)if err nil {queue.Add(key)}},UpdateFunc: func(old interface{}, new interface{}) {key, err : cache.MetaNamespaceKeyFunc(new)if err nil {queue.Add(key)}},DeleteFunc: func(obj interface{}) {key, err : cache.DeletionHandlingMetaNamespaceKeyFunc(obj)if err nil {queue.Add(key)}},}, cache.Indexers{})// 创建我们自己的控制器controller : NewController(queue, indexer, informer)// 启动控制器stop : make(chan struct{})defer close(stop)go controller.Run(1, stop)// Wait foreverselect {}
}参考资料
1 client-go under the hood
2 client-go Examples
3 k8s-client-go demo
4 writing controllers