K8s 系列(四) - 浅谈 Informer (3)

从结构体定义可以看到,通过集成的 controller(上面已分析) 进行 Reflector ListAndWatch,并存储到 DeltaFIFO,并启动 Pop 消费队列,在 sharedIndexInformer 中 Pop 出来进行处理的函数是 HandleDeltas。

所有的 listeners 通过 sharedIndexInformer.AddEventHandler 加入到 processorListener 数组切片中,并通过判断当前 controller 是否已启动做不同处理如下:

// staging/src/k8s.io/client-go/tools/cache/shared_informer.go func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) { ... // 如果还没有启动,则直接 addListener 加入即可返回 if !s.started { s.processor.addListener(listener) return } // 加锁控制 s.blockDeltas.Lock() defer s.blockDeltas.Unlock() s.processor.addListener(listener) // 遍历所有对象,发送到刚刚新加入的 listener for _, item := range s.indexer.List() { listener.add(addNotification{newObj: item}) } }

接着,在 HandleDeltas 中,根据 obj 的 Delta 类型(Added/Updated/Deleted/Replaced/Sync) 调用 sharedProcessor.distribute 给所有监听 listeners 处理。

7. 注册 SharedInformerFactory

SharedInformerFactory 作为使用 SharedInformer 的工厂类,提供了高内聚低耦合的工厂类设计模式,其结构体定义如下:

// staging/src/k8s.io/client-go/informers/factory.go type SharedInformerFactory interface { internalinterfaces.SharedInformerFactory // 重点内部接口 ForResource(resource schema.GroupVersionResource) (GenericInformer, error) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool Admissionregistration() admissionregistration.Interface Internal() apiserverinternal.Interface Apps() apps.Interface Autoscaling() autoscaling.Interface Batch() batch.Interface Certificates() certificates.Interface Coordination() coordination.Interface Core() core.Interface Discovery() discovery.Interface Events() events.Interface Extensions() extensions.Interface Flowcontrol() flowcontrol.Interface Networking() networking.Interface Node() node.Interface Policy() policy.Interface Rbac() rbac.Interface Scheduling() scheduling.Interface Storage() storage.Interface } // staging/src/k8s.io/client-go/informers/internalinterfaces/factory_interfaces.go type SharedInformerFactory interface { Start(stopCh <-chan struct{}) // 启动 SharedIndexInformer.Run InformerFor(obj runtime.Object, newFunc NewInformerFunc) cache.SharedIndexInformer // 目标类型初始化 }

以 PodInformer 为例,说明使用者如何构建自己的 Informer,PodInformer 定义如下:

// staging/src/k8s.io/client-go/informers/core/v1/pod.go type PodInformer interface { Informer() cache.SharedIndexInformer Lister() v1.PodLister } 由小写的 podInformer 实现(又看到了吧,大写接口小写实现的 K8s 风格): type podInformer struct { factory internalinterfaces.SharedInformerFactory tweakListOptions internalinterfaces.TweakListOptionsFunc namespace string } func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) } func (f *podInformer) Informer() cache.SharedIndexInformer { return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer) } func (f *podInformer) Lister() v1.PodLister { return v1.NewPodLister(f.Informer().GetIndexer()) }

由使用者传入目标类型(&corev1.Pod{})、构造函数(defaultInformer),调用 SharedInformerFactory.InformerFor 实现目标 Informer 的注册,然后调用 SharedInformerFactory.Start 进行 Run,就启动了上面分析的 SharedIndexedInformer -> Controller -> Reflector -> DeltaFIFO 流程。

通过使用者自己传入目标类型、构造函数进行 Informer 注册,实现了 SharedInformerFactory 高内聚低耦合的设计模式。

8. 回调 processorListener

所有的 listerners 由 processorListener 实现,分为两组:listeners, syncingListeners,分别遍历所属组全部 listeners,将数据投递到 processorListener 进行处理。

因为各 listeners 设置的 resyncPeriod 可能不一致,所以将没有设置(resyncPeriod = 0) 的归为 listeners 组,将设置了 resyncPeriod 的归到 syncingListeners 组;

如果某个 listener 在多个地方(sharedIndexInformer.resyncCheckPeriod, sharedIndexInformer.AddEventHandlerWithResyncPeriod)都设置了 resyncPeriod,则取最小值 minimumResyncPeriod;

// staging/src/k8s.io/client-go/tools/cache/shared_informer.go func (p *sharedProcessor) distribute(obj interface{}, sync bool) { p.listenersLock.RLock() defer p.listenersLock.RUnlock() if sync { for _, listener := range p.syncingListeners { listener.add(obj) } } else { for _, listener := range p.listeners { listener.add(obj) } } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zwwdgd.html