1
16
17 package record
18
19 import (
20 "context"
21 "fmt"
22 "math/rand"
23 "time"
24
25 v1 "k8s.io/api/core/v1"
26 "k8s.io/apimachinery/pkg/api/errors"
27 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28 "k8s.io/apimachinery/pkg/runtime"
29 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
30 "k8s.io/apimachinery/pkg/watch"
31 restclient "k8s.io/client-go/rest"
32 internalevents "k8s.io/client-go/tools/internal/events"
33 "k8s.io/client-go/tools/record/util"
34 ref "k8s.io/client-go/tools/reference"
35 "k8s.io/klog/v2"
36 "k8s.io/utils/clock"
37 )
38
39 const maxTriesPerEvent = 12
40
41 var defaultSleepDuration = 10 * time.Second
42
43 const maxQueuedEvents = 1000
44
45
46
47
48
49 type EventSink interface {
50 Create(event *v1.Event) (*v1.Event, error)
51 Update(event *v1.Event) (*v1.Event, error)
52 Patch(oldEvent *v1.Event, data []byte) (*v1.Event, error)
53 }
54
55
56
57 type CorrelatorOptions struct {
58
59
60
61 LRUCacheSize int
62
63
64
65 BurstSize int
66
67
68
69 QPS float32
70
71
72 KeyFunc EventAggregatorKeyFunc
73
74
75 MessageFunc EventAggregatorMessageFunc
76
77
78
79 MaxEvents int
80
81
82
83 MaxIntervalInSeconds int
84
85
86 Clock clock.PassiveClock
87
88
89 SpamKeyFunc EventSpamKeyFunc
90 }
91
92
93 type EventRecorder interface {
94
95
96
97
98
99
100
101
102
103
104
105 Event(object runtime.Object, eventtype, reason, message string)
106
107
108 Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{})
109
110
111 AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{})
112 }
113
114
115
116
117
118 type EventRecorderLogger interface {
119 EventRecorder
120
121
122
123
124
125
126 WithLogger(logger klog.Logger) EventRecorderLogger
127 }
128
129
130 type EventBroadcaster interface {
131
132
133
134 StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface
135
136
137
138 StartRecordingToSink(sink EventSink) watch.Interface
139
140
141
142 StartLogging(logf func(format string, args ...interface{})) watch.Interface
143
144
145
146 StartStructuredLogging(verbosity klog.Level) watch.Interface
147
148
149
150 NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorderLogger
151
152
153
154
155 Shutdown()
156 }
157
158
159
160 type EventRecorderAdapter struct {
161 recorder EventRecorderLogger
162 }
163
164 var _ internalevents.EventRecorder = &EventRecorderAdapter{}
165
166
167
168 func NewEventRecorderAdapter(recorder EventRecorderLogger) *EventRecorderAdapter {
169 return &EventRecorderAdapter{
170 recorder: recorder,
171 }
172 }
173
174
175 func (a *EventRecorderAdapter) Eventf(regarding, _ runtime.Object, eventtype, reason, action, note string, args ...interface{}) {
176 a.recorder.Eventf(regarding, eventtype, reason, note, args...)
177 }
178
179 func (a *EventRecorderAdapter) WithLogger(logger klog.Logger) internalevents.EventRecorderLogger {
180 return &EventRecorderAdapter{
181 recorder: a.recorder.WithLogger(logger),
182 }
183 }
184
185
186 func NewBroadcaster(opts ...BroadcasterOption) EventBroadcaster {
187 c := config{
188 sleepDuration: defaultSleepDuration,
189 }
190 for _, opt := range opts {
191 opt(&c)
192 }
193 eventBroadcaster := &eventBroadcasterImpl{
194 Broadcaster: watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
195 sleepDuration: c.sleepDuration,
196 options: c.CorrelatorOptions,
197 }
198 ctx := c.Context
199 if ctx == nil {
200 ctx = context.Background()
201 }
202
203
204
205
206
207 haveCtxCancelation := ctx.Done() == nil
208
209 eventBroadcaster.cancelationCtx, eventBroadcaster.cancel = context.WithCancel(ctx)
210
211 if haveCtxCancelation {
212
213
214
215
216
217
218 go func() {
219 <-eventBroadcaster.cancelationCtx.Done()
220 eventBroadcaster.Broadcaster.Shutdown()
221 }()
222 }
223
224 return eventBroadcaster
225 }
226
227 func NewBroadcasterForTests(sleepDuration time.Duration) EventBroadcaster {
228 return NewBroadcaster(WithSleepDuration(sleepDuration))
229 }
230
231 func NewBroadcasterWithCorrelatorOptions(options CorrelatorOptions) EventBroadcaster {
232 return NewBroadcaster(WithCorrelatorOptions(options))
233 }
234
235 func WithCorrelatorOptions(options CorrelatorOptions) BroadcasterOption {
236 return func(c *config) {
237 c.CorrelatorOptions = options
238 }
239 }
240
241
242
243
244 func WithContext(ctx context.Context) BroadcasterOption {
245 return func(c *config) {
246 c.Context = ctx
247 }
248 }
249
250 func WithSleepDuration(sleepDuration time.Duration) BroadcasterOption {
251 return func(c *config) {
252 c.sleepDuration = sleepDuration
253 }
254 }
255
256 type BroadcasterOption func(*config)
257
258 type config struct {
259 CorrelatorOptions
260 context.Context
261 sleepDuration time.Duration
262 }
263
264 type eventBroadcasterImpl struct {
265 *watch.Broadcaster
266 sleepDuration time.Duration
267 options CorrelatorOptions
268 cancelationCtx context.Context
269 cancel func()
270 }
271
272
273
274
275 func (e *eventBroadcasterImpl) StartRecordingToSink(sink EventSink) watch.Interface {
276 eventCorrelator := NewEventCorrelatorWithOptions(e.options)
277 return e.StartEventWatcher(
278 func(event *v1.Event) {
279 e.recordToSink(sink, event, eventCorrelator)
280 })
281 }
282
283 func (e *eventBroadcasterImpl) Shutdown() {
284 e.Broadcaster.Shutdown()
285 e.cancel()
286 }
287
288 func (e *eventBroadcasterImpl) recordToSink(sink EventSink, event *v1.Event, eventCorrelator *EventCorrelator) {
289
290
291 eventCopy := *event
292 event = &eventCopy
293 result, err := eventCorrelator.EventCorrelate(event)
294 if err != nil {
295 utilruntime.HandleError(err)
296 }
297 if result.Skip {
298 return
299 }
300 tries := 0
301 for {
302 if recordEvent(e.cancelationCtx, sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator) {
303 break
304 }
305 tries++
306 if tries >= maxTriesPerEvent {
307 klog.FromContext(e.cancelationCtx).Error(nil, "Unable to write event (retry limit exceeded!)", "event", event)
308 break
309 }
310
311
312
313 delay := e.sleepDuration
314 if tries == 1 {
315 delay = time.Duration(float64(delay) * rand.Float64())
316 }
317 select {
318 case <-e.cancelationCtx.Done():
319 klog.FromContext(e.cancelationCtx).Error(nil, "Unable to write event (broadcaster is shut down)", "event", event)
320 return
321 case <-time.After(delay):
322 }
323 }
324 }
325
326
327
328
329
330 func recordEvent(ctx context.Context, sink EventSink, event *v1.Event, patch []byte, updateExistingEvent bool, eventCorrelator *EventCorrelator) bool {
331 var newEvent *v1.Event
332 var err error
333 if updateExistingEvent {
334 newEvent, err = sink.Patch(event, patch)
335 }
336
337 if !updateExistingEvent || (updateExistingEvent && util.IsKeyNotFoundError(err)) {
338
339 event.ResourceVersion = ""
340 newEvent, err = sink.Create(event)
341 }
342 if err == nil {
343
344 eventCorrelator.UpdateState(newEvent)
345 return true
346 }
347
348
349
350 switch err.(type) {
351 case *restclient.RequestConstructionError:
352
353 klog.FromContext(ctx).Error(err, "Unable to construct event (will not retry!)", "event", event)
354 return true
355 case *errors.StatusError:
356 if errors.IsAlreadyExists(err) || errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
357 klog.FromContext(ctx).V(5).Info("Server rejected event (will not retry!)", "event", event, "err", err)
358 } else {
359 klog.FromContext(ctx).Error(err, "Server rejected event (will not retry!)", "event", event)
360 }
361 return true
362 case *errors.UnexpectedObjectError:
363
364
365 default:
366
367 }
368 klog.FromContext(ctx).Error(err, "Unable to write event (may retry after sleeping)", "event", event)
369 return false
370 }
371
372
373
374 func (e *eventBroadcasterImpl) StartLogging(logf func(format string, args ...interface{})) watch.Interface {
375 return e.StartEventWatcher(
376 func(e *v1.Event) {
377 logf("Event(%#v): type: '%v' reason: '%v' %v", e.InvolvedObject, e.Type, e.Reason, e.Message)
378 })
379 }
380
381
382
383
384
385 func (e *eventBroadcasterImpl) StartStructuredLogging(verbosity klog.Level) watch.Interface {
386 loggerV := klog.FromContext(e.cancelationCtx).V(int(verbosity))
387 return e.StartEventWatcher(
388 func(e *v1.Event) {
389 loggerV.Info("Event occurred", "object", klog.KRef(e.InvolvedObject.Namespace, e.InvolvedObject.Name), "fieldPath", e.InvolvedObject.FieldPath, "kind", e.InvolvedObject.Kind, "apiVersion", e.InvolvedObject.APIVersion, "type", e.Type, "reason", e.Reason, "message", e.Message)
390 })
391 }
392
393
394
395 func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface {
396 watcher, err := e.Watch()
397 if err != nil {
398 klog.FromContext(e.cancelationCtx).Error(err, "Unable start event watcher (will not retry!)")
399 }
400 go func() {
401 defer utilruntime.HandleCrash()
402 for {
403 select {
404 case <-e.cancelationCtx.Done():
405 watcher.Stop()
406 return
407 case watchEvent := <-watcher.ResultChan():
408 event, ok := watchEvent.Object.(*v1.Event)
409 if !ok {
410
411
412 continue
413 }
414 eventHandler(event)
415 }
416 }
417 }()
418 return watcher
419 }
420
421
422 func (e *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorderLogger {
423 return &recorderImplLogger{recorderImpl: &recorderImpl{scheme, source, e.Broadcaster, clock.RealClock{}}, logger: klog.Background()}
424 }
425
426 type recorderImpl struct {
427 scheme *runtime.Scheme
428 source v1.EventSource
429 *watch.Broadcaster
430 clock clock.PassiveClock
431 }
432
433 var _ EventRecorder = &recorderImpl{}
434
435 func (recorder *recorderImpl) generateEvent(logger klog.Logger, object runtime.Object, annotations map[string]string, eventtype, reason, message string) {
436 ref, err := ref.GetReference(recorder.scheme, object)
437 if err != nil {
438 logger.Error(err, "Could not construct reference, will not report event", "object", object, "eventType", eventtype, "reason", reason, "message", message)
439 return
440 }
441
442 if !util.ValidateEventType(eventtype) {
443 logger.Error(nil, "Unsupported event type", "eventType", eventtype)
444 return
445 }
446
447 event := recorder.makeEvent(ref, annotations, eventtype, reason, message)
448 event.Source = recorder.source
449
450 event.ReportingInstance = recorder.source.Host
451 event.ReportingController = recorder.source.Component
452
453
454
455
456
457
458 sent, err := recorder.ActionOrDrop(watch.Added, event)
459 if err != nil {
460 logger.Error(err, "Unable to record event (will not retry!)")
461 return
462 }
463 if !sent {
464 logger.Error(nil, "Unable to record event: too many queued events, dropped event", "event", event)
465 }
466 }
467
468 func (recorder *recorderImpl) Event(object runtime.Object, eventtype, reason, message string) {
469 recorder.generateEvent(klog.Background(), object, nil, eventtype, reason, message)
470 }
471
472 func (recorder *recorderImpl) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
473 recorder.Event(object, eventtype, reason, fmt.Sprintf(messageFmt, args...))
474 }
475
476 func (recorder *recorderImpl) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) {
477 recorder.generateEvent(klog.Background(), object, annotations, eventtype, reason, fmt.Sprintf(messageFmt, args...))
478 }
479
480 func (recorder *recorderImpl) makeEvent(ref *v1.ObjectReference, annotations map[string]string, eventtype, reason, message string) *v1.Event {
481 t := metav1.Time{Time: recorder.clock.Now()}
482 namespace := ref.Namespace
483 if namespace == "" {
484 namespace = metav1.NamespaceDefault
485 }
486 return &v1.Event{
487 ObjectMeta: metav1.ObjectMeta{
488 Name: fmt.Sprintf("%v.%x", ref.Name, t.UnixNano()),
489 Namespace: namespace,
490 Annotations: annotations,
491 },
492 InvolvedObject: *ref,
493 Reason: reason,
494 Message: message,
495 FirstTimestamp: t,
496 LastTimestamp: t,
497 Count: 1,
498 Type: eventtype,
499 }
500 }
501
502 type recorderImplLogger struct {
503 *recorderImpl
504 logger klog.Logger
505 }
506
507 var _ EventRecorderLogger = &recorderImplLogger{}
508
509 func (recorder recorderImplLogger) Event(object runtime.Object, eventtype, reason, message string) {
510 recorder.recorderImpl.generateEvent(recorder.logger, object, nil, eventtype, reason, message)
511 }
512
513 func (recorder recorderImplLogger) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
514 recorder.Event(object, eventtype, reason, fmt.Sprintf(messageFmt, args...))
515 }
516
517 func (recorder recorderImplLogger) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) {
518 recorder.generateEvent(recorder.logger, object, annotations, eventtype, reason, fmt.Sprintf(messageFmt, args...))
519 }
520
521 func (recorder recorderImplLogger) WithLogger(logger klog.Logger) EventRecorderLogger {
522 return recorderImplLogger{recorderImpl: recorder.recorderImpl, logger: logger}
523 }
524
View as plain text