...

Source file src/k8s.io/client-go/tools/record/events_cache.go

Documentation: k8s.io/client-go/tools/record

     1  /*
     2  Copyright 2015 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package record
    18  
    19  import (
    20  	"encoding/json"
    21  	"fmt"
    22  	"strings"
    23  	"sync"
    24  	"time"
    25  
    26  	"github.com/golang/groupcache/lru"
    27  
    28  	v1 "k8s.io/api/core/v1"
    29  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    30  	"k8s.io/apimachinery/pkg/util/sets"
    31  	"k8s.io/apimachinery/pkg/util/strategicpatch"
    32  	"k8s.io/client-go/util/flowcontrol"
    33  	"k8s.io/utils/clock"
    34  )
    35  
    36  const (
    37  	maxLruCacheEntries = 4096
    38  
    39  	// if we see the same event that varies only by message
    40  	// more than 10 times in a 10 minute period, aggregate the event
    41  	defaultAggregateMaxEvents         = 10
    42  	defaultAggregateIntervalInSeconds = 600
    43  
    44  	// by default, allow a source to send 25 events about an object
    45  	// but control the refill rate to 1 new event every 5 minutes
    46  	// this helps control the long-tail of events for things that are always
    47  	// unhealthy
    48  	defaultSpamBurst = 25
    49  	defaultSpamQPS   = 1. / 300.
    50  )
    51  
    52  // getEventKey builds unique event key based on source, involvedObject, reason, message
    53  func getEventKey(event *v1.Event) string {
    54  	return strings.Join([]string{
    55  		event.Source.Component,
    56  		event.Source.Host,
    57  		event.InvolvedObject.Kind,
    58  		event.InvolvedObject.Namespace,
    59  		event.InvolvedObject.Name,
    60  		event.InvolvedObject.FieldPath,
    61  		string(event.InvolvedObject.UID),
    62  		event.InvolvedObject.APIVersion,
    63  		event.Type,
    64  		event.Reason,
    65  		event.Message,
    66  	},
    67  		"")
    68  }
    69  
    70  // getSpamKey builds unique event key based on source, involvedObject
    71  func getSpamKey(event *v1.Event) string {
    72  	return strings.Join([]string{
    73  		event.Source.Component,
    74  		event.Source.Host,
    75  		event.InvolvedObject.Kind,
    76  		event.InvolvedObject.Namespace,
    77  		event.InvolvedObject.Name,
    78  		string(event.InvolvedObject.UID),
    79  		event.InvolvedObject.APIVersion,
    80  	},
    81  		"")
    82  }
    83  
    84  // EventSpamKeyFunc is a function that returns unique key based on provided event
    85  type EventSpamKeyFunc func(event *v1.Event) string
    86  
    87  // EventFilterFunc is a function that returns true if the event should be skipped
    88  type EventFilterFunc func(event *v1.Event) bool
    89  
    90  // EventSourceObjectSpamFilter is responsible for throttling
    91  // the amount of events a source and object can produce.
    92  type EventSourceObjectSpamFilter struct {
    93  	sync.RWMutex
    94  
    95  	// the cache that manages last synced state
    96  	cache *lru.Cache
    97  
    98  	// burst is the amount of events we allow per source + object
    99  	burst int
   100  
   101  	// qps is the refill rate of the token bucket in queries per second
   102  	qps float32
   103  
   104  	// clock is used to allow for testing over a time interval
   105  	clock clock.PassiveClock
   106  
   107  	// spamKeyFunc is a func used to create a key based on an event, which is later used to filter spam events.
   108  	spamKeyFunc EventSpamKeyFunc
   109  }
   110  
   111  // NewEventSourceObjectSpamFilter allows burst events from a source about an object with the specified qps refill.
   112  func NewEventSourceObjectSpamFilter(lruCacheSize, burst int, qps float32, clock clock.PassiveClock, spamKeyFunc EventSpamKeyFunc) *EventSourceObjectSpamFilter {
   113  	return &EventSourceObjectSpamFilter{
   114  		cache:       lru.New(lruCacheSize),
   115  		burst:       burst,
   116  		qps:         qps,
   117  		clock:       clock,
   118  		spamKeyFunc: spamKeyFunc,
   119  	}
   120  }
   121  
   122  // spamRecord holds data used to perform spam filtering decisions.
   123  type spamRecord struct {
   124  	// rateLimiter controls the rate of events about this object
   125  	rateLimiter flowcontrol.PassiveRateLimiter
   126  }
   127  
   128  // Filter controls that a given source+object are not exceeding the allowed rate.
   129  func (f *EventSourceObjectSpamFilter) Filter(event *v1.Event) bool {
   130  	var record spamRecord
   131  
   132  	// controls our cached information about this event
   133  	eventKey := f.spamKeyFunc(event)
   134  
   135  	// do we have a record of similar events in our cache?
   136  	f.Lock()
   137  	defer f.Unlock()
   138  	value, found := f.cache.Get(eventKey)
   139  	if found {
   140  		record = value.(spamRecord)
   141  	}
   142  
   143  	// verify we have a rate limiter for this record
   144  	if record.rateLimiter == nil {
   145  		record.rateLimiter = flowcontrol.NewTokenBucketPassiveRateLimiterWithClock(f.qps, f.burst, f.clock)
   146  	}
   147  
   148  	// ensure we have available rate
   149  	filter := !record.rateLimiter.TryAccept()
   150  
   151  	// update the cache
   152  	f.cache.Add(eventKey, record)
   153  
   154  	return filter
   155  }
   156  
   157  // EventAggregatorKeyFunc is responsible for grouping events for aggregation
   158  // It returns a tuple of the following:
   159  // aggregateKey - key the identifies the aggregate group to bucket this event
   160  // localKey - key that makes this event in the local group
   161  type EventAggregatorKeyFunc func(event *v1.Event) (aggregateKey string, localKey string)
   162  
   163  // EventAggregatorByReasonFunc aggregates events by exact match on event.Source, event.InvolvedObject, event.Type,
   164  // event.Reason, event.ReportingController and event.ReportingInstance
   165  func EventAggregatorByReasonFunc(event *v1.Event) (string, string) {
   166  	return strings.Join([]string{
   167  		event.Source.Component,
   168  		event.Source.Host,
   169  		event.InvolvedObject.Kind,
   170  		event.InvolvedObject.Namespace,
   171  		event.InvolvedObject.Name,
   172  		string(event.InvolvedObject.UID),
   173  		event.InvolvedObject.APIVersion,
   174  		event.Type,
   175  		event.Reason,
   176  		event.ReportingController,
   177  		event.ReportingInstance,
   178  	},
   179  		""), event.Message
   180  }
   181  
   182  // EventAggregatorMessageFunc is responsible for producing an aggregation message
   183  type EventAggregatorMessageFunc func(event *v1.Event) string
   184  
   185  // EventAggregatorByReasonMessageFunc returns an aggregate message by prefixing the incoming message
   186  func EventAggregatorByReasonMessageFunc(event *v1.Event) string {
   187  	return "(combined from similar events): " + event.Message
   188  }
   189  
   190  // EventAggregator identifies similar events and aggregates them into a single event
   191  type EventAggregator struct {
   192  	sync.RWMutex
   193  
   194  	// The cache that manages aggregation state
   195  	cache *lru.Cache
   196  
   197  	// The function that groups events for aggregation
   198  	keyFunc EventAggregatorKeyFunc
   199  
   200  	// The function that generates a message for an aggregate event
   201  	messageFunc EventAggregatorMessageFunc
   202  
   203  	// The maximum number of events in the specified interval before aggregation occurs
   204  	maxEvents uint
   205  
   206  	// The amount of time in seconds that must transpire since the last occurrence of a similar event before it's considered new
   207  	maxIntervalInSeconds uint
   208  
   209  	// clock is used to allow for testing over a time interval
   210  	clock clock.PassiveClock
   211  }
   212  
   213  // NewEventAggregator returns a new instance of an EventAggregator
   214  func NewEventAggregator(lruCacheSize int, keyFunc EventAggregatorKeyFunc, messageFunc EventAggregatorMessageFunc,
   215  	maxEvents int, maxIntervalInSeconds int, clock clock.PassiveClock) *EventAggregator {
   216  	return &EventAggregator{
   217  		cache:                lru.New(lruCacheSize),
   218  		keyFunc:              keyFunc,
   219  		messageFunc:          messageFunc,
   220  		maxEvents:            uint(maxEvents),
   221  		maxIntervalInSeconds: uint(maxIntervalInSeconds),
   222  		clock:                clock,
   223  	}
   224  }
   225  
   226  // aggregateRecord holds data used to perform aggregation decisions
   227  type aggregateRecord struct {
   228  	// we track the number of unique local keys we have seen in the aggregate set to know when to actually aggregate
   229  	// if the size of this set exceeds the max, we know we need to aggregate
   230  	localKeys sets.String
   231  	// The last time at which the aggregate was recorded
   232  	lastTimestamp metav1.Time
   233  }
   234  
   235  // EventAggregate checks if a similar event has been seen according to the
   236  // aggregation configuration (max events, max interval, etc) and returns:
   237  //
   238  //   - The (potentially modified) event that should be created
   239  //   - The cache key for the event, for correlation purposes. This will be set to
   240  //     the full key for normal events, and to the result of
   241  //     EventAggregatorMessageFunc for aggregate events.
   242  func (e *EventAggregator) EventAggregate(newEvent *v1.Event) (*v1.Event, string) {
   243  	now := metav1.NewTime(e.clock.Now())
   244  	var record aggregateRecord
   245  	// eventKey is the full cache key for this event
   246  	eventKey := getEventKey(newEvent)
   247  	// aggregateKey is for the aggregate event, if one is needed.
   248  	aggregateKey, localKey := e.keyFunc(newEvent)
   249  
   250  	// Do we have a record of similar events in our cache?
   251  	e.Lock()
   252  	defer e.Unlock()
   253  	value, found := e.cache.Get(aggregateKey)
   254  	if found {
   255  		record = value.(aggregateRecord)
   256  	}
   257  
   258  	// Is the previous record too old? If so, make a fresh one. Note: if we didn't
   259  	// find a similar record, its lastTimestamp will be the zero value, so we
   260  	// create a new one in that case.
   261  	maxInterval := time.Duration(e.maxIntervalInSeconds) * time.Second
   262  	interval := now.Time.Sub(record.lastTimestamp.Time)
   263  	if interval > maxInterval {
   264  		record = aggregateRecord{localKeys: sets.NewString()}
   265  	}
   266  
   267  	// Write the new event into the aggregation record and put it on the cache
   268  	record.localKeys.Insert(localKey)
   269  	record.lastTimestamp = now
   270  	e.cache.Add(aggregateKey, record)
   271  
   272  	// If we are not yet over the threshold for unique events, don't correlate them
   273  	if uint(record.localKeys.Len()) < e.maxEvents {
   274  		return newEvent, eventKey
   275  	}
   276  
   277  	// do not grow our local key set any larger than max
   278  	record.localKeys.PopAny()
   279  
   280  	// create a new aggregate event, and return the aggregateKey as the cache key
   281  	// (so that it can be overwritten.)
   282  	eventCopy := &v1.Event{
   283  		ObjectMeta: metav1.ObjectMeta{
   284  			Name:      fmt.Sprintf("%v.%x", newEvent.InvolvedObject.Name, now.UnixNano()),
   285  			Namespace: newEvent.Namespace,
   286  		},
   287  		Count:          1,
   288  		FirstTimestamp: now,
   289  		InvolvedObject: newEvent.InvolvedObject,
   290  		LastTimestamp:  now,
   291  		Message:        e.messageFunc(newEvent),
   292  		Type:           newEvent.Type,
   293  		Reason:         newEvent.Reason,
   294  		Source:         newEvent.Source,
   295  	}
   296  	return eventCopy, aggregateKey
   297  }
   298  
   299  // eventLog records data about when an event was observed
   300  type eventLog struct {
   301  	// The number of times the event has occurred since first occurrence.
   302  	count uint
   303  
   304  	// The time at which the event was first recorded.
   305  	firstTimestamp metav1.Time
   306  
   307  	// The unique name of the first occurrence of this event
   308  	name string
   309  
   310  	// Resource version returned from previous interaction with server
   311  	resourceVersion string
   312  }
   313  
   314  // eventLogger logs occurrences of an event
   315  type eventLogger struct {
   316  	sync.RWMutex
   317  	cache *lru.Cache
   318  	clock clock.PassiveClock
   319  }
   320  
   321  // newEventLogger observes events and counts their frequencies
   322  func newEventLogger(lruCacheEntries int, clock clock.PassiveClock) *eventLogger {
   323  	return &eventLogger{cache: lru.New(lruCacheEntries), clock: clock}
   324  }
   325  
   326  // eventObserve records an event, or updates an existing one if key is a cache hit
   327  func (e *eventLogger) eventObserve(newEvent *v1.Event, key string) (*v1.Event, []byte, error) {
   328  	var (
   329  		patch []byte
   330  		err   error
   331  	)
   332  	eventCopy := *newEvent
   333  	event := &eventCopy
   334  
   335  	e.Lock()
   336  	defer e.Unlock()
   337  
   338  	// Check if there is an existing event we should update
   339  	lastObservation := e.lastEventObservationFromCache(key)
   340  
   341  	// If we found a result, prepare a patch
   342  	if lastObservation.count > 0 {
   343  		// update the event based on the last observation so patch will work as desired
   344  		event.Name = lastObservation.name
   345  		event.ResourceVersion = lastObservation.resourceVersion
   346  		event.FirstTimestamp = lastObservation.firstTimestamp
   347  		event.Count = int32(lastObservation.count) + 1
   348  
   349  		eventCopy2 := *event
   350  		eventCopy2.Count = 0
   351  		eventCopy2.LastTimestamp = metav1.NewTime(time.Unix(0, 0))
   352  		eventCopy2.Message = ""
   353  
   354  		newData, _ := json.Marshal(event)
   355  		oldData, _ := json.Marshal(eventCopy2)
   356  		patch, err = strategicpatch.CreateTwoWayMergePatch(oldData, newData, event)
   357  	}
   358  
   359  	// record our new observation
   360  	e.cache.Add(
   361  		key,
   362  		eventLog{
   363  			count:           uint(event.Count),
   364  			firstTimestamp:  event.FirstTimestamp,
   365  			name:            event.Name,
   366  			resourceVersion: event.ResourceVersion,
   367  		},
   368  	)
   369  	return event, patch, err
   370  }
   371  
   372  // updateState updates its internal tracking information based on latest server state
   373  func (e *eventLogger) updateState(event *v1.Event) {
   374  	key := getEventKey(event)
   375  	e.Lock()
   376  	defer e.Unlock()
   377  	// record our new observation
   378  	e.cache.Add(
   379  		key,
   380  		eventLog{
   381  			count:           uint(event.Count),
   382  			firstTimestamp:  event.FirstTimestamp,
   383  			name:            event.Name,
   384  			resourceVersion: event.ResourceVersion,
   385  		},
   386  	)
   387  }
   388  
   389  // lastEventObservationFromCache returns the event from the cache, reads must be protected via external lock
   390  func (e *eventLogger) lastEventObservationFromCache(key string) eventLog {
   391  	value, ok := e.cache.Get(key)
   392  	if ok {
   393  		observationValue, ok := value.(eventLog)
   394  		if ok {
   395  			return observationValue
   396  		}
   397  	}
   398  	return eventLog{}
   399  }
   400  
   401  // EventCorrelator processes all incoming events and performs analysis to avoid overwhelming the system.  It can filter all
   402  // incoming events to see if the event should be filtered from further processing.  It can aggregate similar events that occur
   403  // frequently to protect the system from spamming events that are difficult for users to distinguish.  It performs de-duplication
   404  // to ensure events that are observed multiple times are compacted into a single event with increasing counts.
   405  type EventCorrelator struct {
   406  	// the function to filter the event
   407  	filterFunc EventFilterFunc
   408  	// the object that performs event aggregation
   409  	aggregator *EventAggregator
   410  	// the object that observes events as they come through
   411  	logger *eventLogger
   412  }
   413  
   414  // EventCorrelateResult is the result of a Correlate
   415  type EventCorrelateResult struct {
   416  	// the event after correlation
   417  	Event *v1.Event
   418  	// if provided, perform a strategic patch when updating the record on the server
   419  	Patch []byte
   420  	// if true, do no further processing of the event
   421  	Skip bool
   422  }
   423  
   424  // NewEventCorrelator returns an EventCorrelator configured with default values.
   425  //
   426  // The EventCorrelator is responsible for event filtering, aggregating, and counting
   427  // prior to interacting with the API server to record the event.
   428  //
   429  // The default behavior is as follows:
   430  //   - Aggregation is performed if a similar event is recorded 10 times
   431  //     in a 10 minute rolling interval.  A similar event is an event that varies only by
   432  //     the Event.Message field.  Rather than recording the precise event, aggregation
   433  //     will create a new event whose message reports that it has combined events with
   434  //     the same reason.
   435  //   - Events are incrementally counted if the exact same event is encountered multiple
   436  //     times.
   437  //   - A source may burst 25 events about an object, but has a refill rate budget
   438  //     per object of 1 event every 5 minutes to control long-tail of spam.
   439  func NewEventCorrelator(clock clock.PassiveClock) *EventCorrelator {
   440  	cacheSize := maxLruCacheEntries
   441  	spamFilter := NewEventSourceObjectSpamFilter(cacheSize, defaultSpamBurst, defaultSpamQPS, clock, getSpamKey)
   442  	return &EventCorrelator{
   443  		filterFunc: spamFilter.Filter,
   444  		aggregator: NewEventAggregator(
   445  			cacheSize,
   446  			EventAggregatorByReasonFunc,
   447  			EventAggregatorByReasonMessageFunc,
   448  			defaultAggregateMaxEvents,
   449  			defaultAggregateIntervalInSeconds,
   450  			clock),
   451  
   452  		logger: newEventLogger(cacheSize, clock),
   453  	}
   454  }
   455  
   456  func NewEventCorrelatorWithOptions(options CorrelatorOptions) *EventCorrelator {
   457  	optionsWithDefaults := populateDefaults(options)
   458  	spamFilter := NewEventSourceObjectSpamFilter(
   459  		optionsWithDefaults.LRUCacheSize,
   460  		optionsWithDefaults.BurstSize,
   461  		optionsWithDefaults.QPS,
   462  		optionsWithDefaults.Clock,
   463  		optionsWithDefaults.SpamKeyFunc)
   464  	return &EventCorrelator{
   465  		filterFunc: spamFilter.Filter,
   466  		aggregator: NewEventAggregator(
   467  			optionsWithDefaults.LRUCacheSize,
   468  			optionsWithDefaults.KeyFunc,
   469  			optionsWithDefaults.MessageFunc,
   470  			optionsWithDefaults.MaxEvents,
   471  			optionsWithDefaults.MaxIntervalInSeconds,
   472  			optionsWithDefaults.Clock),
   473  		logger: newEventLogger(optionsWithDefaults.LRUCacheSize, optionsWithDefaults.Clock),
   474  	}
   475  }
   476  
   477  // populateDefaults populates the zero value options with defaults
   478  func populateDefaults(options CorrelatorOptions) CorrelatorOptions {
   479  	if options.LRUCacheSize == 0 {
   480  		options.LRUCacheSize = maxLruCacheEntries
   481  	}
   482  	if options.BurstSize == 0 {
   483  		options.BurstSize = defaultSpamBurst
   484  	}
   485  	if options.QPS == 0 {
   486  		options.QPS = defaultSpamQPS
   487  	}
   488  	if options.KeyFunc == nil {
   489  		options.KeyFunc = EventAggregatorByReasonFunc
   490  	}
   491  	if options.MessageFunc == nil {
   492  		options.MessageFunc = EventAggregatorByReasonMessageFunc
   493  	}
   494  	if options.MaxEvents == 0 {
   495  		options.MaxEvents = defaultAggregateMaxEvents
   496  	}
   497  	if options.MaxIntervalInSeconds == 0 {
   498  		options.MaxIntervalInSeconds = defaultAggregateIntervalInSeconds
   499  	}
   500  	if options.Clock == nil {
   501  		options.Clock = clock.RealClock{}
   502  	}
   503  	if options.SpamKeyFunc == nil {
   504  		options.SpamKeyFunc = getSpamKey
   505  	}
   506  	return options
   507  }
   508  
   509  // EventCorrelate filters, aggregates, counts, and de-duplicates all incoming events
   510  func (c *EventCorrelator) EventCorrelate(newEvent *v1.Event) (*EventCorrelateResult, error) {
   511  	if newEvent == nil {
   512  		return nil, fmt.Errorf("event is nil")
   513  	}
   514  	aggregateEvent, ckey := c.aggregator.EventAggregate(newEvent)
   515  	observedEvent, patch, err := c.logger.eventObserve(aggregateEvent, ckey)
   516  	if c.filterFunc(observedEvent) {
   517  		return &EventCorrelateResult{Skip: true}, nil
   518  	}
   519  	return &EventCorrelateResult{Event: observedEvent, Patch: patch}, err
   520  }
   521  
   522  // UpdateState based on the latest observed state from server
   523  func (c *EventCorrelator) UpdateState(event *v1.Event) {
   524  	c.logger.updateState(event)
   525  }
   526  

View as plain text