
Source file src/sigs.k8s.io/cli-utils/pkg/printers/table/collector.go

Documentation: sigs.k8s.io/cli-utils/pkg/printers/table

     1  // Copyright 2020 The Kubernetes Authors.
     2  // SPDX-License-Identifier: Apache-2.0
     4  package table
     6  import (
     7  	"fmt"
     8  	"sort"
     9  	"sync"
    11  	"k8s.io/klog/v2"
    12  	"sigs.k8s.io/cli-utils/pkg/apply/event"
    13  	pe "sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
    14  	"sigs.k8s.io/cli-utils/pkg/kstatus/status"
    15  	"sigs.k8s.io/cli-utils/pkg/object"
    16  	"sigs.k8s.io/cli-utils/pkg/object/validation"
    17  	"sigs.k8s.io/cli-utils/pkg/print/stats"
    18  	"sigs.k8s.io/cli-utils/pkg/print/table"
    19  )
    21  const InvalidStatus status.Status = "Invalid"
    23  func newResourceStateCollector(resourceGroups []event.ActionGroup) *resourceStateCollector {
    24  	resourceInfos := make(map[object.ObjMetadata]*resourceInfo)
    25  	for _, group := range resourceGroups {
    26  		action := group.Action
    27  		// Keep the action that describes the operation for the resource
    28  		// rather than that we will wait for it.
    29  		if action == event.WaitAction {
    30  			continue
    31  		}
    32  		for _, identifier := range group.Identifiers {
    33  			resourceInfos[identifier] = &resourceInfo{
    34  				identifier: identifier,
    35  				resourceStatus: &pe.ResourceStatus{
    36  					Identifier: identifier,
    37  					Status:     status.UnknownStatus,
    38  				},
    39  				ResourceAction: action,
    40  			}
    41  		}
    42  	}
    43  	return &resourceStateCollector{
    44  		resourceInfos: resourceInfos,
    45  	}
    46  }
    48  // resourceStateCollector consumes the events from the applier
    49  // eventChannel and keeps track of the latest state for all resources.
    50  // It also provides functionality for fetching the latest seen
    51  // state and return it in format that can be used by the
    52  // BaseTablePrinter.
    53  type resourceStateCollector struct {
    54  	mux sync.RWMutex
    56  	// resourceInfos contains a mapping from the unique
    57  	// resource identifier to a ResourceInfo object that captures
    58  	// the latest state for the given resource.
    59  	resourceInfos map[object.ObjMetadata]*resourceInfo
    61  	// stats collect statistics from handled events
    62  	stats stats.Stats
    64  	err error
    65  }
    67  // resourceInfo captures the latest seen state of a single resource.
    68  // This is used for top-level resources that have a ResourceAction
    69  // associated with them.
    70  type resourceInfo struct {
    71  	// identifier contains the information that identifies a
    72  	// single resource.
    73  	identifier object.ObjMetadata
    75  	// resourceStatus contains the latest status information
    76  	// about the resource.
    77  	resourceStatus *pe.ResourceStatus
    79  	// ResourceAction defines the action we are performing
    80  	// on this particular resource. This can be either Apply
    81  	// or Prune.
    82  	ResourceAction event.ResourceAction
    84  	// Error is set if an error occurred trying to perform
    85  	// the desired action on the resource.
    86  	Error error
    88  	// ApplyStatus contains the result after
    89  	// a resource has been applied to the cluster.
    90  	ApplyStatus event.ApplyEventStatus
    92  	// PruneStatus contains the result after
    93  	// a prune operation on a resource
    94  	PruneStatus event.PruneEventStatus
    96  	// DeleteStatus contains the result after
    97  	// a delete operation on a resource
    98  	DeleteStatus event.DeleteEventStatus
   100  	// WaitStatus contains the result after
   101  	// a wait operation on a resource
   102  	WaitStatus event.WaitEventStatus
   103  }
   105  // Identifier returns the identifier for the given resource.
   106  func (r *resourceInfo) Identifier() object.ObjMetadata {
   107  	return r.identifier
   108  }
   110  // ResourceStatus returns the latest seen status for the
   111  // resource.
   112  func (r *resourceInfo) ResourceStatus() *pe.ResourceStatus {
   113  	return r.resourceStatus
   114  }
   116  // SubResources returns a slice of Resource which contains
   117  // any resources created and managed by this resource.
   118  func (r *resourceInfo) SubResources() []table.Resource {
   119  	var resources []table.Resource
   120  	for _, res := range r.resourceStatus.GeneratedResources {
   121  		resources = append(resources, &subResourceInfo{
   122  			resourceStatus: res,
   123  		})
   124  	}
   125  	return resources
   126  }
   128  // subResourceInfo captures the latest seen state of a
   129  // single subResource, i.e. resources that are created and
   130  // managed by one of the top-level resources we either apply
   131  // or prune.
   132  type subResourceInfo struct {
   133  	// resourceStatus contains the latest status information
   134  	// about the subResource.
   135  	resourceStatus *pe.ResourceStatus
   136  }
   138  // Identifier returns the identifier for the given subResource.
   139  func (r *subResourceInfo) Identifier() object.ObjMetadata {
   140  	return r.resourceStatus.Identifier
   141  }
   143  // ResourceStatus returns the latest seen status for the
   144  // subResource.
   145  func (r *subResourceInfo) ResourceStatus() *pe.ResourceStatus {
   146  	return r.resourceStatus
   147  }
   149  // SubResources returns a slice of Resource which contains
   150  // any resources created and managed by this resource.
   151  func (r *subResourceInfo) SubResources() []table.Resource {
   152  	var resources []table.Resource
   153  	for _, res := range r.resourceStatus.GeneratedResources {
   154  		resources = append(resources, &subResourceInfo{
   155  			resourceStatus: res,
   156  		})
   157  	}
   158  	return resources
   159  }
   161  // Listen starts a new goroutine that will listen for events on the
   162  // provided eventChannel and keep track of the latest state for
   163  // the resources. The goroutine will exit when the provided
   164  // eventChannel is closed.
   165  // The function returns a channel. When this channel is closed, the
   166  // goroutine has processed all events in the eventChannel and
   167  // exited.
   168  func (r *resourceStateCollector) Listen(eventChannel <-chan event.Event) <-chan listenerResult {
   169  	completed := make(chan listenerResult)
   170  	go func() {
   171  		defer close(completed)
   172  		for ev := range eventChannel {
   173  			if err := r.processEvent(ev); err != nil {
   174  				completed <- listenerResult{err: err}
   175  				return
   176  			}
   177  		}
   178  	}()
   179  	return completed
   180  }
   182  type listenerResult struct {
   183  	err error
   184  }
   186  // processEvent processes an event and updates the state.
   187  func (r *resourceStateCollector) processEvent(ev event.Event) error {
   188  	r.mux.Lock()
   189  	defer r.mux.Unlock()
   190  	switch ev.Type {
   191  	case event.ValidationType:
   192  		return r.processValidationEvent(ev.ValidationEvent)
   193  	case event.StatusType:
   194  		r.processStatusEvent(ev.StatusEvent)
   195  	case event.ApplyType:
   196  		r.processApplyEvent(ev.ApplyEvent)
   197  	case event.PruneType:
   198  		r.processPruneEvent(ev.PruneEvent)
   199  	case event.DeleteType:
   200  		r.processDeleteEvent(ev.DeleteEvent)
   201  	case event.WaitType:
   202  		r.processWaitEvent(ev.WaitEvent)
   203  	case event.ErrorType:
   204  		return ev.ErrorEvent.Err
   205  	}
   206  	return nil
   207  }
   209  // processValidationEvent handles events pertaining to a validation error
   210  // for a resource.
   211  func (r *resourceStateCollector) processValidationEvent(e event.ValidationEvent) error {
   212  	klog.V(7).Infoln("processing validation event")
   213  	// unwrap validation errors
   214  	err := e.Error
   215  	if vErr, ok := err.(*validation.Error); ok {
   216  		err = vErr.Unwrap()
   217  	}
   218  	if len(e.Identifiers) == 0 {
   219  		// no objects, invalid event
   220  		return fmt.Errorf("invalid validation event: no identifiers: %w", err)
   221  	}
   222  	for _, id := range e.Identifiers {
   223  		previous, found := r.resourceInfos[id]
   224  		if !found {
   225  			klog.V(4).Infof("%s status event not found in ResourceInfos; no processing", id)
   226  			continue
   227  		}
   228  		previous.resourceStatus = &pe.ResourceStatus{
   229  			Identifier: id,
   230  			Status:     InvalidStatus,
   231  			Message:    e.Error.Error(),
   232  		}
   233  	}
   234  	return nil
   235  }
   237  // processStatusEvent handles events pertaining to a status
   238  // update for a resource.
   239  func (r *resourceStateCollector) processStatusEvent(e event.StatusEvent) {
   240  	klog.V(7).Infoln("processing status event")
   241  	previous, found := r.resourceInfos[e.Identifier]
   242  	if !found {
   243  		klog.V(4).Infof("%s status event not found in ResourceInfos; no processing", e.Identifier)
   244  		return
   245  	}
   246  	previous.resourceStatus = e.PollResourceInfo
   247  }
   249  // processApplyEvent handles events relating to apply operations
   250  func (r *resourceStateCollector) processApplyEvent(e event.ApplyEvent) {
   251  	identifier := e.Identifier
   252  	klog.V(7).Infof("processing apply event for %s", identifier)
   253  	previous, found := r.resourceInfos[identifier]
   254  	if !found {
   255  		klog.V(4).Infof("%s apply event not found in ResourceInfos; no processing", identifier)
   256  		return
   257  	}
   258  	if e.Error != nil {
   259  		previous.Error = e.Error
   260  	}
   261  	previous.ApplyStatus = e.Status
   262  	r.stats.ApplyStats.Inc(e.Status)
   263  }
   265  // processPruneEvent handles event related to prune operations.
   266  func (r *resourceStateCollector) processPruneEvent(e event.PruneEvent) {
   267  	identifier := e.Identifier
   268  	klog.V(7).Infof("processing prune event for %s", identifier)
   269  	previous, found := r.resourceInfos[identifier]
   270  	if !found {
   271  		klog.V(4).Infof("%s prune event not found in ResourceInfos; no processing", identifier)
   272  		return
   273  	}
   274  	if e.Error != nil {
   275  		previous.Error = e.Error
   276  	}
   277  	previous.PruneStatus = e.Status
   278  	r.stats.PruneStats.Inc(e.Status)
   279  }
   281  // processDeleteEvent handles event related to delete operations.
   282  func (r *resourceStateCollector) processDeleteEvent(e event.DeleteEvent) {
   283  	identifier := e.Identifier
   284  	klog.V(7).Infof("processing delete event for %s", identifier)
   285  	previous, found := r.resourceInfos[identifier]
   286  	if !found {
   287  		klog.V(4).Infof("%s delete event not found in ResourceInfos; no processing", identifier)
   288  		return
   289  	}
   290  	if e.Error != nil {
   291  		previous.Error = e.Error
   292  	}
   293  	previous.DeleteStatus = e.Status
   294  	r.stats.DeleteStats.Inc(e.Status)
   295  }
   297  // processPruneEvent handles event related to prune operations.
   298  func (r *resourceStateCollector) processWaitEvent(e event.WaitEvent) {
   299  	identifier := e.Identifier
   300  	klog.V(7).Infof("processing wait event for %s", identifier)
   301  	previous, found := r.resourceInfos[identifier]
   302  	if !found {
   303  		klog.V(4).Infof("%s wait event not found in ResourceInfos; no processing", identifier)
   304  		return
   305  	}
   306  	previous.WaitStatus = e.Status
   307  	r.stats.WaitStats.Inc(e.Status)
   308  }
   310  // ResourceState contains the latest state for all the resources.
   311  type ResourceState struct {
   312  	resourceInfos ResourceInfos
   314  	err error
   315  }
   317  // Resources returns a slice containing the latest state
   318  // for each individual resource.
   319  func (r *ResourceState) Resources() []table.Resource {
   320  	var resources []table.Resource
   321  	for _, res := range r.resourceInfos {
   322  		resources = append(resources, res)
   323  	}
   324  	return resources
   325  }
   327  func (r *ResourceState) Error() error {
   328  	return r.err
   329  }
   331  // LatestState returns a ResourceState object that contains
   332  // a copy of the latest state for all resources.
   333  func (r *resourceStateCollector) LatestState() *ResourceState {
   334  	r.mux.RLock()
   335  	defer r.mux.RUnlock()
   337  	var resourceInfos ResourceInfos
   338  	for _, ri := range r.resourceInfos {
   339  		resourceInfos = append(resourceInfos, &resourceInfo{
   340  			identifier:     ri.identifier,
   341  			resourceStatus: ri.resourceStatus,
   342  			ResourceAction: ri.ResourceAction,
   343  			ApplyStatus:    ri.ApplyStatus,
   344  			PruneStatus:    ri.PruneStatus,
   345  			DeleteStatus:   ri.DeleteStatus,
   346  			WaitStatus:     ri.WaitStatus,
   347  		})
   348  	}
   349  	sort.Sort(resourceInfos)
   351  	return &ResourceState{
   352  		resourceInfos: resourceInfos,
   353  		err:           r.err,
   354  	}
   355  }
   357  type ResourceInfos []*resourceInfo
   359  func (g ResourceInfos) Len() int {
   360  	return len(g)
   361  }
   363  func (g ResourceInfos) Less(i, j int) bool {
   364  	idI := g[i].identifier
   365  	idJ := g[j].identifier
   367  	if idI.Namespace != idJ.Namespace {
   368  		return idI.Namespace < idJ.Namespace
   369  	}
   370  	if idI.GroupKind.Group != idJ.GroupKind.Group {
   371  		return idI.GroupKind.Group < idJ.GroupKind.Group
   372  	}
   373  	if idI.GroupKind.Kind != idJ.GroupKind.Kind {
   374  		return idI.GroupKind.Kind < idJ.GroupKind.Kind
   375  	}
   376  	return idI.Name < idJ.Name
   377  }
   379  func (g ResourceInfos) Swap(i, j int) {
   380  	g[i], g[j] = g[j], g[i]
   381  }

View as plain text