package reconcile import ( "context" "errors" apierrors "k8s.io/apimachinery/pkg/api/errors" kerrors "k8s.io/apimachinery/pkg/util/errors" kuberecorder "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "edge-infra.dev/pkg/k8s/meta/status" "edge-infra.dev/pkg/k8s/runtime/conditions" "edge-infra.dev/pkg/k8s/runtime/patch" ) // Conditions contains all the conditions information needed to summarize the // target condition. type Conditions struct { // Target is the target condition, e.g.: Ready. Required. Target string // Owned conditions are the conditions owned by the reconciler for this // target condition. Owned []string // Summarize conditions are the conditions that the target condition depends // on. Required. Summarize []string // NegativePolarity conditions are the conditions in Summarize with negative // polarity. NegativePolarity []string } func (c Conditions) IsEmpty() bool { return c.Target == "" || len(c.Summarize) == 0 } // Summarizer computes the final result of a reconcile loop (resolved conditions, // reconcile result, error) and applies the computed patch to the summarized // object. type Summarizer struct { patcher *patch.SerialPatcher } // New returns a new instance of a summarization helper. func NewSummarizer(patcher *patch.SerialPatcher) *Summarizer { return &Summarizer{ patcher: patcher, } } // SummarizeOptions contains options for [SummarizeAndPatch]. type SummarizeOptions struct { // Conditions are conditions that needs to be summarized and persisted on // the object. Conditions []Conditions // Processors are chain of ResultProcessors for processing the results. This // can be used to analyze and modify the results. This enables injecting // custom middlewares in the [SummarizeAndPatch] operation. Processors []ResultProcessor // IgnoreNotFound can be used to ignores any resource not found error during // patching. IgnoreNotFound bool // Result is the abstracted result of reconciliation. Result Result // Error is the reconciliation error. Error error // FieldOwner defines the field owner configuration for the Kubernetes // patch operation. FieldOwner string // EventRecorder EventRecorder kuberecorder.EventRecorder } // Option is configuration that modifies SummarizationHelper. type SummarizeOption func(*SummarizeOptions) // WithConditions sets the Conditions for which summary is calculated in // [SummarizeAndPatch]. func WithConditions(condns ...Conditions) SummarizeOption { return func(s *SummarizeOptions) { s.Conditions = append(s.Conditions, condns...) } } // WithProcessors can be used to inject middlewares in the [SummarizeAndPatch] // process, to be executed before the result calculation and patching. func WithProcessors(rps ...ResultProcessor) SummarizeOption { return func(s *SummarizeOptions) { s.Processors = append(s.Processors, rps...) } } // WithIgnoreNotFound skips any resource not found error during patching. func WithIgnoreNotFound() SummarizeOption { return func(s *SummarizeOptions) { s.IgnoreNotFound = true } } // WithReconcileResult sets the value of input result used to calculate the // results of reconciliation in [SummarizeAndPatch]. func WithResult(rr Result) SummarizeOption { return func(s *SummarizeOptions) { s.Result = rr } } // WithReconcileError sets the value of input error used to calculate the // results reconciliation in [SummarizeAndPatch]. func WithError(re error) SummarizeOption { return func(s *SummarizeOptions) { s.Error = re } } // WithPatchFieldOwner sets the FieldOwner in the patch helper. func WithFieldOwner(fieldOwner string) SummarizeOption { return func(s *SummarizeOptions) { s.FieldOwner = fieldOwner } } // WithEventRecorder sets the EventRecorder that will be used create Events // during [SummarizeAndPatch]. func WithEventRecorder(recorder kuberecorder.EventRecorder) SummarizeOption { return func(s *SummarizeOptions) { s.EventRecorder = recorder } } // SummarizeAndPatch summarizes and patches the result to the target object. // When used at the very end of a reconciliation, the result builder must be // specified using the Option WithResultBuilder(). The returned result and error // can be returned as the return values of the reconciliation. // When used in the middle of a reconciliation, no result builder should be set // and the result can be ignored. func (h *Summarizer) SummarizeAndPatch(ctx context.Context, obj conditions.Setter, options ...SummarizeOption) (ctrl.Result, error) { // Calculate the options. opts := &SummarizeOptions{} for _, o := range options { o(opts) } // Combined the owned conditions of all the conditions for the patcher. ownedConditions := []string{} for _, c := range opts.Conditions { ownedConditions = append(ownedConditions, c.Owned...) } // Patch the object, prioritizing the conditions owned by the controller in // case of any conflicts. patchOpts := []patch.Option{ patch.WithOwnedConditions{ Conditions: ownedConditions, }, } if opts.FieldOwner != "" { patchOpts = append(patchOpts, patch.WithFieldOwner(opts.FieldOwner)) } // Compute the reconcile results, obtain patch options and reconcile error. pOpts, result, recErr := ComputeResult(obj, opts.Result, opts.Error) patchOpts = append(patchOpts, pOpts...) // Summarize conditions. This must be performed only after computing the // reconcile result, since the object status is adjusted based on the // reconcile result and error. for _, c := range opts.Conditions { conditions.SetSummary(obj, c.Target, conditions.WithConditions(c.Summarize...), conditions.WithNegativePolarityConditions(c.NegativePolarity...), ) } // If object is not stalled, result is success and runtime error is nil, // ensure that Ready=True. Else, use the Ready failure message as the // runtime error message. This ensures that the reconciliation would be // retried as the object isn't ready. // NOTE: This is applicable to Ready condition only because it is a special // condition in kstatus that reflects the overall state of an object. if isNonStalledSuccess(obj, opts.Result, opts.Error) { if !conditions.IsReady(obj) { recErr = errors.New(conditions.GetMessage(obj, status.ReadyCondition)) } } // If the object is not ready, make sure that 'opts.Error' is not 'nil' // when passing it to the [ResultProcessors] below. The runtime error // takes precedence over the computed reconcile error. if opts.Error == nil && recErr != nil { opts.Error = recErr } // Process the results of reconciliation. for _, processor := range opts.Processors { processor(ctx, opts.EventRecorder, obj, opts.Result, opts.Error) } // Finally, patch the resource. if err := h.patcher.Patch(ctx, obj, patchOpts...); err != nil { // Ignore patch error "not found" when the object is being deleted. if opts.IgnoreNotFound { if isNotFoundError(err) || obj.GetDeletionTimestamp().IsZero() { return result, recErr } } recErr = kerrors.NewAggregate([]error{recErr, err}) } return result, recErr } func isNotFoundError(err error) bool { var aggError kerrors.Aggregate if errors.As(err, &aggError) && kerrors.FilterOut(aggError, apierrors.IsNotFound) == nil { return true } if apierrors.IsNotFound(err) { return true } return false } // isNonStalledSuccess checks if the reconciliation was successful and has not // resulted in stalled situation. func isNonStalledSuccess(obj conditions.Setter, r Result, recErr error) bool { return !conditions.IsStalled(obj) && recErr == nil && r == ResultSuccess }