上一步 ListAndWatch 到的资源已经存储到 DeltaFIFO 中,接着调用 Pop 从队列进行消费。实际使用中,Process 处理函数由 sharedIndexInformer.HandleDeltas 进行实现。HandleDeltas 函数根据上面不同的 DeltaType 分别进行 Add/Update/Delete,并同时创建、更新、删除对应的索引。
具体索引实现如下:
// staging/src/k8s.io/client-go/tools/cache/index.go // map 索引类型 => 索引函数 type Indexers map[string]IndexFunc // map 索引类型 => 索引值 map type Indices map[string]Index // 索引值 map: 由索引函数计算所得索引值(indexedValue) => [objKey1, objKey2...] type Index map[string]sets.String索引函数(IndexFunc):就是计算索引的函数,这样允许扩展多种不同的索引计算函数。默认也是最常用的索引函数是:MetaNamespaceIndexFunc。
索引值(indexedValue):有些地方叫 indexKey,表示由索引函数(IndexFunc) 计算出来的索引值(如 ns1)。
对象键(objKey):对象 obj 的 唯一 key(如 ns1/pod1),与某个资源对象一一对应。
可以看到,Indexer 由 ThreadSafeStore 接口集成,最终由 threadSafeMap 实现。
索引函数 IndexFunc(如 MetaNamespaceIndexFunc)、KeyFunc(如 MetaNamespaceKeyFunc) 区别:前者表示如何计算索引,后者表示如何获取对象键(objKey);
索引键(indexKey,有些地方是 indexedValue)、对象键(objKey) 区别:前者表示由索引函数(IndexFunc) 计算出来的索引键(如 ns1),后者则是 obj 的 唯一 key(如 ns1/pod1);
5. 总管家 ControllerController 作为核心中枢,集成了上面的组件 Reflector、DeltaFIFO、Indexer、Store,成为连接下游消费者的桥梁。
Controller 由 controller 结构体进行具体实现:
在 K8s 中约定俗成:大写定义的 interface 接口,由对应小写定义的结构体进行实现。
// staging/src/k8s.io/client-go/tools/cache/controller.go type controller struct { config Config reflector *Reflector // 上面已分析的组件 reflectorMutex sync.RWMutex clock clock.Clock } type Config struct { // 实际由 DeltaFIFO 实现 Queue // 构造 Reflector 需要 ListerWatcher // Pop 出来的 obj 处理函数 Process ProcessFunc // 目标对象类型 ObjectType runtime.Object // 全量重新同步周期 FullResyncPeriod time.Duration // 是否进行重新同步的判断函数 ShouldResync ShouldResyncFunc // 如果为 true,Process() 函数返回 err,则再次入队 re-queue RetryOnError bool // Watch 返回 err 的回调函数 WatchErrorHandler WatchErrorHandler // Watch 分页大小 WatchListPageSize int64 }Controller 中以 goroutine 协程方式启动 Run 方法,会启动 Reflector 的 ListAndWatch(),用于从 apiserver 拉取全量和监听增量资源,存储到 DeltaFIFO。接着,启动 processLoop 不断从 DeltaFIFO Pop 进行消费。在 sharedIndexInformer 中 Pop 出来进行处理的函数是 HandleDeltas,一方面维护 Indexer 的 Add/Update/Delete,另一方面调用下游 sharedProcessor 进行 handler 处理。
6. 启动 SharedInformerSharedInformer 接口由 SharedIndexInformer 进行集成,由 sharedIndexInformer(这里看到了吧,又是大写定义的 interface 接口,由对应小写定义的结构体进行实现) 进行实现。
看一下结构体定义:
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go type SharedIndexInformer interface { SharedInformer // AddIndexers add indexers to the informer before it starts. AddIndexers(indexers Indexers) error GetIndexer() Indexer } type sharedIndexInformer struct { indexer Indexer controller Controller // 处理函数,将是重点 processor *sharedProcessor // 检测 cache 是否有变化,一把用作调试,默认是关闭的 cacheMutationDetector MutationDetector // 构造 Reflector 需要 listerWatcher ListerWatcher // 目标类型,给 Reflector 判断资源类型 objectType runtime.Object // Reflector 进行重新同步周期 resyncCheckPeriod time.Duration // 如果使用者没有添加 Resync 时间,则使用这个默认的重新同步周期 defaultEventHandlerResyncPeriod time.Duration clock clock.Clock // 两个 bool 表达了三个状态:controller 启动前、已启动、已停止 started, stopped bool startedLock sync.Mutex // 当 Pop 正在消费队列,此时新增的 listener 需要加锁,防止消费混乱 blockDeltas sync.Mutex // Watch 返回 err 的回调函数 watchErrorHandler WatchErrorHandler } type sharedProcessor struct { listenersStarted bool listenersLock sync.RWMutex listeners []*processorListener syncingListeners []*processorListener // 需要 sync 的 listeners clock clock.Clock wg wait.Group }