1
16
17
18
19
20
21
22
23
24
25
26
27
28 package replicaset
29
30 import (
31 "context"
32 "fmt"
33 "reflect"
34 "sort"
35 "strings"
36 "sync"
37 "time"
38
39 apps "k8s.io/api/apps/v1"
40 v1 "k8s.io/api/core/v1"
41 apierrors "k8s.io/apimachinery/pkg/api/errors"
42 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
43 "k8s.io/apimachinery/pkg/labels"
44 "k8s.io/apimachinery/pkg/runtime/schema"
45 "k8s.io/apimachinery/pkg/types"
46 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
47 "k8s.io/apimachinery/pkg/util/wait"
48 appsinformers "k8s.io/client-go/informers/apps/v1"
49 coreinformers "k8s.io/client-go/informers/core/v1"
50 clientset "k8s.io/client-go/kubernetes"
51 "k8s.io/client-go/kubernetes/scheme"
52 v1core "k8s.io/client-go/kubernetes/typed/core/v1"
53 appslisters "k8s.io/client-go/listers/apps/v1"
54 corelisters "k8s.io/client-go/listers/core/v1"
55 "k8s.io/client-go/tools/cache"
56 "k8s.io/client-go/tools/record"
57 "k8s.io/client-go/util/workqueue"
58 "k8s.io/component-base/metrics/legacyregistry"
59 "k8s.io/klog/v2"
60 podutil "k8s.io/kubernetes/pkg/api/v1/pod"
61 "k8s.io/kubernetes/pkg/controller"
62 "k8s.io/kubernetes/pkg/controller/replicaset/metrics"
63 )
64
65 const (
66
67
68 BurstReplicas = 500
69
70
71 statusUpdateRetries = 1
72
73
74
75 controllerUIDIndex = "controllerUID"
76 )
77
78
79
80 type ReplicaSetController struct {
81
82
83
84 schema.GroupVersionKind
85
86 kubeClient clientset.Interface
87 podControl controller.PodControlInterface
88
89 eventBroadcaster record.EventBroadcaster
90
91
92
93 burstReplicas int
94
95 syncHandler func(ctx context.Context, rsKey string) error
96
97
98 expectations *controller.UIDTrackingControllerExpectations
99
100
101 rsLister appslisters.ReplicaSetLister
102
103
104 rsListerSynced cache.InformerSynced
105 rsIndexer cache.Indexer
106
107
108 podLister corelisters.PodLister
109
110
111 podListerSynced cache.InformerSynced
112
113
114 queue workqueue.RateLimitingInterface
115 }
116
117
118 func NewReplicaSetController(ctx context.Context, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int) *ReplicaSetController {
119 logger := klog.FromContext(ctx)
120 eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
121 if err := metrics.Register(legacyregistry.Register); err != nil {
122 logger.Error(err, "unable to register metrics")
123 }
124 return NewBaseController(logger, rsInformer, podInformer, kubeClient, burstReplicas,
125 apps.SchemeGroupVersion.WithKind("ReplicaSet"),
126 "replicaset_controller",
127 "replicaset",
128 controller.RealPodControl{
129 KubeClient: kubeClient,
130 Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "replicaset-controller"}),
131 },
132 eventBroadcaster,
133 )
134 }
135
136
137
138 func NewBaseController(logger klog.Logger, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int,
139 gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface, eventBroadcaster record.EventBroadcaster) *ReplicaSetController {
140
141 rsc := &ReplicaSetController{
142 GroupVersionKind: gvk,
143 kubeClient: kubeClient,
144 podControl: podControl,
145 eventBroadcaster: eventBroadcaster,
146 burstReplicas: burstReplicas,
147 expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
148 queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), queueName),
149 }
150
151 rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
152 AddFunc: func(obj interface{}) {
153 rsc.addRS(logger, obj)
154 },
155 UpdateFunc: func(oldObj, newObj interface{}) {
156 rsc.updateRS(logger, oldObj, newObj)
157 },
158 DeleteFunc: func(obj interface{}) {
159 rsc.deleteRS(logger, obj)
160 },
161 })
162 rsInformer.Informer().AddIndexers(cache.Indexers{
163 controllerUIDIndex: func(obj interface{}) ([]string, error) {
164 rs, ok := obj.(*apps.ReplicaSet)
165 if !ok {
166 return []string{}, nil
167 }
168 controllerRef := metav1.GetControllerOf(rs)
169 if controllerRef == nil {
170 return []string{}, nil
171 }
172 return []string{string(controllerRef.UID)}, nil
173 },
174 })
175 rsc.rsIndexer = rsInformer.Informer().GetIndexer()
176 rsc.rsLister = rsInformer.Lister()
177 rsc.rsListerSynced = rsInformer.Informer().HasSynced
178
179 podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
180 AddFunc: func(obj interface{}) {
181 rsc.addPod(logger, obj)
182 },
183
184
185
186 UpdateFunc: func(oldObj, newObj interface{}) {
187 rsc.updatePod(logger, oldObj, newObj)
188 },
189 DeleteFunc: func(obj interface{}) {
190 rsc.deletePod(logger, obj)
191 },
192 })
193 rsc.podLister = podInformer.Lister()
194 rsc.podListerSynced = podInformer.Informer().HasSynced
195
196 rsc.syncHandler = rsc.syncReplicaSet
197
198 return rsc
199 }
200
201
202 func (rsc *ReplicaSetController) Run(ctx context.Context, workers int) {
203 defer utilruntime.HandleCrash()
204
205
206 rsc.eventBroadcaster.StartStructuredLogging(3)
207 rsc.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: rsc.kubeClient.CoreV1().Events("")})
208 defer rsc.eventBroadcaster.Shutdown()
209
210 defer rsc.queue.ShutDown()
211
212 controllerName := strings.ToLower(rsc.Kind)
213 logger := klog.FromContext(ctx)
214 logger.Info("Starting controller", "name", controllerName)
215 defer logger.Info("Shutting down controller", "name", controllerName)
216
217 if !cache.WaitForNamedCacheSync(rsc.Kind, ctx.Done(), rsc.podListerSynced, rsc.rsListerSynced) {
218 return
219 }
220
221 for i := 0; i < workers; i++ {
222 go wait.UntilWithContext(ctx, rsc.worker, time.Second)
223 }
224
225 <-ctx.Done()
226 }
227
228
229
230 func (rsc *ReplicaSetController) getReplicaSetsWithSameController(logger klog.Logger, rs *apps.ReplicaSet) []*apps.ReplicaSet {
231 controllerRef := metav1.GetControllerOf(rs)
232 if controllerRef == nil {
233 utilruntime.HandleError(fmt.Errorf("ReplicaSet has no controller: %v", rs))
234 return nil
235 }
236
237 objects, err := rsc.rsIndexer.ByIndex(controllerUIDIndex, string(controllerRef.UID))
238 if err != nil {
239 utilruntime.HandleError(err)
240 return nil
241 }
242 relatedRSs := make([]*apps.ReplicaSet, 0, len(objects))
243 for _, obj := range objects {
244 relatedRSs = append(relatedRSs, obj.(*apps.ReplicaSet))
245 }
246
247 if klogV := logger.V(2); klogV.Enabled() {
248 klogV.Info("Found related ReplicaSets", "replicaSet", klog.KObj(rs), "relatedReplicaSets", klog.KObjSlice(relatedRSs))
249 }
250
251 return relatedRSs
252 }
253
254
255 func (rsc *ReplicaSetController) getPodReplicaSets(pod *v1.Pod) []*apps.ReplicaSet {
256 rss, err := rsc.rsLister.GetPodReplicaSets(pod)
257 if err != nil {
258 return nil
259 }
260 if len(rss) > 1 {
261
262
263 utilruntime.HandleError(fmt.Errorf("user error! more than one %v is selecting pods with labels: %+v", rsc.Kind, pod.Labels))
264 }
265 return rss
266 }
267
268
269
270
271 func (rsc *ReplicaSetController) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *apps.ReplicaSet {
272
273
274 if controllerRef.Kind != rsc.Kind {
275 return nil
276 }
277 rs, err := rsc.rsLister.ReplicaSets(namespace).Get(controllerRef.Name)
278 if err != nil {
279 return nil
280 }
281 if rs.UID != controllerRef.UID {
282
283
284 return nil
285 }
286 return rs
287 }
288
289 func (rsc *ReplicaSetController) enqueueRS(rs *apps.ReplicaSet) {
290 key, err := controller.KeyFunc(rs)
291 if err != nil {
292 utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", rs, err))
293 return
294 }
295
296 rsc.queue.Add(key)
297 }
298
299 func (rsc *ReplicaSetController) enqueueRSAfter(rs *apps.ReplicaSet, duration time.Duration) {
300 key, err := controller.KeyFunc(rs)
301 if err != nil {
302 utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", rs, err))
303 return
304 }
305
306 rsc.queue.AddAfter(key, duration)
307 }
308
309 func (rsc *ReplicaSetController) addRS(logger klog.Logger, obj interface{}) {
310 rs := obj.(*apps.ReplicaSet)
311 logger.V(4).Info("Adding", "replicaSet", klog.KObj(rs))
312 rsc.enqueueRS(rs)
313 }
314
315
316 func (rsc *ReplicaSetController) updateRS(logger klog.Logger, old, cur interface{}) {
317 oldRS := old.(*apps.ReplicaSet)
318 curRS := cur.(*apps.ReplicaSet)
319
320
321 if curRS.UID != oldRS.UID {
322 key, err := controller.KeyFunc(oldRS)
323 if err != nil {
324 utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", oldRS, err))
325 return
326 }
327 rsc.deleteRS(logger, cache.DeletedFinalStateUnknown{
328 Key: key,
329 Obj: oldRS,
330 })
331 }
332
333
334
335
336
337
338
339
340
341
342
343
344
345 if *(oldRS.Spec.Replicas) != *(curRS.Spec.Replicas) {
346 logger.V(4).Info("replicaSet updated. Desired pod count change.", "replicaSet", klog.KObj(oldRS), "oldReplicas", *(oldRS.Spec.Replicas), "newReplicas", *(curRS.Spec.Replicas))
347 }
348 rsc.enqueueRS(curRS)
349 }
350
351 func (rsc *ReplicaSetController) deleteRS(logger klog.Logger, obj interface{}) {
352 rs, ok := obj.(*apps.ReplicaSet)
353 if !ok {
354 tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
355 if !ok {
356 utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
357 return
358 }
359 rs, ok = tombstone.Obj.(*apps.ReplicaSet)
360 if !ok {
361 utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a ReplicaSet %#v", obj))
362 return
363 }
364 }
365
366 key, err := controller.KeyFunc(rs)
367 if err != nil {
368 utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", rs, err))
369 return
370 }
371
372 logger.V(4).Info("Deleting", "replicaSet", klog.KObj(rs))
373
374
375 rsc.expectations.DeleteExpectations(logger, key)
376
377 rsc.queue.Add(key)
378 }
379
380
381 func (rsc *ReplicaSetController) addPod(logger klog.Logger, obj interface{}) {
382 pod := obj.(*v1.Pod)
383
384 if pod.DeletionTimestamp != nil {
385
386
387 rsc.deletePod(logger, pod)
388 return
389 }
390
391
392 if controllerRef := metav1.GetControllerOf(pod); controllerRef != nil {
393 rs := rsc.resolveControllerRef(pod.Namespace, controllerRef)
394 if rs == nil {
395 return
396 }
397 rsKey, err := controller.KeyFunc(rs)
398 if err != nil {
399 return
400 }
401 logger.V(4).Info("Pod created", "pod", klog.KObj(pod), "detail", pod)
402 rsc.expectations.CreationObserved(logger, rsKey)
403 rsc.queue.Add(rsKey)
404 return
405 }
406
407
408
409
410
411 rss := rsc.getPodReplicaSets(pod)
412 if len(rss) == 0 {
413 return
414 }
415 logger.V(4).Info("Orphan Pod created", "pod", klog.KObj(pod), "detail", pod)
416 for _, rs := range rss {
417 rsc.enqueueRS(rs)
418 }
419 }
420
421
422
423
424 func (rsc *ReplicaSetController) updatePod(logger klog.Logger, old, cur interface{}) {
425 curPod := cur.(*v1.Pod)
426 oldPod := old.(*v1.Pod)
427 if curPod.ResourceVersion == oldPod.ResourceVersion {
428
429
430 return
431 }
432
433 labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
434 if curPod.DeletionTimestamp != nil {
435
436
437
438
439
440 rsc.deletePod(logger, curPod)
441 if labelChanged {
442
443 rsc.deletePod(logger, oldPod)
444 }
445 return
446 }
447
448 curControllerRef := metav1.GetControllerOf(curPod)
449 oldControllerRef := metav1.GetControllerOf(oldPod)
450 controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
451 if controllerRefChanged && oldControllerRef != nil {
452
453 if rs := rsc.resolveControllerRef(oldPod.Namespace, oldControllerRef); rs != nil {
454 rsc.enqueueRS(rs)
455 }
456 }
457
458
459 if curControllerRef != nil {
460 rs := rsc.resolveControllerRef(curPod.Namespace, curControllerRef)
461 if rs == nil {
462 return
463 }
464 logger.V(4).Info("Pod objectMeta updated.", "pod", klog.KObj(oldPod), "oldObjectMeta", oldPod.ObjectMeta, "curObjectMeta", curPod.ObjectMeta)
465 rsc.enqueueRS(rs)
466
467
468
469
470
471
472
473 if !podutil.IsPodReady(oldPod) && podutil.IsPodReady(curPod) && rs.Spec.MinReadySeconds > 0 {
474 logger.V(2).Info("pod will be enqueued after a while for availability check", "duration", rs.Spec.MinReadySeconds, "kind", rsc.Kind, "pod", klog.KObj(oldPod))
475
476
477 rsc.enqueueRSAfter(rs, (time.Duration(rs.Spec.MinReadySeconds)*time.Second)+time.Second)
478 }
479 return
480 }
481
482
483
484 if labelChanged || controllerRefChanged {
485 rss := rsc.getPodReplicaSets(curPod)
486 if len(rss) == 0 {
487 return
488 }
489 logger.V(4).Info("Orphan Pod objectMeta updated.", "pod", klog.KObj(oldPod), "oldObjectMeta", oldPod.ObjectMeta, "curObjectMeta", curPod.ObjectMeta)
490 for _, rs := range rss {
491 rsc.enqueueRS(rs)
492 }
493 }
494 }
495
496
497
498 func (rsc *ReplicaSetController) deletePod(logger klog.Logger, obj interface{}) {
499 pod, ok := obj.(*v1.Pod)
500
501
502
503
504
505 if !ok {
506 tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
507 if !ok {
508 utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %+v", obj))
509 return
510 }
511 pod, ok = tombstone.Obj.(*v1.Pod)
512 if !ok {
513 utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %#v", obj))
514 return
515 }
516 }
517
518 controllerRef := metav1.GetControllerOf(pod)
519 if controllerRef == nil {
520
521 return
522 }
523 rs := rsc.resolveControllerRef(pod.Namespace, controllerRef)
524 if rs == nil {
525 return
526 }
527 rsKey, err := controller.KeyFunc(rs)
528 if err != nil {
529 utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", rs, err))
530 return
531 }
532 logger.V(4).Info("Pod deleted", "delete_by", utilruntime.GetCaller(), "deletion_timestamp", pod.DeletionTimestamp, "pod", klog.KObj(pod))
533 rsc.expectations.DeletionObserved(logger, rsKey, controller.PodKey(pod))
534 rsc.queue.Add(rsKey)
535 }
536
537
538
539 func (rsc *ReplicaSetController) worker(ctx context.Context) {
540 for rsc.processNextWorkItem(ctx) {
541 }
542 }
543
544 func (rsc *ReplicaSetController) processNextWorkItem(ctx context.Context) bool {
545 key, quit := rsc.queue.Get()
546 if quit {
547 return false
548 }
549 defer rsc.queue.Done(key)
550
551 err := rsc.syncHandler(ctx, key.(string))
552 if err == nil {
553 rsc.queue.Forget(key)
554 return true
555 }
556
557 utilruntime.HandleError(fmt.Errorf("sync %q failed with %v", key, err))
558 rsc.queue.AddRateLimited(key)
559
560 return true
561 }
562
563
564
565
566 func (rsc *ReplicaSetController) manageReplicas(ctx context.Context, filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
567 diff := len(filteredPods) - int(*(rs.Spec.Replicas))
568 rsKey, err := controller.KeyFunc(rs)
569 if err != nil {
570 utilruntime.HandleError(fmt.Errorf("couldn't get key for %v %#v: %v", rsc.Kind, rs, err))
571 return nil
572 }
573 logger := klog.FromContext(ctx)
574 if diff < 0 {
575 diff *= -1
576 if diff > rsc.burstReplicas {
577 diff = rsc.burstReplicas
578 }
579
580
581
582
583
584 rsc.expectations.ExpectCreations(logger, rsKey, diff)
585 logger.V(2).Info("Too few replicas", "replicaSet", klog.KObj(rs), "need", *(rs.Spec.Replicas), "creating", diff)
586
587
588
589
590
591
592
593
594 successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {
595 err := rsc.podControl.CreatePods(ctx, rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind))
596 if err != nil {
597 if apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
598
599
600 return nil
601 }
602 }
603 return err
604 })
605
606
607
608
609 if skippedPods := diff - successfulCreations; skippedPods > 0 {
610 logger.V(2).Info("Slow-start failure. Skipping creation of pods, decrementing expectations", "podsSkipped", skippedPods, "kind", rsc.Kind, "replicaSet", klog.KObj(rs))
611 for i := 0; i < skippedPods; i++ {
612
613 rsc.expectations.CreationObserved(logger, rsKey)
614 }
615 }
616 return err
617 } else if diff > 0 {
618 if diff > rsc.burstReplicas {
619 diff = rsc.burstReplicas
620 }
621 logger.V(2).Info("Too many replicas", "replicaSet", klog.KObj(rs), "need", *(rs.Spec.Replicas), "deleting", diff)
622
623 relatedPods, err := rsc.getIndirectlyRelatedPods(logger, rs)
624 utilruntime.HandleError(err)
625
626
627 podsToDelete := getPodsToDelete(filteredPods, relatedPods, diff)
628
629
630
631
632
633
634
635 rsc.expectations.ExpectDeletions(logger, rsKey, getPodKeys(podsToDelete))
636
637 errCh := make(chan error, diff)
638 var wg sync.WaitGroup
639 wg.Add(diff)
640 for _, pod := range podsToDelete {
641 go func(targetPod *v1.Pod) {
642 defer wg.Done()
643 if err := rsc.podControl.DeletePod(ctx, rs.Namespace, targetPod.Name, rs); err != nil {
644
645 podKey := controller.PodKey(targetPod)
646 rsc.expectations.DeletionObserved(logger, rsKey, podKey)
647 if !apierrors.IsNotFound(err) {
648 logger.V(2).Info("Failed to delete pod, decremented expectations", "pod", podKey, "kind", rsc.Kind, "replicaSet", klog.KObj(rs))
649 errCh <- err
650 }
651 }
652 }(pod)
653 }
654 wg.Wait()
655
656 select {
657 case err := <-errCh:
658
659 if err != nil {
660 return err
661 }
662 default:
663 }
664 }
665
666 return nil
667 }
668
669
670
671
672 func (rsc *ReplicaSetController) syncReplicaSet(ctx context.Context, key string) error {
673 logger := klog.FromContext(ctx)
674 startTime := time.Now()
675 defer func() {
676 logger.Info("Finished syncing", "kind", rsc.Kind, "key", key, "duration", time.Since(startTime))
677 }()
678
679 namespace, name, err := cache.SplitMetaNamespaceKey(key)
680 if err != nil {
681 return err
682 }
683 rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
684 if apierrors.IsNotFound(err) {
685 logger.V(4).Info("deleted", "kind", rsc.Kind, "key", key)
686 rsc.expectations.DeleteExpectations(logger, key)
687 return nil
688 }
689 if err != nil {
690 return err
691 }
692
693 rsNeedsSync := rsc.expectations.SatisfiedExpectations(logger, key)
694 selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
695 if err != nil {
696 utilruntime.HandleError(fmt.Errorf("error converting pod selector to selector for rs %v/%v: %v", namespace, name, err))
697 return nil
698 }
699
700
701
702
703 allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
704 if err != nil {
705 return err
706 }
707
708 filteredPods := controller.FilterActivePods(logger, allPods)
709
710
711
712 filteredPods, err = rsc.claimPods(ctx, rs, selector, filteredPods)
713 if err != nil {
714 return err
715 }
716
717 var manageReplicasErr error
718 if rsNeedsSync && rs.DeletionTimestamp == nil {
719 manageReplicasErr = rsc.manageReplicas(ctx, filteredPods, rs)
720 }
721 rs = rs.DeepCopy()
722 newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)
723
724
725 updatedRS, err := updateReplicaSetStatus(logger, rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)
726 if err != nil {
727
728
729 return err
730 }
731
732 if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 &&
733 updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) &&
734 updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) {
735 rsc.queue.AddAfter(key, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second)
736 }
737 return manageReplicasErr
738 }
739
740 func (rsc *ReplicaSetController) claimPods(ctx context.Context, rs *apps.ReplicaSet, selector labels.Selector, filteredPods []*v1.Pod) ([]*v1.Pod, error) {
741
742
743 canAdoptFunc := controller.RecheckDeletionTimestamp(func(ctx context.Context) (metav1.Object, error) {
744 fresh, err := rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace).Get(ctx, rs.Name, metav1.GetOptions{})
745 if err != nil {
746 return nil, err
747 }
748 if fresh.UID != rs.UID {
749 return nil, fmt.Errorf("original %v %v/%v is gone: got uid %v, wanted %v", rsc.Kind, rs.Namespace, rs.Name, fresh.UID, rs.UID)
750 }
751 return fresh, nil
752 })
753 cm := controller.NewPodControllerRefManager(rsc.podControl, rs, selector, rsc.GroupVersionKind, canAdoptFunc)
754 return cm.ClaimPods(ctx, filteredPods)
755 }
756
757
758
759
760
761
762
763
764
765
766
767
768 func slowStartBatch(count int, initialBatchSize int, fn func() error) (int, error) {
769 remaining := count
770 successes := 0
771 for batchSize := min(remaining, initialBatchSize); batchSize > 0; batchSize = min(2*batchSize, remaining) {
772 errCh := make(chan error, batchSize)
773 var wg sync.WaitGroup
774 wg.Add(batchSize)
775 for i := 0; i < batchSize; i++ {
776 go func() {
777 defer wg.Done()
778 if err := fn(); err != nil {
779 errCh <- err
780 }
781 }()
782 }
783 wg.Wait()
784 curSuccesses := batchSize - len(errCh)
785 successes += curSuccesses
786 if len(errCh) > 0 {
787 return successes, <-errCh
788 }
789 remaining -= batchSize
790 }
791 return successes, nil
792 }
793
794
795
796 func (rsc *ReplicaSetController) getIndirectlyRelatedPods(logger klog.Logger, rs *apps.ReplicaSet) ([]*v1.Pod, error) {
797 var relatedPods []*v1.Pod
798 seen := make(map[types.UID]*apps.ReplicaSet)
799 for _, relatedRS := range rsc.getReplicaSetsWithSameController(logger, rs) {
800 selector, err := metav1.LabelSelectorAsSelector(relatedRS.Spec.Selector)
801 if err != nil {
802
803 continue
804 }
805 pods, err := rsc.podLister.Pods(relatedRS.Namespace).List(selector)
806 if err != nil {
807 return nil, err
808 }
809 for _, pod := range pods {
810 if otherRS, found := seen[pod.UID]; found {
811 logger.V(5).Info("Pod is owned by both", "pod", klog.KObj(pod), "kind", rsc.Kind, "replicaSets", klog.KObjSlice([]klog.KMetadata{otherRS, relatedRS}))
812 continue
813 }
814 seen[pod.UID] = relatedRS
815 relatedPods = append(relatedPods, pod)
816 }
817 }
818 logger.V(4).Info("Found related pods", "kind", rsc.Kind, "replicaSet", klog.KObj(rs), "pods", klog.KObjSlice(relatedPods))
819 return relatedPods, nil
820 }
821
822 func getPodsToDelete(filteredPods, relatedPods []*v1.Pod, diff int) []*v1.Pod {
823
824
825 if diff < len(filteredPods) {
826 podsWithRanks := getPodsRankedByRelatedPodsOnSameNode(filteredPods, relatedPods)
827 sort.Sort(podsWithRanks)
828 reportSortingDeletionAgeRatioMetric(filteredPods, diff)
829 }
830 return filteredPods[:diff]
831 }
832
833 func reportSortingDeletionAgeRatioMetric(filteredPods []*v1.Pod, diff int) {
834 now := time.Now()
835 youngestTime := time.Time{}
836
837 for _, pod := range filteredPods {
838 if pod.CreationTimestamp.Time.After(youngestTime) && podutil.IsPodReady(pod) {
839 youngestTime = pod.CreationTimestamp.Time
840 }
841 }
842
843
844 for _, pod := range filteredPods[:diff] {
845 if !podutil.IsPodReady(pod) {
846 continue
847 }
848 ratio := float64(now.Sub(pod.CreationTimestamp.Time).Milliseconds() / now.Sub(youngestTime).Milliseconds())
849 metrics.SortingDeletionAgeRatio.Observe(ratio)
850 }
851 }
852
853
854
855
856
857 func getPodsRankedByRelatedPodsOnSameNode(podsToRank, relatedPods []*v1.Pod) controller.ActivePodsWithRanks {
858 podsOnNode := make(map[string]int)
859 for _, pod := range relatedPods {
860 if controller.IsPodActive(pod) {
861 podsOnNode[pod.Spec.NodeName]++
862 }
863 }
864 ranks := make([]int, len(podsToRank))
865 for i, pod := range podsToRank {
866 ranks[i] = podsOnNode[pod.Spec.NodeName]
867 }
868 return controller.ActivePodsWithRanks{Pods: podsToRank, Rank: ranks, Now: metav1.Now()}
869 }
870
871 func getPodKeys(pods []*v1.Pod) []string {
872 podKeys := make([]string, 0, len(pods))
873 for _, pod := range pods {
874 podKeys = append(podKeys, controller.PodKey(pod))
875 }
876 return podKeys
877 }
878
View as plain text