文件位置:client-go/tools/record/event.go
func NewBroadcaster() EventBroadcaster { return &eventBroadcasterImpl{ Broadcaster: watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), sleepDuration: defaultSleepDuration, } }这里会创建一个eventBroadcasterImpl实例,并设置两个字段Broadcaster和sleepDuration。Broadcaster是这个方法的核心,我们下面接着看:
func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *Broadcaster { m := &Broadcaster{ watchers: map[int64]*broadcasterWatcher{}, incoming: make(chan Event, incomingQueueLength), watchQueueLength: queueLength, fullChannelBehavior: fullChannelBehavior, } m.distributing.Add(1) //开启事件循环 go m.loop() return m }在这里初始化Broadcaster的时候,会初始化一个broadcasterWatcher,用于定义事件处理方式,如上报apiserver等;初始化incoming,用于EventBroadcaster和EventRecorder进行事件传输。
loop
文件位置:k8s.io/apimachinery/pkg/watch/mux.go
func (m *Broadcaster) loop() { //获取m.incoming管道中的数据 for event := range m.incoming { if event.Type == internalRunFunctionMarker { event.Object.(functionFakeRuntimeObject)() continue } //进行事件分发 m.distribute(event) } m.closeAll() m.distributing.Done() }这个方法会一直后台等待获取m.incoming管道中的数据,然后调用distribute方法进行事件分发给broadcasterWatcher。incoming管道中的数据是EventRecorder调用Event方法传入的。
distribute
func (m *Broadcaster) distribute(event Event) { m.lock.Lock() defer m.lock.Unlock() //如果是非阻塞,那么使用DropIfChannelFull标识 if m.fullChannelBehavior == DropIfChannelFull { for _, w := range m.watchers { select { case w.result <- event: case <-w.stopped: default: // Don't block if the event can't be queued. } } } else { for _, w := range m.watchers { select { case w.result <- event: case <-w.stopped: } } } }如果是非阻塞,那么使用DropIfChannelFull标识,在w.result管道满了之后,事件会丢失。如果没有default关键字,那么,当w.result管道满了之后,分发过程会阻塞并等待。
这里之所以需要丢失事件,是因为随着k8s集群越来越大,上报事件也随之增多,那么每次上报都要对etcd进行读写,这样会给etcd集群带来压力。但是事件丢失并不会影响集群的正常工作,所以非阻塞分发机制下事件会丢失。
recordToSink事件的处理调用StartRecordingToSink方法会将数据上报到apiserver。
StartRecordingToSink
func (e *eventBroadcasterImpl) StartRecordingToSink(sink EventSink) watch.Interface { eventCorrelator := NewEventCorrelatorWithOptions(e.options) return e.StartEventWatcher( func(event *v1.Event) { recordToSink(sink, event, eventCorrelator, e.sleepDuration) }) } func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface { watcher := e.Watch() go func() { defer utilruntime.HandleCrash() for watchEvent := range watcher.ResultChan() { event, ok := watchEvent.Object.(*v1.Event) if !ok { continue } //回调传入的方法 eventHandler(event) } }() return watcher }StartRecordingToSink会调用StartEventWatcher,StartEventWatcher方法里面会异步的调用 watcher.ResultChan()方法获取到broadcasterWatcher的result管道,result管道里面的数据就是Broadcaster的distribute方法进行分发的。
最后会回调到传入的方法recordToSink中。
recordToSink
func recordToSink(sink EventSink, event *v1.Event, eventCorrelator *EventCorrelator, sleepDuration time.Duration) { eventCopy := *event event = &eventCopy //对事件做预处理,聚合相同的事件 result, err := eventCorrelator.EventCorrelate(event) if err != nil { utilruntime.HandleError(err) } if result.Skip { return } tries := 0 for { // 把事件发送到 apiserver if recordEvent(sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator) { break } tries++ if tries >= maxTriesPerEvent { klog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event) break } if tries == 1 { time.Sleep(time.Duration(float64(sleepDuration) * rand.Float64())) } else { time.Sleep(sleepDuration) } } }