15.深入k8s:Event事件处理及其源码分析 (4)

文件位置:client-go/tools/record/event.go

func recordEvent(sink EventSink, event *v1.Event, patch []byte, updateExistingEvent bool, eventCorrelator *EventCorrelator) bool { var newEvent *v1.Event var err error // 更新已经存在的事件 if updateExistingEvent { newEvent, err = sink.Patch(event, patch) } // 创建一个新的事件 if !updateExistingEvent || (updateExistingEvent && util.IsKeyNotFoundError(err)) { event.ResourceVersion = "" newEvent, err = sink.Create(event) } if err == nil { eventCorrelator.UpdateState(newEvent) return true } // 如果是已知错误,就不要再重试了;否则,返回 false,让上层进行重试 switch err.(type) { case *restclient.RequestConstructionError: klog.Errorf("Unable to construct event '%#v': '%v' (will not retry!)", event, err) return true case *errors.StatusError: if errors.IsAlreadyExists(err) { klog.V(5).Infof("Server rejected event '%#v': '%v' (will not retry!)", event, err) } else { klog.Errorf("Server rejected event '%#v': '%v' (will not retry!)", event, err) } return true case *errors.UnexpectedObjectError: default: } klog.Errorf("Unable to write event: '%v' (may retry after sleeping)", err) return false }

recordEvent方法会根据eventCorrelator返回的结果来决定是新建一个事件还是更新已经存在的事件,并根据请求的结果决定是否需要重试。

整个recordToSink调用比较绕,这里我把图画一下:

image-20201011222338424

到这里整个方法算时讲解完毕了。

总结

了解完 events 的整个处理流程后,再梳理一下整个流程:

首先是初始化 EventBroadcaster 对象,同时会初始化一个 Broadcaster 对象,并开启一个loop循环接收所有的 events 并进行广播;

然后通过 EventBroadcaster 对象的 NewRecorder() 方法初始化 EventRecorder 对象,EventRecorder 对象会生成 events 并通过 Action() 方法发送 events 到 Broadcaster 的 channel 队列中;

EventBroadcaster 会调用StartStructuredLogging、StartRecordingToSink方法调用封装好的StartEventWatcher方法,并执行自己的逻辑;

StartRecordingToSink封装的StartEventWatcher方法里面会将所有的 events 广播给每一个 watcher,并调用recordToSink方法对收到 events 后会进行缓存、过滤、聚合而后发送到 apiserver,apiserver 会将 events 保存到 etcd 中。

Reference

https://www.bluematador.com/blog/kubernetes-events-explained

https://kubernetes.io/docs/tasks/debug-application-cluster/debug-application-introspection/

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zypzwy.html