...

Source file src/edge-infra.dev/pkg/k8s/runtime/controller/reconcile/summarize.go

Documentation: edge-infra.dev/pkg/k8s/runtime/controller/reconcile

     1  package reconcile
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  
     7  	apierrors "k8s.io/apimachinery/pkg/api/errors"
     8  	kerrors "k8s.io/apimachinery/pkg/util/errors"
     9  	kuberecorder "k8s.io/client-go/tools/record"
    10  	ctrl "sigs.k8s.io/controller-runtime"
    11  
    12  	"edge-infra.dev/pkg/k8s/meta/status"
    13  	"edge-infra.dev/pkg/k8s/runtime/conditions"
    14  	"edge-infra.dev/pkg/k8s/runtime/patch"
    15  )
    16  
    17  // Conditions contains all the conditions information needed to summarize the
    18  // target condition.
    19  type Conditions struct {
    20  	// Target is the target condition, e.g.: Ready. Required.
    21  	Target string
    22  	// Owned conditions are the conditions owned by the reconciler for this
    23  	// target condition.
    24  	Owned []string
    25  	// Summarize conditions are the conditions that the target condition depends
    26  	// on. Required.
    27  	Summarize []string
    28  	// NegativePolarity conditions are the conditions in Summarize with negative
    29  	// polarity.
    30  	NegativePolarity []string
    31  }
    32  
    33  func (c Conditions) IsEmpty() bool {
    34  	return c.Target == "" || len(c.Summarize) == 0
    35  }
    36  
    37  // Summarizer computes the final result of a reconcile loop (resolved conditions,
    38  // reconcile result, error) and applies the computed patch to the summarized
    39  // object.
    40  type Summarizer struct {
    41  	patcher *patch.SerialPatcher
    42  }
    43  
    44  // New returns a new instance of a summarization helper.
    45  func NewSummarizer(patcher *patch.SerialPatcher) *Summarizer {
    46  	return &Summarizer{
    47  		patcher: patcher,
    48  	}
    49  }
    50  
    51  // SummarizeOptions contains options for [SummarizeAndPatch].
    52  type SummarizeOptions struct {
    53  	// Conditions are conditions that needs to be summarized and persisted on
    54  	// the object.
    55  	Conditions []Conditions
    56  	// Processors are chain of ResultProcessors for processing the results. This
    57  	// can be used to analyze and modify the results. This enables injecting
    58  	// custom middlewares in the [SummarizeAndPatch] operation.
    59  	Processors []ResultProcessor
    60  	// IgnoreNotFound can be used to ignores any resource not found error during
    61  	// patching.
    62  	IgnoreNotFound bool
    63  	// Result is the abstracted result of reconciliation.
    64  	Result Result
    65  	// Error is the reconciliation error.
    66  	Error error
    67  	// FieldOwner defines the field owner configuration for the Kubernetes
    68  	// patch operation.
    69  	FieldOwner string
    70  	// EventRecorder
    71  	EventRecorder kuberecorder.EventRecorder
    72  }
    73  
    74  // Option is configuration that modifies SummarizationHelper.
    75  type SummarizeOption func(*SummarizeOptions)
    76  
    77  // WithConditions sets the Conditions for which summary is calculated in
    78  // [SummarizeAndPatch].
    79  func WithConditions(condns ...Conditions) SummarizeOption {
    80  	return func(s *SummarizeOptions) {
    81  		s.Conditions = append(s.Conditions, condns...)
    82  	}
    83  }
    84  
    85  // WithProcessors can be used to inject middlewares in the [SummarizeAndPatch]
    86  // process, to be executed before the result calculation and patching.
    87  func WithProcessors(rps ...ResultProcessor) SummarizeOption {
    88  	return func(s *SummarizeOptions) {
    89  		s.Processors = append(s.Processors, rps...)
    90  	}
    91  }
    92  
    93  // WithIgnoreNotFound skips any resource not found error during patching.
    94  func WithIgnoreNotFound() SummarizeOption {
    95  	return func(s *SummarizeOptions) {
    96  		s.IgnoreNotFound = true
    97  	}
    98  }
    99  
   100  // WithReconcileResult sets the value of input result used to calculate the
   101  // results of reconciliation in [SummarizeAndPatch].
   102  func WithResult(rr Result) SummarizeOption {
   103  	return func(s *SummarizeOptions) {
   104  		s.Result = rr
   105  	}
   106  }
   107  
   108  // WithReconcileError sets the value of input error used to calculate the
   109  // results reconciliation in [SummarizeAndPatch].
   110  func WithError(re error) SummarizeOption {
   111  	return func(s *SummarizeOptions) {
   112  		s.Error = re
   113  	}
   114  }
   115  
   116  // WithPatchFieldOwner sets the FieldOwner in the patch helper.
   117  func WithFieldOwner(fieldOwner string) SummarizeOption {
   118  	return func(s *SummarizeOptions) {
   119  		s.FieldOwner = fieldOwner
   120  	}
   121  }
   122  
   123  // WithEventRecorder sets the EventRecorder that will be used create Events
   124  // during [SummarizeAndPatch].
   125  func WithEventRecorder(recorder kuberecorder.EventRecorder) SummarizeOption {
   126  	return func(s *SummarizeOptions) {
   127  		s.EventRecorder = recorder
   128  	}
   129  }
   130  
   131  // SummarizeAndPatch summarizes and patches the result to the target object.
   132  // When used at the very end of a reconciliation, the result builder must be
   133  // specified using the Option WithResultBuilder(). The returned result and error
   134  // can be returned as the return values of the reconciliation.
   135  // When used in the middle of a reconciliation, no result builder should be set
   136  // and the result can be ignored.
   137  func (h *Summarizer) SummarizeAndPatch(ctx context.Context, obj conditions.Setter, options ...SummarizeOption) (ctrl.Result, error) {
   138  	// Calculate the options.
   139  	opts := &SummarizeOptions{}
   140  	for _, o := range options {
   141  		o(opts)
   142  	}
   143  	// Combined the owned conditions of all the conditions for the patcher.
   144  	ownedConditions := []string{}
   145  	for _, c := range opts.Conditions {
   146  		ownedConditions = append(ownedConditions, c.Owned...)
   147  	}
   148  	// Patch the object, prioritizing the conditions owned by the controller in
   149  	// case of any conflicts.
   150  	patchOpts := []patch.Option{
   151  		patch.WithOwnedConditions{
   152  			Conditions: ownedConditions,
   153  		},
   154  	}
   155  	if opts.FieldOwner != "" {
   156  		patchOpts = append(patchOpts, patch.WithFieldOwner(opts.FieldOwner))
   157  	}
   158  
   159  	// Compute the reconcile results, obtain patch options and reconcile error.
   160  	pOpts, result, recErr := ComputeResult(obj, opts.Result, opts.Error)
   161  	patchOpts = append(patchOpts, pOpts...)
   162  
   163  	// Summarize conditions. This must be performed only after computing the
   164  	// reconcile result, since the object status is adjusted based on the
   165  	// reconcile result and error.
   166  	for _, c := range opts.Conditions {
   167  		conditions.SetSummary(obj,
   168  			c.Target,
   169  			conditions.WithConditions(c.Summarize...),
   170  			conditions.WithNegativePolarityConditions(c.NegativePolarity...),
   171  		)
   172  	}
   173  
   174  	// If object is not stalled, result is success and runtime error is nil,
   175  	// ensure that Ready=True. Else, use the Ready failure message as the
   176  	// runtime error message. This ensures that the reconciliation would be
   177  	// retried as the object isn't ready.
   178  	// NOTE: This is applicable to Ready condition only because it is a special
   179  	// condition in kstatus that reflects the overall state of an object.
   180  	if isNonStalledSuccess(obj, opts.Result, opts.Error) {
   181  		if !conditions.IsReady(obj) {
   182  			recErr = errors.New(conditions.GetMessage(obj, status.ReadyCondition))
   183  		}
   184  	}
   185  
   186  	// If the object is not ready, make sure that 'opts.Error' is not 'nil'
   187  	// when passing it to the [ResultProcessors] below. The runtime error
   188  	// takes precedence over the computed reconcile error.
   189  	if opts.Error == nil && recErr != nil {
   190  		opts.Error = recErr
   191  	}
   192  
   193  	// Process the results of reconciliation.
   194  	for _, processor := range opts.Processors {
   195  		processor(ctx, opts.EventRecorder, obj, opts.Result, opts.Error)
   196  	}
   197  
   198  	// Finally, patch the resource.
   199  	if err := h.patcher.Patch(ctx, obj, patchOpts...); err != nil {
   200  		// Ignore patch error "not found" when the object is being deleted.
   201  		if opts.IgnoreNotFound {
   202  			if isNotFoundError(err) || obj.GetDeletionTimestamp().IsZero() {
   203  				return result, recErr
   204  			}
   205  		}
   206  
   207  		recErr = kerrors.NewAggregate([]error{recErr, err})
   208  	}
   209  
   210  	return result, recErr
   211  }
   212  
   213  func isNotFoundError(err error) bool {
   214  	var aggError kerrors.Aggregate
   215  	if errors.As(err, &aggError) && kerrors.FilterOut(aggError, apierrors.IsNotFound) == nil {
   216  		return true
   217  	}
   218  	if apierrors.IsNotFound(err) {
   219  		return true
   220  	}
   221  	return false
   222  }
   223  
   224  // isNonStalledSuccess checks if the reconciliation was successful and has not
   225  // resulted in stalled situation.
   226  func isNonStalledSuccess(obj conditions.Setter, r Result, recErr error) bool {
   227  	return !conditions.IsStalled(obj) && recErr == nil && r == ResultSuccess
   228  }
   229  

View as plain text