福州网站建设平台,完美动力培训机构,技术支持 盈岚网站建设,网站建设发布平台Reflector 和 Informer 是 Kubernetes 客户端库中两个密切相关但职责不同的组件。Reflector 是一个较低级别的组件#xff0c;主要负责与 Kubernetes API 服务器进行交互#xff0c;执行资源的初始列表操作和持续的监视操作#xff0c;将获取到的数据放入队列中。而 Informe…Reflector 和 Informer 是 Kubernetes 客户端库中两个密切相关但职责不同的组件。Reflector 是一个较低级别的组件主要负责与 Kubernetes API 服务器进行交互执行资源的初始列表操作和持续的监视操作将获取到的数据放入队列中。而 Informer 是一个更高级别的抽象它内部使用了 Reflector但提供了更全面的功能。Informer 不仅负责数据同步还维护了资源对象的本地缓存并提供了事件处理机制允许开发者注册自定义的事件处理函数。
Informer 实际上通过一个称为 Controller 的内部组件来管理 Reflector。在这个架构中Reflector 负责从 Kubernetes API 服务器获取数据并将其放入一个 DeltaFIFO queue。Controller 则从这个 queue 中取出数据更新 Informer 的 LocalStore这是一个持久化的缓存并触发相应的事件处理器。这种设计允许 Informer 提供一个高级抽象隐藏了底层的复杂性。虽然 Informer 不直接调用 Reflector 的方法但它们通过 Controller 紧密集成。这种架构确保了各个组件职责单一Reflector 专注于 API 交互DeltaFIFO 管理变更队列Controller 协调数据流而 Informer 则提供高级 API 和缓存管理。这种设计使得开发者可以方便地使用 Informer而无需直接处理 Reflector 或了解内部 queue 和 store 的细节。
以kubernetes源码中的reflector的一个测试为例 执行这个test-reflector.go文件这种test文件可以封装一些测试的代码更好的帮助我们理解reflector的工作原理。 func TestReflectorListAndWatch(t *testing.T) {createdFakes : make(chan *watch.FakeWatcher)// The ListFunc says that its at revision 1. Therefore, we expect our WatchFunc// to get called at the beginning of the watch with 1, and again with 3 when we// inject an error.expectedRVs : []string{1, 3}lw : testLW{WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {rv : options.ResourceVersionfw : watch.NewFake()if e, a : expectedRVs[0], rv; e ! a {t.Errorf(Expected rv %v, but got %v, e, a)}expectedRVs expectedRVs[1:]// channel is not buffered because the for loop below needs to block. But// we dont want to block here, so report the new fake via a go routine.go func() { createdFakes - fw }()return fw, nil},ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {return v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: 1}}, nil},}s : NewFIFO(MetaNamespaceKeyFunc)r : NewReflector(lw, v1.Pod{}, s, 0)go r.ListAndWatch(wait.NeverStop)ids : []string{foo, bar, baz, qux, zoo}var fw *watch.FakeWatcherfor i, id : range ids {if fw nil {fw -createdFakes}sendingRV : strconv.FormatUint(uint64(i2), 10)fw.Add(v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: id, ResourceVersion: sendingRV}})if sendingRV 3 {// Inject a failure.fw.Stop()fw nil}}// Verify we received the right ids with the right resource versions.for i, id : range ids {pod : Pop(s).(*v1.Pod)if e, a : id, pod.Name; e ! a {t.Errorf(%v: Expected %v, got %v, i, e, a)}if e, a : strconv.FormatUint(uint64(i2), 10), pod.ResourceVersion; e ! a {t.Errorf(%v: Expected %v, got %v, i, e, a)}}if len(expectedRVs) ! 0 {t.Error(called watchStarter an unexpected number of times)}
}第一部分初始化一个通道createdFakes初始化一个watcher有两个方法为watch和list。watch方法会创建watcher并传递到createdFakes通道里面。这里的watchFunc是一个闭包
闭包Closure是指一个函数可以捕获并“记住”其作用域外部的变量即使这个变量在闭包函数的作用域之外。这种特性使得闭包能够访问和操作其外部函数中的变量。捕获外部变量闭包可以捕获外部函数的局部变量并且在闭包函数内使用这些变量即使外部函数已经执行完毕。持久性闭包会“记住”它捕获的变量即使外部函数已经返回。状态共享通过闭包可以在不同的函数调用之间共享状态。
这里的fw是闭包里面创建的watcher并不和外面的fw冲突如果不理解这里的fw也可以改为别的名字。 fw : watch.NewFake() go func() { createdFakes - fw }() 然后初始化reflector s : NewFIFO(MetaNamespaceKeyFunc) r : NewReflector(lw, v1.Pod{}, s, 0) go r.ListAndWatch(wait.NeverStop) 然后下面是测试部分首先为fw赋值为watcher一开始为nil而在上面rNewReflector初始化的时候就已经自动调用了watchFunc创建了一个watcher。随后这个watcher被拿出来给了fw。sendingRV随机生成一个资源的版本随着从ids拿出来的值作为pod的属性用来创建pod在测试中fw.Add(...) 模拟了 Kubernetes API 服务器添加新 Pod 的行为。在实际环境中当新 Pod 被创建时Watch 操作会接收到这个事件。测试使用 fw.Add(...) 来模拟这个过程向 FakeWatcher 发送一个新 Pod 被添加的事件。 ids : []string{foo, bar, baz, qux, zoo} var fw *watch.FakeWatcher for i, id : range ids { if fw nil { fw -createdFakes } sendingRV : strconv.FormatUint(uint64(i2), 10) fw.Add(v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: id, ResourceVersion: sendingRV}}) if sendingRV 3 { // Inject a failure. fw.Stop() fw nil } } 如果sendingRV3那么就会将fw停止并设为nil。这模拟了如果resourceVersion错误会发生什么但是reflector已经设为neverStop那么此时ListAndWatch会重新调用watchFunc然后创建新的watcher放入createdFakes中从而fw可以重新拿取。