...

Source file src/k8s.io/kubernetes/pkg/controller/statefulset/stateful_set_control.go

Documentation: k8s.io/kubernetes/pkg/controller/statefulset

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package statefulset
    18  
    19  import (
    20  	"context"
    21  	"sort"
    22  	"sync"
    23  
    24  	apps "k8s.io/api/apps/v1"
    25  	v1 "k8s.io/api/core/v1"
    26  	"k8s.io/apimachinery/pkg/api/errors"
    27  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    28  	utilerrors "k8s.io/apimachinery/pkg/util/errors"
    29  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    30  	"k8s.io/client-go/tools/record"
    31  	"k8s.io/klog/v2"
    32  	"k8s.io/kubernetes/pkg/controller/history"
    33  	"k8s.io/kubernetes/pkg/features"
    34  )
    35  
    36  // Realistic value for maximum in-flight requests when processing in parallel mode.
    37  const MaxBatchSize = 500
    38  
    39  // StatefulSetControl implements the control logic for updating StatefulSets and their children Pods. It is implemented
    40  // as an interface to allow for extensions that provide different semantics. Currently, there is only one implementation.
    41  type StatefulSetControlInterface interface {
    42  	// UpdateStatefulSet implements the control logic for Pod creation, update, and deletion, and
    43  	// persistent volume creation, update, and deletion.
    44  	// If an implementation returns a non-nil error, the invocation will be retried using a rate-limited strategy.
    45  	// Implementors should sink any errors that they do not wish to trigger a retry, and they may feel free to
    46  	// exit exceptionally at any point provided they wish the update to be re-run at a later point in time.
    47  	UpdateStatefulSet(ctx context.Context, set *apps.StatefulSet, pods []*v1.Pod) (*apps.StatefulSetStatus, error)
    48  	// ListRevisions returns a array of the ControllerRevisions that represent the revisions of set. If the returned
    49  	// error is nil, the returns slice of ControllerRevisions is valid.
    50  	ListRevisions(set *apps.StatefulSet) ([]*apps.ControllerRevision, error)
    51  	// AdoptOrphanRevisions adopts any orphaned ControllerRevisions that match set's Selector. If all adoptions are
    52  	// successful the returned error is nil.
    53  	AdoptOrphanRevisions(set *apps.StatefulSet, revisions []*apps.ControllerRevision) error
    54  }
    55  
    56  // NewDefaultStatefulSetControl returns a new instance of the default implementation StatefulSetControlInterface that
    57  // implements the documented semantics for StatefulSets. podControl is the PodControlInterface used to create, update,
    58  // and delete Pods and to create PersistentVolumeClaims. statusUpdater is the StatefulSetStatusUpdaterInterface used
    59  // to update the status of StatefulSets. You should use an instance returned from NewRealStatefulPodControl() for any
    60  // scenario other than testing.
    61  func NewDefaultStatefulSetControl(
    62  	podControl *StatefulPodControl,
    63  	statusUpdater StatefulSetStatusUpdaterInterface,
    64  	controllerHistory history.Interface,
    65  	recorder record.EventRecorder) StatefulSetControlInterface {
    66  	return &defaultStatefulSetControl{podControl, statusUpdater, controllerHistory, recorder}
    67  }
    68  
    69  type defaultStatefulSetControl struct {
    70  	podControl        *StatefulPodControl
    71  	statusUpdater     StatefulSetStatusUpdaterInterface
    72  	controllerHistory history.Interface
    73  	recorder          record.EventRecorder
    74  }
    75  
    76  // UpdateStatefulSet executes the core logic loop for a stateful set, applying the predictable and
    77  // consistent monotonic update strategy by default - scale up proceeds in ordinal order, no new pod
    78  // is created while any pod is unhealthy, and pods are terminated in descending order. The burst
    79  // strategy allows these constraints to be relaxed - pods will be created and deleted eagerly and
    80  // in no particular order. Clients using the burst strategy should be careful to ensure they
    81  // understand the consistency implications of having unpredictable numbers of pods available.
    82  func (ssc *defaultStatefulSetControl) UpdateStatefulSet(ctx context.Context, set *apps.StatefulSet, pods []*v1.Pod) (*apps.StatefulSetStatus, error) {
    83  	set = set.DeepCopy() // set is modified when a new revision is created in performUpdate. Make a copy now to avoid mutation errors.
    84  
    85  	// list all revisions and sort them
    86  	revisions, err := ssc.ListRevisions(set)
    87  	if err != nil {
    88  		return nil, err
    89  	}
    90  	history.SortControllerRevisions(revisions)
    91  
    92  	currentRevision, updateRevision, status, err := ssc.performUpdate(ctx, set, pods, revisions)
    93  	if err != nil {
    94  		errs := []error{err}
    95  		if agg, ok := err.(utilerrors.Aggregate); ok {
    96  			errs = agg.Errors()
    97  		}
    98  		return nil, utilerrors.NewAggregate(append(errs, ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision)))
    99  	}
   100  
   101  	// maintain the set's revision history limit
   102  	return status, ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision)
   103  }
   104  
   105  func (ssc *defaultStatefulSetControl) performUpdate(
   106  	ctx context.Context, set *apps.StatefulSet, pods []*v1.Pod, revisions []*apps.ControllerRevision) (*apps.ControllerRevision, *apps.ControllerRevision, *apps.StatefulSetStatus, error) {
   107  	var currentStatus *apps.StatefulSetStatus
   108  	logger := klog.FromContext(ctx)
   109  	// get the current, and update revisions
   110  	currentRevision, updateRevision, collisionCount, err := ssc.getStatefulSetRevisions(set, revisions)
   111  	if err != nil {
   112  		return currentRevision, updateRevision, currentStatus, err
   113  	}
   114  
   115  	// perform the main update function and get the status
   116  	currentStatus, err = ssc.updateStatefulSet(ctx, set, currentRevision, updateRevision, collisionCount, pods)
   117  	if err != nil && currentStatus == nil {
   118  		return currentRevision, updateRevision, nil, err
   119  	}
   120  
   121  	// make sure to update the latest status even if there is an error with non-nil currentStatus
   122  	statusErr := ssc.updateStatefulSetStatus(ctx, set, currentStatus)
   123  	if statusErr == nil {
   124  		logger.V(4).Info("Updated status", "statefulSet", klog.KObj(set),
   125  			"replicas", currentStatus.Replicas,
   126  			"readyReplicas", currentStatus.ReadyReplicas,
   127  			"currentReplicas", currentStatus.CurrentReplicas,
   128  			"updatedReplicas", currentStatus.UpdatedReplicas)
   129  	}
   130  
   131  	switch {
   132  	case err != nil && statusErr != nil:
   133  		logger.Error(statusErr, "Could not update status", "statefulSet", klog.KObj(set))
   134  		return currentRevision, updateRevision, currentStatus, err
   135  	case err != nil:
   136  		return currentRevision, updateRevision, currentStatus, err
   137  	case statusErr != nil:
   138  		return currentRevision, updateRevision, currentStatus, statusErr
   139  	}
   140  
   141  	logger.V(4).Info("StatefulSet revisions", "statefulSet", klog.KObj(set),
   142  		"currentRevision", currentStatus.CurrentRevision,
   143  		"updateRevision", currentStatus.UpdateRevision)
   144  
   145  	return currentRevision, updateRevision, currentStatus, nil
   146  }
   147  
   148  func (ssc *defaultStatefulSetControl) ListRevisions(set *apps.StatefulSet) ([]*apps.ControllerRevision, error) {
   149  	selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
   150  	if err != nil {
   151  		return nil, err
   152  	}
   153  	return ssc.controllerHistory.ListControllerRevisions(set, selector)
   154  }
   155  
   156  func (ssc *defaultStatefulSetControl) AdoptOrphanRevisions(
   157  	set *apps.StatefulSet,
   158  	revisions []*apps.ControllerRevision) error {
   159  	for i := range revisions {
   160  		adopted, err := ssc.controllerHistory.AdoptControllerRevision(set, controllerKind, revisions[i])
   161  		if err != nil {
   162  			return err
   163  		}
   164  		revisions[i] = adopted
   165  	}
   166  	return nil
   167  }
   168  
   169  // truncateHistory truncates any non-live ControllerRevisions in revisions from set's history. The UpdateRevision and
   170  // CurrentRevision in set's Status are considered to be live. Any revisions associated with the Pods in pods are also
   171  // considered to be live. Non-live revisions are deleted, starting with the revision with the lowest Revision, until
   172  // only RevisionHistoryLimit revisions remain. If the returned error is nil the operation was successful. This method
   173  // expects that revisions is sorted when supplied.
   174  func (ssc *defaultStatefulSetControl) truncateHistory(
   175  	set *apps.StatefulSet,
   176  	pods []*v1.Pod,
   177  	revisions []*apps.ControllerRevision,
   178  	current *apps.ControllerRevision,
   179  	update *apps.ControllerRevision) error {
   180  	history := make([]*apps.ControllerRevision, 0, len(revisions))
   181  	// mark all live revisions
   182  	live := map[string]bool{}
   183  	if current != nil {
   184  		live[current.Name] = true
   185  	}
   186  	if update != nil {
   187  		live[update.Name] = true
   188  	}
   189  	for i := range pods {
   190  		live[getPodRevision(pods[i])] = true
   191  	}
   192  	// collect live revisions and historic revisions
   193  	for i := range revisions {
   194  		if !live[revisions[i].Name] {
   195  			history = append(history, revisions[i])
   196  		}
   197  	}
   198  	historyLen := len(history)
   199  	historyLimit := int(*set.Spec.RevisionHistoryLimit)
   200  	if historyLen <= historyLimit {
   201  		return nil
   202  	}
   203  	// delete any non-live history to maintain the revision limit.
   204  	history = history[:(historyLen - historyLimit)]
   205  	for i := 0; i < len(history); i++ {
   206  		if err := ssc.controllerHistory.DeleteControllerRevision(history[i]); err != nil {
   207  			return err
   208  		}
   209  	}
   210  	return nil
   211  }
   212  
   213  // getStatefulSetRevisions returns the current and update ControllerRevisions for set. It also
   214  // returns a collision count that records the number of name collisions set saw when creating
   215  // new ControllerRevisions. This count is incremented on every name collision and is used in
   216  // building the ControllerRevision names for name collision avoidance. This method may create
   217  // a new revision, or modify the Revision of an existing revision if an update to set is detected.
   218  // This method expects that revisions is sorted when supplied.
   219  func (ssc *defaultStatefulSetControl) getStatefulSetRevisions(
   220  	set *apps.StatefulSet,
   221  	revisions []*apps.ControllerRevision) (*apps.ControllerRevision, *apps.ControllerRevision, int32, error) {
   222  	var currentRevision, updateRevision *apps.ControllerRevision
   223  
   224  	revisionCount := len(revisions)
   225  	history.SortControllerRevisions(revisions)
   226  
   227  	// Use a local copy of set.Status.CollisionCount to avoid modifying set.Status directly.
   228  	// This copy is returned so the value gets carried over to set.Status in updateStatefulSet.
   229  	var collisionCount int32
   230  	if set.Status.CollisionCount != nil {
   231  		collisionCount = *set.Status.CollisionCount
   232  	}
   233  
   234  	// create a new revision from the current set
   235  	updateRevision, err := newRevision(set, nextRevision(revisions), &collisionCount)
   236  	if err != nil {
   237  		return nil, nil, collisionCount, err
   238  	}
   239  
   240  	// find any equivalent revisions
   241  	equalRevisions := history.FindEqualRevisions(revisions, updateRevision)
   242  	equalCount := len(equalRevisions)
   243  
   244  	if equalCount > 0 && history.EqualRevision(revisions[revisionCount-1], equalRevisions[equalCount-1]) {
   245  		// if the equivalent revision is immediately prior the update revision has not changed
   246  		updateRevision = revisions[revisionCount-1]
   247  	} else if equalCount > 0 {
   248  		// if the equivalent revision is not immediately prior we will roll back by incrementing the
   249  		// Revision of the equivalent revision
   250  		updateRevision, err = ssc.controllerHistory.UpdateControllerRevision(
   251  			equalRevisions[equalCount-1],
   252  			updateRevision.Revision)
   253  		if err != nil {
   254  			return nil, nil, collisionCount, err
   255  		}
   256  	} else {
   257  		//if there is no equivalent revision we create a new one
   258  		updateRevision, err = ssc.controllerHistory.CreateControllerRevision(set, updateRevision, &collisionCount)
   259  		if err != nil {
   260  			return nil, nil, collisionCount, err
   261  		}
   262  	}
   263  
   264  	// attempt to find the revision that corresponds to the current revision
   265  	for i := range revisions {
   266  		if revisions[i].Name == set.Status.CurrentRevision {
   267  			currentRevision = revisions[i]
   268  			break
   269  		}
   270  	}
   271  
   272  	// if the current revision is nil we initialize the history by setting it to the update revision
   273  	if currentRevision == nil {
   274  		currentRevision = updateRevision
   275  	}
   276  
   277  	return currentRevision, updateRevision, collisionCount, nil
   278  }
   279  
   280  func slowStartBatch(initialBatchSize int, remaining int, fn func(int) (bool, error)) (int, error) {
   281  	successes := 0
   282  	j := 0
   283  	for batchSize := min(remaining, initialBatchSize); batchSize > 0; batchSize = min(min(2*batchSize, remaining), MaxBatchSize) {
   284  		errCh := make(chan error, batchSize)
   285  		var wg sync.WaitGroup
   286  		wg.Add(batchSize)
   287  		for i := 0; i < batchSize; i++ {
   288  			go func(k int) {
   289  				defer wg.Done()
   290  				// Ignore the first parameter - relevant for monotonic only.
   291  				if _, err := fn(k); err != nil {
   292  					errCh <- err
   293  				}
   294  			}(j)
   295  			j++
   296  		}
   297  		wg.Wait()
   298  		successes += batchSize - len(errCh)
   299  		close(errCh)
   300  		if len(errCh) > 0 {
   301  			errs := make([]error, 0)
   302  			for err := range errCh {
   303  				errs = append(errs, err)
   304  			}
   305  			return successes, utilerrors.NewAggregate(errs)
   306  		}
   307  		remaining -= batchSize
   308  	}
   309  	return successes, nil
   310  }
   311  
   312  type replicaStatus struct {
   313  	replicas          int32
   314  	readyReplicas     int32
   315  	availableReplicas int32
   316  	currentReplicas   int32
   317  	updatedReplicas   int32
   318  }
   319  
   320  func computeReplicaStatus(pods []*v1.Pod, minReadySeconds int32, currentRevision, updateRevision *apps.ControllerRevision) replicaStatus {
   321  	status := replicaStatus{}
   322  	for _, pod := range pods {
   323  		if isCreated(pod) {
   324  			status.replicas++
   325  		}
   326  
   327  		// count the number of running and ready replicas
   328  		if isRunningAndReady(pod) {
   329  			status.readyReplicas++
   330  			// count the number of running and available replicas
   331  			if isRunningAndAvailable(pod, minReadySeconds) {
   332  				status.availableReplicas++
   333  			}
   334  
   335  		}
   336  
   337  		// count the number of current and update replicas
   338  		if isCreated(pod) && !isTerminating(pod) {
   339  			revision := getPodRevision(pod)
   340  			if revision == currentRevision.Name {
   341  				status.currentReplicas++
   342  			}
   343  			if revision == updateRevision.Name {
   344  				status.updatedReplicas++
   345  			}
   346  		}
   347  	}
   348  	return status
   349  }
   350  
   351  func updateStatus(status *apps.StatefulSetStatus, minReadySeconds int32, currentRevision, updateRevision *apps.ControllerRevision, podLists ...[]*v1.Pod) {
   352  	status.Replicas = 0
   353  	status.ReadyReplicas = 0
   354  	status.AvailableReplicas = 0
   355  	status.CurrentReplicas = 0
   356  	status.UpdatedReplicas = 0
   357  	for _, list := range podLists {
   358  		replicaStatus := computeReplicaStatus(list, minReadySeconds, currentRevision, updateRevision)
   359  		status.Replicas += replicaStatus.replicas
   360  		status.ReadyReplicas += replicaStatus.readyReplicas
   361  		status.AvailableReplicas += replicaStatus.availableReplicas
   362  		status.CurrentReplicas += replicaStatus.currentReplicas
   363  		status.UpdatedReplicas += replicaStatus.updatedReplicas
   364  	}
   365  }
   366  
   367  func (ssc *defaultStatefulSetControl) processReplica(
   368  	ctx context.Context,
   369  	set *apps.StatefulSet,
   370  	currentRevision *apps.ControllerRevision,
   371  	updateRevision *apps.ControllerRevision,
   372  	currentSet *apps.StatefulSet,
   373  	updateSet *apps.StatefulSet,
   374  	monotonic bool,
   375  	replicas []*v1.Pod,
   376  	i int) (bool, error) {
   377  	logger := klog.FromContext(ctx)
   378  	// Delete and recreate pods which finished running.
   379  	//
   380  	// Note that pods with phase Succeeded will also trigger this event. This is
   381  	// because final pod phase of evicted or otherwise forcibly stopped pods
   382  	// (e.g. terminated on node reboot) is determined by the exit code of the
   383  	// container, not by the reason for pod termination. We should restart the pod
   384  	// regardless of the exit code.
   385  	if isFailed(replicas[i]) || isSucceeded(replicas[i]) {
   386  		if isFailed(replicas[i]) {
   387  			ssc.recorder.Eventf(set, v1.EventTypeWarning, "RecreatingFailedPod",
   388  				"StatefulSet %s/%s is recreating failed Pod %s",
   389  				set.Namespace,
   390  				set.Name,
   391  				replicas[i].Name)
   392  		} else {
   393  			ssc.recorder.Eventf(set, v1.EventTypeNormal, "RecreatingTerminatedPod",
   394  				"StatefulSet %s/%s is recreating terminated Pod %s",
   395  				set.Namespace,
   396  				set.Name,
   397  				replicas[i].Name)
   398  		}
   399  		if err := ssc.podControl.DeleteStatefulPod(set, replicas[i]); err != nil {
   400  			return true, err
   401  		}
   402  		replicaOrd := i + getStartOrdinal(set)
   403  		replicas[i] = newVersionedStatefulSetPod(
   404  			currentSet,
   405  			updateSet,
   406  			currentRevision.Name,
   407  			updateRevision.Name,
   408  			replicaOrd)
   409  	}
   410  	// If we find a Pod that has not been created we create the Pod
   411  	if !isCreated(replicas[i]) {
   412  		if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) {
   413  			if isStale, err := ssc.podControl.PodClaimIsStale(set, replicas[i]); err != nil {
   414  				return true, err
   415  			} else if isStale {
   416  				// If a pod has a stale PVC, no more work can be done this round.
   417  				return true, err
   418  			}
   419  		}
   420  		if err := ssc.podControl.CreateStatefulPod(ctx, set, replicas[i]); err != nil {
   421  			return true, err
   422  		}
   423  		if monotonic {
   424  			// if the set does not allow bursting, return immediately
   425  			return true, nil
   426  		}
   427  	}
   428  
   429  	// If the Pod is in pending state then trigger PVC creation to create missing PVCs
   430  	if isPending(replicas[i]) {
   431  		logger.V(4).Info(
   432  			"StatefulSet is triggering PVC creation for pending Pod",
   433  			"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i]))
   434  		if err := ssc.podControl.createMissingPersistentVolumeClaims(ctx, set, replicas[i]); err != nil {
   435  			return true, err
   436  		}
   437  	}
   438  
   439  	// If we find a Pod that is currently terminating, we must wait until graceful deletion
   440  	// completes before we continue to make progress.
   441  	if isTerminating(replicas[i]) && monotonic {
   442  		logger.V(4).Info("StatefulSet is waiting for Pod to Terminate",
   443  			"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i]))
   444  		return true, nil
   445  	}
   446  
   447  	// If we have a Pod that has been created but is not running and ready we can not make progress.
   448  	// We must ensure that all for each Pod, when we create it, all of its predecessors, with respect to its
   449  	// ordinal, are Running and Ready.
   450  	if !isRunningAndReady(replicas[i]) && monotonic {
   451  		logger.V(4).Info("StatefulSet is waiting for Pod to be Running and Ready",
   452  			"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i]))
   453  		return true, nil
   454  	}
   455  
   456  	// If we have a Pod that has been created but is not available we can not make progress.
   457  	// We must ensure that all for each Pod, when we create it, all of its predecessors, with respect to its
   458  	// ordinal, are Available.
   459  	if !isRunningAndAvailable(replicas[i], set.Spec.MinReadySeconds) && monotonic {
   460  		logger.V(4).Info("StatefulSet is waiting for Pod to be Available",
   461  			"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i]))
   462  		return true, nil
   463  	}
   464  
   465  	// Enforce the StatefulSet invariants
   466  	retentionMatch := true
   467  	if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) {
   468  		var err error
   469  		retentionMatch, err = ssc.podControl.ClaimsMatchRetentionPolicy(ctx, updateSet, replicas[i])
   470  		// An error is expected if the pod is not yet fully updated, and so return is treated as matching.
   471  		if err != nil {
   472  			retentionMatch = true
   473  		}
   474  	}
   475  
   476  	if identityMatches(set, replicas[i]) && storageMatches(set, replicas[i]) && retentionMatch {
   477  		return false, nil
   478  	}
   479  
   480  	// Make a deep copy so we don't mutate the shared cache
   481  	replica := replicas[i].DeepCopy()
   482  	if err := ssc.podControl.UpdateStatefulPod(ctx, updateSet, replica); err != nil {
   483  		return true, err
   484  	}
   485  
   486  	return false, nil
   487  }
   488  
   489  func (ssc *defaultStatefulSetControl) processCondemned(ctx context.Context, set *apps.StatefulSet, firstUnhealthyPod *v1.Pod, monotonic bool, condemned []*v1.Pod, i int) (bool, error) {
   490  	logger := klog.FromContext(ctx)
   491  	if isTerminating(condemned[i]) {
   492  		// if we are in monotonic mode, block and wait for terminating pods to expire
   493  		if monotonic {
   494  			logger.V(4).Info("StatefulSet is waiting for Pod to Terminate prior to scale down",
   495  				"statefulSet", klog.KObj(set), "pod", klog.KObj(condemned[i]))
   496  			return true, nil
   497  		}
   498  		return false, nil
   499  	}
   500  	// if we are in monotonic mode and the condemned target is not the first unhealthy Pod block
   501  	if !isRunningAndReady(condemned[i]) && monotonic && condemned[i] != firstUnhealthyPod {
   502  		logger.V(4).Info("StatefulSet is waiting for Pod to be Running and Ready prior to scale down",
   503  			"statefulSet", klog.KObj(set), "pod", klog.KObj(firstUnhealthyPod))
   504  		return true, nil
   505  	}
   506  	// if we are in monotonic mode and the condemned target is not the first unhealthy Pod, block.
   507  	if !isRunningAndAvailable(condemned[i], set.Spec.MinReadySeconds) && monotonic && condemned[i] != firstUnhealthyPod {
   508  		logger.V(4).Info("StatefulSet is waiting for Pod to be Available prior to scale down",
   509  			"statefulSet", klog.KObj(set), "pod", klog.KObj(firstUnhealthyPod))
   510  		return true, nil
   511  	}
   512  
   513  	logger.V(2).Info("Pod of StatefulSet is terminating for scale down",
   514  		"statefulSet", klog.KObj(set), "pod", klog.KObj(condemned[i]))
   515  	return true, ssc.podControl.DeleteStatefulPod(set, condemned[i])
   516  }
   517  
   518  func runForAll(pods []*v1.Pod, fn func(i int) (bool, error), monotonic bool) (bool, error) {
   519  	if monotonic {
   520  		for i := range pods {
   521  			if shouldExit, err := fn(i); shouldExit || err != nil {
   522  				return true, err
   523  			}
   524  		}
   525  	} else {
   526  		if _, err := slowStartBatch(1, len(pods), fn); err != nil {
   527  			return true, err
   528  		}
   529  	}
   530  	return false, nil
   531  }
   532  
   533  // updateStatefulSet performs the update function for a StatefulSet. This method creates, updates, and deletes Pods in
   534  // the set in order to conform the system to the target state for the set. The target state always contains
   535  // set.Spec.Replicas Pods with a Ready Condition. If the UpdateStrategy.Type for the set is
   536  // RollingUpdateStatefulSetStrategyType then all Pods in the set must be at set.Status.CurrentRevision.
   537  // If the UpdateStrategy.Type for the set is OnDeleteStatefulSetStrategyType, the target state implies nothing about
   538  // the revisions of Pods in the set. If the UpdateStrategy.Type for the set is PartitionStatefulSetStrategyType, then
   539  // all Pods with ordinal less than UpdateStrategy.Partition.Ordinal must be at Status.CurrentRevision and all other
   540  // Pods must be at Status.UpdateRevision. If the returned error is nil, the returned StatefulSetStatus is valid and the
   541  // update must be recorded. If the error is not nil, the method should be retried until successful.
   542  func (ssc *defaultStatefulSetControl) updateStatefulSet(
   543  	ctx context.Context,
   544  	set *apps.StatefulSet,
   545  	currentRevision *apps.ControllerRevision,
   546  	updateRevision *apps.ControllerRevision,
   547  	collisionCount int32,
   548  	pods []*v1.Pod) (*apps.StatefulSetStatus, error) {
   549  	logger := klog.FromContext(ctx)
   550  	// get the current and update revisions of the set.
   551  	currentSet, err := ApplyRevision(set, currentRevision)
   552  	if err != nil {
   553  		return nil, err
   554  	}
   555  	updateSet, err := ApplyRevision(set, updateRevision)
   556  	if err != nil {
   557  		return nil, err
   558  	}
   559  
   560  	// set the generation, and revisions in the returned status
   561  	status := apps.StatefulSetStatus{}
   562  	status.ObservedGeneration = set.Generation
   563  	status.CurrentRevision = currentRevision.Name
   564  	status.UpdateRevision = updateRevision.Name
   565  	status.CollisionCount = new(int32)
   566  	*status.CollisionCount = collisionCount
   567  
   568  	updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, pods)
   569  
   570  	replicaCount := int(*set.Spec.Replicas)
   571  	// slice that will contain all Pods such that getStartOrdinal(set) <= getOrdinal(pod) <= getEndOrdinal(set)
   572  	replicas := make([]*v1.Pod, replicaCount)
   573  	// slice that will contain all Pods such that getOrdinal(pod) < getStartOrdinal(set) OR getOrdinal(pod) > getEndOrdinal(set)
   574  	condemned := make([]*v1.Pod, 0, len(pods))
   575  	unhealthy := 0
   576  	var firstUnhealthyPod *v1.Pod
   577  
   578  	// First we partition pods into two lists valid replicas and condemned Pods
   579  	for _, pod := range pods {
   580  		if podInOrdinalRange(pod, set) {
   581  			// if the ordinal of the pod is within the range of the current number of replicas,
   582  			// insert it at the indirection of its ordinal
   583  			replicas[getOrdinal(pod)-getStartOrdinal(set)] = pod
   584  		} else if getOrdinal(pod) >= 0 {
   585  			// if the ordinal is valid, but not within the range add it to the condemned list
   586  			condemned = append(condemned, pod)
   587  		}
   588  		// If the ordinal could not be parsed (ord < 0), ignore the Pod.
   589  	}
   590  
   591  	// for any empty indices in the sequence [0,set.Spec.Replicas) create a new Pod at the correct revision
   592  	for ord := getStartOrdinal(set); ord <= getEndOrdinal(set); ord++ {
   593  		replicaIdx := ord - getStartOrdinal(set)
   594  		if replicas[replicaIdx] == nil {
   595  			replicas[replicaIdx] = newVersionedStatefulSetPod(
   596  				currentSet,
   597  				updateSet,
   598  				currentRevision.Name,
   599  				updateRevision.Name, ord)
   600  		}
   601  	}
   602  
   603  	// sort the condemned Pods by their ordinals
   604  	sort.Sort(descendingOrdinal(condemned))
   605  
   606  	// find the first unhealthy Pod
   607  	for i := range replicas {
   608  		if !isHealthy(replicas[i]) {
   609  			unhealthy++
   610  			if firstUnhealthyPod == nil {
   611  				firstUnhealthyPod = replicas[i]
   612  			}
   613  		}
   614  	}
   615  
   616  	// or the first unhealthy condemned Pod (condemned are sorted in descending order for ease of use)
   617  	for i := len(condemned) - 1; i >= 0; i-- {
   618  		if !isHealthy(condemned[i]) {
   619  			unhealthy++
   620  			if firstUnhealthyPod == nil {
   621  				firstUnhealthyPod = condemned[i]
   622  			}
   623  		}
   624  	}
   625  
   626  	if unhealthy > 0 {
   627  		logger.V(4).Info("StatefulSet has unhealthy Pods", "statefulSet", klog.KObj(set), "unhealthyReplicas", unhealthy, "pod", klog.KObj(firstUnhealthyPod))
   628  	}
   629  
   630  	// If the StatefulSet is being deleted, don't do anything other than updating
   631  	// status.
   632  	if set.DeletionTimestamp != nil {
   633  		return &status, nil
   634  	}
   635  
   636  	monotonic := !allowsBurst(set)
   637  
   638  	// First, process each living replica. Exit if we run into an error or something blocking in monotonic mode.
   639  	processReplicaFn := func(i int) (bool, error) {
   640  		return ssc.processReplica(ctx, set, currentRevision, updateRevision, currentSet, updateSet, monotonic, replicas, i)
   641  	}
   642  	if shouldExit, err := runForAll(replicas, processReplicaFn, monotonic); shouldExit || err != nil {
   643  		updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned)
   644  		return &status, err
   645  	}
   646  
   647  	// Fix pod claims for condemned pods, if necessary.
   648  	if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) {
   649  		fixPodClaim := func(i int) (bool, error) {
   650  			if matchPolicy, err := ssc.podControl.ClaimsMatchRetentionPolicy(ctx, updateSet, condemned[i]); err != nil {
   651  				return true, err
   652  			} else if !matchPolicy {
   653  				if err := ssc.podControl.UpdatePodClaimForRetentionPolicy(ctx, updateSet, condemned[i]); err != nil {
   654  					return true, err
   655  				}
   656  			}
   657  			return false, nil
   658  		}
   659  		if shouldExit, err := runForAll(condemned, fixPodClaim, monotonic); shouldExit || err != nil {
   660  			updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned)
   661  			return &status, err
   662  		}
   663  	}
   664  
   665  	// At this point, in monotonic mode all of the current Replicas are Running, Ready and Available,
   666  	// and we can consider termination.
   667  	// We will wait for all predecessors to be Running and Ready prior to attempting a deletion.
   668  	// We will terminate Pods in a monotonically decreasing order.
   669  	// Note that we do not resurrect Pods in this interval. Also note that scaling will take precedence over
   670  	// updates.
   671  	processCondemnedFn := func(i int) (bool, error) {
   672  		return ssc.processCondemned(ctx, set, firstUnhealthyPod, monotonic, condemned, i)
   673  	}
   674  	if shouldExit, err := runForAll(condemned, processCondemnedFn, monotonic); shouldExit || err != nil {
   675  		updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned)
   676  		return &status, err
   677  	}
   678  
   679  	updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned)
   680  
   681  	// for the OnDelete strategy we short circuit. Pods will be updated when they are manually deleted.
   682  	if set.Spec.UpdateStrategy.Type == apps.OnDeleteStatefulSetStrategyType {
   683  		return &status, nil
   684  	}
   685  
   686  	if utilfeature.DefaultFeatureGate.Enabled(features.MaxUnavailableStatefulSet) {
   687  		return updateStatefulSetAfterInvariantEstablished(ctx,
   688  			ssc,
   689  			set,
   690  			replicas,
   691  			updateRevision,
   692  			status,
   693  		)
   694  	}
   695  
   696  	// we compute the minimum ordinal of the target sequence for a destructive update based on the strategy.
   697  	updateMin := 0
   698  	if set.Spec.UpdateStrategy.RollingUpdate != nil {
   699  		updateMin = int(*set.Spec.UpdateStrategy.RollingUpdate.Partition)
   700  	}
   701  	// we terminate the Pod with the largest ordinal that does not match the update revision.
   702  	for target := len(replicas) - 1; target >= updateMin; target-- {
   703  
   704  		// delete the Pod if it is not already terminating and does not match the update revision.
   705  		if getPodRevision(replicas[target]) != updateRevision.Name && !isTerminating(replicas[target]) {
   706  			logger.V(2).Info("Pod of StatefulSet is terminating for update",
   707  				"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[target]))
   708  			if err := ssc.podControl.DeleteStatefulPod(set, replicas[target]); err != nil {
   709  				if !errors.IsNotFound(err) {
   710  					return &status, err
   711  				}
   712  			}
   713  			status.CurrentReplicas--
   714  			return &status, err
   715  		}
   716  
   717  		// wait for unhealthy Pods on update
   718  		if !isHealthy(replicas[target]) {
   719  			logger.V(4).Info("StatefulSet is waiting for Pod to update",
   720  				"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[target]))
   721  			return &status, nil
   722  		}
   723  
   724  	}
   725  	return &status, nil
   726  }
   727  
   728  func updateStatefulSetAfterInvariantEstablished(
   729  	ctx context.Context,
   730  	ssc *defaultStatefulSetControl,
   731  	set *apps.StatefulSet,
   732  	replicas []*v1.Pod,
   733  	updateRevision *apps.ControllerRevision,
   734  	status apps.StatefulSetStatus,
   735  ) (*apps.StatefulSetStatus, error) {
   736  
   737  	logger := klog.FromContext(ctx)
   738  	replicaCount := int(*set.Spec.Replicas)
   739  
   740  	// we compute the minimum ordinal of the target sequence for a destructive update based on the strategy.
   741  	updateMin := 0
   742  	maxUnavailable := 1
   743  	if set.Spec.UpdateStrategy.RollingUpdate != nil {
   744  		updateMin = int(*set.Spec.UpdateStrategy.RollingUpdate.Partition)
   745  
   746  		// if the feature was enabled and then later disabled, MaxUnavailable may have a value
   747  		// more than 1. Ignore the passed in value and Use maxUnavailable as 1 to enforce
   748  		// expected behavior when feature gate is not enabled.
   749  		var err error
   750  		maxUnavailable, err = getStatefulSetMaxUnavailable(set.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable, replicaCount)
   751  		if err != nil {
   752  			return &status, err
   753  		}
   754  	}
   755  
   756  	// Collect all targets in the range between getStartOrdinal(set) and getEndOrdinal(set). Count any targets in that range
   757  	// that are unhealthy i.e. terminated or not running and ready as unavailable). Select the
   758  	// (MaxUnavailable - Unavailable) Pods, in order with respect to their ordinal for termination. Delete
   759  	// those pods and count the successful deletions. Update the status with the correct number of deletions.
   760  	unavailablePods := 0
   761  	for target := len(replicas) - 1; target >= 0; target-- {
   762  		if !isHealthy(replicas[target]) {
   763  			unavailablePods++
   764  		}
   765  	}
   766  
   767  	if unavailablePods >= maxUnavailable {
   768  		logger.V(2).Info("StatefulSet found unavailablePods, more than or equal to allowed maxUnavailable",
   769  			"statefulSet", klog.KObj(set),
   770  			"unavailablePods", unavailablePods,
   771  			"maxUnavailable", maxUnavailable)
   772  		return &status, nil
   773  	}
   774  
   775  	// Now we need to delete MaxUnavailable- unavailablePods
   776  	// start deleting one by one starting from the highest ordinal first
   777  	podsToDelete := maxUnavailable - unavailablePods
   778  
   779  	deletedPods := 0
   780  	for target := len(replicas) - 1; target >= updateMin && deletedPods < podsToDelete; target-- {
   781  
   782  		// delete the Pod if it is healthy and the revision doesnt match the target
   783  		if getPodRevision(replicas[target]) != updateRevision.Name && !isTerminating(replicas[target]) {
   784  			// delete the Pod if it is healthy and the revision doesnt match the target
   785  			logger.V(2).Info("StatefulSet terminating Pod for update",
   786  				"statefulSet", klog.KObj(set),
   787  				"pod", klog.KObj(replicas[target]))
   788  			if err := ssc.podControl.DeleteStatefulPod(set, replicas[target]); err != nil {
   789  				if !errors.IsNotFound(err) {
   790  					return &status, err
   791  				}
   792  			}
   793  			deletedPods++
   794  			status.CurrentReplicas--
   795  		}
   796  	}
   797  	return &status, nil
   798  }
   799  
   800  // updateStatefulSetStatus updates set's Status to be equal to status. If status indicates a complete update, it is
   801  // mutated to indicate completion. If status is semantically equivalent to set's Status no update is performed. If the
   802  // returned error is nil, the update is successful.
   803  func (ssc *defaultStatefulSetControl) updateStatefulSetStatus(
   804  	ctx context.Context,
   805  	set *apps.StatefulSet,
   806  	status *apps.StatefulSetStatus) error {
   807  	// complete any in progress rolling update if necessary
   808  	completeRollingUpdate(set, status)
   809  
   810  	// if the status is not inconsistent do not perform an update
   811  	if !inconsistentStatus(set, status) {
   812  		return nil
   813  	}
   814  
   815  	// copy set and update its status
   816  	set = set.DeepCopy()
   817  	if err := ssc.statusUpdater.UpdateStatefulSetStatus(ctx, set, status); err != nil {
   818  		return err
   819  	}
   820  
   821  	return nil
   822  }
   823  
   824  var _ StatefulSetControlInterface = &defaultStatefulSetControl{}
   825  

View as plain text