1
16
17 package controller
18
19 import (
20 "context"
21 "encoding/json"
22 "fmt"
23 "sync"
24
25 apps "k8s.io/api/apps/v1"
26 v1 "k8s.io/api/core/v1"
27 "k8s.io/apimachinery/pkg/api/errors"
28 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29 "k8s.io/apimachinery/pkg/labels"
30 "k8s.io/apimachinery/pkg/runtime/schema"
31 "k8s.io/apimachinery/pkg/types"
32 utilerrors "k8s.io/apimachinery/pkg/util/errors"
33 "k8s.io/klog/v2"
34 )
35
36 type BaseControllerRefManager struct {
37 Controller metav1.Object
38 Selector labels.Selector
39
40 canAdoptErr error
41 canAdoptOnce sync.Once
42 CanAdoptFunc func(ctx context.Context) error
43 }
44
45 func (m *BaseControllerRefManager) CanAdopt(ctx context.Context) error {
46 m.canAdoptOnce.Do(func() {
47 if m.CanAdoptFunc != nil {
48 m.canAdoptErr = m.CanAdoptFunc(ctx)
49 }
50 })
51 return m.canAdoptErr
52 }
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69 func (m *BaseControllerRefManager) ClaimObject(ctx context.Context, obj metav1.Object, match func(metav1.Object) bool, adopt, release func(context.Context, metav1.Object) error) (bool, error) {
70 controllerRef := metav1.GetControllerOfNoCopy(obj)
71 if controllerRef != nil {
72 if controllerRef.UID != m.Controller.GetUID() {
73
74 return false, nil
75 }
76 if match(obj) {
77
78
79
80
81 return true, nil
82 }
83
84
85 if m.Controller.GetDeletionTimestamp() != nil {
86 return false, nil
87 }
88 if err := release(ctx, obj); err != nil {
89
90 if errors.IsNotFound(err) {
91 return false, nil
92 }
93
94
95 return false, err
96 }
97
98 return false, nil
99 }
100
101
102 if m.Controller.GetDeletionTimestamp() != nil || !match(obj) {
103
104 return false, nil
105 }
106 if obj.GetDeletionTimestamp() != nil {
107
108 return false, nil
109 }
110
111 if len(m.Controller.GetNamespace()) > 0 && m.Controller.GetNamespace() != obj.GetNamespace() {
112
113 return false, nil
114 }
115
116
117 if err := adopt(ctx, obj); err != nil {
118
119 if errors.IsNotFound(err) {
120 return false, nil
121 }
122
123
124 return false, err
125 }
126
127 return true, nil
128 }
129
130 type PodControllerRefManager struct {
131 BaseControllerRefManager
132 controllerKind schema.GroupVersionKind
133 podControl PodControlInterface
134 finalizers []string
135 }
136
137
138
139
140
141
142
143
144
145
146
147
148 func NewPodControllerRefManager(
149 podControl PodControlInterface,
150 controller metav1.Object,
151 selector labels.Selector,
152 controllerKind schema.GroupVersionKind,
153 canAdopt func(ctx context.Context) error,
154 finalizers ...string,
155 ) *PodControllerRefManager {
156 return &PodControllerRefManager{
157 BaseControllerRefManager: BaseControllerRefManager{
158 Controller: controller,
159 Selector: selector,
160 CanAdoptFunc: canAdopt,
161 },
162 controllerKind: controllerKind,
163 podControl: podControl,
164 finalizers: finalizers,
165 }
166 }
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183 func (m *PodControllerRefManager) ClaimPods(ctx context.Context, pods []*v1.Pod, filters ...func(*v1.Pod) bool) ([]*v1.Pod, error) {
184 var claimed []*v1.Pod
185 var errlist []error
186
187 match := func(obj metav1.Object) bool {
188 pod := obj.(*v1.Pod)
189
190 if !m.Selector.Matches(labels.Set(pod.Labels)) {
191 return false
192 }
193 for _, filter := range filters {
194 if !filter(pod) {
195 return false
196 }
197 }
198 return true
199 }
200 adopt := func(ctx context.Context, obj metav1.Object) error {
201 return m.AdoptPod(ctx, obj.(*v1.Pod))
202 }
203 release := func(ctx context.Context, obj metav1.Object) error {
204 return m.ReleasePod(ctx, obj.(*v1.Pod))
205 }
206
207 for _, pod := range pods {
208 ok, err := m.ClaimObject(ctx, pod, match, adopt, release)
209 if err != nil {
210 errlist = append(errlist, err)
211 continue
212 }
213 if ok {
214 claimed = append(claimed, pod)
215 }
216 }
217 return claimed, utilerrors.NewAggregate(errlist)
218 }
219
220
221
222 func (m *PodControllerRefManager) AdoptPod(ctx context.Context, pod *v1.Pod) error {
223 if err := m.CanAdopt(ctx); err != nil {
224 return fmt.Errorf("can't adopt Pod %v/%v (%v): %v", pod.Namespace, pod.Name, pod.UID, err)
225 }
226
227
228
229 patchBytes, err := ownerRefControllerPatch(m.Controller, m.controllerKind, pod.UID, m.finalizers...)
230 if err != nil {
231 return err
232 }
233 return m.podControl.PatchPod(ctx, pod.Namespace, pod.Name, patchBytes)
234 }
235
236
237
238 func (m *PodControllerRefManager) ReleasePod(ctx context.Context, pod *v1.Pod) error {
239 logger := klog.FromContext(ctx)
240 logger.V(2).Info("Patching pod to remove its controllerRef", "pod", klog.KObj(pod), "gvk", m.controllerKind, "controller", m.Controller.GetName())
241 patchBytes, err := GenerateDeleteOwnerRefStrategicMergeBytes(pod.UID, []types.UID{m.Controller.GetUID()}, m.finalizers...)
242 if err != nil {
243 return err
244 }
245 err = m.podControl.PatchPod(ctx, pod.Namespace, pod.Name, patchBytes)
246 if err != nil {
247 if errors.IsNotFound(err) {
248
249 return nil
250 }
251 if errors.IsInvalid(err) {
252
253
254
255
256
257
258
259
260 return nil
261 }
262 }
263 return err
264 }
265
266
267
268
269
270
271 type ReplicaSetControllerRefManager struct {
272 BaseControllerRefManager
273 controllerKind schema.GroupVersionKind
274 rsControl RSControlInterface
275 }
276
277
278
279
280
281
282
283
284
285
286
287
288 func NewReplicaSetControllerRefManager(
289 rsControl RSControlInterface,
290 controller metav1.Object,
291 selector labels.Selector,
292 controllerKind schema.GroupVersionKind,
293 canAdopt func(ctx context.Context) error,
294 ) *ReplicaSetControllerRefManager {
295 return &ReplicaSetControllerRefManager{
296 BaseControllerRefManager: BaseControllerRefManager{
297 Controller: controller,
298 Selector: selector,
299 CanAdoptFunc: canAdopt,
300 },
301 controllerKind: controllerKind,
302 rsControl: rsControl,
303 }
304 }
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319 func (m *ReplicaSetControllerRefManager) ClaimReplicaSets(ctx context.Context, sets []*apps.ReplicaSet) ([]*apps.ReplicaSet, error) {
320 var claimed []*apps.ReplicaSet
321 var errlist []error
322
323 match := func(obj metav1.Object) bool {
324 return m.Selector.Matches(labels.Set(obj.GetLabels()))
325 }
326 adopt := func(ctx context.Context, obj metav1.Object) error {
327 return m.AdoptReplicaSet(ctx, obj.(*apps.ReplicaSet))
328 }
329 release := func(ctx context.Context, obj metav1.Object) error {
330 return m.ReleaseReplicaSet(ctx, obj.(*apps.ReplicaSet))
331 }
332
333 for _, rs := range sets {
334 ok, err := m.ClaimObject(ctx, rs, match, adopt, release)
335 if err != nil {
336 errlist = append(errlist, err)
337 continue
338 }
339 if ok {
340 claimed = append(claimed, rs)
341 }
342 }
343 return claimed, utilerrors.NewAggregate(errlist)
344 }
345
346
347
348 func (m *ReplicaSetControllerRefManager) AdoptReplicaSet(ctx context.Context, rs *apps.ReplicaSet) error {
349 if err := m.CanAdopt(ctx); err != nil {
350 return fmt.Errorf("can't adopt ReplicaSet %v/%v (%v): %v", rs.Namespace, rs.Name, rs.UID, err)
351 }
352
353
354 patchBytes, err := ownerRefControllerPatch(m.Controller, m.controllerKind, rs.UID)
355 if err != nil {
356 return err
357 }
358 return m.rsControl.PatchReplicaSet(ctx, rs.Namespace, rs.Name, patchBytes)
359 }
360
361
362
363 func (m *ReplicaSetControllerRefManager) ReleaseReplicaSet(ctx context.Context, replicaSet *apps.ReplicaSet) error {
364 logger := klog.FromContext(ctx)
365 logger.V(2).Info("Patching ReplicaSet to remove its controllerRef", "replicaSet", klog.KObj(replicaSet), "gvk", m.controllerKind, "controller", m.Controller.GetName())
366 patchBytes, err := GenerateDeleteOwnerRefStrategicMergeBytes(replicaSet.UID, []types.UID{m.Controller.GetUID()})
367 if err != nil {
368 return err
369 }
370 err = m.rsControl.PatchReplicaSet(ctx, replicaSet.Namespace, replicaSet.Name, patchBytes)
371 if err != nil {
372 if errors.IsNotFound(err) {
373
374 return nil
375 }
376 if errors.IsInvalid(err) {
377
378
379
380
381 return nil
382 }
383 }
384 return err
385 }
386
387
388
389
390
391 func RecheckDeletionTimestamp(getObject func(context.Context) (metav1.Object, error)) func(context.Context) error {
392 return func(ctx context.Context) error {
393 obj, err := getObject(ctx)
394 if err != nil {
395 return fmt.Errorf("can't recheck DeletionTimestamp: %v", err)
396 }
397 if obj.GetDeletionTimestamp() != nil {
398 return fmt.Errorf("%v/%v has just been deleted at %v", obj.GetNamespace(), obj.GetName(), obj.GetDeletionTimestamp())
399 }
400 return nil
401 }
402 }
403
404
405
406
407
408
409 type ControllerRevisionControllerRefManager struct {
410 BaseControllerRefManager
411 controllerKind schema.GroupVersionKind
412 crControl ControllerRevisionControlInterface
413 }
414
415
416
417
418
419
420
421
422
423
424
425
426 func NewControllerRevisionControllerRefManager(
427 crControl ControllerRevisionControlInterface,
428 controller metav1.Object,
429 selector labels.Selector,
430 controllerKind schema.GroupVersionKind,
431 canAdopt func(ctx context.Context) error,
432 ) *ControllerRevisionControllerRefManager {
433 return &ControllerRevisionControllerRefManager{
434 BaseControllerRefManager: BaseControllerRefManager{
435 Controller: controller,
436 Selector: selector,
437 CanAdoptFunc: canAdopt,
438 },
439 controllerKind: controllerKind,
440 crControl: crControl,
441 }
442 }
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457 func (m *ControllerRevisionControllerRefManager) ClaimControllerRevisions(ctx context.Context, histories []*apps.ControllerRevision) ([]*apps.ControllerRevision, error) {
458 var claimed []*apps.ControllerRevision
459 var errlist []error
460
461 match := func(obj metav1.Object) bool {
462 return m.Selector.Matches(labels.Set(obj.GetLabels()))
463 }
464 adopt := func(ctx context.Context, obj metav1.Object) error {
465 return m.AdoptControllerRevision(ctx, obj.(*apps.ControllerRevision))
466 }
467 release := func(ctx context.Context, obj metav1.Object) error {
468 return m.ReleaseControllerRevision(ctx, obj.(*apps.ControllerRevision))
469 }
470
471 for _, h := range histories {
472 ok, err := m.ClaimObject(ctx, h, match, adopt, release)
473 if err != nil {
474 errlist = append(errlist, err)
475 continue
476 }
477 if ok {
478 claimed = append(claimed, h)
479 }
480 }
481 return claimed, utilerrors.NewAggregate(errlist)
482 }
483
484
485
486 func (m *ControllerRevisionControllerRefManager) AdoptControllerRevision(ctx context.Context, history *apps.ControllerRevision) error {
487 if err := m.CanAdopt(ctx); err != nil {
488 return fmt.Errorf("can't adopt ControllerRevision %v/%v (%v): %v", history.Namespace, history.Name, history.UID, err)
489 }
490
491
492 patchBytes, err := ownerRefControllerPatch(m.Controller, m.controllerKind, history.UID)
493 if err != nil {
494 return err
495 }
496 return m.crControl.PatchControllerRevision(ctx, history.Namespace, history.Name, patchBytes)
497 }
498
499
500
501 func (m *ControllerRevisionControllerRefManager) ReleaseControllerRevision(ctx context.Context, history *apps.ControllerRevision) error {
502 logger := klog.FromContext(ctx)
503 logger.V(2).Info("Patching ControllerRevision to remove its controllerRef", "controllerRevision", klog.KObj(history), "gvk", m.controllerKind, "controller", m.Controller.GetName())
504 patchBytes, err := GenerateDeleteOwnerRefStrategicMergeBytes(history.UID, []types.UID{m.Controller.GetUID()})
505 if err != nil {
506 return err
507 }
508
509 err = m.crControl.PatchControllerRevision(ctx, history.Namespace, history.Name, patchBytes)
510 if err != nil {
511 if errors.IsNotFound(err) {
512
513 return nil
514 }
515 if errors.IsInvalid(err) {
516
517
518
519
520 return nil
521 }
522 }
523 return err
524 }
525
526 type objectForAddOwnerRefPatch struct {
527 Metadata objectMetaForPatch `json:"metadata"`
528 }
529
530 type objectMetaForPatch struct {
531 OwnerReferences []metav1.OwnerReference `json:"ownerReferences"`
532 UID types.UID `json:"uid"`
533 Finalizers []string `json:"finalizers,omitempty"`
534 }
535
536 func ownerRefControllerPatch(controller metav1.Object, controllerKind schema.GroupVersionKind, uid types.UID, finalizers ...string) ([]byte, error) {
537 blockOwnerDeletion := true
538 isController := true
539 addControllerPatch := objectForAddOwnerRefPatch{
540 Metadata: objectMetaForPatch{
541 UID: uid,
542 OwnerReferences: []metav1.OwnerReference{
543 {
544 APIVersion: controllerKind.GroupVersion().String(),
545 Kind: controllerKind.Kind,
546 Name: controller.GetName(),
547 UID: controller.GetUID(),
548 Controller: &isController,
549 BlockOwnerDeletion: &blockOwnerDeletion,
550 },
551 },
552 Finalizers: finalizers,
553 },
554 }
555 patchBytes, err := json.Marshal(&addControllerPatch)
556 if err != nil {
557 return nil, err
558 }
559 return patchBytes, nil
560 }
561
562 type objectForDeleteOwnerRefStrategicMergePatch struct {
563 Metadata objectMetaForMergePatch `json:"metadata"`
564 }
565
566 type objectMetaForMergePatch struct {
567 UID types.UID `json:"uid"`
568 OwnerReferences []map[string]string `json:"ownerReferences"`
569 DeleteFinalizers []string `json:"$deleteFromPrimitiveList/finalizers,omitempty"`
570 }
571
572 func GenerateDeleteOwnerRefStrategicMergeBytes(dependentUID types.UID, ownerUIDs []types.UID, finalizers ...string) ([]byte, error) {
573 var ownerReferences []map[string]string
574 for _, ownerUID := range ownerUIDs {
575 ownerReferences = append(ownerReferences, ownerReference(ownerUID, "delete"))
576 }
577 patch := objectForDeleteOwnerRefStrategicMergePatch{
578 Metadata: objectMetaForMergePatch{
579 UID: dependentUID,
580 OwnerReferences: ownerReferences,
581 DeleteFinalizers: finalizers,
582 },
583 }
584 patchBytes, err := json.Marshal(&patch)
585 if err != nil {
586 return nil, err
587 }
588 return patchBytes, nil
589 }
590
591 func ownerReference(uid types.UID, patchType string) map[string]string {
592 return map[string]string{
593 "$patch": patchType,
594 "uid": string(uid),
595 }
596 }
597
View as plain text