1
16
17 package statefulset
18
19 import (
20 "context"
21 "sort"
22 "sync"
23
24 apps "k8s.io/api/apps/v1"
25 v1 "k8s.io/api/core/v1"
26 "k8s.io/apimachinery/pkg/api/errors"
27 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28 utilerrors "k8s.io/apimachinery/pkg/util/errors"
29 utilfeature "k8s.io/apiserver/pkg/util/feature"
30 "k8s.io/client-go/tools/record"
31 "k8s.io/klog/v2"
32 "k8s.io/kubernetes/pkg/controller/history"
33 "k8s.io/kubernetes/pkg/features"
34 )
35
36
37 const MaxBatchSize = 500
38
39
40
41 type StatefulSetControlInterface interface {
42
43
44
45
46
47 UpdateStatefulSet(ctx context.Context, set *apps.StatefulSet, pods []*v1.Pod) (*apps.StatefulSetStatus, error)
48
49
50 ListRevisions(set *apps.StatefulSet) ([]*apps.ControllerRevision, error)
51
52
53 AdoptOrphanRevisions(set *apps.StatefulSet, revisions []*apps.ControllerRevision) error
54 }
55
56
57
58
59
60
61 func NewDefaultStatefulSetControl(
62 podControl *StatefulPodControl,
63 statusUpdater StatefulSetStatusUpdaterInterface,
64 controllerHistory history.Interface,
65 recorder record.EventRecorder) StatefulSetControlInterface {
66 return &defaultStatefulSetControl{podControl, statusUpdater, controllerHistory, recorder}
67 }
68
69 type defaultStatefulSetControl struct {
70 podControl *StatefulPodControl
71 statusUpdater StatefulSetStatusUpdaterInterface
72 controllerHistory history.Interface
73 recorder record.EventRecorder
74 }
75
76
77
78
79
80
81
82 func (ssc *defaultStatefulSetControl) UpdateStatefulSet(ctx context.Context, set *apps.StatefulSet, pods []*v1.Pod) (*apps.StatefulSetStatus, error) {
83 set = set.DeepCopy()
84
85
86 revisions, err := ssc.ListRevisions(set)
87 if err != nil {
88 return nil, err
89 }
90 history.SortControllerRevisions(revisions)
91
92 currentRevision, updateRevision, status, err := ssc.performUpdate(ctx, set, pods, revisions)
93 if err != nil {
94 errs := []error{err}
95 if agg, ok := err.(utilerrors.Aggregate); ok {
96 errs = agg.Errors()
97 }
98 return nil, utilerrors.NewAggregate(append(errs, ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision)))
99 }
100
101
102 return status, ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision)
103 }
104
105 func (ssc *defaultStatefulSetControl) performUpdate(
106 ctx context.Context, set *apps.StatefulSet, pods []*v1.Pod, revisions []*apps.ControllerRevision) (*apps.ControllerRevision, *apps.ControllerRevision, *apps.StatefulSetStatus, error) {
107 var currentStatus *apps.StatefulSetStatus
108 logger := klog.FromContext(ctx)
109
110 currentRevision, updateRevision, collisionCount, err := ssc.getStatefulSetRevisions(set, revisions)
111 if err != nil {
112 return currentRevision, updateRevision, currentStatus, err
113 }
114
115
116 currentStatus, err = ssc.updateStatefulSet(ctx, set, currentRevision, updateRevision, collisionCount, pods)
117 if err != nil && currentStatus == nil {
118 return currentRevision, updateRevision, nil, err
119 }
120
121
122 statusErr := ssc.updateStatefulSetStatus(ctx, set, currentStatus)
123 if statusErr == nil {
124 logger.V(4).Info("Updated status", "statefulSet", klog.KObj(set),
125 "replicas", currentStatus.Replicas,
126 "readyReplicas", currentStatus.ReadyReplicas,
127 "currentReplicas", currentStatus.CurrentReplicas,
128 "updatedReplicas", currentStatus.UpdatedReplicas)
129 }
130
131 switch {
132 case err != nil && statusErr != nil:
133 logger.Error(statusErr, "Could not update status", "statefulSet", klog.KObj(set))
134 return currentRevision, updateRevision, currentStatus, err
135 case err != nil:
136 return currentRevision, updateRevision, currentStatus, err
137 case statusErr != nil:
138 return currentRevision, updateRevision, currentStatus, statusErr
139 }
140
141 logger.V(4).Info("StatefulSet revisions", "statefulSet", klog.KObj(set),
142 "currentRevision", currentStatus.CurrentRevision,
143 "updateRevision", currentStatus.UpdateRevision)
144
145 return currentRevision, updateRevision, currentStatus, nil
146 }
147
148 func (ssc *defaultStatefulSetControl) ListRevisions(set *apps.StatefulSet) ([]*apps.ControllerRevision, error) {
149 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
150 if err != nil {
151 return nil, err
152 }
153 return ssc.controllerHistory.ListControllerRevisions(set, selector)
154 }
155
156 func (ssc *defaultStatefulSetControl) AdoptOrphanRevisions(
157 set *apps.StatefulSet,
158 revisions []*apps.ControllerRevision) error {
159 for i := range revisions {
160 adopted, err := ssc.controllerHistory.AdoptControllerRevision(set, controllerKind, revisions[i])
161 if err != nil {
162 return err
163 }
164 revisions[i] = adopted
165 }
166 return nil
167 }
168
169
170
171
172
173
174 func (ssc *defaultStatefulSetControl) truncateHistory(
175 set *apps.StatefulSet,
176 pods []*v1.Pod,
177 revisions []*apps.ControllerRevision,
178 current *apps.ControllerRevision,
179 update *apps.ControllerRevision) error {
180 history := make([]*apps.ControllerRevision, 0, len(revisions))
181
182 live := map[string]bool{}
183 if current != nil {
184 live[current.Name] = true
185 }
186 if update != nil {
187 live[update.Name] = true
188 }
189 for i := range pods {
190 live[getPodRevision(pods[i])] = true
191 }
192
193 for i := range revisions {
194 if !live[revisions[i].Name] {
195 history = append(history, revisions[i])
196 }
197 }
198 historyLen := len(history)
199 historyLimit := int(*set.Spec.RevisionHistoryLimit)
200 if historyLen <= historyLimit {
201 return nil
202 }
203
204 history = history[:(historyLen - historyLimit)]
205 for i := 0; i < len(history); i++ {
206 if err := ssc.controllerHistory.DeleteControllerRevision(history[i]); err != nil {
207 return err
208 }
209 }
210 return nil
211 }
212
213
214
215
216
217
218
219 func (ssc *defaultStatefulSetControl) getStatefulSetRevisions(
220 set *apps.StatefulSet,
221 revisions []*apps.ControllerRevision) (*apps.ControllerRevision, *apps.ControllerRevision, int32, error) {
222 var currentRevision, updateRevision *apps.ControllerRevision
223
224 revisionCount := len(revisions)
225 history.SortControllerRevisions(revisions)
226
227
228
229 var collisionCount int32
230 if set.Status.CollisionCount != nil {
231 collisionCount = *set.Status.CollisionCount
232 }
233
234
235 updateRevision, err := newRevision(set, nextRevision(revisions), &collisionCount)
236 if err != nil {
237 return nil, nil, collisionCount, err
238 }
239
240
241 equalRevisions := history.FindEqualRevisions(revisions, updateRevision)
242 equalCount := len(equalRevisions)
243
244 if equalCount > 0 && history.EqualRevision(revisions[revisionCount-1], equalRevisions[equalCount-1]) {
245
246 updateRevision = revisions[revisionCount-1]
247 } else if equalCount > 0 {
248
249
250 updateRevision, err = ssc.controllerHistory.UpdateControllerRevision(
251 equalRevisions[equalCount-1],
252 updateRevision.Revision)
253 if err != nil {
254 return nil, nil, collisionCount, err
255 }
256 } else {
257
258 updateRevision, err = ssc.controllerHistory.CreateControllerRevision(set, updateRevision, &collisionCount)
259 if err != nil {
260 return nil, nil, collisionCount, err
261 }
262 }
263
264
265 for i := range revisions {
266 if revisions[i].Name == set.Status.CurrentRevision {
267 currentRevision = revisions[i]
268 break
269 }
270 }
271
272
273 if currentRevision == nil {
274 currentRevision = updateRevision
275 }
276
277 return currentRevision, updateRevision, collisionCount, nil
278 }
279
280 func slowStartBatch(initialBatchSize int, remaining int, fn func(int) (bool, error)) (int, error) {
281 successes := 0
282 j := 0
283 for batchSize := min(remaining, initialBatchSize); batchSize > 0; batchSize = min(min(2*batchSize, remaining), MaxBatchSize) {
284 errCh := make(chan error, batchSize)
285 var wg sync.WaitGroup
286 wg.Add(batchSize)
287 for i := 0; i < batchSize; i++ {
288 go func(k int) {
289 defer wg.Done()
290
291 if _, err := fn(k); err != nil {
292 errCh <- err
293 }
294 }(j)
295 j++
296 }
297 wg.Wait()
298 successes += batchSize - len(errCh)
299 close(errCh)
300 if len(errCh) > 0 {
301 errs := make([]error, 0)
302 for err := range errCh {
303 errs = append(errs, err)
304 }
305 return successes, utilerrors.NewAggregate(errs)
306 }
307 remaining -= batchSize
308 }
309 return successes, nil
310 }
311
312 type replicaStatus struct {
313 replicas int32
314 readyReplicas int32
315 availableReplicas int32
316 currentReplicas int32
317 updatedReplicas int32
318 }
319
320 func computeReplicaStatus(pods []*v1.Pod, minReadySeconds int32, currentRevision, updateRevision *apps.ControllerRevision) replicaStatus {
321 status := replicaStatus{}
322 for _, pod := range pods {
323 if isCreated(pod) {
324 status.replicas++
325 }
326
327
328 if isRunningAndReady(pod) {
329 status.readyReplicas++
330
331 if isRunningAndAvailable(pod, minReadySeconds) {
332 status.availableReplicas++
333 }
334
335 }
336
337
338 if isCreated(pod) && !isTerminating(pod) {
339 revision := getPodRevision(pod)
340 if revision == currentRevision.Name {
341 status.currentReplicas++
342 }
343 if revision == updateRevision.Name {
344 status.updatedReplicas++
345 }
346 }
347 }
348 return status
349 }
350
351 func updateStatus(status *apps.StatefulSetStatus, minReadySeconds int32, currentRevision, updateRevision *apps.ControllerRevision, podLists ...[]*v1.Pod) {
352 status.Replicas = 0
353 status.ReadyReplicas = 0
354 status.AvailableReplicas = 0
355 status.CurrentReplicas = 0
356 status.UpdatedReplicas = 0
357 for _, list := range podLists {
358 replicaStatus := computeReplicaStatus(list, minReadySeconds, currentRevision, updateRevision)
359 status.Replicas += replicaStatus.replicas
360 status.ReadyReplicas += replicaStatus.readyReplicas
361 status.AvailableReplicas += replicaStatus.availableReplicas
362 status.CurrentReplicas += replicaStatus.currentReplicas
363 status.UpdatedReplicas += replicaStatus.updatedReplicas
364 }
365 }
366
367 func (ssc *defaultStatefulSetControl) processReplica(
368 ctx context.Context,
369 set *apps.StatefulSet,
370 currentRevision *apps.ControllerRevision,
371 updateRevision *apps.ControllerRevision,
372 currentSet *apps.StatefulSet,
373 updateSet *apps.StatefulSet,
374 monotonic bool,
375 replicas []*v1.Pod,
376 i int) (bool, error) {
377 logger := klog.FromContext(ctx)
378
379
380
381
382
383
384
385 if isFailed(replicas[i]) || isSucceeded(replicas[i]) {
386 if isFailed(replicas[i]) {
387 ssc.recorder.Eventf(set, v1.EventTypeWarning, "RecreatingFailedPod",
388 "StatefulSet %s/%s is recreating failed Pod %s",
389 set.Namespace,
390 set.Name,
391 replicas[i].Name)
392 } else {
393 ssc.recorder.Eventf(set, v1.EventTypeNormal, "RecreatingTerminatedPod",
394 "StatefulSet %s/%s is recreating terminated Pod %s",
395 set.Namespace,
396 set.Name,
397 replicas[i].Name)
398 }
399 if err := ssc.podControl.DeleteStatefulPod(set, replicas[i]); err != nil {
400 return true, err
401 }
402 replicaOrd := i + getStartOrdinal(set)
403 replicas[i] = newVersionedStatefulSetPod(
404 currentSet,
405 updateSet,
406 currentRevision.Name,
407 updateRevision.Name,
408 replicaOrd)
409 }
410
411 if !isCreated(replicas[i]) {
412 if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) {
413 if isStale, err := ssc.podControl.PodClaimIsStale(set, replicas[i]); err != nil {
414 return true, err
415 } else if isStale {
416
417 return true, err
418 }
419 }
420 if err := ssc.podControl.CreateStatefulPod(ctx, set, replicas[i]); err != nil {
421 return true, err
422 }
423 if monotonic {
424
425 return true, nil
426 }
427 }
428
429
430 if isPending(replicas[i]) {
431 logger.V(4).Info(
432 "StatefulSet is triggering PVC creation for pending Pod",
433 "statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i]))
434 if err := ssc.podControl.createMissingPersistentVolumeClaims(ctx, set, replicas[i]); err != nil {
435 return true, err
436 }
437 }
438
439
440
441 if isTerminating(replicas[i]) && monotonic {
442 logger.V(4).Info("StatefulSet is waiting for Pod to Terminate",
443 "statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i]))
444 return true, nil
445 }
446
447
448
449
450 if !isRunningAndReady(replicas[i]) && monotonic {
451 logger.V(4).Info("StatefulSet is waiting for Pod to be Running and Ready",
452 "statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i]))
453 return true, nil
454 }
455
456
457
458
459 if !isRunningAndAvailable(replicas[i], set.Spec.MinReadySeconds) && monotonic {
460 logger.V(4).Info("StatefulSet is waiting for Pod to be Available",
461 "statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i]))
462 return true, nil
463 }
464
465
466 retentionMatch := true
467 if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) {
468 var err error
469 retentionMatch, err = ssc.podControl.ClaimsMatchRetentionPolicy(ctx, updateSet, replicas[i])
470
471 if err != nil {
472 retentionMatch = true
473 }
474 }
475
476 if identityMatches(set, replicas[i]) && storageMatches(set, replicas[i]) && retentionMatch {
477 return false, nil
478 }
479
480
481 replica := replicas[i].DeepCopy()
482 if err := ssc.podControl.UpdateStatefulPod(ctx, updateSet, replica); err != nil {
483 return true, err
484 }
485
486 return false, nil
487 }
488
489 func (ssc *defaultStatefulSetControl) processCondemned(ctx context.Context, set *apps.StatefulSet, firstUnhealthyPod *v1.Pod, monotonic bool, condemned []*v1.Pod, i int) (bool, error) {
490 logger := klog.FromContext(ctx)
491 if isTerminating(condemned[i]) {
492
493 if monotonic {
494 logger.V(4).Info("StatefulSet is waiting for Pod to Terminate prior to scale down",
495 "statefulSet", klog.KObj(set), "pod", klog.KObj(condemned[i]))
496 return true, nil
497 }
498 return false, nil
499 }
500
501 if !isRunningAndReady(condemned[i]) && monotonic && condemned[i] != firstUnhealthyPod {
502 logger.V(4).Info("StatefulSet is waiting for Pod to be Running and Ready prior to scale down",
503 "statefulSet", klog.KObj(set), "pod", klog.KObj(firstUnhealthyPod))
504 return true, nil
505 }
506
507 if !isRunningAndAvailable(condemned[i], set.Spec.MinReadySeconds) && monotonic && condemned[i] != firstUnhealthyPod {
508 logger.V(4).Info("StatefulSet is waiting for Pod to be Available prior to scale down",
509 "statefulSet", klog.KObj(set), "pod", klog.KObj(firstUnhealthyPod))
510 return true, nil
511 }
512
513 logger.V(2).Info("Pod of StatefulSet is terminating for scale down",
514 "statefulSet", klog.KObj(set), "pod", klog.KObj(condemned[i]))
515 return true, ssc.podControl.DeleteStatefulPod(set, condemned[i])
516 }
517
518 func runForAll(pods []*v1.Pod, fn func(i int) (bool, error), monotonic bool) (bool, error) {
519 if monotonic {
520 for i := range pods {
521 if shouldExit, err := fn(i); shouldExit || err != nil {
522 return true, err
523 }
524 }
525 } else {
526 if _, err := slowStartBatch(1, len(pods), fn); err != nil {
527 return true, err
528 }
529 }
530 return false, nil
531 }
532
533
534
535
536
537
538
539
540
541
542 func (ssc *defaultStatefulSetControl) updateStatefulSet(
543 ctx context.Context,
544 set *apps.StatefulSet,
545 currentRevision *apps.ControllerRevision,
546 updateRevision *apps.ControllerRevision,
547 collisionCount int32,
548 pods []*v1.Pod) (*apps.StatefulSetStatus, error) {
549 logger := klog.FromContext(ctx)
550
551 currentSet, err := ApplyRevision(set, currentRevision)
552 if err != nil {
553 return nil, err
554 }
555 updateSet, err := ApplyRevision(set, updateRevision)
556 if err != nil {
557 return nil, err
558 }
559
560
561 status := apps.StatefulSetStatus{}
562 status.ObservedGeneration = set.Generation
563 status.CurrentRevision = currentRevision.Name
564 status.UpdateRevision = updateRevision.Name
565 status.CollisionCount = new(int32)
566 *status.CollisionCount = collisionCount
567
568 updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, pods)
569
570 replicaCount := int(*set.Spec.Replicas)
571
572 replicas := make([]*v1.Pod, replicaCount)
573
574 condemned := make([]*v1.Pod, 0, len(pods))
575 unhealthy := 0
576 var firstUnhealthyPod *v1.Pod
577
578
579 for _, pod := range pods {
580 if podInOrdinalRange(pod, set) {
581
582
583 replicas[getOrdinal(pod)-getStartOrdinal(set)] = pod
584 } else if getOrdinal(pod) >= 0 {
585
586 condemned = append(condemned, pod)
587 }
588
589 }
590
591
592 for ord := getStartOrdinal(set); ord <= getEndOrdinal(set); ord++ {
593 replicaIdx := ord - getStartOrdinal(set)
594 if replicas[replicaIdx] == nil {
595 replicas[replicaIdx] = newVersionedStatefulSetPod(
596 currentSet,
597 updateSet,
598 currentRevision.Name,
599 updateRevision.Name, ord)
600 }
601 }
602
603
604 sort.Sort(descendingOrdinal(condemned))
605
606
607 for i := range replicas {
608 if !isHealthy(replicas[i]) {
609 unhealthy++
610 if firstUnhealthyPod == nil {
611 firstUnhealthyPod = replicas[i]
612 }
613 }
614 }
615
616
617 for i := len(condemned) - 1; i >= 0; i-- {
618 if !isHealthy(condemned[i]) {
619 unhealthy++
620 if firstUnhealthyPod == nil {
621 firstUnhealthyPod = condemned[i]
622 }
623 }
624 }
625
626 if unhealthy > 0 {
627 logger.V(4).Info("StatefulSet has unhealthy Pods", "statefulSet", klog.KObj(set), "unhealthyReplicas", unhealthy, "pod", klog.KObj(firstUnhealthyPod))
628 }
629
630
631
632 if set.DeletionTimestamp != nil {
633 return &status, nil
634 }
635
636 monotonic := !allowsBurst(set)
637
638
639 processReplicaFn := func(i int) (bool, error) {
640 return ssc.processReplica(ctx, set, currentRevision, updateRevision, currentSet, updateSet, monotonic, replicas, i)
641 }
642 if shouldExit, err := runForAll(replicas, processReplicaFn, monotonic); shouldExit || err != nil {
643 updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned)
644 return &status, err
645 }
646
647
648 if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) {
649 fixPodClaim := func(i int) (bool, error) {
650 if matchPolicy, err := ssc.podControl.ClaimsMatchRetentionPolicy(ctx, updateSet, condemned[i]); err != nil {
651 return true, err
652 } else if !matchPolicy {
653 if err := ssc.podControl.UpdatePodClaimForRetentionPolicy(ctx, updateSet, condemned[i]); err != nil {
654 return true, err
655 }
656 }
657 return false, nil
658 }
659 if shouldExit, err := runForAll(condemned, fixPodClaim, monotonic); shouldExit || err != nil {
660 updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned)
661 return &status, err
662 }
663 }
664
665
666
667
668
669
670
671 processCondemnedFn := func(i int) (bool, error) {
672 return ssc.processCondemned(ctx, set, firstUnhealthyPod, monotonic, condemned, i)
673 }
674 if shouldExit, err := runForAll(condemned, processCondemnedFn, monotonic); shouldExit || err != nil {
675 updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned)
676 return &status, err
677 }
678
679 updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned)
680
681
682 if set.Spec.UpdateStrategy.Type == apps.OnDeleteStatefulSetStrategyType {
683 return &status, nil
684 }
685
686 if utilfeature.DefaultFeatureGate.Enabled(features.MaxUnavailableStatefulSet) {
687 return updateStatefulSetAfterInvariantEstablished(ctx,
688 ssc,
689 set,
690 replicas,
691 updateRevision,
692 status,
693 )
694 }
695
696
697 updateMin := 0
698 if set.Spec.UpdateStrategy.RollingUpdate != nil {
699 updateMin = int(*set.Spec.UpdateStrategy.RollingUpdate.Partition)
700 }
701
702 for target := len(replicas) - 1; target >= updateMin; target-- {
703
704
705 if getPodRevision(replicas[target]) != updateRevision.Name && !isTerminating(replicas[target]) {
706 logger.V(2).Info("Pod of StatefulSet is terminating for update",
707 "statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[target]))
708 if err := ssc.podControl.DeleteStatefulPod(set, replicas[target]); err != nil {
709 if !errors.IsNotFound(err) {
710 return &status, err
711 }
712 }
713 status.CurrentReplicas--
714 return &status, err
715 }
716
717
718 if !isHealthy(replicas[target]) {
719 logger.V(4).Info("StatefulSet is waiting for Pod to update",
720 "statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[target]))
721 return &status, nil
722 }
723
724 }
725 return &status, nil
726 }
727
728 func updateStatefulSetAfterInvariantEstablished(
729 ctx context.Context,
730 ssc *defaultStatefulSetControl,
731 set *apps.StatefulSet,
732 replicas []*v1.Pod,
733 updateRevision *apps.ControllerRevision,
734 status apps.StatefulSetStatus,
735 ) (*apps.StatefulSetStatus, error) {
736
737 logger := klog.FromContext(ctx)
738 replicaCount := int(*set.Spec.Replicas)
739
740
741 updateMin := 0
742 maxUnavailable := 1
743 if set.Spec.UpdateStrategy.RollingUpdate != nil {
744 updateMin = int(*set.Spec.UpdateStrategy.RollingUpdate.Partition)
745
746
747
748
749 var err error
750 maxUnavailable, err = getStatefulSetMaxUnavailable(set.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable, replicaCount)
751 if err != nil {
752 return &status, err
753 }
754 }
755
756
757
758
759
760 unavailablePods := 0
761 for target := len(replicas) - 1; target >= 0; target-- {
762 if !isHealthy(replicas[target]) {
763 unavailablePods++
764 }
765 }
766
767 if unavailablePods >= maxUnavailable {
768 logger.V(2).Info("StatefulSet found unavailablePods, more than or equal to allowed maxUnavailable",
769 "statefulSet", klog.KObj(set),
770 "unavailablePods", unavailablePods,
771 "maxUnavailable", maxUnavailable)
772 return &status, nil
773 }
774
775
776
777 podsToDelete := maxUnavailable - unavailablePods
778
779 deletedPods := 0
780 for target := len(replicas) - 1; target >= updateMin && deletedPods < podsToDelete; target-- {
781
782
783 if getPodRevision(replicas[target]) != updateRevision.Name && !isTerminating(replicas[target]) {
784
785 logger.V(2).Info("StatefulSet terminating Pod for update",
786 "statefulSet", klog.KObj(set),
787 "pod", klog.KObj(replicas[target]))
788 if err := ssc.podControl.DeleteStatefulPod(set, replicas[target]); err != nil {
789 if !errors.IsNotFound(err) {
790 return &status, err
791 }
792 }
793 deletedPods++
794 status.CurrentReplicas--
795 }
796 }
797 return &status, nil
798 }
799
800
801
802
803 func (ssc *defaultStatefulSetControl) updateStatefulSetStatus(
804 ctx context.Context,
805 set *apps.StatefulSet,
806 status *apps.StatefulSetStatus) error {
807
808 completeRollingUpdate(set, status)
809
810
811 if !inconsistentStatus(set, status) {
812 return nil
813 }
814
815
816 set = set.DeepCopy()
817 if err := ssc.statusUpdater.UpdateStatefulSetStatus(ctx, set, status); err != nil {
818 return err
819 }
820
821 return nil
822 }
823
824 var _ StatefulSetControlInterface = &defaultStatefulSetControl{}
825
View as plain text