1
16
17 package record
18
19 import (
20 "encoding/json"
21 "fmt"
22 "strings"
23 "sync"
24 "time"
25
26 "github.com/golang/groupcache/lru"
27
28 v1 "k8s.io/api/core/v1"
29 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30 "k8s.io/apimachinery/pkg/util/sets"
31 "k8s.io/apimachinery/pkg/util/strategicpatch"
32 "k8s.io/client-go/util/flowcontrol"
33 "k8s.io/utils/clock"
34 )
35
36 const (
37 maxLruCacheEntries = 4096
38
39
40
41 defaultAggregateMaxEvents = 10
42 defaultAggregateIntervalInSeconds = 600
43
44
45
46
47
48 defaultSpamBurst = 25
49 defaultSpamQPS = 1. / 300.
50 )
51
52
53 func getEventKey(event *v1.Event) string {
54 return strings.Join([]string{
55 event.Source.Component,
56 event.Source.Host,
57 event.InvolvedObject.Kind,
58 event.InvolvedObject.Namespace,
59 event.InvolvedObject.Name,
60 event.InvolvedObject.FieldPath,
61 string(event.InvolvedObject.UID),
62 event.InvolvedObject.APIVersion,
63 event.Type,
64 event.Reason,
65 event.Message,
66 },
67 "")
68 }
69
70
71 func getSpamKey(event *v1.Event) string {
72 return strings.Join([]string{
73 event.Source.Component,
74 event.Source.Host,
75 event.InvolvedObject.Kind,
76 event.InvolvedObject.Namespace,
77 event.InvolvedObject.Name,
78 string(event.InvolvedObject.UID),
79 event.InvolvedObject.APIVersion,
80 },
81 "")
82 }
83
84
85 type EventSpamKeyFunc func(event *v1.Event) string
86
87
88 type EventFilterFunc func(event *v1.Event) bool
89
90
91
92 type EventSourceObjectSpamFilter struct {
93 sync.RWMutex
94
95
96 cache *lru.Cache
97
98
99 burst int
100
101
102 qps float32
103
104
105 clock clock.PassiveClock
106
107
108 spamKeyFunc EventSpamKeyFunc
109 }
110
111
112 func NewEventSourceObjectSpamFilter(lruCacheSize, burst int, qps float32, clock clock.PassiveClock, spamKeyFunc EventSpamKeyFunc) *EventSourceObjectSpamFilter {
113 return &EventSourceObjectSpamFilter{
114 cache: lru.New(lruCacheSize),
115 burst: burst,
116 qps: qps,
117 clock: clock,
118 spamKeyFunc: spamKeyFunc,
119 }
120 }
121
122
123 type spamRecord struct {
124
125 rateLimiter flowcontrol.PassiveRateLimiter
126 }
127
128
129 func (f *EventSourceObjectSpamFilter) Filter(event *v1.Event) bool {
130 var record spamRecord
131
132
133 eventKey := f.spamKeyFunc(event)
134
135
136 f.Lock()
137 defer f.Unlock()
138 value, found := f.cache.Get(eventKey)
139 if found {
140 record = value.(spamRecord)
141 }
142
143
144 if record.rateLimiter == nil {
145 record.rateLimiter = flowcontrol.NewTokenBucketPassiveRateLimiterWithClock(f.qps, f.burst, f.clock)
146 }
147
148
149 filter := !record.rateLimiter.TryAccept()
150
151
152 f.cache.Add(eventKey, record)
153
154 return filter
155 }
156
157
158
159
160
161 type EventAggregatorKeyFunc func(event *v1.Event) (aggregateKey string, localKey string)
162
163
164
165 func EventAggregatorByReasonFunc(event *v1.Event) (string, string) {
166 return strings.Join([]string{
167 event.Source.Component,
168 event.Source.Host,
169 event.InvolvedObject.Kind,
170 event.InvolvedObject.Namespace,
171 event.InvolvedObject.Name,
172 string(event.InvolvedObject.UID),
173 event.InvolvedObject.APIVersion,
174 event.Type,
175 event.Reason,
176 event.ReportingController,
177 event.ReportingInstance,
178 },
179 ""), event.Message
180 }
181
182
183 type EventAggregatorMessageFunc func(event *v1.Event) string
184
185
186 func EventAggregatorByReasonMessageFunc(event *v1.Event) string {
187 return "(combined from similar events): " + event.Message
188 }
189
190
191 type EventAggregator struct {
192 sync.RWMutex
193
194
195 cache *lru.Cache
196
197
198 keyFunc EventAggregatorKeyFunc
199
200
201 messageFunc EventAggregatorMessageFunc
202
203
204 maxEvents uint
205
206
207 maxIntervalInSeconds uint
208
209
210 clock clock.PassiveClock
211 }
212
213
214 func NewEventAggregator(lruCacheSize int, keyFunc EventAggregatorKeyFunc, messageFunc EventAggregatorMessageFunc,
215 maxEvents int, maxIntervalInSeconds int, clock clock.PassiveClock) *EventAggregator {
216 return &EventAggregator{
217 cache: lru.New(lruCacheSize),
218 keyFunc: keyFunc,
219 messageFunc: messageFunc,
220 maxEvents: uint(maxEvents),
221 maxIntervalInSeconds: uint(maxIntervalInSeconds),
222 clock: clock,
223 }
224 }
225
226
227 type aggregateRecord struct {
228
229
230 localKeys sets.String
231
232 lastTimestamp metav1.Time
233 }
234
235
236
237
238
239
240
241
242 func (e *EventAggregator) EventAggregate(newEvent *v1.Event) (*v1.Event, string) {
243 now := metav1.NewTime(e.clock.Now())
244 var record aggregateRecord
245
246 eventKey := getEventKey(newEvent)
247
248 aggregateKey, localKey := e.keyFunc(newEvent)
249
250
251 e.Lock()
252 defer e.Unlock()
253 value, found := e.cache.Get(aggregateKey)
254 if found {
255 record = value.(aggregateRecord)
256 }
257
258
259
260
261 maxInterval := time.Duration(e.maxIntervalInSeconds) * time.Second
262 interval := now.Time.Sub(record.lastTimestamp.Time)
263 if interval > maxInterval {
264 record = aggregateRecord{localKeys: sets.NewString()}
265 }
266
267
268 record.localKeys.Insert(localKey)
269 record.lastTimestamp = now
270 e.cache.Add(aggregateKey, record)
271
272
273 if uint(record.localKeys.Len()) < e.maxEvents {
274 return newEvent, eventKey
275 }
276
277
278 record.localKeys.PopAny()
279
280
281
282 eventCopy := &v1.Event{
283 ObjectMeta: metav1.ObjectMeta{
284 Name: fmt.Sprintf("%v.%x", newEvent.InvolvedObject.Name, now.UnixNano()),
285 Namespace: newEvent.Namespace,
286 },
287 Count: 1,
288 FirstTimestamp: now,
289 InvolvedObject: newEvent.InvolvedObject,
290 LastTimestamp: now,
291 Message: e.messageFunc(newEvent),
292 Type: newEvent.Type,
293 Reason: newEvent.Reason,
294 Source: newEvent.Source,
295 }
296 return eventCopy, aggregateKey
297 }
298
299
300 type eventLog struct {
301
302 count uint
303
304
305 firstTimestamp metav1.Time
306
307
308 name string
309
310
311 resourceVersion string
312 }
313
314
315 type eventLogger struct {
316 sync.RWMutex
317 cache *lru.Cache
318 clock clock.PassiveClock
319 }
320
321
322 func newEventLogger(lruCacheEntries int, clock clock.PassiveClock) *eventLogger {
323 return &eventLogger{cache: lru.New(lruCacheEntries), clock: clock}
324 }
325
326
327 func (e *eventLogger) eventObserve(newEvent *v1.Event, key string) (*v1.Event, []byte, error) {
328 var (
329 patch []byte
330 err error
331 )
332 eventCopy := *newEvent
333 event := &eventCopy
334
335 e.Lock()
336 defer e.Unlock()
337
338
339 lastObservation := e.lastEventObservationFromCache(key)
340
341
342 if lastObservation.count > 0 {
343
344 event.Name = lastObservation.name
345 event.ResourceVersion = lastObservation.resourceVersion
346 event.FirstTimestamp = lastObservation.firstTimestamp
347 event.Count = int32(lastObservation.count) + 1
348
349 eventCopy2 := *event
350 eventCopy2.Count = 0
351 eventCopy2.LastTimestamp = metav1.NewTime(time.Unix(0, 0))
352 eventCopy2.Message = ""
353
354 newData, _ := json.Marshal(event)
355 oldData, _ := json.Marshal(eventCopy2)
356 patch, err = strategicpatch.CreateTwoWayMergePatch(oldData, newData, event)
357 }
358
359
360 e.cache.Add(
361 key,
362 eventLog{
363 count: uint(event.Count),
364 firstTimestamp: event.FirstTimestamp,
365 name: event.Name,
366 resourceVersion: event.ResourceVersion,
367 },
368 )
369 return event, patch, err
370 }
371
372
373 func (e *eventLogger) updateState(event *v1.Event) {
374 key := getEventKey(event)
375 e.Lock()
376 defer e.Unlock()
377
378 e.cache.Add(
379 key,
380 eventLog{
381 count: uint(event.Count),
382 firstTimestamp: event.FirstTimestamp,
383 name: event.Name,
384 resourceVersion: event.ResourceVersion,
385 },
386 )
387 }
388
389
390 func (e *eventLogger) lastEventObservationFromCache(key string) eventLog {
391 value, ok := e.cache.Get(key)
392 if ok {
393 observationValue, ok := value.(eventLog)
394 if ok {
395 return observationValue
396 }
397 }
398 return eventLog{}
399 }
400
401
402
403
404
405 type EventCorrelator struct {
406
407 filterFunc EventFilterFunc
408
409 aggregator *EventAggregator
410
411 logger *eventLogger
412 }
413
414
415 type EventCorrelateResult struct {
416
417 Event *v1.Event
418
419 Patch []byte
420
421 Skip bool
422 }
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439 func NewEventCorrelator(clock clock.PassiveClock) *EventCorrelator {
440 cacheSize := maxLruCacheEntries
441 spamFilter := NewEventSourceObjectSpamFilter(cacheSize, defaultSpamBurst, defaultSpamQPS, clock, getSpamKey)
442 return &EventCorrelator{
443 filterFunc: spamFilter.Filter,
444 aggregator: NewEventAggregator(
445 cacheSize,
446 EventAggregatorByReasonFunc,
447 EventAggregatorByReasonMessageFunc,
448 defaultAggregateMaxEvents,
449 defaultAggregateIntervalInSeconds,
450 clock),
451
452 logger: newEventLogger(cacheSize, clock),
453 }
454 }
455
456 func NewEventCorrelatorWithOptions(options CorrelatorOptions) *EventCorrelator {
457 optionsWithDefaults := populateDefaults(options)
458 spamFilter := NewEventSourceObjectSpamFilter(
459 optionsWithDefaults.LRUCacheSize,
460 optionsWithDefaults.BurstSize,
461 optionsWithDefaults.QPS,
462 optionsWithDefaults.Clock,
463 optionsWithDefaults.SpamKeyFunc)
464 return &EventCorrelator{
465 filterFunc: spamFilter.Filter,
466 aggregator: NewEventAggregator(
467 optionsWithDefaults.LRUCacheSize,
468 optionsWithDefaults.KeyFunc,
469 optionsWithDefaults.MessageFunc,
470 optionsWithDefaults.MaxEvents,
471 optionsWithDefaults.MaxIntervalInSeconds,
472 optionsWithDefaults.Clock),
473 logger: newEventLogger(optionsWithDefaults.LRUCacheSize, optionsWithDefaults.Clock),
474 }
475 }
476
477
478 func populateDefaults(options CorrelatorOptions) CorrelatorOptions {
479 if options.LRUCacheSize == 0 {
480 options.LRUCacheSize = maxLruCacheEntries
481 }
482 if options.BurstSize == 0 {
483 options.BurstSize = defaultSpamBurst
484 }
485 if options.QPS == 0 {
486 options.QPS = defaultSpamQPS
487 }
488 if options.KeyFunc == nil {
489 options.KeyFunc = EventAggregatorByReasonFunc
490 }
491 if options.MessageFunc == nil {
492 options.MessageFunc = EventAggregatorByReasonMessageFunc
493 }
494 if options.MaxEvents == 0 {
495 options.MaxEvents = defaultAggregateMaxEvents
496 }
497 if options.MaxIntervalInSeconds == 0 {
498 options.MaxIntervalInSeconds = defaultAggregateIntervalInSeconds
499 }
500 if options.Clock == nil {
501 options.Clock = clock.RealClock{}
502 }
503 if options.SpamKeyFunc == nil {
504 options.SpamKeyFunc = getSpamKey
505 }
506 return options
507 }
508
509
510 func (c *EventCorrelator) EventCorrelate(newEvent *v1.Event) (*EventCorrelateResult, error) {
511 if newEvent == nil {
512 return nil, fmt.Errorf("event is nil")
513 }
514 aggregateEvent, ckey := c.aggregator.EventAggregate(newEvent)
515 observedEvent, patch, err := c.logger.eventObserve(aggregateEvent, ckey)
516 if c.filterFunc(observedEvent) {
517 return &EventCorrelateResult{Skip: true}, nil
518 }
519 return &EventCorrelateResult{Event: observedEvent, Patch: patch}, err
520 }
521
522
523 func (c *EventCorrelator) UpdateState(event *v1.Event) {
524 c.logger.updateState(event)
525 }
526
View as plain text