sharedIndexInformer里面会创建sharedProcessor,设置List&Watch的回调函数,创建了一个indexer,我们这里看一下NewIndexer是怎么创建indexer的:
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer { return &cache{ cacheStorage: NewThreadSafeStore(indexers, Indices{}), keyFunc: keyFunc, } }NewIndexer方法创建了一个cache,它的keyFunc是DeletionHandlingMetaNamespaceKeyFunc,即接受一个object,生成它的namepace/name的字符串。cache里面的数据会存放到cacheStorage中,它是一个threadSafeMap用来存储资源对象并自带索引功能的本地存储。
注册EventHandler事件EventHandler事件的注册是通过informer的AddEventHandler方法进行的。在调用AddEventHandler方法的时候,传入一个cache.ResourceEventHandlerFuncs结构体:
文件位置:tools/cache/shared_informer.go
func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) { s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod) } func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) { s.startedLock.Lock() defer s.startedLock.Unlock() ... //初始化监听器 listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize) //如果informer还没启动,那么直接将监听器加入到processor监听器列表中 if !s.started { s.processor.addListener(listener) return } //如果informer已经启动,那么需要加锁 s.blockDeltas.Lock() defer s.blockDeltas.Unlock() s.processor.addListener(listener) //然后将indexer中缓存的数据写入到listener中 for _, item := range s.indexer.List() { listener.add(addNotification{newObj: item}) } }AddEventHandler方法会调用到AddEventHandlerWithResyncPeriod方法中,然后调用newProcessListener初始化listener。
接着会校验informer是否已经启动,如果没有启动,那么直接将监听器加入到processor监听器列表中并返回;如果informer已经启动,那么需要加锁将监听器加入到processor监听器列表中,然后将indexer中缓存的数据写入到listener中。
需要注意的是listener.add方法会调用processorListener的add方法,这个方法会将数据写入到addCh管道中:
func (p *processorListener) add(notification interface{}) { p.addCh <- notification }addCh管道里面数据是用来处理事件回调的,后面我会说到。
大致的流程如下:
启动Informer模块最后我们在上面的demo中会使用sharedIndexInformer的Run方法来启动Informer模块。
文件位置:tools/cache/shared_informer.go
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() //初始化DeltaFIFO队列 fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ KnownObjects: s.indexer, EmitDeltaTypeReplaced: true, }) cfg := &Config{ //设置Queue为DeltaFIFO队列 Queue: fifo, //设置List&Watch的回调函数 ListerWatcher: s.listerWatcher, ObjectType: s.objectType, //设置Resync周期 FullResyncPeriod: s.resyncCheckPeriod, RetryOnError: false, //判断有哪些监听器到期需要被Resync ShouldResync: s.processor.shouldResync, Process: s.HandleDeltas, WatchErrorHandler: s.watchErrorHandler, } func() { s.startedLock.Lock() defer s.startedLock.Unlock() //异步创建controller s.controller = New(cfg) s.controller.(*controller).clock = s.clock s.started = true }() processorStopCh := make(chan struct{}) var wg wait.Group defer wg.Wait() // Wait for Processor to stop defer close(processorStopCh) // Tell Processor to stop wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run) //调用run方法启动processor wg.StartWithChannel(processorStopCh, s.processor.run) defer func() { s.startedLock.Lock() defer s.startedLock.Unlock() s.stopped = true }() //启动controller s.controller.Run(stopCh) }这段代码主要做了以下几件事:
调用NewDeltaFIFOWithOptions方法初始化DeltaFIFO队列;
初始化Config结果体,作为创建controller的参数;
异步创建controller;
调用run方法启动processor;
调用run方法启动controller;
下面我们看看sharedProcessor的run方法做了什么:
func (p *sharedProcessor) run(stopCh <-chan struct{}) { func() { ... //遍历监听器 for _, listener := range p.listeners { //下面两个方法是核心的事件call back的方法 p.wg.Start(listener.run) p.wg.Start(listener.pop) } p.listenersStarted = true }() ... }