...

Source file src/k8s.io/client-go/tools/record/event.go

Documentation: k8s.io/client-go/tools/record

     1  /*
     2  Copyright 2014 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package record
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"math/rand"
    23  	"time"
    24  
    25  	v1 "k8s.io/api/core/v1"
    26  	"k8s.io/apimachinery/pkg/api/errors"
    27  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    28  	"k8s.io/apimachinery/pkg/runtime"
    29  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    30  	"k8s.io/apimachinery/pkg/watch"
    31  	restclient "k8s.io/client-go/rest"
    32  	internalevents "k8s.io/client-go/tools/internal/events"
    33  	"k8s.io/client-go/tools/record/util"
    34  	ref "k8s.io/client-go/tools/reference"
    35  	"k8s.io/klog/v2"
    36  	"k8s.io/utils/clock"
    37  )
    38  
    39  const maxTriesPerEvent = 12
    40  
    41  var defaultSleepDuration = 10 * time.Second
    42  
    43  const maxQueuedEvents = 1000
    44  
    45  // EventSink knows how to store events (client.Client implements it.)
    46  // EventSink must respect the namespace that will be embedded in 'event'.
    47  // It is assumed that EventSink will return the same sorts of errors as
    48  // pkg/client's REST client.
    49  type EventSink interface {
    50  	Create(event *v1.Event) (*v1.Event, error)
    51  	Update(event *v1.Event) (*v1.Event, error)
    52  	Patch(oldEvent *v1.Event, data []byte) (*v1.Event, error)
    53  }
    54  
    55  // CorrelatorOptions allows you to change the default of the EventSourceObjectSpamFilter
    56  // and EventAggregator in EventCorrelator
    57  type CorrelatorOptions struct {
    58  	// The lru cache size used for both EventSourceObjectSpamFilter and the EventAggregator
    59  	// If not specified (zero value), the default specified in events_cache.go will be picked
    60  	// This means that the LRUCacheSize has to be greater than 0.
    61  	LRUCacheSize int
    62  	// The burst size used by the token bucket rate filtering in EventSourceObjectSpamFilter
    63  	// If not specified (zero value), the default specified in events_cache.go will be picked
    64  	// This means that the BurstSize has to be greater than 0.
    65  	BurstSize int
    66  	// The fill rate of the token bucket in queries per second in EventSourceObjectSpamFilter
    67  	// If not specified (zero value), the default specified in events_cache.go will be picked
    68  	// This means that the QPS has to be greater than 0.
    69  	QPS float32
    70  	// The func used by the EventAggregator to group event keys for aggregation
    71  	// If not specified (zero value), EventAggregatorByReasonFunc will be used
    72  	KeyFunc EventAggregatorKeyFunc
    73  	// The func used by the EventAggregator to produced aggregated message
    74  	// If not specified (zero value), EventAggregatorByReasonMessageFunc will be used
    75  	MessageFunc EventAggregatorMessageFunc
    76  	// The number of events in an interval before aggregation happens by the EventAggregator
    77  	// If not specified (zero value), the default specified in events_cache.go will be picked
    78  	// This means that the MaxEvents has to be greater than 0
    79  	MaxEvents int
    80  	// The amount of time in seconds that must transpire since the last occurrence of a similar event before it is considered new by the EventAggregator
    81  	// If not specified (zero value), the default specified in events_cache.go will be picked
    82  	// This means that the MaxIntervalInSeconds has to be greater than 0
    83  	MaxIntervalInSeconds int
    84  	// The clock used by the EventAggregator to allow for testing
    85  	// If not specified (zero value), clock.RealClock{} will be used
    86  	Clock clock.PassiveClock
    87  	// The func used by EventFilterFunc, which returns a key for given event, based on which filtering will take place
    88  	// If not specified (zero value), getSpamKey will be used
    89  	SpamKeyFunc EventSpamKeyFunc
    90  }
    91  
    92  // EventRecorder knows how to record events on behalf of an EventSource.
    93  type EventRecorder interface {
    94  	// Event constructs an event from the given information and puts it in the queue for sending.
    95  	// 'object' is the object this event is about. Event will make a reference-- or you may also
    96  	// pass a reference to the object directly.
    97  	// 'eventtype' of this event, and can be one of Normal, Warning. New types could be added in future
    98  	// 'reason' is the reason this event is generated. 'reason' should be short and unique; it
    99  	// should be in UpperCamelCase format (starting with a capital letter). "reason" will be used
   100  	// to automate handling of events, so imagine people writing switch statements to handle them.
   101  	// You want to make that easy.
   102  	// 'message' is intended to be human readable.
   103  	//
   104  	// The resulting event will be created in the same namespace as the reference object.
   105  	Event(object runtime.Object, eventtype, reason, message string)
   106  
   107  	// Eventf is just like Event, but with Sprintf for the message field.
   108  	Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{})
   109  
   110  	// AnnotatedEventf is just like eventf, but with annotations attached
   111  	AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{})
   112  }
   113  
   114  // EventRecorderLogger extends EventRecorder such that a logger can
   115  // be set for methods in EventRecorder. Normally, those methods
   116  // uses the global default logger to record errors and debug messages.
   117  // If that is not desired, use WithLogger to provide a logger instance.
   118  type EventRecorderLogger interface {
   119  	EventRecorder
   120  
   121  	// WithLogger replaces the context used for logging. This is a cheap call
   122  	// and meant to be used for contextual logging:
   123  	//    recorder := ...
   124  	//    logger := klog.FromContext(ctx)
   125  	//    recorder.WithLogger(logger).Eventf(...)
   126  	WithLogger(logger klog.Logger) EventRecorderLogger
   127  }
   128  
   129  // EventBroadcaster knows how to receive events and send them to any EventSink, watcher, or log.
   130  type EventBroadcaster interface {
   131  	// StartEventWatcher starts sending events received from this EventBroadcaster to the given
   132  	// event handler function. The return value can be ignored or used to stop recording, if
   133  	// desired.
   134  	StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface
   135  
   136  	// StartRecordingToSink starts sending events received from this EventBroadcaster to the given
   137  	// sink. The return value can be ignored or used to stop recording, if desired.
   138  	StartRecordingToSink(sink EventSink) watch.Interface
   139  
   140  	// StartLogging starts sending events received from this EventBroadcaster to the given logging
   141  	// function. The return value can be ignored or used to stop recording, if desired.
   142  	StartLogging(logf func(format string, args ...interface{})) watch.Interface
   143  
   144  	// StartStructuredLogging starts sending events received from this EventBroadcaster to the structured
   145  	// logging function. The return value can be ignored or used to stop recording, if desired.
   146  	StartStructuredLogging(verbosity klog.Level) watch.Interface
   147  
   148  	// NewRecorder returns an EventRecorder that can be used to send events to this EventBroadcaster
   149  	// with the event source set to the given event source.
   150  	NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorderLogger
   151  
   152  	// Shutdown shuts down the broadcaster. Once the broadcaster is shut
   153  	// down, it will only try to record an event in a sink once before
   154  	// giving up on it with an error message.
   155  	Shutdown()
   156  }
   157  
   158  // EventRecorderAdapter is a wrapper around a "k8s.io/client-go/tools/record".EventRecorder
   159  // implementing the new "k8s.io/client-go/tools/events".EventRecorder interface.
   160  type EventRecorderAdapter struct {
   161  	recorder EventRecorderLogger
   162  }
   163  
   164  var _ internalevents.EventRecorder = &EventRecorderAdapter{}
   165  
   166  // NewEventRecorderAdapter returns an adapter implementing the new
   167  // "k8s.io/client-go/tools/events".EventRecorder interface.
   168  func NewEventRecorderAdapter(recorder EventRecorderLogger) *EventRecorderAdapter {
   169  	return &EventRecorderAdapter{
   170  		recorder: recorder,
   171  	}
   172  }
   173  
   174  // Eventf is a wrapper around v1 Eventf
   175  func (a *EventRecorderAdapter) Eventf(regarding, _ runtime.Object, eventtype, reason, action, note string, args ...interface{}) {
   176  	a.recorder.Eventf(regarding, eventtype, reason, note, args...)
   177  }
   178  
   179  func (a *EventRecorderAdapter) WithLogger(logger klog.Logger) internalevents.EventRecorderLogger {
   180  	return &EventRecorderAdapter{
   181  		recorder: a.recorder.WithLogger(logger),
   182  	}
   183  }
   184  
   185  // Creates a new event broadcaster.
   186  func NewBroadcaster(opts ...BroadcasterOption) EventBroadcaster {
   187  	c := config{
   188  		sleepDuration: defaultSleepDuration,
   189  	}
   190  	for _, opt := range opts {
   191  		opt(&c)
   192  	}
   193  	eventBroadcaster := &eventBroadcasterImpl{
   194  		Broadcaster:   watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
   195  		sleepDuration: c.sleepDuration,
   196  		options:       c.CorrelatorOptions,
   197  	}
   198  	ctx := c.Context
   199  	if ctx == nil {
   200  		ctx = context.Background()
   201  	}
   202  	// The are two scenarios where it makes no sense to wait for context cancelation:
   203  	// - The context was nil.
   204  	// - The context was context.Background() to begin with.
   205  	//
   206  	// Both cases get checked here.
   207  	haveCtxCancelation := ctx.Done() == nil
   208  
   209  	eventBroadcaster.cancelationCtx, eventBroadcaster.cancel = context.WithCancel(ctx)
   210  
   211  	if haveCtxCancelation {
   212  		// Calling Shutdown is not required when a context was provided:
   213  		// when the context is canceled, this goroutine will shut down
   214  		// the broadcaster.
   215  		//
   216  		// If Shutdown is called first, then this goroutine will
   217  		// also stop.
   218  		go func() {
   219  			<-eventBroadcaster.cancelationCtx.Done()
   220  			eventBroadcaster.Broadcaster.Shutdown()
   221  		}()
   222  	}
   223  
   224  	return eventBroadcaster
   225  }
   226  
   227  func NewBroadcasterForTests(sleepDuration time.Duration) EventBroadcaster {
   228  	return NewBroadcaster(WithSleepDuration(sleepDuration))
   229  }
   230  
   231  func NewBroadcasterWithCorrelatorOptions(options CorrelatorOptions) EventBroadcaster {
   232  	return NewBroadcaster(WithCorrelatorOptions(options))
   233  }
   234  
   235  func WithCorrelatorOptions(options CorrelatorOptions) BroadcasterOption {
   236  	return func(c *config) {
   237  		c.CorrelatorOptions = options
   238  	}
   239  }
   240  
   241  // WithContext sets a context for the broadcaster. Canceling the context will
   242  // shut down the broadcaster, Shutdown doesn't need to be called. The context
   243  // can also be used to provide a logger.
   244  func WithContext(ctx context.Context) BroadcasterOption {
   245  	return func(c *config) {
   246  		c.Context = ctx
   247  	}
   248  }
   249  
   250  func WithSleepDuration(sleepDuration time.Duration) BroadcasterOption {
   251  	return func(c *config) {
   252  		c.sleepDuration = sleepDuration
   253  	}
   254  }
   255  
   256  type BroadcasterOption func(*config)
   257  
   258  type config struct {
   259  	CorrelatorOptions
   260  	context.Context
   261  	sleepDuration time.Duration
   262  }
   263  
   264  type eventBroadcasterImpl struct {
   265  	*watch.Broadcaster
   266  	sleepDuration  time.Duration
   267  	options        CorrelatorOptions
   268  	cancelationCtx context.Context
   269  	cancel         func()
   270  }
   271  
   272  // StartRecordingToSink starts sending events received from the specified eventBroadcaster to the given sink.
   273  // The return value can be ignored or used to stop recording, if desired.
   274  // TODO: make me an object with parameterizable queue length and retry interval
   275  func (e *eventBroadcasterImpl) StartRecordingToSink(sink EventSink) watch.Interface {
   276  	eventCorrelator := NewEventCorrelatorWithOptions(e.options)
   277  	return e.StartEventWatcher(
   278  		func(event *v1.Event) {
   279  			e.recordToSink(sink, event, eventCorrelator)
   280  		})
   281  }
   282  
   283  func (e *eventBroadcasterImpl) Shutdown() {
   284  	e.Broadcaster.Shutdown()
   285  	e.cancel()
   286  }
   287  
   288  func (e *eventBroadcasterImpl) recordToSink(sink EventSink, event *v1.Event, eventCorrelator *EventCorrelator) {
   289  	// Make a copy before modification, because there could be multiple listeners.
   290  	// Events are safe to copy like this.
   291  	eventCopy := *event
   292  	event = &eventCopy
   293  	result, err := eventCorrelator.EventCorrelate(event)
   294  	if err != nil {
   295  		utilruntime.HandleError(err)
   296  	}
   297  	if result.Skip {
   298  		return
   299  	}
   300  	tries := 0
   301  	for {
   302  		if recordEvent(e.cancelationCtx, sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator) {
   303  			break
   304  		}
   305  		tries++
   306  		if tries >= maxTriesPerEvent {
   307  			klog.FromContext(e.cancelationCtx).Error(nil, "Unable to write event (retry limit exceeded!)", "event", event)
   308  			break
   309  		}
   310  
   311  		// Randomize the first sleep so that various clients won't all be
   312  		// synced up if the master goes down.
   313  		delay := e.sleepDuration
   314  		if tries == 1 {
   315  			delay = time.Duration(float64(delay) * rand.Float64())
   316  		}
   317  		select {
   318  		case <-e.cancelationCtx.Done():
   319  			klog.FromContext(e.cancelationCtx).Error(nil, "Unable to write event (broadcaster is shut down)", "event", event)
   320  			return
   321  		case <-time.After(delay):
   322  		}
   323  	}
   324  }
   325  
   326  // recordEvent attempts to write event to a sink. It returns true if the event
   327  // was successfully recorded or discarded, false if it should be retried.
   328  // If updateExistingEvent is false, it creates a new event, otherwise it updates
   329  // existing event.
   330  func recordEvent(ctx context.Context, sink EventSink, event *v1.Event, patch []byte, updateExistingEvent bool, eventCorrelator *EventCorrelator) bool {
   331  	var newEvent *v1.Event
   332  	var err error
   333  	if updateExistingEvent {
   334  		newEvent, err = sink.Patch(event, patch)
   335  	}
   336  	// Update can fail because the event may have been removed and it no longer exists.
   337  	if !updateExistingEvent || (updateExistingEvent && util.IsKeyNotFoundError(err)) {
   338  		// Making sure that ResourceVersion is empty on creation
   339  		event.ResourceVersion = ""
   340  		newEvent, err = sink.Create(event)
   341  	}
   342  	if err == nil {
   343  		// we need to update our event correlator with the server returned state to handle name/resourceversion
   344  		eventCorrelator.UpdateState(newEvent)
   345  		return true
   346  	}
   347  
   348  	// If we can't contact the server, then hold everything while we keep trying.
   349  	// Otherwise, something about the event is malformed and we should abandon it.
   350  	switch err.(type) {
   351  	case *restclient.RequestConstructionError:
   352  		// We will construct the request the same next time, so don't keep trying.
   353  		klog.FromContext(ctx).Error(err, "Unable to construct event (will not retry!)", "event", event)
   354  		return true
   355  	case *errors.StatusError:
   356  		if errors.IsAlreadyExists(err) || errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
   357  			klog.FromContext(ctx).V(5).Info("Server rejected event (will not retry!)", "event", event, "err", err)
   358  		} else {
   359  			klog.FromContext(ctx).Error(err, "Server rejected event (will not retry!)", "event", event)
   360  		}
   361  		return true
   362  	case *errors.UnexpectedObjectError:
   363  		// We don't expect this; it implies the server's response didn't match a
   364  		// known pattern. Go ahead and retry.
   365  	default:
   366  		// This case includes actual http transport errors. Go ahead and retry.
   367  	}
   368  	klog.FromContext(ctx).Error(err, "Unable to write event (may retry after sleeping)", "event", event)
   369  	return false
   370  }
   371  
   372  // StartLogging starts sending events received from this EventBroadcaster to the given logging function.
   373  // The return value can be ignored or used to stop recording, if desired.
   374  func (e *eventBroadcasterImpl) StartLogging(logf func(format string, args ...interface{})) watch.Interface {
   375  	return e.StartEventWatcher(
   376  		func(e *v1.Event) {
   377  			logf("Event(%#v): type: '%v' reason: '%v' %v", e.InvolvedObject, e.Type, e.Reason, e.Message)
   378  		})
   379  }
   380  
   381  // StartStructuredLogging starts sending events received from this EventBroadcaster to a structured logger.
   382  // The logger is retrieved from a context if the broadcaster was constructed with a context, otherwise
   383  // the global default is used.
   384  // The return value can be ignored or used to stop recording, if desired.
   385  func (e *eventBroadcasterImpl) StartStructuredLogging(verbosity klog.Level) watch.Interface {
   386  	loggerV := klog.FromContext(e.cancelationCtx).V(int(verbosity))
   387  	return e.StartEventWatcher(
   388  		func(e *v1.Event) {
   389  			loggerV.Info("Event occurred", "object", klog.KRef(e.InvolvedObject.Namespace, e.InvolvedObject.Name), "fieldPath", e.InvolvedObject.FieldPath, "kind", e.InvolvedObject.Kind, "apiVersion", e.InvolvedObject.APIVersion, "type", e.Type, "reason", e.Reason, "message", e.Message)
   390  		})
   391  }
   392  
   393  // StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function.
   394  // The return value can be ignored or used to stop recording, if desired.
   395  func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface {
   396  	watcher, err := e.Watch()
   397  	if err != nil {
   398  		klog.FromContext(e.cancelationCtx).Error(err, "Unable start event watcher (will not retry!)")
   399  	}
   400  	go func() {
   401  		defer utilruntime.HandleCrash()
   402  		for {
   403  			select {
   404  			case <-e.cancelationCtx.Done():
   405  				watcher.Stop()
   406  				return
   407  			case watchEvent := <-watcher.ResultChan():
   408  				event, ok := watchEvent.Object.(*v1.Event)
   409  				if !ok {
   410  					// This is all local, so there's no reason this should
   411  					// ever happen.
   412  					continue
   413  				}
   414  				eventHandler(event)
   415  			}
   416  		}
   417  	}()
   418  	return watcher
   419  }
   420  
   421  // NewRecorder returns an EventRecorder that records events with the given event source.
   422  func (e *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorderLogger {
   423  	return &recorderImplLogger{recorderImpl: &recorderImpl{scheme, source, e.Broadcaster, clock.RealClock{}}, logger: klog.Background()}
   424  }
   425  
   426  type recorderImpl struct {
   427  	scheme *runtime.Scheme
   428  	source v1.EventSource
   429  	*watch.Broadcaster
   430  	clock clock.PassiveClock
   431  }
   432  
   433  var _ EventRecorder = &recorderImpl{}
   434  
   435  func (recorder *recorderImpl) generateEvent(logger klog.Logger, object runtime.Object, annotations map[string]string, eventtype, reason, message string) {
   436  	ref, err := ref.GetReference(recorder.scheme, object)
   437  	if err != nil {
   438  		logger.Error(err, "Could not construct reference, will not report event", "object", object, "eventType", eventtype, "reason", reason, "message", message)
   439  		return
   440  	}
   441  
   442  	if !util.ValidateEventType(eventtype) {
   443  		logger.Error(nil, "Unsupported event type", "eventType", eventtype)
   444  		return
   445  	}
   446  
   447  	event := recorder.makeEvent(ref, annotations, eventtype, reason, message)
   448  	event.Source = recorder.source
   449  
   450  	event.ReportingInstance = recorder.source.Host
   451  	event.ReportingController = recorder.source.Component
   452  
   453  	// NOTE: events should be a non-blocking operation, but we also need to not
   454  	// put this in a goroutine, otherwise we'll race to write to a closed channel
   455  	// when we go to shut down this broadcaster.  Just drop events if we get overloaded,
   456  	// and log an error if that happens (we've configured the broadcaster to drop
   457  	// outgoing events anyway).
   458  	sent, err := recorder.ActionOrDrop(watch.Added, event)
   459  	if err != nil {
   460  		logger.Error(err, "Unable to record event (will not retry!)")
   461  		return
   462  	}
   463  	if !sent {
   464  		logger.Error(nil, "Unable to record event: too many queued events, dropped event", "event", event)
   465  	}
   466  }
   467  
   468  func (recorder *recorderImpl) Event(object runtime.Object, eventtype, reason, message string) {
   469  	recorder.generateEvent(klog.Background(), object, nil, eventtype, reason, message)
   470  }
   471  
   472  func (recorder *recorderImpl) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
   473  	recorder.Event(object, eventtype, reason, fmt.Sprintf(messageFmt, args...))
   474  }
   475  
   476  func (recorder *recorderImpl) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) {
   477  	recorder.generateEvent(klog.Background(), object, annotations, eventtype, reason, fmt.Sprintf(messageFmt, args...))
   478  }
   479  
   480  func (recorder *recorderImpl) makeEvent(ref *v1.ObjectReference, annotations map[string]string, eventtype, reason, message string) *v1.Event {
   481  	t := metav1.Time{Time: recorder.clock.Now()}
   482  	namespace := ref.Namespace
   483  	if namespace == "" {
   484  		namespace = metav1.NamespaceDefault
   485  	}
   486  	return &v1.Event{
   487  		ObjectMeta: metav1.ObjectMeta{
   488  			Name:        fmt.Sprintf("%v.%x", ref.Name, t.UnixNano()),
   489  			Namespace:   namespace,
   490  			Annotations: annotations,
   491  		},
   492  		InvolvedObject: *ref,
   493  		Reason:         reason,
   494  		Message:        message,
   495  		FirstTimestamp: t,
   496  		LastTimestamp:  t,
   497  		Count:          1,
   498  		Type:           eventtype,
   499  	}
   500  }
   501  
   502  type recorderImplLogger struct {
   503  	*recorderImpl
   504  	logger klog.Logger
   505  }
   506  
   507  var _ EventRecorderLogger = &recorderImplLogger{}
   508  
   509  func (recorder recorderImplLogger) Event(object runtime.Object, eventtype, reason, message string) {
   510  	recorder.recorderImpl.generateEvent(recorder.logger, object, nil, eventtype, reason, message)
   511  }
   512  
   513  func (recorder recorderImplLogger) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
   514  	recorder.Event(object, eventtype, reason, fmt.Sprintf(messageFmt, args...))
   515  }
   516  
   517  func (recorder recorderImplLogger) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) {
   518  	recorder.generateEvent(recorder.logger, object, annotations, eventtype, reason, fmt.Sprintf(messageFmt, args...))
   519  }
   520  
   521  func (recorder recorderImplLogger) WithLogger(logger klog.Logger) EventRecorderLogger {
   522  	return recorderImplLogger{recorderImpl: recorder.recorderImpl, logger: logger}
   523  }
   524  

View as plain text