...

Source file src/k8s.io/kubectl/pkg/cmd/apply/patcher.go

Documentation: k8s.io/kubectl/pkg/cmd/apply

     1  /*
     2  Copyright 2019 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package apply
    18  
    19  import (
    20  	"encoding/json"
    21  	"fmt"
    22  	"io"
    23  	"time"
    24  
    25  	"github.com/pkg/errors"
    26  
    27  	"github.com/jonboulle/clockwork"
    28  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    29  	"k8s.io/apimachinery/pkg/api/meta"
    30  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    31  	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    32  	"k8s.io/apimachinery/pkg/runtime"
    33  	"k8s.io/apimachinery/pkg/runtime/schema"
    34  	"k8s.io/apimachinery/pkg/types"
    35  	"k8s.io/apimachinery/pkg/util/jsonmergepatch"
    36  	"k8s.io/apimachinery/pkg/util/mergepatch"
    37  	"k8s.io/apimachinery/pkg/util/strategicpatch"
    38  	"k8s.io/apimachinery/pkg/util/wait"
    39  	"k8s.io/cli-runtime/pkg/resource"
    40  	"k8s.io/client-go/openapi3"
    41  	"k8s.io/klog/v2"
    42  	"k8s.io/kube-openapi/pkg/validation/spec"
    43  	cmdutil "k8s.io/kubectl/pkg/cmd/util"
    44  	"k8s.io/kubectl/pkg/scheme"
    45  	"k8s.io/kubectl/pkg/util"
    46  	"k8s.io/kubectl/pkg/util/openapi"
    47  )
    48  
    49  const (
    50  	// maxPatchRetry is the maximum number of conflicts retry for during a patch operation before returning failure
    51  	maxPatchRetry = 5
    52  	// backOffPeriod is the period to back off when apply patch results in error.
    53  	backOffPeriod = 1 * time.Second
    54  	// how many times we can retry before back off
    55  	triesBeforeBackOff = 1
    56  	// groupVersionKindExtensionKey is the key used to lookup the
    57  	// GroupVersionKind value for an object definition from the
    58  	// definition's "extensions" map.
    59  	groupVersionKindExtensionKey = "x-kubernetes-group-version-kind"
    60  )
    61  
    62  var createPatchErrFormat = "creating patch with:\noriginal:\n%s\nmodified:\n%s\ncurrent:\n%s\nfor:"
    63  
    64  // Patcher defines options to patch OpenAPI objects.
    65  type Patcher struct {
    66  	Mapping *meta.RESTMapping
    67  	Helper  *resource.Helper
    68  
    69  	Overwrite bool
    70  	BackOff   clockwork.Clock
    71  
    72  	Force             bool
    73  	CascadingStrategy metav1.DeletionPropagation
    74  	Timeout           time.Duration
    75  	GracePeriod       int
    76  
    77  	// If set, forces the patch against a specific resourceVersion
    78  	ResourceVersion *string
    79  
    80  	// Number of retries to make if the patch fails with conflict
    81  	Retries int
    82  
    83  	OpenAPIGetter openapi.OpenAPIResourcesGetter
    84  	OpenAPIV3Root openapi3.Root
    85  }
    86  
    87  func newPatcher(o *ApplyOptions, info *resource.Info, helper *resource.Helper) (*Patcher, error) {
    88  	var openAPIGetter openapi.OpenAPIResourcesGetter
    89  	var openAPIV3Root openapi3.Root
    90  
    91  	if o.OpenAPIPatch {
    92  		openAPIGetter = o.OpenAPIGetter
    93  		openAPIV3Root = o.OpenAPIV3Root
    94  	}
    95  
    96  	return &Patcher{
    97  		Mapping:           info.Mapping,
    98  		Helper:            helper,
    99  		Overwrite:         o.Overwrite,
   100  		BackOff:           clockwork.NewRealClock(),
   101  		Force:             o.DeleteOptions.ForceDeletion,
   102  		CascadingStrategy: o.DeleteOptions.CascadingStrategy,
   103  		Timeout:           o.DeleteOptions.Timeout,
   104  		GracePeriod:       o.DeleteOptions.GracePeriod,
   105  		OpenAPIGetter:     openAPIGetter,
   106  		OpenAPIV3Root:     openAPIV3Root,
   107  		Retries:           maxPatchRetry,
   108  	}, nil
   109  }
   110  
   111  func (p *Patcher) delete(namespace, name string) error {
   112  	options := asDeleteOptions(p.CascadingStrategy, p.GracePeriod)
   113  	_, err := p.Helper.DeleteWithOptions(namespace, name, &options)
   114  	return err
   115  }
   116  
   117  func (p *Patcher) patchSimple(obj runtime.Object, modified []byte, namespace, name string, errOut io.Writer) ([]byte, runtime.Object, error) {
   118  	// Serialize the current configuration of the object from the server.
   119  	current, err := runtime.Encode(unstructured.UnstructuredJSONScheme, obj)
   120  	if err != nil {
   121  		return nil, nil, errors.Wrapf(err, "serializing current configuration from:\n%v\nfor:", obj)
   122  	}
   123  
   124  	// Retrieve the original configuration of the object from the annotation.
   125  	original, err := util.GetOriginalConfiguration(obj)
   126  	if err != nil {
   127  		return nil, nil, errors.Wrapf(err, "retrieving original configuration from:\n%v\nfor:", obj)
   128  	}
   129  
   130  	var patchType types.PatchType
   131  	var patch []byte
   132  
   133  	if p.OpenAPIV3Root != nil {
   134  		gvkSupported, err := p.gvkSupportsPatchOpenAPIV3(p.Mapping.GroupVersionKind)
   135  		if err != nil {
   136  			// Realistically this error logging is not needed (not present in V2),
   137  			// but would help us in debugging if users encounter a problem
   138  			// with OpenAPI V3 not present in V2.
   139  			klog.V(5).Infof("warning: OpenAPI V3 path does not exist - group: %s, version %s, kind %s\n",
   140  				p.Mapping.GroupVersionKind.Group, p.Mapping.GroupVersionKind.Version, p.Mapping.GroupVersionKind.Kind)
   141  		} else if gvkSupported {
   142  			patch, err = p.buildStrategicMergePatchFromOpenAPIV3(original, modified, current)
   143  			if err != nil {
   144  				// Fall back to OpenAPI V2 if there is a problem
   145  				// We should remove the fallback in the future,
   146  				// but for the first release it might be beneficial
   147  				// to fall back to OpenAPI V2 while logging the error
   148  				// and seeing if we get any bug reports.
   149  				fmt.Fprintf(errOut, "warning: error calculating patch from openapi v3 spec: %v\n", err)
   150  			} else {
   151  				patchType = types.StrategicMergePatchType
   152  			}
   153  		} else {
   154  			klog.V(5).Infof("warning: OpenAPI V3 path does not support strategic merge patch - group: %s, version %s, kind %s\n",
   155  				p.Mapping.GroupVersionKind.Group, p.Mapping.GroupVersionKind.Version, p.Mapping.GroupVersionKind.Kind)
   156  		}
   157  	}
   158  
   159  	if patch == nil && p.OpenAPIGetter != nil {
   160  		if openAPISchema, err := p.OpenAPIGetter.OpenAPISchema(); err == nil && openAPISchema != nil {
   161  			// if openapischema is used, we'll try to get required patch type for this GVK from Open API.
   162  			// if it fails or could not find any patch type, fall back to baked-in patch type determination.
   163  			if patchType, err = p.getPatchTypeFromOpenAPI(openAPISchema, p.Mapping.GroupVersionKind); err == nil && patchType == types.StrategicMergePatchType {
   164  				patch, err = p.buildStrategicMergeFromOpenAPI(openAPISchema, original, modified, current)
   165  				if err != nil {
   166  					// Warn user about problem and continue strategic merge patching using builtin types.
   167  					fmt.Fprintf(errOut, "warning: error calculating patch from openapi spec: %v\n", err)
   168  				}
   169  			}
   170  		}
   171  	}
   172  
   173  	if patch == nil {
   174  		versionedObj, err := scheme.Scheme.New(p.Mapping.GroupVersionKind)
   175  		if err == nil {
   176  			patchType = types.StrategicMergePatchType
   177  			patch, err = p.buildStrategicMergeFromBuiltins(versionedObj, original, modified, current)
   178  			if err != nil {
   179  				return nil, nil, errors.Wrapf(err, createPatchErrFormat, original, modified, current)
   180  			}
   181  		} else {
   182  			if !runtime.IsNotRegisteredError(err) {
   183  				return nil, nil, errors.Wrapf(err, "getting instance of versioned object for %v:", p.Mapping.GroupVersionKind)
   184  			}
   185  
   186  			patchType = types.MergePatchType
   187  			patch, err = p.buildMergePatch(original, modified, current)
   188  			if err != nil {
   189  				return nil, nil, errors.Wrapf(err, createPatchErrFormat, original, modified, current)
   190  			}
   191  		}
   192  	}
   193  
   194  	if string(patch) == "{}" {
   195  		return patch, obj, nil
   196  	}
   197  
   198  	if p.ResourceVersion != nil {
   199  		patch, err = addResourceVersion(patch, *p.ResourceVersion)
   200  		if err != nil {
   201  			return nil, nil, errors.Wrap(err, "Failed to insert resourceVersion in patch")
   202  		}
   203  	}
   204  
   205  	patchedObj, err := p.Helper.Patch(namespace, name, patchType, patch, nil)
   206  	return patch, patchedObj, err
   207  }
   208  
   209  // buildMergePatch builds patch according to the JSONMergePatch which is used for
   210  // custom resource definitions.
   211  func (p *Patcher) buildMergePatch(original, modified, current []byte) ([]byte, error) {
   212  	preconditions := []mergepatch.PreconditionFunc{mergepatch.RequireKeyUnchanged("apiVersion"),
   213  		mergepatch.RequireKeyUnchanged("kind"), mergepatch.RequireMetadataKeyUnchanged("name")}
   214  	patch, err := jsonmergepatch.CreateThreeWayJSONMergePatch(original, modified, current, preconditions...)
   215  	if err != nil {
   216  		if mergepatch.IsPreconditionFailed(err) {
   217  			return nil, fmt.Errorf("%s", "At least one of apiVersion, kind and name was changed")
   218  		}
   219  		return nil, err
   220  	}
   221  
   222  	return patch, nil
   223  }
   224  
   225  // gvkSupportsPatchOpenAPIV3 checks if a particular GVK supports the patch operation.
   226  // It returns an error if the OpenAPI V3 could not be downloaded.
   227  func (p *Patcher) gvkSupportsPatchOpenAPIV3(gvk schema.GroupVersionKind) (bool, error) {
   228  	gvSpec, err := p.OpenAPIV3Root.GVSpec(schema.GroupVersion{
   229  		Group:   p.Mapping.GroupVersionKind.Group,
   230  		Version: p.Mapping.GroupVersionKind.Version,
   231  	})
   232  	if err != nil {
   233  		return false, err
   234  	}
   235  	if gvSpec == nil || gvSpec.Paths == nil || gvSpec.Paths.Paths == nil {
   236  		return false, fmt.Errorf("gvk group: %s, version: %s, kind: %s does not exist for OpenAPI V3", gvk.Group, gvk.Version, gvk.Kind)
   237  	}
   238  	for _, path := range gvSpec.Paths.Paths {
   239  		if path.Patch != nil {
   240  			if gvkMatchesSingle(p.Mapping.GroupVersionKind, path.Patch.Extensions) {
   241  				if path.Patch.RequestBody == nil || path.Patch.RequestBody.Content == nil {
   242  					// GVK exists but does not support requestBody. Indication of malformed OpenAPI.
   243  					return false, nil
   244  				}
   245  				if _, ok := path.Patch.RequestBody.Content["application/strategic-merge-patch+json"]; ok {
   246  					return true, nil
   247  				}
   248  				// GVK exists but strategic-merge-patch is not supported. Likely to be a CRD or aggregated resource.
   249  				return false, nil
   250  			}
   251  		}
   252  	}
   253  	return false, nil
   254  }
   255  
   256  func gvkMatchesArray(targetGVK schema.GroupVersionKind, ext spec.Extensions) bool {
   257  	var gvkList []map[string]string
   258  	err := ext.GetObject(groupVersionKindExtensionKey, &gvkList)
   259  	if err != nil {
   260  		return false
   261  	}
   262  	for _, gvkMap := range gvkList {
   263  		if gvkMap["group"] == targetGVK.Group &&
   264  			gvkMap["version"] == targetGVK.Version &&
   265  			gvkMap["kind"] == targetGVK.Kind {
   266  			return true
   267  		}
   268  	}
   269  	return false
   270  }
   271  
   272  func gvkMatchesSingle(targetGVK schema.GroupVersionKind, ext spec.Extensions) bool {
   273  	var gvkMap map[string]string
   274  	err := ext.GetObject(groupVersionKindExtensionKey, &gvkMap)
   275  	if err != nil {
   276  		return false
   277  	}
   278  	return gvkMap["group"] == targetGVK.Group &&
   279  		gvkMap["version"] == targetGVK.Version &&
   280  		gvkMap["kind"] == targetGVK.Kind
   281  }
   282  
   283  func (p *Patcher) buildStrategicMergePatchFromOpenAPIV3(original, modified, current []byte) ([]byte, error) {
   284  	gvSpec, err := p.OpenAPIV3Root.GVSpec(schema.GroupVersion{
   285  		Group:   p.Mapping.GroupVersionKind.Group,
   286  		Version: p.Mapping.GroupVersionKind.Version,
   287  	})
   288  	if err != nil {
   289  		return nil, err
   290  	}
   291  	if gvSpec == nil || gvSpec.Components == nil {
   292  		return nil, fmt.Errorf("OpenAPI V3 Components is nil")
   293  	}
   294  	for _, c := range gvSpec.Components.Schemas {
   295  		if !gvkMatchesArray(p.Mapping.GroupVersionKind, c.Extensions) {
   296  			continue
   297  		}
   298  		lookupPatchMeta := strategicpatch.PatchMetaFromOpenAPIV3{Schema: c, SchemaList: gvSpec.Components.Schemas}
   299  		if openapiv3Patch, err := strategicpatch.CreateThreeWayMergePatch(original, modified, current, lookupPatchMeta, p.Overwrite); err != nil {
   300  			return nil, err
   301  		} else {
   302  			return openapiv3Patch, nil
   303  		}
   304  
   305  	}
   306  	return nil, nil
   307  }
   308  
   309  // buildStrategicMergeFromOpenAPI builds patch from OpenAPI if it is enabled.
   310  // This is used for core types which is published in openapi.
   311  func (p *Patcher) buildStrategicMergeFromOpenAPI(openAPISchema openapi.Resources, original, modified, current []byte) ([]byte, error) {
   312  	schema := openAPISchema.LookupResource(p.Mapping.GroupVersionKind)
   313  	if schema == nil {
   314  		// Missing schema returns nil patch; also no error.
   315  		return nil, nil
   316  	}
   317  	lookupPatchMeta := strategicpatch.PatchMetaFromOpenAPI{Schema: schema}
   318  	if openapiPatch, err := strategicpatch.CreateThreeWayMergePatch(original, modified, current, lookupPatchMeta, p.Overwrite); err != nil {
   319  		return nil, err
   320  	} else {
   321  		return openapiPatch, nil
   322  	}
   323  }
   324  
   325  // getPatchTypeFromOpenAPI looks up patch types supported by given GroupVersionKind in Open API.
   326  func (p *Patcher) getPatchTypeFromOpenAPI(openAPISchema openapi.Resources, gvk schema.GroupVersionKind) (types.PatchType, error) {
   327  	if pc := openAPISchema.GetConsumes(p.Mapping.GroupVersionKind, "PATCH"); pc != nil {
   328  		for _, c := range pc {
   329  			if c == string(types.StrategicMergePatchType) {
   330  				return types.StrategicMergePatchType, nil
   331  			}
   332  		}
   333  
   334  		return types.MergePatchType, nil
   335  	}
   336  
   337  	return types.MergePatchType, fmt.Errorf("unable to find any patch type for %s in Open API", gvk)
   338  }
   339  
   340  // buildStrategicMergeFromStruct builds patch from struct. This is used when
   341  // openapi endpoint is not working or user disables it by setting openapi-patch flag
   342  // to false.
   343  func (p *Patcher) buildStrategicMergeFromBuiltins(versionedObj runtime.Object, original, modified, current []byte) ([]byte, error) {
   344  	lookupPatchMeta, err := strategicpatch.NewPatchMetaFromStruct(versionedObj)
   345  	if err != nil {
   346  		return nil, err
   347  	}
   348  	patch, err := strategicpatch.CreateThreeWayMergePatch(original, modified, current, lookupPatchMeta, p.Overwrite)
   349  	if err != nil {
   350  		return nil, err
   351  	}
   352  
   353  	return patch, nil
   354  }
   355  
   356  // Patch tries to patch an OpenAPI resource. On success, returns the merge patch as well
   357  // the final patched object. On failure, returns an error.
   358  func (p *Patcher) Patch(current runtime.Object, modified []byte, source, namespace, name string, errOut io.Writer) ([]byte, runtime.Object, error) {
   359  	var getErr error
   360  	patchBytes, patchObject, err := p.patchSimple(current, modified, namespace, name, errOut)
   361  	if p.Retries == 0 {
   362  		p.Retries = maxPatchRetry
   363  	}
   364  	for i := 1; i <= p.Retries && apierrors.IsConflict(err); i++ {
   365  		if i > triesBeforeBackOff {
   366  			p.BackOff.Sleep(backOffPeriod)
   367  		}
   368  		current, getErr = p.Helper.Get(namespace, name)
   369  		if getErr != nil {
   370  			return nil, nil, getErr
   371  		}
   372  		patchBytes, patchObject, err = p.patchSimple(current, modified, namespace, name, errOut)
   373  	}
   374  	if err != nil {
   375  		if (apierrors.IsConflict(err) || apierrors.IsInvalid(err)) && p.Force {
   376  			patchBytes, patchObject, err = p.deleteAndCreate(current, modified, namespace, name)
   377  		} else {
   378  			err = cmdutil.AddSourceToErr("patching", source, err)
   379  		}
   380  	}
   381  	return patchBytes, patchObject, err
   382  }
   383  
   384  func (p *Patcher) deleteAndCreate(original runtime.Object, modified []byte, namespace, name string) ([]byte, runtime.Object, error) {
   385  	if err := p.delete(namespace, name); err != nil {
   386  		return modified, nil, err
   387  	}
   388  	// TODO: use wait
   389  	if err := wait.PollImmediate(1*time.Second, p.Timeout, func() (bool, error) {
   390  		if _, err := p.Helper.Get(namespace, name); !apierrors.IsNotFound(err) {
   391  			return false, err
   392  		}
   393  		return true, nil
   394  	}); err != nil {
   395  		return modified, nil, err
   396  	}
   397  	versionedObject, _, err := unstructured.UnstructuredJSONScheme.Decode(modified, nil, nil)
   398  	if err != nil {
   399  		return modified, nil, err
   400  	}
   401  	createdObject, err := p.Helper.Create(namespace, true, versionedObject)
   402  	if err != nil {
   403  		// restore the original object if we fail to create the new one
   404  		// but still propagate and advertise error to user
   405  		recreated, recreateErr := p.Helper.Create(namespace, true, original)
   406  		if recreateErr != nil {
   407  			err = fmt.Errorf("An error occurred force-replacing the existing object with the newly provided one:\n\n%v.\n\nAdditionally, an error occurred attempting to restore the original object:\n\n%v", err, recreateErr)
   408  		} else {
   409  			createdObject = recreated
   410  		}
   411  	}
   412  	return modified, createdObject, err
   413  }
   414  
   415  func addResourceVersion(patch []byte, rv string) ([]byte, error) {
   416  	var patchMap map[string]interface{}
   417  	err := json.Unmarshal(patch, &patchMap)
   418  	if err != nil {
   419  		return nil, err
   420  	}
   421  	u := unstructured.Unstructured{Object: patchMap}
   422  	a, err := meta.Accessor(&u)
   423  	if err != nil {
   424  		return nil, err
   425  	}
   426  	a.SetResourceVersion(rv)
   427  
   428  	return json.Marshal(patchMap)
   429  }
   430  

View as plain text