K8s 系列(四) - 浅谈 Informer

进入 K8s 的世界,会发现有很多的 Controller,它们都是为了完成某类资源(如 pod 是通过 DeploymentController, ReplicaSetController 进行管理)的调谐,目标是保持用户期望的状态。

K8s 中有几十种类型的资源,如何能让 K8s 内部以及外部用户方便、高效的获取某类资源的变化,就是本文 Informer 要实现的。本文将从 Reflector(反射器)、DeletaFIFO(增量队列)、Indexer(索引器)、Controller(控制器)、SharedInformer(共享资源通知器)、processorListener(事件监听处理器)、workqueue(事件处理工作队列) 等方面进行解析。

本文及后续相关文章都基于 K8s v1.22

2. 从 Reflector 说起

Reflector 的主要职责是从 apiserver 拉取并持续监听(ListAndWatch) 相关资源类型的增删改(Add/Update/Delete)事件,存储在由 DeltaFIFO 实现的本地缓存(local Store) 中。

首先看一下 Reflector 结构体定义:

// staging/src/k8s.io/client-go/tools/cache/reflector.go type Reflector struct { // 通过 file:line 唯一标识的 name name string // 下面三个为了确认类型 expectedTypeName string expectedType reflect.Type expectedGVK *schema.GroupVersionKind // 存储 interface: 具体由 DeltaFIFO 实现存储 store Store // 用来从 apiserver 拉取全量和增量资源 listerWatcher ListerWatcher // 下面两个用来做失败重试 backoffManager wait.BackoffManager initConnBackoffManager wait.BackoffManager // informer 使用者重新同步的周期 resyncPeriod time.Duration // 判断是否满足可以重新同步的条件 ShouldResync func() bool clock clock.Clock // 是否要进行分页 List paginatedResult bool // 最后同步的资源版本号,以此为依据,watch 只会监听大于此值的资源 lastSyncResourceVersion string // 最后同步的资源版本号是否可用 isLastSyncResourceVersionUnavailable bool // 加把锁控制版本号 lastSyncResourceVersionMutex sync.RWMutex // 每页大小 WatchListPageSize int64 // watch 失败回调 handler watchErrorHandler WatchErrorHandler }

从结构体定义可以看到,通过指定目标资源类型进行 ListAndWatch,并可进行分页相关设置。

第一次拉取全量资源(目标资源类型) 后通过 syncWith 函数全量替换(Replace) 到 DeltaFIFO queue/items 中,之后通过持续监听 Watch(目标资源类型) 增量事件,并去重更新到 DeltaFIFO queue/items 中,等待被消费。

watch 目标类型通过 Go reflect 反射实现如下:

// staging/src/k8s.io/client-go/tools/cache/reflector.go // watchHandler watches w and keeps *resourceVersion up to date. func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error { ... if r.expectedType != nil { if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a { utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a)) continue } } if r.expectedGVK != nil { if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a { utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a)) continue } } ... }

通过反射确认目标资源类型,所以命名为 Reflector 还是比较贴切的;

List/Watch 的目标资源类型在 NewSharedIndexInformer.ListerWatcher 进行了确定,但 Watch 还会在 watchHandler 中再次比较一下目标类型;

3. 认识 DeltaFIFO

还是先看下 DeltaFIFO 结构体定义:

// staging/src/k8s.io/client-go/tools/cache/delta_fifo.go type DeltaFIFO struct { // 读写锁、条件变量 lock sync.RWMutex cond sync.Cond // kv 存储:objKey1->Deltas[obj1-Added, obj1-Updated...] items map[string]Deltas // 只存储所有 objKeys queue []string // 是否已经填充:通过 Replace() 接口将第一批对象放入队列,或者第一次调用增、删、改接口时标记为true populated bool // 通过 Replace() 接口将第一批对象放入队列的数量 initialPopulationCount int // keyFunc 用来从某个 obj 中获取其对应的 objKey keyFunc KeyFunc // 已知对象,其实就是 Indexer knownObjects KeyListerGetter // 队列是否已经关闭 closed bool // 以 Replaced 类型发送(为了兼容老版本的 Sync) emitDeltaTypeReplaced bool }

DeltaType 可分为以下类型:

// staging/src/k8s.io/client-go/tools/cache/delta_fifo.go type DeltaType string const ( Added DeltaType = "Added" Updated DeltaType = "Updated" Deleted DeltaType = "Deleted" Replaced DeltaType = "Replaced" // 第一次或重新同步 Sync DeltaType = "Sync" // 老版本重新同步叫 Sync )

通过上面的 Reflector 分析可以知道,DeltaFIFO 的职责是通过队列加锁处理(queueActionLocked)、去重(dedupDeltas)、存储在由 DeltaFIFO 实现的本地缓存(local Store) 中,包括 queue(仅存 objKeys) 和 items(存 objKeys 和对应的 Deltas 增量变化),并通过 Pop 不断消费,通过 Process(item) 处理相关逻辑。

K8s-DeltaFIFO

4. 索引 Indexer

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zwwdgd.html