// makeEventRecorder sets up kubeDeps.Recorder if it's nil. It's a no-op otherwise.
funcmakeEventRecorder(kubeDeps*kubelet.Dependencies,nodeNametypes.NodeName){ifkubeDeps.Recorder!=nil{return}//事件广播
eventBroadcaster:=record.NewBroadcaster()//创建EventRecorder
kubeDeps.Recorder=eventBroadcaster.NewRecorder(legacyscheme.Scheme,v1.EventSource{Component:componentKubelet,Host:string(nodeName)})//发送event至log输出
eventBroadcaster.StartLogging(klog.V(3).Infof)ifkubeDeps.EventClient!=nil{klog.V(4).Infof("Sending events to api server.")//发送event至apiserver
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface:kubeDeps.EventClient.Events("")})}else{klog.Warning("No api server defined - no events will be sent to API server.")}}
// EventBroadcaster knows how to receive events and send them to any EventSink, watcher, or log.
typeEventBroadcasterinterface{//
StartEventWatcher(eventHandlerfunc(*v1.Event))watch.InterfaceStartRecordingToSink(sinkEventSink)watch.InterfaceStartLogging(logffunc(formatstring,args...interface{}))watch.InterfaceNewRecorder(scheme*runtime.Scheme,sourcev1.EventSource)EventRecorderShutdown()}
// EventRecorder knows how to record events on behalf of an EventSource.
typeEventRecorderinterface{// Event constructs an event from the given information and puts it in the queue for sending.
// 'object' is the object this event is about. Event will make a reference-- or you may also
// pass a reference to the object directly.
// 'type' of this event, and can be one of Normal, Warning. New types could be added in future
// 'reason' is the reason this event is generated. 'reason' should be short and unique; it
// should be in UpperCamelCase format (starting with a capital letter). "reason" will be used
// to automate handling of events, so imagine people writing switch statements to handle them.
// You want to make that easy.
// 'message' is intended to be human readable.
//
// The resulting event will be created in the same namespace as the reference object.
Event(objectruntime.Object,eventtype,reason,messagestring)// Eventf is just like Event, but with Sprintf for the message field.
Eventf(objectruntime.Object,eventtype,reason,messageFmtstring,args...interface{})// PastEventf is just like Eventf, but with an option to specify the event's 'timestamp' field.
PastEventf(objectruntime.Object,timestampmetav1.Time,eventtype,reason,messageFmtstring,args...interface{})// AnnotatedEventf is just like eventf, but with annotations attached
AnnotatedEventf(objectruntime.Object,annotationsmap[string]string,eventtype,reason,messageFmtstring,args...interface{})}
func(recorder*recorderImpl)generateEvent(objectruntime.Object,annotationsmap[string]string,timestampmetav1.Time,eventtype,reason,messagestring){.....event:=recorder.makeEvent(ref,annotations,eventtype,reason,message)event.Source=recorder.sourcegofunc(){// NOTE: events should be a non-blocking operation
deferutilruntime.HandleCrash()recorder.Action(watch.Added,event)}()}
func(e*eventBroadcasterImpl)StartEventWatcher(eventHandlerfunc(*v1.Event))watch.Interface{watcher:=e.Watch()gofunc(){deferutilruntime.HandleCrash()forwatchEvent:=rangewatcher.ResultChan(){event,ok:=watchEvent.Object.(*v1.Event)if!ok{// This is all local, so there's no reason this should
// ever happen.
continue}eventHandler(event)}}()returnwatcher}
funcrecordToSink(sinkEventSink,event*v1.Event,eventCorrelator*EventCorrelator,sleepDurationtime.Duration){// Make a copy before modification, because there could be multiple listeners.
// Events are safe to copy like this.
eventCopy:=*eventevent=&eventCopyresult,err:=eventCorrelator.EventCorrelate(event)iferr!=nil{utilruntime.HandleError(err)}ifresult.Skip{return}tries:=0for{ifrecordEvent(sink,result.Event,result.Patch,result.Event.Count>1,eventCorrelator){break}tries++iftries>=maxTriesPerEvent{klog.Errorf("Unable to write event '%#v' (retry limit exceeded!)",event)break}// Randomize the first sleep so that various clients won't all be
// synced up if the master goes down.
// 第一次重试增加随机性,防止 apiserver 重启的时候所有的事件都在同一时间发送事件
iftries==1{time.Sleep(time.Duration(float64(sleepDuration)*rand.Float64()))}else{time.Sleep(sleepDuration)}}}
其中event被经过了一个 eventCorrelator.EventCorrelate(event) 方法做预处理,主要是聚合相同的事件(避免产生的事件过多,增加 etcd 和 apiserver 的压力,也会导致查看 pod 事件很不清晰)
// EventCorrelate filters, aggregates, counts, and de-duplicates all incoming events
func(c*EventCorrelator)EventCorrelate(newEvent*v1.Event)(*EventCorrelateResult,error){ifnewEvent==nil{returnnil,fmt.Errorf("event is nil")}aggregateEvent,ckey:=c.aggregator.EventAggregate(newEvent)observedEvent,patch,err:=c.logger.eventObserve(aggregateEvent,ckey)ifc.filterFunc(observedEvent){return&EventCorrelateResult{Skip:true},nil}return&EventCorrelateResult{Event:observedEvent,Patch:patch},err}
func(e*EventAggregator)EventAggregate(newEvent*v1.Event)(*v1.Event,string){now:=metav1.NewTime(e.clock.Now())varrecordaggregateRecord// eventKey is the full cache key for this event
//eventKey 是将除了时间戳外所有字段结合在一起
eventKey:=getEventKey(newEvent)// aggregateKey is for the aggregate event, if one is needed.
//aggregateKey 是除了message和时间戳外的字段结合在一起,localKey 是message
aggregateKey,localKey:=e.keyFunc(newEvent)// Do we have a record of similar events in our cache?
e.Lock()defere.Unlock()//从cache中根据aggregateKey查询是否存在,如果是相同或者相类似的事件会被放入cache中
value,found:=e.cache.Get(aggregateKey)iffound{record=value.(aggregateRecord)}//判断上次事件产生的时间是否超过10分钟,如何操作则重新生成一个localKeys集合(集合中存放message)
maxInterval:=time.Duration(e.maxIntervalInSeconds)*time.Secondinterval:=now.Time.Sub(record.lastTimestamp.Time)ifinterval>maxInterval{record=aggregateRecord{localKeys:sets.NewString()}}// Write the new event into the aggregation record and put it on the cache
//将locakKey也就是message放入集合中,如果message相同就是覆盖了
record.localKeys.Insert(localKey)record.lastTimestamp=nowe.cache.Add(aggregateKey,record)// If we are not yet over the threshold for unique events, don't correlate them
//判断localKeys集合中存放的类似事件是否超过10个,
ifuint(record.localKeys.Len())<e.maxEvents{returnnewEvent,eventKey}// do not grow our local key set any larger than max
record.localKeys.PopAny()// create a new aggregate event, and return the aggregateKey as the cache key
// (so that it can be overwritten.)
eventCopy:=&v1.Event{ObjectMeta:metav1.ObjectMeta{Name:fmt.Sprintf("%v.%x",newEvent.InvolvedObject.Name,now.UnixNano()),Namespace:newEvent.Namespace,},Count:1,FirstTimestamp:now,InvolvedObject:newEvent.InvolvedObject,LastTimestamp:now,//这里会对message加个前缀:(combined from similar events):
Message:e.messageFunc(newEvent),Type:newEvent.Type,Reason:newEvent.Reason,Source:newEvent.Source,}returneventCopy,aggregateKey}