从代码可以看到 processorListener 巧妙地使用了两个 channel(addCh, nextCh) 和一个 pendingNotifications(由 slice 实现的滚动 Ring) 进行 buffer 缓冲,默认的 initialBufferSize = 1024。既做到了高效传递数据,又不阻塞上下游处理,值得学习。
9. workqueue 忙起来通过上一步 processorListener 回调函数,交给内部 ResourceEventHandler 进行真正的增删改(CUD) 处理,分别调用 OnAdd/OnUpdate/OnDelete 注册函数进行处理。
为了快速处理而不阻塞 processorListener 回调函数,一般使用 workqueue 进行异步化解耦合处理,其实现如下:
从图中可以看到,workqueue.RateLimitingInterface 集成了 DelayingInterface,DelayingInterface 集成了 Interface,最终由 rateLimitingType 进行实现,提供了 rateLimit 限速、delay 延时入队(由优先级队列通过小顶堆实现)、queue 队列处理 三大核心能力。
另外,在代码中可看到 K8s 实现了三种 RateLimiter:BucketRateLimiter, ItemExponentialFailureRateLimiter, ItemFastSlowRateLimiter,Controller 默认采用了前两种如下:
// staging/src/k8s.io/client-go/util/workqueue/default_rate_limiters.go func DefaultControllerRateLimiter() RateLimiter { return NewMaxOfRateLimiter( NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second), // 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item) &BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, ) }这样,在用户侧可以通过调用 workqueue 相关方法进行灵活的队列处理,比如失败多少次就不再重试,失败了延时入队的时间控制,队列的限速控制(QPS)等,实现非阻塞异步化逻辑处理。
10. 小结本文通过分析 K8s 中 Reflector(反射器)、DeletaFIFO(增量队列)、Indexer(索引器)、Controller(控制器)、SharedInformer(共享资源通知器)、processorListener(事件监听处理器)、workqueue(事件处理工作队列) 等组件,对 Informer 实现机制进行了解析,通过源码、图文方式说明了相关流程处理,以期更好的理解 K8s Informer 运行流程。
可以看到,K8s 为了实现高效、非阻塞的核心流程,大量采用了 goroutine 协程、channel 通道、queue 队列、index 索引、map 去重等方式;并通过良好的接口设计模式,给使用者开放了很多扩展能力;采用了统一的接口与实现的命名方式等,这些都值得深入学习与借鉴。