...

Source file src/edge-infra.dev/pkg/k8s/runtime/patch/patch.go

Documentation: edge-infra.dev/pkg/k8s/runtime/patch

     1  package patch
     2  
     3  import (
     4  	"context"
     5  	"encoding/json"
     6  	"time"
     7  
     8  	"github.com/pkg/errors"
     9  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    10  	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    11  	"k8s.io/apimachinery/pkg/runtime"
    12  	"k8s.io/apimachinery/pkg/runtime/schema"
    13  	kerrors "k8s.io/apimachinery/pkg/util/errors"
    14  	"k8s.io/apimachinery/pkg/util/wait"
    15  	"sigs.k8s.io/controller-runtime/pkg/client"
    16  	"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
    17  
    18  	"edge-infra.dev/pkg/k8s/runtime/conditions"
    19  	edgeUnstructured "edge-infra.dev/pkg/k8s/unstructured"
    20  )
    21  
    22  // Helper is a utility for ensuring the proper patching of objects.
    23  //
    24  // The Helper MUST be initialised before a set of modifications within the scope of an envisioned patch are made
    25  // to an object, so that the difference in state can be utilised to calculate a patch that can be used on a new revision
    26  // of the resource in case of conflicts.
    27  //
    28  // A common pattern for reconcilers is to initialise a NewHelper at the beginning of their Reconcile method, after
    29  // having fetched the latest revision for the resource from the API server, and then defer the call of Helper.Patch.
    30  // This ensures any modifications made to the spec and the status (conditions) object of the resource are always
    31  // persisted at the end of a reconcile run.
    32  //
    33  // The example below assumes that you will use the Reconciling condition to signal that progress can be made; if it is
    34  // not present, and the Ready condition is not true, the resource will be marked as stalled.
    35  //
    36  //	func (r *FooReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, retErr error) {
    37  //		// Retrieve the object from the API server
    38  //		obj := &v1.Foo{}
    39  //		if err := r.Get(ctx, req.NamespacedName, obj); err != nil {
    40  //			return ctrl.Result{}, client.IgnoreNotFound(err)
    41  //		}
    42  //
    43  //		// Initialise the patch helper
    44  //		patchHelper, err := patch.NewHelper(obj, r.Client)
    45  //		if err != nil {
    46  //			return ctrl.Result{}, err
    47  //		}
    48  //
    49  //		// Always attempt to patch the object and status after each reconciliation
    50  //		defer func() {
    51  //			// Patch the object, ignoring conflicts on the conditions owned by this controller
    52  //			patchOpts := []patch.Option{
    53  //				patch.WithOwnedConditions{
    54  //					Conditions: []string{
    55  //						meta.ReadyCondition,
    56  //						meta.ReconcilingCondition,
    57  //						meta.StalledCondition,
    58  //						// any other "owned conditions"
    59  //					},
    60  //				},
    61  //			}
    62  //
    63  //			// On a clean exit, determine if the resource is still being reconciled, or if it has stalled, and record this observation
    64  //			if retErr == nil && (result.IsZero() || !result.Requeue) {
    65  //				// We have now observed this generation
    66  //				patchOpts = append(patchOpts, patch.WithStatusObservedGeneration{})
    67  //
    68  //				readyCondition := conditions.Get(obj, meta.ReadyCondition)
    69  //				switch {
    70  //				case readyCondition.Status == metav1.ConditionTrue:
    71  //					// As we are no longer reconciling and the end-state is ready, the reconciliation is no longer stalled or progressing, so clear these
    72  //					conditions.Delete(obj, meta.StalledCondition)
    73  //					conditions.Delete(obj, meta.ReconcilingCondition)
    74  //				case conditions.IsReconciling(obj):
    75  //					// This implies stalling is not set; nothing to do
    76  //					break
    77  //				case readyCondition.Status == metav1.ConditionFalse:
    78  //					// As we are no longer reconciling and the end-state is not ready, the reconciliation has stalled
    79  //					conditions.MarkTrue(obj, meta.StalledCondition, readyCondition.Reason, readyCondition.Message)
    80  //				}
    81  //			}
    82  //
    83  //			// Finally, patch the resource
    84  //			if err := patchHelper.Patch(ctx, obj, patchOpts...); err != nil {
    85  //				retErr = kerrors.NewAggregate([]error{retErr, err})
    86  //			}
    87  //		}()
    88  //
    89  //		// ...start with actual reconciliation logic
    90  //	}
    91  //
    92  // Using this pattern, one-off or scoped patches for a subset of a reconcile operation can be made by initialising a new
    93  // Helper using NewHelper with the current state of the resource, making the modifications, and then directly applying
    94  // the patch using Helper.Patch, for example:
    95  //
    96  //	func (r *FooReconciler) subsetReconcile(ctx context.Context, obj *v1.Foo) (ctrl.Result, error) {
    97  //		patchHelper, err := patch.NewHelper(obj, r.Client)
    98  //		if err != nil {
    99  //			return ctrl.Result{}, err
   100  //		}
   101  //
   102  //		// Set CustomField in status object of resource
   103  //		obj.Status.CustomField = "value"
   104  //
   105  //		// Patch now only attempts to persist CustomField
   106  //		patchHelper.Patch(ctx, obj, nil)
   107  //	}
   108  type Helper struct {
   109  	client       client.Client
   110  	gvk          schema.GroupVersionKind
   111  	beforeObject client.Object
   112  	before       *unstructured.Unstructured
   113  	after        *unstructured.Unstructured
   114  	changes      map[string]bool
   115  
   116  	isConditionsSetter bool
   117  }
   118  
   119  // NewHelper returns an initialised Helper.
   120  func NewHelper(obj client.Object, crClient client.Client) (*Helper, error) {
   121  	// Get the GroupVersionKind of the object,
   122  	// used to validate against later on.
   123  	gvk, err := apiutil.GVKForObject(obj, crClient.Scheme())
   124  	if err != nil {
   125  		return nil, err
   126  	}
   127  
   128  	// Convert the object to unstructured to compare against our before copy.
   129  	unstructuredObj, err := edgeUnstructured.ToUnstructured(obj)
   130  	if err != nil {
   131  		return nil, err
   132  	}
   133  
   134  	// Check if the object satisfies the GitOps Toolkit API conditions contract.
   135  	_, canInterfaceConditions := obj.(conditions.Setter)
   136  
   137  	return &Helper{
   138  		client:             crClient,
   139  		gvk:                gvk,
   140  		before:             unstructuredObj,
   141  		beforeObject:       obj.DeepCopyObject().(client.Object),
   142  		isConditionsSetter: canInterfaceConditions,
   143  	}, nil
   144  }
   145  
   146  // Patch will attempt to patch the given object, including its status.
   147  func (h *Helper) Patch(ctx context.Context, obj client.Object, opts ...Option) error {
   148  	// Get the GroupVersionKind of the object that we want to patch.
   149  	gvk, err := apiutil.GVKForObject(obj, h.client.Scheme())
   150  	if err != nil {
   151  		return err
   152  	}
   153  	if gvk != h.gvk {
   154  		return errors.Errorf("unmatched GroupVersionKind, expected %q got %q", h.gvk, gvk)
   155  	}
   156  
   157  	// Calculate the options.
   158  	options := &HelperOptions{}
   159  	for _, opt := range opts {
   160  		opt.ApplyToHelper(options)
   161  	}
   162  
   163  	// Convert the object to unstructured to compare against our before copy.
   164  	h.after, err = edgeUnstructured.ToUnstructured(obj)
   165  	if err != nil {
   166  		return err
   167  	}
   168  
   169  	// Determine if the object has status.
   170  	if unstructuredHasStatus(h.after) && options.IncludeStatusObservedGeneration {
   171  		// Set status.observedGeneration if we're asked to do so.
   172  		if err := unstructured.SetNestedField(h.after.Object, h.after.GetGeneration(), "status", "observedGeneration"); err != nil {
   173  			return err
   174  		}
   175  
   176  		// Restore the changes back to the original object.
   177  		if err := runtime.DefaultUnstructuredConverter.FromUnstructured(h.after.Object, obj); err != nil {
   178  			return err
   179  		}
   180  	}
   181  
   182  	// Calculate and store the top-level field changes (e.g. "metadata", "spec", "status") we have before/after.
   183  	h.changes, err = h.calculateChanges(obj)
   184  	if err != nil {
   185  		return err
   186  	}
   187  
   188  	// Define K8s client options
   189  	var clientOpts []client.PatchOption
   190  	if options.FieldOwner != "" {
   191  		clientOpts = append(clientOpts, client.FieldOwner(options.FieldOwner))
   192  	}
   193  
   194  	// Issue patches and return errors in an aggregate.
   195  	return kerrors.NewAggregate([]error{
   196  		// Patch the conditions first.
   197  		//
   198  		// Given that we pass in metadata.resourceVersion to perform a 3-way-merge conflict resolution,
   199  		// patching conditions first avoids an extra loop if spec or status patch succeeds first
   200  		// given that causes the resourceVersion to mutate.
   201  		h.patchStatusConditions(ctx, obj, options.ForceOverwriteConditions, options.OwnedConditions, clientOpts...),
   202  
   203  		// Then proceed to patch the rest of the object.
   204  		h.patch(ctx, obj, clientOpts...),
   205  		h.patchStatus(ctx, obj, clientOpts...),
   206  	})
   207  }
   208  
   209  // patch issues a patch for metadata and spec.
   210  func (h *Helper) patch(ctx context.Context, obj client.Object, opts ...client.PatchOption) error {
   211  	if !h.shouldPatch("metadata") && !h.shouldPatch("spec") {
   212  		return nil
   213  	}
   214  	beforeObject, afterObject, err := h.calculatePatch(obj, specPatch)
   215  	if err != nil {
   216  		return err
   217  	}
   218  	return h.client.Patch(ctx, afterObject, client.MergeFromWithOptions(beforeObject), opts...)
   219  }
   220  
   221  // patchStatus issues a patch if the status has changed.
   222  func (h *Helper) patchStatus(ctx context.Context, obj client.Object, opts ...client.PatchOption) error {
   223  	if !h.shouldPatch("status") {
   224  		return nil
   225  	}
   226  	beforeObject, afterObject, err := h.calculatePatch(obj, statusPatch)
   227  	if err != nil {
   228  		return err
   229  	}
   230  	patchOpts := &client.PatchOptions{}
   231  	patchOpts = patchOpts.ApplyOptions(opts)
   232  	return h.client.Status().Patch(ctx, afterObject, client.MergeFrom(beforeObject), &client.SubResourcePatchOptions{PatchOptions: *patchOpts})
   233  }
   234  
   235  // patchStatusConditions issues a patch if there are any changes to the conditions slice under the status subresource.
   236  // This is a special case and it's handled separately given that we allow different controllers to act on conditions of
   237  // the same object.
   238  //
   239  // This method has an internal backoff loop. When a conflict is detected, the method asks the Client for the new
   240  // version of the object we're trying to patch.
   241  //
   242  // Condition changes are then applied to the latest version of the object, and if there are no unresolvable conflicts,
   243  // the patch is sent again.
   244  func (h *Helper) patchStatusConditions(ctx context.Context, obj client.Object, forceOverwrite bool, ownedConditions []string, opts ...client.PatchOption) error {
   245  	// Nothing to do if the object isn't a condition patcher.
   246  	if !h.isConditionsSetter {
   247  		return nil
   248  	}
   249  
   250  	// Make sure our before/after objects satisfy the proper interface before continuing.
   251  	//
   252  	// NOTE: The checks and error below are done so that we don't panic if any of the objects don't satisfy the
   253  	// interface any longer, although this shouldn't happen because we already check when creating the patcher.
   254  	before, ok := h.beforeObject.(conditions.Getter)
   255  	if !ok {
   256  		return errors.Errorf("object %s doesn't satisfy conditions.Getter, cannot patch", before.GetObjectKind())
   257  	}
   258  	after, ok := obj.(conditions.Getter)
   259  	if !ok {
   260  		return errors.Errorf("object %s doesn't satisfy conditions.Getter, cannot patch", after.GetObjectKind())
   261  	}
   262  
   263  	// Store the diff from the before/after object, and return early if there are no changes.
   264  	diff := conditions.NewPatch(
   265  		before,
   266  		after,
   267  	)
   268  	if diff.IsZero() {
   269  		return nil
   270  	}
   271  
   272  	// Make a copy of the object and store the key used if we have conflicts.
   273  	key := client.ObjectKeyFromObject(after)
   274  
   275  	// Define and start a backoff loop to handle conflicts
   276  	// between controllers working on the same object.
   277  	//
   278  	// This has been copied from https://github.com/kubernetes/kubernetes/blob/release-1.16/pkg/controller/controller_utils.go#L86-L88.
   279  	backoff := wait.Backoff{
   280  		Steps:    5,
   281  		Duration: 100 * time.Millisecond,
   282  		Jitter:   1.0,
   283  	}
   284  
   285  	// Start the backoff loop and return errors if any.
   286  	return wait.ExponentialBackoff(backoff, func() (bool, error) {
   287  		latest, ok := before.DeepCopyObject().(conditions.Setter)
   288  		if !ok {
   289  			return false, errors.Errorf("object %s doesn't satisfy conditions.Setter, cannot patch", latest.GetObjectKind())
   290  		}
   291  
   292  		// Get a new copy of the object.
   293  		if err := h.client.Get(ctx, key, latest); err != nil {
   294  			return false, err
   295  		}
   296  		// Create the condition patch before merging conditions.
   297  		conditionsPatch := client.MergeFromWithOptions(latest.DeepCopyObject().(conditions.Setter), client.MergeFromWithOptimisticLock{})
   298  
   299  		// Set the condition patch previously created on the new object.
   300  		if err := diff.Apply(latest, conditions.WithForceOverwrite(forceOverwrite), conditions.WithOwnedConditions(ownedConditions...)); err != nil {
   301  			return false, err
   302  		}
   303  
   304  		// Issue the patch.
   305  		patchOpts := &client.PatchOptions{}
   306  		patchOpts = patchOpts.ApplyOptions(opts)
   307  		err := h.client.Status().Patch(ctx, latest, conditionsPatch, &client.SubResourcePatchOptions{PatchOptions: *patchOpts})
   308  		switch {
   309  		case apierrors.IsConflict(err):
   310  			// Requeue.
   311  			return false, nil
   312  		case err != nil:
   313  			return false, err
   314  		default:
   315  			return true, nil
   316  		}
   317  	})
   318  }
   319  
   320  // calculatePatch returns the before/after objects to be given in a controller-runtime patch, scoped down to the
   321  // absolute necessary.
   322  func (h *Helper) calculatePatch(afterObj client.Object, focus patchType) (client.Object, client.Object, error) {
   323  	// Get a shallow unsafe copy of the before/after object in unstructured form.
   324  	before := unsafeUnstructuredCopy(h.before, focus, h.isConditionsSetter)
   325  	after := unsafeUnstructuredCopy(h.after, focus, h.isConditionsSetter)
   326  
   327  	// We've now applied all modifications to local unstructured objects,
   328  	// make copies of the original objects and convert them back.
   329  	beforeObj := h.beforeObject.DeepCopyObject().(client.Object)
   330  	if err := runtime.DefaultUnstructuredConverter.FromUnstructured(before.Object, beforeObj); err != nil {
   331  		return nil, nil, err
   332  	}
   333  	afterObj = afterObj.DeepCopyObject().(client.Object)
   334  	if err := runtime.DefaultUnstructuredConverter.FromUnstructured(after.Object, afterObj); err != nil {
   335  		return nil, nil, err
   336  	}
   337  	return beforeObj, afterObj, nil
   338  }
   339  
   340  func (h *Helper) shouldPatch(in string) bool {
   341  	return h.changes[in]
   342  }
   343  
   344  // calculate changes tries to build a patch from the before/after objects we have and store in a map which top-level
   345  // fields (e.g. `metadata`, `spec`, `status`, etc.) have changed.
   346  func (h *Helper) calculateChanges(after client.Object) (map[string]bool, error) {
   347  	// Calculate patch data.
   348  	patch := client.MergeFrom(h.beforeObject)
   349  	diff, err := patch.Data(after)
   350  	if err != nil {
   351  		return nil, errors.Wrapf(err, "failed to calculate patch data")
   352  	}
   353  
   354  	// Unmarshal patch data into a local map.
   355  	patchDiff := map[string]interface{}{}
   356  	if err := json.Unmarshal(diff, &patchDiff); err != nil {
   357  		return nil, errors.Wrapf(err, "failed to unmarshal patch data into a map")
   358  	}
   359  
   360  	// Return the map.
   361  	res := make(map[string]bool, len(patchDiff))
   362  	for key := range patchDiff {
   363  		res[key] = true
   364  	}
   365  	return res, nil
   366  }
   367  

View as plain text