recordToSink方法首先会调用EventCorrelate方法对event做预处理,聚合相同的事件,避免产生的事件过多,增加 etcd 和 apiserver 的压力,如果传入的Event太多了,那么result.Skip 就会返回false;
接下来会调用recordEvent方法把事件发送到 apiserver,它会重试很多次(默认是 12 次),并且每次重试都有一定时间间隔(默认是 10 秒钟)。
下面我们分别来看看EventCorrelate方法和recordEvent方法。
EventCorrelate
文件位置:client-go/tools/record/events_cache.go
func (c *EventCorrelator) EventCorrelate(newEvent *v1.Event) (*EventCorrelateResult, error) { if newEvent == nil { return nil, fmt.Errorf("event is nil") } aggregateEvent, ckey := c.aggregator.EventAggregate(newEvent) observedEvent, patch, err := c.logger.eventObserve(aggregateEvent, ckey) if c.filterFunc(observedEvent) { return &EventCorrelateResult{Skip: true}, nil } return &EventCorrelateResult{Event: observedEvent, Patch: patch}, err }EventCorrelate方法会调用EventAggregate、eventObserve进行聚合,调用filterFunc会调用到spamFilter.Filter方法进行过滤。
func (e *EventAggregator) EventAggregate(newEvent *v1.Event) (*v1.Event, string) { now := metav1.NewTime(e.clock.Now()) var record aggregateRecord eventKey := getEventKey(newEvent) aggregateKey, localKey := e.keyFunc(newEvent) e.Lock() defer e.Unlock() // 查找缓存里面是否也存在这样的记录 value, found := e.cache.Get(aggregateKey) if found { record = value.(aggregateRecord) } // maxIntervalInSeconds默认时间是600s,这里校验缓存里面的记录是否太老了 // 如果是那么就创建一个新的 // 如果record在缓存里面找不到,那么lastTimestamp是零,那么也创建一个新的 maxInterval := time.Duration(e.maxIntervalInSeconds) * time.Second interval := now.Time.Sub(record.lastTimestamp.Time) if interval > maxInterval { record = aggregateRecord{localKeys: sets.NewString()} } record.localKeys.Insert(localKey) record.lastTimestamp = now // 重新加入到LRU缓存中 e.cache.Add(aggregateKey, record) // 如果没有达到阈值,那么不进行聚合 if uint(record.localKeys.Len()) < e.maxEvents { return newEvent, eventKey } record.localKeys.PopAny() eventCopy := &v1.Event{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%v.%x", newEvent.InvolvedObject.Name, now.UnixNano()), Namespace: newEvent.Namespace, }, Count: 1, FirstTimestamp: now, InvolvedObject: newEvent.InvolvedObject, LastTimestamp: now, // 将Message进行聚合 Message: e.messageFunc(newEvent), Type: newEvent.Type, Reason: newEvent.Reason, Source: newEvent.Source, } return eventCopy, aggregateKey }EventAggregate方法也考虑了很多,首先是去缓存里面查找有没有相同的聚合记录aggregateRecord,如果没有的话,那么会在校验时间间隔的时候顺便创建聚合记录aggregateRecord;
由于缓存时lru缓存,所以再将聚合记录重新Add到缓存的头部;
接下来会判断缓存是否已经超过了阈值,如果没有达到阈值,那么直接返回不进行聚合;
如果达到阈值了,那么会重新copy传入的Event,并调用messageFunc方法聚合Message;
eventObserve
func (e *eventLogger) eventObserve(newEvent *v1.Event, key string) (*v1.Event, []byte, error) { var ( patch []byte err error ) eventCopy := *newEvent event := &eventCopy e.Lock() defer e.Unlock() // 检查是否在缓存中 lastObservation := e.lastEventObservationFromCache(key) // 如果大于0说明存在,并且对Count进行自增 if lastObservation.count > 0 { event.Name = lastObservation.name event.ResourceVersion = lastObservation.resourceVersion event.FirstTimestamp = lastObservation.firstTimestamp event.Count = int32(lastObservation.count) + 1 eventCopy2 := *event eventCopy2.Count = 0 eventCopy2.LastTimestamp = metav1.NewTime(time.Unix(0, 0)) eventCopy2.Message = "" newData, _ := json.Marshal(event) oldData, _ := json.Marshal(eventCopy2) patch, err = strategicpatch.CreateTwoWayMergePatch(oldData, newData, event) } // 最后重新更新缓存记录 e.cache.Add( key, eventLog{ count: uint(event.Count), firstTimestamp: event.FirstTimestamp, name: event.Name, resourceVersion: event.ResourceVersion, }, ) return event, patch, err }eventObserve方法里面会去查找缓存中的记录,然后对count进行自增后更新到缓存中。
Filter
文件位置:client-go/tools/record/events_cache.go
func (f *EventSourceObjectSpamFilter) Filter(event *v1.Event) bool { var record spamRecord eventKey := getSpamKey(event) f.Lock() defer f.Unlock() value, found := f.cache.Get(eventKey) if found { record = value.(spamRecord) } if record.rateLimiter == nil { record.rateLimiter = flowcontrol.NewTokenBucketRateLimiterWithClock(f.qps, f.burst, f.clock) } // 使用令牌桶进行过滤 filter := !record.rateLimiter.TryAccept() // update the cache f.cache.Add(eventKey, record) return filter }Filter主要时起到了一个限速的作用,通过令牌桶来进行过滤操作。
recordEvent