...

Source file src/sigs.k8s.io/cli-utils/pkg/kstatus/polling/collector/collector.go

Documentation: sigs.k8s.io/cli-utils/pkg/kstatus/polling/collector

     1  // Copyright 2020 The Kubernetes Authors.
     2  // SPDX-License-Identifier: Apache-2.0
     3  
     4  package collector
     5  
     6  import (
     7  	"sort"
     8  	"sync"
     9  
    10  	"sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
    11  	"sigs.k8s.io/cli-utils/pkg/kstatus/status"
    12  	"sigs.k8s.io/cli-utils/pkg/object"
    13  )
    14  
    15  func NewResourceStatusCollector(identifiers object.ObjMetadataSet) *ResourceStatusCollector {
    16  	resourceStatuses := make(map[object.ObjMetadata]*event.ResourceStatus)
    17  	for _, id := range identifiers {
    18  		resourceStatuses[id] = &event.ResourceStatus{
    19  			Identifier: id,
    20  			Status:     status.UnknownStatus,
    21  		}
    22  	}
    23  	return &ResourceStatusCollector{
    24  		ResourceStatuses: resourceStatuses,
    25  	}
    26  }
    27  
    28  // Observer is an interface that can be implemented to have the
    29  // ResourceStatusCollector invoke the function on every event that
    30  // comes through the eventChannel.
    31  // The callback happens in the processing goroutine and while the
    32  // goroutine holds the lock, so any processing in the callback
    33  // must be done quickly.
    34  type Observer interface {
    35  	Notify(*ResourceStatusCollector, event.Event)
    36  }
    37  
    38  // ObserverFunc is a function implementation of the Observer
    39  // interface.
    40  type ObserverFunc func(*ResourceStatusCollector, event.Event)
    41  
    42  func (o ObserverFunc) Notify(rsc *ResourceStatusCollector, e event.Event) {
    43  	o(rsc, e)
    44  }
    45  
    46  // ResourceStatusCollector is for use by clients of the polling library and provides
    47  // a way to keep track of the latest status/state for all the polled resources. The collector
    48  // is set up to listen to the eventChannel and keep the latest event for each resource. It also
    49  // provides a way to fetch the latest state for all resources and the aggregated status at any point.
    50  // The functions already handles synchronization so it can be used by multiple goroutines.
    51  type ResourceStatusCollector struct {
    52  	mux sync.RWMutex
    53  
    54  	LastEventType event.Type
    55  
    56  	ResourceStatuses map[object.ObjMetadata]*event.ResourceStatus
    57  
    58  	Error error
    59  }
    60  
    61  // ListenerResult is the type of the object passed back to the caller to
    62  // Listen and ListenWithObserver if a fatal error has been encountered.
    63  type ListenerResult struct {
    64  	Err error
    65  }
    66  
    67  // Listen kicks off the goroutine that will listen for the events on the
    68  // eventChannel.  It returns a channel that will be closed the collector stops
    69  // listening to the eventChannel.
    70  func (o *ResourceStatusCollector) Listen(eventChannel <-chan event.Event) <-chan ListenerResult {
    71  	return o.ListenWithObserver(eventChannel, nil)
    72  }
    73  
    74  // ListenWithObserver kicks off the goroutine that will listen for the events on the
    75  // eventChannel.  It returns a channel that will be closed the collector stops
    76  // listening to the eventChannel.
    77  // The provided observer will be invoked on every event, after the event
    78  // has been processed.
    79  func (o *ResourceStatusCollector) ListenWithObserver(eventChannel <-chan event.Event,
    80  	observer Observer) <-chan ListenerResult {
    81  	completed := make(chan ListenerResult)
    82  	go func() {
    83  		defer close(completed)
    84  		for e := range eventChannel {
    85  			err := o.processEvent(e)
    86  			if err != nil {
    87  				completed <- ListenerResult{
    88  					Err: err,
    89  				}
    90  			}
    91  			if observer != nil {
    92  				observer.Notify(o, e)
    93  			}
    94  		}
    95  	}()
    96  	return completed
    97  }
    98  
    99  func (o *ResourceStatusCollector) processEvent(e event.Event) error {
   100  	o.mux.Lock()
   101  	defer o.mux.Unlock()
   102  	o.LastEventType = e.Type
   103  	if e.Type == event.ErrorEvent {
   104  		o.Error = e.Error
   105  		return e.Error
   106  	}
   107  	if e.Type == event.ResourceUpdateEvent {
   108  		resourceStatus := e.Resource
   109  		o.ResourceStatuses[resourceStatus.Identifier] = resourceStatus
   110  	}
   111  	return nil
   112  }
   113  
   114  // Observation contains the latest state known by the collector as returned
   115  // by a call to the LatestObservation function.
   116  type Observation struct {
   117  	LastEventType event.Type
   118  
   119  	ResourceStatuses []*event.ResourceStatus
   120  
   121  	Error error
   122  }
   123  
   124  // LatestObservation returns an Observation instance, which contains the
   125  // latest information about the resources known by the collector.
   126  func (o *ResourceStatusCollector) LatestObservation() *Observation {
   127  	o.mux.RLock()
   128  	defer o.mux.RUnlock()
   129  
   130  	var resourceStatuses event.ResourceStatuses
   131  	for _, resourceStatus := range o.ResourceStatuses {
   132  		resourceStatuses = append(resourceStatuses, resourceStatus)
   133  	}
   134  	sort.Sort(resourceStatuses)
   135  
   136  	return &Observation{
   137  		LastEventType:    o.LastEventType,
   138  		ResourceStatuses: resourceStatuses,
   139  		Error:            o.Error,
   140  	}
   141  }
   142  

View as plain text