1
16
17 package apply
18
19 import (
20 "encoding/json"
21 "fmt"
22 "io"
23 "time"
24
25 "github.com/pkg/errors"
26
27 "github.com/jonboulle/clockwork"
28 apierrors "k8s.io/apimachinery/pkg/api/errors"
29 "k8s.io/apimachinery/pkg/api/meta"
30 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
31 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
32 "k8s.io/apimachinery/pkg/runtime"
33 "k8s.io/apimachinery/pkg/runtime/schema"
34 "k8s.io/apimachinery/pkg/types"
35 "k8s.io/apimachinery/pkg/util/jsonmergepatch"
36 "k8s.io/apimachinery/pkg/util/mergepatch"
37 "k8s.io/apimachinery/pkg/util/strategicpatch"
38 "k8s.io/apimachinery/pkg/util/wait"
39 "k8s.io/cli-runtime/pkg/resource"
40 "k8s.io/client-go/openapi3"
41 "k8s.io/klog/v2"
42 "k8s.io/kube-openapi/pkg/validation/spec"
43 cmdutil "k8s.io/kubectl/pkg/cmd/util"
44 "k8s.io/kubectl/pkg/scheme"
45 "k8s.io/kubectl/pkg/util"
46 "k8s.io/kubectl/pkg/util/openapi"
47 )
48
49 const (
50
51 maxPatchRetry = 5
52
53 backOffPeriod = 1 * time.Second
54
55 triesBeforeBackOff = 1
56
57
58
59 groupVersionKindExtensionKey = "x-kubernetes-group-version-kind"
60 )
61
62 var createPatchErrFormat = "creating patch with:\noriginal:\n%s\nmodified:\n%s\ncurrent:\n%s\nfor:"
63
64
65 type Patcher struct {
66 Mapping *meta.RESTMapping
67 Helper *resource.Helper
68
69 Overwrite bool
70 BackOff clockwork.Clock
71
72 Force bool
73 CascadingStrategy metav1.DeletionPropagation
74 Timeout time.Duration
75 GracePeriod int
76
77
78 ResourceVersion *string
79
80
81 Retries int
82
83 OpenAPIGetter openapi.OpenAPIResourcesGetter
84 OpenAPIV3Root openapi3.Root
85 }
86
87 func newPatcher(o *ApplyOptions, info *resource.Info, helper *resource.Helper) (*Patcher, error) {
88 var openAPIGetter openapi.OpenAPIResourcesGetter
89 var openAPIV3Root openapi3.Root
90
91 if o.OpenAPIPatch {
92 openAPIGetter = o.OpenAPIGetter
93 openAPIV3Root = o.OpenAPIV3Root
94 }
95
96 return &Patcher{
97 Mapping: info.Mapping,
98 Helper: helper,
99 Overwrite: o.Overwrite,
100 BackOff: clockwork.NewRealClock(),
101 Force: o.DeleteOptions.ForceDeletion,
102 CascadingStrategy: o.DeleteOptions.CascadingStrategy,
103 Timeout: o.DeleteOptions.Timeout,
104 GracePeriod: o.DeleteOptions.GracePeriod,
105 OpenAPIGetter: openAPIGetter,
106 OpenAPIV3Root: openAPIV3Root,
107 Retries: maxPatchRetry,
108 }, nil
109 }
110
111 func (p *Patcher) delete(namespace, name string) error {
112 options := asDeleteOptions(p.CascadingStrategy, p.GracePeriod)
113 _, err := p.Helper.DeleteWithOptions(namespace, name, &options)
114 return err
115 }
116
117 func (p *Patcher) patchSimple(obj runtime.Object, modified []byte, namespace, name string, errOut io.Writer) ([]byte, runtime.Object, error) {
118
119 current, err := runtime.Encode(unstructured.UnstructuredJSONScheme, obj)
120 if err != nil {
121 return nil, nil, errors.Wrapf(err, "serializing current configuration from:\n%v\nfor:", obj)
122 }
123
124
125 original, err := util.GetOriginalConfiguration(obj)
126 if err != nil {
127 return nil, nil, errors.Wrapf(err, "retrieving original configuration from:\n%v\nfor:", obj)
128 }
129
130 var patchType types.PatchType
131 var patch []byte
132
133 if p.OpenAPIV3Root != nil {
134 gvkSupported, err := p.gvkSupportsPatchOpenAPIV3(p.Mapping.GroupVersionKind)
135 if err != nil {
136
137
138
139 klog.V(5).Infof("warning: OpenAPI V3 path does not exist - group: %s, version %s, kind %s\n",
140 p.Mapping.GroupVersionKind.Group, p.Mapping.GroupVersionKind.Version, p.Mapping.GroupVersionKind.Kind)
141 } else if gvkSupported {
142 patch, err = p.buildStrategicMergePatchFromOpenAPIV3(original, modified, current)
143 if err != nil {
144
145
146
147
148
149 fmt.Fprintf(errOut, "warning: error calculating patch from openapi v3 spec: %v\n", err)
150 } else {
151 patchType = types.StrategicMergePatchType
152 }
153 } else {
154 klog.V(5).Infof("warning: OpenAPI V3 path does not support strategic merge patch - group: %s, version %s, kind %s\n",
155 p.Mapping.GroupVersionKind.Group, p.Mapping.GroupVersionKind.Version, p.Mapping.GroupVersionKind.Kind)
156 }
157 }
158
159 if patch == nil && p.OpenAPIGetter != nil {
160 if openAPISchema, err := p.OpenAPIGetter.OpenAPISchema(); err == nil && openAPISchema != nil {
161
162
163 if patchType, err = p.getPatchTypeFromOpenAPI(openAPISchema, p.Mapping.GroupVersionKind); err == nil && patchType == types.StrategicMergePatchType {
164 patch, err = p.buildStrategicMergeFromOpenAPI(openAPISchema, original, modified, current)
165 if err != nil {
166
167 fmt.Fprintf(errOut, "warning: error calculating patch from openapi spec: %v\n", err)
168 }
169 }
170 }
171 }
172
173 if patch == nil {
174 versionedObj, err := scheme.Scheme.New(p.Mapping.GroupVersionKind)
175 if err == nil {
176 patchType = types.StrategicMergePatchType
177 patch, err = p.buildStrategicMergeFromBuiltins(versionedObj, original, modified, current)
178 if err != nil {
179 return nil, nil, errors.Wrapf(err, createPatchErrFormat, original, modified, current)
180 }
181 } else {
182 if !runtime.IsNotRegisteredError(err) {
183 return nil, nil, errors.Wrapf(err, "getting instance of versioned object for %v:", p.Mapping.GroupVersionKind)
184 }
185
186 patchType = types.MergePatchType
187 patch, err = p.buildMergePatch(original, modified, current)
188 if err != nil {
189 return nil, nil, errors.Wrapf(err, createPatchErrFormat, original, modified, current)
190 }
191 }
192 }
193
194 if string(patch) == "{}" {
195 return patch, obj, nil
196 }
197
198 if p.ResourceVersion != nil {
199 patch, err = addResourceVersion(patch, *p.ResourceVersion)
200 if err != nil {
201 return nil, nil, errors.Wrap(err, "Failed to insert resourceVersion in patch")
202 }
203 }
204
205 patchedObj, err := p.Helper.Patch(namespace, name, patchType, patch, nil)
206 return patch, patchedObj, err
207 }
208
209
210
211 func (p *Patcher) buildMergePatch(original, modified, current []byte) ([]byte, error) {
212 preconditions := []mergepatch.PreconditionFunc{mergepatch.RequireKeyUnchanged("apiVersion"),
213 mergepatch.RequireKeyUnchanged("kind"), mergepatch.RequireMetadataKeyUnchanged("name")}
214 patch, err := jsonmergepatch.CreateThreeWayJSONMergePatch(original, modified, current, preconditions...)
215 if err != nil {
216 if mergepatch.IsPreconditionFailed(err) {
217 return nil, fmt.Errorf("%s", "At least one of apiVersion, kind and name was changed")
218 }
219 return nil, err
220 }
221
222 return patch, nil
223 }
224
225
226
227 func (p *Patcher) gvkSupportsPatchOpenAPIV3(gvk schema.GroupVersionKind) (bool, error) {
228 gvSpec, err := p.OpenAPIV3Root.GVSpec(schema.GroupVersion{
229 Group: p.Mapping.GroupVersionKind.Group,
230 Version: p.Mapping.GroupVersionKind.Version,
231 })
232 if err != nil {
233 return false, err
234 }
235 if gvSpec == nil || gvSpec.Paths == nil || gvSpec.Paths.Paths == nil {
236 return false, fmt.Errorf("gvk group: %s, version: %s, kind: %s does not exist for OpenAPI V3", gvk.Group, gvk.Version, gvk.Kind)
237 }
238 for _, path := range gvSpec.Paths.Paths {
239 if path.Patch != nil {
240 if gvkMatchesSingle(p.Mapping.GroupVersionKind, path.Patch.Extensions) {
241 if path.Patch.RequestBody == nil || path.Patch.RequestBody.Content == nil {
242
243 return false, nil
244 }
245 if _, ok := path.Patch.RequestBody.Content["application/strategic-merge-patch+json"]; ok {
246 return true, nil
247 }
248
249 return false, nil
250 }
251 }
252 }
253 return false, nil
254 }
255
256 func gvkMatchesArray(targetGVK schema.GroupVersionKind, ext spec.Extensions) bool {
257 var gvkList []map[string]string
258 err := ext.GetObject(groupVersionKindExtensionKey, &gvkList)
259 if err != nil {
260 return false
261 }
262 for _, gvkMap := range gvkList {
263 if gvkMap["group"] == targetGVK.Group &&
264 gvkMap["version"] == targetGVK.Version &&
265 gvkMap["kind"] == targetGVK.Kind {
266 return true
267 }
268 }
269 return false
270 }
271
272 func gvkMatchesSingle(targetGVK schema.GroupVersionKind, ext spec.Extensions) bool {
273 var gvkMap map[string]string
274 err := ext.GetObject(groupVersionKindExtensionKey, &gvkMap)
275 if err != nil {
276 return false
277 }
278 return gvkMap["group"] == targetGVK.Group &&
279 gvkMap["version"] == targetGVK.Version &&
280 gvkMap["kind"] == targetGVK.Kind
281 }
282
283 func (p *Patcher) buildStrategicMergePatchFromOpenAPIV3(original, modified, current []byte) ([]byte, error) {
284 gvSpec, err := p.OpenAPIV3Root.GVSpec(schema.GroupVersion{
285 Group: p.Mapping.GroupVersionKind.Group,
286 Version: p.Mapping.GroupVersionKind.Version,
287 })
288 if err != nil {
289 return nil, err
290 }
291 if gvSpec == nil || gvSpec.Components == nil {
292 return nil, fmt.Errorf("OpenAPI V3 Components is nil")
293 }
294 for _, c := range gvSpec.Components.Schemas {
295 if !gvkMatchesArray(p.Mapping.GroupVersionKind, c.Extensions) {
296 continue
297 }
298 lookupPatchMeta := strategicpatch.PatchMetaFromOpenAPIV3{Schema: c, SchemaList: gvSpec.Components.Schemas}
299 if openapiv3Patch, err := strategicpatch.CreateThreeWayMergePatch(original, modified, current, lookupPatchMeta, p.Overwrite); err != nil {
300 return nil, err
301 } else {
302 return openapiv3Patch, nil
303 }
304
305 }
306 return nil, nil
307 }
308
309
310
311 func (p *Patcher) buildStrategicMergeFromOpenAPI(openAPISchema openapi.Resources, original, modified, current []byte) ([]byte, error) {
312 schema := openAPISchema.LookupResource(p.Mapping.GroupVersionKind)
313 if schema == nil {
314
315 return nil, nil
316 }
317 lookupPatchMeta := strategicpatch.PatchMetaFromOpenAPI{Schema: schema}
318 if openapiPatch, err := strategicpatch.CreateThreeWayMergePatch(original, modified, current, lookupPatchMeta, p.Overwrite); err != nil {
319 return nil, err
320 } else {
321 return openapiPatch, nil
322 }
323 }
324
325
326 func (p *Patcher) getPatchTypeFromOpenAPI(openAPISchema openapi.Resources, gvk schema.GroupVersionKind) (types.PatchType, error) {
327 if pc := openAPISchema.GetConsumes(p.Mapping.GroupVersionKind, "PATCH"); pc != nil {
328 for _, c := range pc {
329 if c == string(types.StrategicMergePatchType) {
330 return types.StrategicMergePatchType, nil
331 }
332 }
333
334 return types.MergePatchType, nil
335 }
336
337 return types.MergePatchType, fmt.Errorf("unable to find any patch type for %s in Open API", gvk)
338 }
339
340
341
342
343 func (p *Patcher) buildStrategicMergeFromBuiltins(versionedObj runtime.Object, original, modified, current []byte) ([]byte, error) {
344 lookupPatchMeta, err := strategicpatch.NewPatchMetaFromStruct(versionedObj)
345 if err != nil {
346 return nil, err
347 }
348 patch, err := strategicpatch.CreateThreeWayMergePatch(original, modified, current, lookupPatchMeta, p.Overwrite)
349 if err != nil {
350 return nil, err
351 }
352
353 return patch, nil
354 }
355
356
357
358 func (p *Patcher) Patch(current runtime.Object, modified []byte, source, namespace, name string, errOut io.Writer) ([]byte, runtime.Object, error) {
359 var getErr error
360 patchBytes, patchObject, err := p.patchSimple(current, modified, namespace, name, errOut)
361 if p.Retries == 0 {
362 p.Retries = maxPatchRetry
363 }
364 for i := 1; i <= p.Retries && apierrors.IsConflict(err); i++ {
365 if i > triesBeforeBackOff {
366 p.BackOff.Sleep(backOffPeriod)
367 }
368 current, getErr = p.Helper.Get(namespace, name)
369 if getErr != nil {
370 return nil, nil, getErr
371 }
372 patchBytes, patchObject, err = p.patchSimple(current, modified, namespace, name, errOut)
373 }
374 if err != nil {
375 if (apierrors.IsConflict(err) || apierrors.IsInvalid(err)) && p.Force {
376 patchBytes, patchObject, err = p.deleteAndCreate(current, modified, namespace, name)
377 } else {
378 err = cmdutil.AddSourceToErr("patching", source, err)
379 }
380 }
381 return patchBytes, patchObject, err
382 }
383
384 func (p *Patcher) deleteAndCreate(original runtime.Object, modified []byte, namespace, name string) ([]byte, runtime.Object, error) {
385 if err := p.delete(namespace, name); err != nil {
386 return modified, nil, err
387 }
388
389 if err := wait.PollImmediate(1*time.Second, p.Timeout, func() (bool, error) {
390 if _, err := p.Helper.Get(namespace, name); !apierrors.IsNotFound(err) {
391 return false, err
392 }
393 return true, nil
394 }); err != nil {
395 return modified, nil, err
396 }
397 versionedObject, _, err := unstructured.UnstructuredJSONScheme.Decode(modified, nil, nil)
398 if err != nil {
399 return modified, nil, err
400 }
401 createdObject, err := p.Helper.Create(namespace, true, versionedObject)
402 if err != nil {
403
404
405 recreated, recreateErr := p.Helper.Create(namespace, true, original)
406 if recreateErr != nil {
407 err = fmt.Errorf("An error occurred force-replacing the existing object with the newly provided one:\n\n%v.\n\nAdditionally, an error occurred attempting to restore the original object:\n\n%v", err, recreateErr)
408 } else {
409 createdObject = recreated
410 }
411 }
412 return modified, createdObject, err
413 }
414
415 func addResourceVersion(patch []byte, rv string) ([]byte, error) {
416 var patchMap map[string]interface{}
417 err := json.Unmarshal(patch, &patchMap)
418 if err != nil {
419 return nil, err
420 }
421 u := unstructured.Unstructured{Object: patchMap}
422 a, err := meta.Accessor(&u)
423 if err != nil {
424 return nil, err
425 }
426 a.SetResourceVersion(rv)
427
428 return json.Marshal(patchMap)
429 }
430
View as plain text