...

Source file src/sigs.k8s.io/controller-runtime/pkg/internal/recorder/recorder.go

Documentation: sigs.k8s.io/controller-runtime/pkg/internal/recorder

     1  /*
     2  Copyright 2018 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package recorder
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"net/http"
    23  	"sync"
    24  
    25  	"github.com/go-logr/logr"
    26  	corev1 "k8s.io/api/core/v1"
    27  	"k8s.io/apimachinery/pkg/runtime"
    28  	corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
    29  	"k8s.io/client-go/rest"
    30  	"k8s.io/client-go/tools/record"
    31  )
    32  
    33  // EventBroadcasterProducer makes an event broadcaster, returning
    34  // whether or not the broadcaster should be stopped with the Provider,
    35  // or not (e.g. if it's shared, it shouldn't be stopped with the Provider).
    36  type EventBroadcasterProducer func() (caster record.EventBroadcaster, stopWithProvider bool)
    37  
    38  // Provider is a recorder.Provider that records events to the k8s API server
    39  // and to a logr Logger.
    40  type Provider struct {
    41  	lock    sync.RWMutex
    42  	stopped bool
    43  
    44  	// scheme to specify when creating a recorder
    45  	scheme *runtime.Scheme
    46  	// logger is the logger to use when logging diagnostic event info
    47  	logger          logr.Logger
    48  	evtClient       corev1client.EventInterface
    49  	makeBroadcaster EventBroadcasterProducer
    50  
    51  	broadcasterOnce sync.Once
    52  	broadcaster     record.EventBroadcaster
    53  	stopBroadcaster bool
    54  }
    55  
    56  // NB(directxman12): this manually implements Stop instead of Being a runnable because we need to
    57  // stop it *after* everything else shuts down, otherwise we'll cause panics as the leader election
    58  // code finishes up and tries to continue emitting events.
    59  
    60  // Stop attempts to stop this provider, stopping the underlying broadcaster
    61  // if the broadcaster asked to be stopped.  It kinda tries to honor the given
    62  // context, but the underlying broadcaster has an indefinite wait that doesn't
    63  // return until all queued events are flushed, so this may end up just returning
    64  // before the underlying wait has finished instead of cancelling the wait.
    65  // This is Very Frustrating™.
    66  func (p *Provider) Stop(shutdownCtx context.Context) {
    67  	doneCh := make(chan struct{})
    68  
    69  	go func() {
    70  		// technically, this could start the broadcaster, but practically, it's
    71  		// almost certainly already been started (e.g. by leader election).  We
    72  		// need to invoke this to ensure that we don't inadvertently race with
    73  		// an invocation of getBroadcaster.
    74  		broadcaster := p.getBroadcaster()
    75  		if p.stopBroadcaster {
    76  			p.lock.Lock()
    77  			broadcaster.Shutdown()
    78  			p.stopped = true
    79  			p.lock.Unlock()
    80  		}
    81  		close(doneCh)
    82  	}()
    83  
    84  	select {
    85  	case <-shutdownCtx.Done():
    86  	case <-doneCh:
    87  	}
    88  }
    89  
    90  // getBroadcaster ensures that a broadcaster is started for this
    91  // provider, and returns it.  It's threadsafe.
    92  func (p *Provider) getBroadcaster() record.EventBroadcaster {
    93  	// NB(directxman12): this can technically still leak if something calls
    94  	// "getBroadcaster" (i.e. Emits an Event) but never calls Start, but if we
    95  	// create the broadcaster in start, we could race with other things that
    96  	// are started at the same time & want to emit events.  The alternative is
    97  	// silently swallowing events and more locking, but that seems suboptimal.
    98  
    99  	p.broadcasterOnce.Do(func() {
   100  		broadcaster, stop := p.makeBroadcaster()
   101  		broadcaster.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: p.evtClient})
   102  		broadcaster.StartEventWatcher(
   103  			func(e *corev1.Event) {
   104  				p.logger.V(1).Info(e.Message, "type", e.Type, "object", e.InvolvedObject, "reason", e.Reason)
   105  			})
   106  		p.broadcaster = broadcaster
   107  		p.stopBroadcaster = stop
   108  	})
   109  
   110  	return p.broadcaster
   111  }
   112  
   113  // NewProvider create a new Provider instance.
   114  func NewProvider(config *rest.Config, httpClient *http.Client, scheme *runtime.Scheme, logger logr.Logger, makeBroadcaster EventBroadcasterProducer) (*Provider, error) {
   115  	if httpClient == nil {
   116  		panic("httpClient must not be nil")
   117  	}
   118  
   119  	corev1Client, err := corev1client.NewForConfigAndClient(config, httpClient)
   120  	if err != nil {
   121  		return nil, fmt.Errorf("failed to init client: %w", err)
   122  	}
   123  
   124  	p := &Provider{scheme: scheme, logger: logger, makeBroadcaster: makeBroadcaster, evtClient: corev1Client.Events("")}
   125  	return p, nil
   126  }
   127  
   128  // GetEventRecorderFor returns an event recorder that broadcasts to this provider's
   129  // broadcaster.  All events will be associated with a component of the given name.
   130  func (p *Provider) GetEventRecorderFor(name string) record.EventRecorder {
   131  	return &lazyRecorder{
   132  		prov: p,
   133  		name: name,
   134  	}
   135  }
   136  
   137  // lazyRecorder is a recorder that doesn't actually instantiate any underlying
   138  // recorder until the first event is emitted.
   139  type lazyRecorder struct {
   140  	prov *Provider
   141  	name string
   142  
   143  	recOnce sync.Once
   144  	rec     record.EventRecorder
   145  }
   146  
   147  // ensureRecording ensures that a concrete recorder is populated for this recorder.
   148  func (l *lazyRecorder) ensureRecording() {
   149  	l.recOnce.Do(func() {
   150  		broadcaster := l.prov.getBroadcaster()
   151  		l.rec = broadcaster.NewRecorder(l.prov.scheme, corev1.EventSource{Component: l.name})
   152  	})
   153  }
   154  
   155  func (l *lazyRecorder) Event(object runtime.Object, eventtype, reason, message string) {
   156  	l.ensureRecording()
   157  
   158  	l.prov.lock.RLock()
   159  	if !l.prov.stopped {
   160  		l.rec.Event(object, eventtype, reason, message)
   161  	}
   162  	l.prov.lock.RUnlock()
   163  }
   164  func (l *lazyRecorder) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
   165  	l.ensureRecording()
   166  
   167  	l.prov.lock.RLock()
   168  	if !l.prov.stopped {
   169  		l.rec.Eventf(object, eventtype, reason, messageFmt, args...)
   170  	}
   171  	l.prov.lock.RUnlock()
   172  }
   173  func (l *lazyRecorder) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) {
   174  	l.ensureRecording()
   175  
   176  	l.prov.lock.RLock()
   177  	if !l.prov.stopped {
   178  		l.rec.AnnotatedEventf(object, annotations, eventtype, reason, messageFmt, args...)
   179  	}
   180  	l.prov.lock.RUnlock()
   181  }
   182  

View as plain text