/* Copyright 2016 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package statefulset import ( "context" "sort" "sync" apps "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilerrors "k8s.io/apimachinery/pkg/util/errors" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/controller/history" "k8s.io/kubernetes/pkg/features" ) // Realistic value for maximum in-flight requests when processing in parallel mode. const MaxBatchSize = 500 // StatefulSetControl implements the control logic for updating StatefulSets and their children Pods. It is implemented // as an interface to allow for extensions that provide different semantics. Currently, there is only one implementation. type StatefulSetControlInterface interface { // UpdateStatefulSet implements the control logic for Pod creation, update, and deletion, and // persistent volume creation, update, and deletion. // If an implementation returns a non-nil error, the invocation will be retried using a rate-limited strategy. // Implementors should sink any errors that they do not wish to trigger a retry, and they may feel free to // exit exceptionally at any point provided they wish the update to be re-run at a later point in time. UpdateStatefulSet(ctx context.Context, set *apps.StatefulSet, pods []*v1.Pod) (*apps.StatefulSetStatus, error) // ListRevisions returns a array of the ControllerRevisions that represent the revisions of set. If the returned // error is nil, the returns slice of ControllerRevisions is valid. ListRevisions(set *apps.StatefulSet) ([]*apps.ControllerRevision, error) // AdoptOrphanRevisions adopts any orphaned ControllerRevisions that match set's Selector. If all adoptions are // successful the returned error is nil. AdoptOrphanRevisions(set *apps.StatefulSet, revisions []*apps.ControllerRevision) error } // NewDefaultStatefulSetControl returns a new instance of the default implementation StatefulSetControlInterface that // implements the documented semantics for StatefulSets. podControl is the PodControlInterface used to create, update, // and delete Pods and to create PersistentVolumeClaims. statusUpdater is the StatefulSetStatusUpdaterInterface used // to update the status of StatefulSets. You should use an instance returned from NewRealStatefulPodControl() for any // scenario other than testing. func NewDefaultStatefulSetControl( podControl *StatefulPodControl, statusUpdater StatefulSetStatusUpdaterInterface, controllerHistory history.Interface, recorder record.EventRecorder) StatefulSetControlInterface { return &defaultStatefulSetControl{podControl, statusUpdater, controllerHistory, recorder} } type defaultStatefulSetControl struct { podControl *StatefulPodControl statusUpdater StatefulSetStatusUpdaterInterface controllerHistory history.Interface recorder record.EventRecorder } // UpdateStatefulSet executes the core logic loop for a stateful set, applying the predictable and // consistent monotonic update strategy by default - scale up proceeds in ordinal order, no new pod // is created while any pod is unhealthy, and pods are terminated in descending order. The burst // strategy allows these constraints to be relaxed - pods will be created and deleted eagerly and // in no particular order. Clients using the burst strategy should be careful to ensure they // understand the consistency implications of having unpredictable numbers of pods available. func (ssc *defaultStatefulSetControl) UpdateStatefulSet(ctx context.Context, set *apps.StatefulSet, pods []*v1.Pod) (*apps.StatefulSetStatus, error) { set = set.DeepCopy() // set is modified when a new revision is created in performUpdate. Make a copy now to avoid mutation errors. // list all revisions and sort them revisions, err := ssc.ListRevisions(set) if err != nil { return nil, err } history.SortControllerRevisions(revisions) currentRevision, updateRevision, status, err := ssc.performUpdate(ctx, set, pods, revisions) if err != nil { errs := []error{err} if agg, ok := err.(utilerrors.Aggregate); ok { errs = agg.Errors() } return nil, utilerrors.NewAggregate(append(errs, ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision))) } // maintain the set's revision history limit return status, ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision) } func (ssc *defaultStatefulSetControl) performUpdate( ctx context.Context, set *apps.StatefulSet, pods []*v1.Pod, revisions []*apps.ControllerRevision) (*apps.ControllerRevision, *apps.ControllerRevision, *apps.StatefulSetStatus, error) { var currentStatus *apps.StatefulSetStatus logger := klog.FromContext(ctx) // get the current, and update revisions currentRevision, updateRevision, collisionCount, err := ssc.getStatefulSetRevisions(set, revisions) if err != nil { return currentRevision, updateRevision, currentStatus, err } // perform the main update function and get the status currentStatus, err = ssc.updateStatefulSet(ctx, set, currentRevision, updateRevision, collisionCount, pods) if err != nil && currentStatus == nil { return currentRevision, updateRevision, nil, err } // make sure to update the latest status even if there is an error with non-nil currentStatus statusErr := ssc.updateStatefulSetStatus(ctx, set, currentStatus) if statusErr == nil { logger.V(4).Info("Updated status", "statefulSet", klog.KObj(set), "replicas", currentStatus.Replicas, "readyReplicas", currentStatus.ReadyReplicas, "currentReplicas", currentStatus.CurrentReplicas, "updatedReplicas", currentStatus.UpdatedReplicas) } switch { case err != nil && statusErr != nil: logger.Error(statusErr, "Could not update status", "statefulSet", klog.KObj(set)) return currentRevision, updateRevision, currentStatus, err case err != nil: return currentRevision, updateRevision, currentStatus, err case statusErr != nil: return currentRevision, updateRevision, currentStatus, statusErr } logger.V(4).Info("StatefulSet revisions", "statefulSet", klog.KObj(set), "currentRevision", currentStatus.CurrentRevision, "updateRevision", currentStatus.UpdateRevision) return currentRevision, updateRevision, currentStatus, nil } func (ssc *defaultStatefulSetControl) ListRevisions(set *apps.StatefulSet) ([]*apps.ControllerRevision, error) { selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) if err != nil { return nil, err } return ssc.controllerHistory.ListControllerRevisions(set, selector) } func (ssc *defaultStatefulSetControl) AdoptOrphanRevisions( set *apps.StatefulSet, revisions []*apps.ControllerRevision) error { for i := range revisions { adopted, err := ssc.controllerHistory.AdoptControllerRevision(set, controllerKind, revisions[i]) if err != nil { return err } revisions[i] = adopted } return nil } // truncateHistory truncates any non-live ControllerRevisions in revisions from set's history. The UpdateRevision and // CurrentRevision in set's Status are considered to be live. Any revisions associated with the Pods in pods are also // considered to be live. Non-live revisions are deleted, starting with the revision with the lowest Revision, until // only RevisionHistoryLimit revisions remain. If the returned error is nil the operation was successful. This method // expects that revisions is sorted when supplied. func (ssc *defaultStatefulSetControl) truncateHistory( set *apps.StatefulSet, pods []*v1.Pod, revisions []*apps.ControllerRevision, current *apps.ControllerRevision, update *apps.ControllerRevision) error { history := make([]*apps.ControllerRevision, 0, len(revisions)) // mark all live revisions live := map[string]bool{} if current != nil { live[current.Name] = true } if update != nil { live[update.Name] = true } for i := range pods { live[getPodRevision(pods[i])] = true } // collect live revisions and historic revisions for i := range revisions { if !live[revisions[i].Name] { history = append(history, revisions[i]) } } historyLen := len(history) historyLimit := int(*set.Spec.RevisionHistoryLimit) if historyLen <= historyLimit { return nil } // delete any non-live history to maintain the revision limit. history = history[:(historyLen - historyLimit)] for i := 0; i < len(history); i++ { if err := ssc.controllerHistory.DeleteControllerRevision(history[i]); err != nil { return err } } return nil } // getStatefulSetRevisions returns the current and update ControllerRevisions for set. It also // returns a collision count that records the number of name collisions set saw when creating // new ControllerRevisions. This count is incremented on every name collision and is used in // building the ControllerRevision names for name collision avoidance. This method may create // a new revision, or modify the Revision of an existing revision if an update to set is detected. // This method expects that revisions is sorted when supplied. func (ssc *defaultStatefulSetControl) getStatefulSetRevisions( set *apps.StatefulSet, revisions []*apps.ControllerRevision) (*apps.ControllerRevision, *apps.ControllerRevision, int32, error) { var currentRevision, updateRevision *apps.ControllerRevision revisionCount := len(revisions) history.SortControllerRevisions(revisions) // Use a local copy of set.Status.CollisionCount to avoid modifying set.Status directly. // This copy is returned so the value gets carried over to set.Status in updateStatefulSet. var collisionCount int32 if set.Status.CollisionCount != nil { collisionCount = *set.Status.CollisionCount } // create a new revision from the current set updateRevision, err := newRevision(set, nextRevision(revisions), &collisionCount) if err != nil { return nil, nil, collisionCount, err } // find any equivalent revisions equalRevisions := history.FindEqualRevisions(revisions, updateRevision) equalCount := len(equalRevisions) if equalCount > 0 && history.EqualRevision(revisions[revisionCount-1], equalRevisions[equalCount-1]) { // if the equivalent revision is immediately prior the update revision has not changed updateRevision = revisions[revisionCount-1] } else if equalCount > 0 { // if the equivalent revision is not immediately prior we will roll back by incrementing the // Revision of the equivalent revision updateRevision, err = ssc.controllerHistory.UpdateControllerRevision( equalRevisions[equalCount-1], updateRevision.Revision) if err != nil { return nil, nil, collisionCount, err } } else { //if there is no equivalent revision we create a new one updateRevision, err = ssc.controllerHistory.CreateControllerRevision(set, updateRevision, &collisionCount) if err != nil { return nil, nil, collisionCount, err } } // attempt to find the revision that corresponds to the current revision for i := range revisions { if revisions[i].Name == set.Status.CurrentRevision { currentRevision = revisions[i] break } } // if the current revision is nil we initialize the history by setting it to the update revision if currentRevision == nil { currentRevision = updateRevision } return currentRevision, updateRevision, collisionCount, nil } func slowStartBatch(initialBatchSize int, remaining int, fn func(int) (bool, error)) (int, error) { successes := 0 j := 0 for batchSize := min(remaining, initialBatchSize); batchSize > 0; batchSize = min(min(2*batchSize, remaining), MaxBatchSize) { errCh := make(chan error, batchSize) var wg sync.WaitGroup wg.Add(batchSize) for i := 0; i < batchSize; i++ { go func(k int) { defer wg.Done() // Ignore the first parameter - relevant for monotonic only. if _, err := fn(k); err != nil { errCh <- err } }(j) j++ } wg.Wait() successes += batchSize - len(errCh) close(errCh) if len(errCh) > 0 { errs := make([]error, 0) for err := range errCh { errs = append(errs, err) } return successes, utilerrors.NewAggregate(errs) } remaining -= batchSize } return successes, nil } type replicaStatus struct { replicas int32 readyReplicas int32 availableReplicas int32 currentReplicas int32 updatedReplicas int32 } func computeReplicaStatus(pods []*v1.Pod, minReadySeconds int32, currentRevision, updateRevision *apps.ControllerRevision) replicaStatus { status := replicaStatus{} for _, pod := range pods { if isCreated(pod) { status.replicas++ } // count the number of running and ready replicas if isRunningAndReady(pod) { status.readyReplicas++ // count the number of running and available replicas if isRunningAndAvailable(pod, minReadySeconds) { status.availableReplicas++ } } // count the number of current and update replicas if isCreated(pod) && !isTerminating(pod) { revision := getPodRevision(pod) if revision == currentRevision.Name { status.currentReplicas++ } if revision == updateRevision.Name { status.updatedReplicas++ } } } return status } func updateStatus(status *apps.StatefulSetStatus, minReadySeconds int32, currentRevision, updateRevision *apps.ControllerRevision, podLists ...[]*v1.Pod) { status.Replicas = 0 status.ReadyReplicas = 0 status.AvailableReplicas = 0 status.CurrentReplicas = 0 status.UpdatedReplicas = 0 for _, list := range podLists { replicaStatus := computeReplicaStatus(list, minReadySeconds, currentRevision, updateRevision) status.Replicas += replicaStatus.replicas status.ReadyReplicas += replicaStatus.readyReplicas status.AvailableReplicas += replicaStatus.availableReplicas status.CurrentReplicas += replicaStatus.currentReplicas status.UpdatedReplicas += replicaStatus.updatedReplicas } } func (ssc *defaultStatefulSetControl) processReplica( ctx context.Context, set *apps.StatefulSet, currentRevision *apps.ControllerRevision, updateRevision *apps.ControllerRevision, currentSet *apps.StatefulSet, updateSet *apps.StatefulSet, monotonic bool, replicas []*v1.Pod, i int) (bool, error) { logger := klog.FromContext(ctx) // Delete and recreate pods which finished running. // // Note that pods with phase Succeeded will also trigger this event. This is // because final pod phase of evicted or otherwise forcibly stopped pods // (e.g. terminated on node reboot) is determined by the exit code of the // container, not by the reason for pod termination. We should restart the pod // regardless of the exit code. if isFailed(replicas[i]) || isSucceeded(replicas[i]) { if isFailed(replicas[i]) { ssc.recorder.Eventf(set, v1.EventTypeWarning, "RecreatingFailedPod", "StatefulSet %s/%s is recreating failed Pod %s", set.Namespace, set.Name, replicas[i].Name) } else { ssc.recorder.Eventf(set, v1.EventTypeNormal, "RecreatingTerminatedPod", "StatefulSet %s/%s is recreating terminated Pod %s", set.Namespace, set.Name, replicas[i].Name) } if err := ssc.podControl.DeleteStatefulPod(set, replicas[i]); err != nil { return true, err } replicaOrd := i + getStartOrdinal(set) replicas[i] = newVersionedStatefulSetPod( currentSet, updateSet, currentRevision.Name, updateRevision.Name, replicaOrd) } // If we find a Pod that has not been created we create the Pod if !isCreated(replicas[i]) { if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) { if isStale, err := ssc.podControl.PodClaimIsStale(set, replicas[i]); err != nil { return true, err } else if isStale { // If a pod has a stale PVC, no more work can be done this round. return true, err } } if err := ssc.podControl.CreateStatefulPod(ctx, set, replicas[i]); err != nil { return true, err } if monotonic { // if the set does not allow bursting, return immediately return true, nil } } // If the Pod is in pending state then trigger PVC creation to create missing PVCs if isPending(replicas[i]) { logger.V(4).Info( "StatefulSet is triggering PVC creation for pending Pod", "statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i])) if err := ssc.podControl.createMissingPersistentVolumeClaims(ctx, set, replicas[i]); err != nil { return true, err } } // If we find a Pod that is currently terminating, we must wait until graceful deletion // completes before we continue to make progress. if isTerminating(replicas[i]) && monotonic { logger.V(4).Info("StatefulSet is waiting for Pod to Terminate", "statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i])) return true, nil } // If we have a Pod that has been created but is not running and ready we can not make progress. // We must ensure that all for each Pod, when we create it, all of its predecessors, with respect to its // ordinal, are Running and Ready. if !isRunningAndReady(replicas[i]) && monotonic { logger.V(4).Info("StatefulSet is waiting for Pod to be Running and Ready", "statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i])) return true, nil } // If we have a Pod that has been created but is not available we can not make progress. // We must ensure that all for each Pod, when we create it, all of its predecessors, with respect to its // ordinal, are Available. if !isRunningAndAvailable(replicas[i], set.Spec.MinReadySeconds) && monotonic { logger.V(4).Info("StatefulSet is waiting for Pod to be Available", "statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i])) return true, nil } // Enforce the StatefulSet invariants retentionMatch := true if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) { var err error retentionMatch, err = ssc.podControl.ClaimsMatchRetentionPolicy(ctx, updateSet, replicas[i]) // An error is expected if the pod is not yet fully updated, and so return is treated as matching. if err != nil { retentionMatch = true } } if identityMatches(set, replicas[i]) && storageMatches(set, replicas[i]) && retentionMatch { return false, nil } // Make a deep copy so we don't mutate the shared cache replica := replicas[i].DeepCopy() if err := ssc.podControl.UpdateStatefulPod(ctx, updateSet, replica); err != nil { return true, err } return false, nil } func (ssc *defaultStatefulSetControl) processCondemned(ctx context.Context, set *apps.StatefulSet, firstUnhealthyPod *v1.Pod, monotonic bool, condemned []*v1.Pod, i int) (bool, error) { logger := klog.FromContext(ctx) if isTerminating(condemned[i]) { // if we are in monotonic mode, block and wait for terminating pods to expire if monotonic { logger.V(4).Info("StatefulSet is waiting for Pod to Terminate prior to scale down", "statefulSet", klog.KObj(set), "pod", klog.KObj(condemned[i])) return true, nil } return false, nil } // if we are in monotonic mode and the condemned target is not the first unhealthy Pod block if !isRunningAndReady(condemned[i]) && monotonic && condemned[i] != firstUnhealthyPod { logger.V(4).Info("StatefulSet is waiting for Pod to be Running and Ready prior to scale down", "statefulSet", klog.KObj(set), "pod", klog.KObj(firstUnhealthyPod)) return true, nil } // if we are in monotonic mode and the condemned target is not the first unhealthy Pod, block. if !isRunningAndAvailable(condemned[i], set.Spec.MinReadySeconds) && monotonic && condemned[i] != firstUnhealthyPod { logger.V(4).Info("StatefulSet is waiting for Pod to be Available prior to scale down", "statefulSet", klog.KObj(set), "pod", klog.KObj(firstUnhealthyPod)) return true, nil } logger.V(2).Info("Pod of StatefulSet is terminating for scale down", "statefulSet", klog.KObj(set), "pod", klog.KObj(condemned[i])) return true, ssc.podControl.DeleteStatefulPod(set, condemned[i]) } func runForAll(pods []*v1.Pod, fn func(i int) (bool, error), monotonic bool) (bool, error) { if monotonic { for i := range pods { if shouldExit, err := fn(i); shouldExit || err != nil { return true, err } } } else { if _, err := slowStartBatch(1, len(pods), fn); err != nil { return true, err } } return false, nil } // updateStatefulSet performs the update function for a StatefulSet. This method creates, updates, and deletes Pods in // the set in order to conform the system to the target state for the set. The target state always contains // set.Spec.Replicas Pods with a Ready Condition. If the UpdateStrategy.Type for the set is // RollingUpdateStatefulSetStrategyType then all Pods in the set must be at set.Status.CurrentRevision. // If the UpdateStrategy.Type for the set is OnDeleteStatefulSetStrategyType, the target state implies nothing about // the revisions of Pods in the set. If the UpdateStrategy.Type for the set is PartitionStatefulSetStrategyType, then // all Pods with ordinal less than UpdateStrategy.Partition.Ordinal must be at Status.CurrentRevision and all other // Pods must be at Status.UpdateRevision. If the returned error is nil, the returned StatefulSetStatus is valid and the // update must be recorded. If the error is not nil, the method should be retried until successful. func (ssc *defaultStatefulSetControl) updateStatefulSet( ctx context.Context, set *apps.StatefulSet, currentRevision *apps.ControllerRevision, updateRevision *apps.ControllerRevision, collisionCount int32, pods []*v1.Pod) (*apps.StatefulSetStatus, error) { logger := klog.FromContext(ctx) // get the current and update revisions of the set. currentSet, err := ApplyRevision(set, currentRevision) if err != nil { return nil, err } updateSet, err := ApplyRevision(set, updateRevision) if err != nil { return nil, err } // set the generation, and revisions in the returned status status := apps.StatefulSetStatus{} status.ObservedGeneration = set.Generation status.CurrentRevision = currentRevision.Name status.UpdateRevision = updateRevision.Name status.CollisionCount = new(int32) *status.CollisionCount = collisionCount updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, pods) replicaCount := int(*set.Spec.Replicas) // slice that will contain all Pods such that getStartOrdinal(set) <= getOrdinal(pod) <= getEndOrdinal(set) replicas := make([]*v1.Pod, replicaCount) // slice that will contain all Pods such that getOrdinal(pod) < getStartOrdinal(set) OR getOrdinal(pod) > getEndOrdinal(set) condemned := make([]*v1.Pod, 0, len(pods)) unhealthy := 0 var firstUnhealthyPod *v1.Pod // First we partition pods into two lists valid replicas and condemned Pods for _, pod := range pods { if podInOrdinalRange(pod, set) { // if the ordinal of the pod is within the range of the current number of replicas, // insert it at the indirection of its ordinal replicas[getOrdinal(pod)-getStartOrdinal(set)] = pod } else if getOrdinal(pod) >= 0 { // if the ordinal is valid, but not within the range add it to the condemned list condemned = append(condemned, pod) } // If the ordinal could not be parsed (ord < 0), ignore the Pod. } // for any empty indices in the sequence [0,set.Spec.Replicas) create a new Pod at the correct revision for ord := getStartOrdinal(set); ord <= getEndOrdinal(set); ord++ { replicaIdx := ord - getStartOrdinal(set) if replicas[replicaIdx] == nil { replicas[replicaIdx] = newVersionedStatefulSetPod( currentSet, updateSet, currentRevision.Name, updateRevision.Name, ord) } } // sort the condemned Pods by their ordinals sort.Sort(descendingOrdinal(condemned)) // find the first unhealthy Pod for i := range replicas { if !isHealthy(replicas[i]) { unhealthy++ if firstUnhealthyPod == nil { firstUnhealthyPod = replicas[i] } } } // or the first unhealthy condemned Pod (condemned are sorted in descending order for ease of use) for i := len(condemned) - 1; i >= 0; i-- { if !isHealthy(condemned[i]) { unhealthy++ if firstUnhealthyPod == nil { firstUnhealthyPod = condemned[i] } } } if unhealthy > 0 { logger.V(4).Info("StatefulSet has unhealthy Pods", "statefulSet", klog.KObj(set), "unhealthyReplicas", unhealthy, "pod", klog.KObj(firstUnhealthyPod)) } // If the StatefulSet is being deleted, don't do anything other than updating // status. if set.DeletionTimestamp != nil { return &status, nil } monotonic := !allowsBurst(set) // First, process each living replica. Exit if we run into an error or something blocking in monotonic mode. processReplicaFn := func(i int) (bool, error) { return ssc.processReplica(ctx, set, currentRevision, updateRevision, currentSet, updateSet, monotonic, replicas, i) } if shouldExit, err := runForAll(replicas, processReplicaFn, monotonic); shouldExit || err != nil { updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned) return &status, err } // Fix pod claims for condemned pods, if necessary. if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) { fixPodClaim := func(i int) (bool, error) { if matchPolicy, err := ssc.podControl.ClaimsMatchRetentionPolicy(ctx, updateSet, condemned[i]); err != nil { return true, err } else if !matchPolicy { if err := ssc.podControl.UpdatePodClaimForRetentionPolicy(ctx, updateSet, condemned[i]); err != nil { return true, err } } return false, nil } if shouldExit, err := runForAll(condemned, fixPodClaim, monotonic); shouldExit || err != nil { updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned) return &status, err } } // At this point, in monotonic mode all of the current Replicas are Running, Ready and Available, // and we can consider termination. // We will wait for all predecessors to be Running and Ready prior to attempting a deletion. // We will terminate Pods in a monotonically decreasing order. // Note that we do not resurrect Pods in this interval. Also note that scaling will take precedence over // updates. processCondemnedFn := func(i int) (bool, error) { return ssc.processCondemned(ctx, set, firstUnhealthyPod, monotonic, condemned, i) } if shouldExit, err := runForAll(condemned, processCondemnedFn, monotonic); shouldExit || err != nil { updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned) return &status, err } updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned) // for the OnDelete strategy we short circuit. Pods will be updated when they are manually deleted. if set.Spec.UpdateStrategy.Type == apps.OnDeleteStatefulSetStrategyType { return &status, nil } if utilfeature.DefaultFeatureGate.Enabled(features.MaxUnavailableStatefulSet) { return updateStatefulSetAfterInvariantEstablished(ctx, ssc, set, replicas, updateRevision, status, ) } // we compute the minimum ordinal of the target sequence for a destructive update based on the strategy. updateMin := 0 if set.Spec.UpdateStrategy.RollingUpdate != nil { updateMin = int(*set.Spec.UpdateStrategy.RollingUpdate.Partition) } // we terminate the Pod with the largest ordinal that does not match the update revision. for target := len(replicas) - 1; target >= updateMin; target-- { // delete the Pod if it is not already terminating and does not match the update revision. if getPodRevision(replicas[target]) != updateRevision.Name && !isTerminating(replicas[target]) { logger.V(2).Info("Pod of StatefulSet is terminating for update", "statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[target])) if err := ssc.podControl.DeleteStatefulPod(set, replicas[target]); err != nil { if !errors.IsNotFound(err) { return &status, err } } status.CurrentReplicas-- return &status, err } // wait for unhealthy Pods on update if !isHealthy(replicas[target]) { logger.V(4).Info("StatefulSet is waiting for Pod to update", "statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[target])) return &status, nil } } return &status, nil } func updateStatefulSetAfterInvariantEstablished( ctx context.Context, ssc *defaultStatefulSetControl, set *apps.StatefulSet, replicas []*v1.Pod, updateRevision *apps.ControllerRevision, status apps.StatefulSetStatus, ) (*apps.StatefulSetStatus, error) { logger := klog.FromContext(ctx) replicaCount := int(*set.Spec.Replicas) // we compute the minimum ordinal of the target sequence for a destructive update based on the strategy. updateMin := 0 maxUnavailable := 1 if set.Spec.UpdateStrategy.RollingUpdate != nil { updateMin = int(*set.Spec.UpdateStrategy.RollingUpdate.Partition) // if the feature was enabled and then later disabled, MaxUnavailable may have a value // more than 1. Ignore the passed in value and Use maxUnavailable as 1 to enforce // expected behavior when feature gate is not enabled. var err error maxUnavailable, err = getStatefulSetMaxUnavailable(set.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable, replicaCount) if err != nil { return &status, err } } // Collect all targets in the range between getStartOrdinal(set) and getEndOrdinal(set). Count any targets in that range // that are unhealthy i.e. terminated or not running and ready as unavailable). Select the // (MaxUnavailable - Unavailable) Pods, in order with respect to their ordinal for termination. Delete // those pods and count the successful deletions. Update the status with the correct number of deletions. unavailablePods := 0 for target := len(replicas) - 1; target >= 0; target-- { if !isHealthy(replicas[target]) { unavailablePods++ } } if unavailablePods >= maxUnavailable { logger.V(2).Info("StatefulSet found unavailablePods, more than or equal to allowed maxUnavailable", "statefulSet", klog.KObj(set), "unavailablePods", unavailablePods, "maxUnavailable", maxUnavailable) return &status, nil } // Now we need to delete MaxUnavailable- unavailablePods // start deleting one by one starting from the highest ordinal first podsToDelete := maxUnavailable - unavailablePods deletedPods := 0 for target := len(replicas) - 1; target >= updateMin && deletedPods < podsToDelete; target-- { // delete the Pod if it is healthy and the revision doesnt match the target if getPodRevision(replicas[target]) != updateRevision.Name && !isTerminating(replicas[target]) { // delete the Pod if it is healthy and the revision doesnt match the target logger.V(2).Info("StatefulSet terminating Pod for update", "statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[target])) if err := ssc.podControl.DeleteStatefulPod(set, replicas[target]); err != nil { if !errors.IsNotFound(err) { return &status, err } } deletedPods++ status.CurrentReplicas-- } } return &status, nil } // updateStatefulSetStatus updates set's Status to be equal to status. If status indicates a complete update, it is // mutated to indicate completion. If status is semantically equivalent to set's Status no update is performed. If the // returned error is nil, the update is successful. func (ssc *defaultStatefulSetControl) updateStatefulSetStatus( ctx context.Context, set *apps.StatefulSet, status *apps.StatefulSetStatus) error { // complete any in progress rolling update if necessary completeRollingUpdate(set, status) // if the status is not inconsistent do not perform an update if !inconsistentStatus(set, status) { return nil } // copy set and update its status set = set.DeepCopy() if err := ssc.statusUpdater.UpdateStatefulSetStatus(ctx, set, status); err != nil { return err } return nil } var _ StatefulSetControlInterface = &defaultStatefulSetControl{}