...

Source file src/k8s.io/kubernetes/pkg/controller/deployment/sync.go

Documentation: k8s.io/kubernetes/pkg/controller/deployment

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package deployment
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"reflect"
    23  	"sort"
    24  	"strconv"
    25  
    26  	apps "k8s.io/api/apps/v1"
    27  	v1 "k8s.io/api/core/v1"
    28  	"k8s.io/apimachinery/pkg/api/errors"
    29  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    30  	"k8s.io/klog/v2"
    31  	"k8s.io/kubernetes/pkg/controller"
    32  	deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util"
    33  	labelsutil "k8s.io/kubernetes/pkg/util/labels"
    34  )
    35  
    36  // syncStatusOnly only updates Deployments Status and doesn't take any mutating actions.
    37  func (dc *DeploymentController) syncStatusOnly(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) error {
    38  	newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, false)
    39  	if err != nil {
    40  		return err
    41  	}
    42  
    43  	allRSs := append(oldRSs, newRS)
    44  	return dc.syncDeploymentStatus(ctx, allRSs, newRS, d)
    45  }
    46  
    47  // sync is responsible for reconciling deployments on scaling events or when they
    48  // are paused.
    49  func (dc *DeploymentController) sync(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) error {
    50  	newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, false)
    51  	if err != nil {
    52  		return err
    53  	}
    54  	if err := dc.scale(ctx, d, newRS, oldRSs); err != nil {
    55  		// If we get an error while trying to scale, the deployment will be requeued
    56  		// so we can abort this resync
    57  		return err
    58  	}
    59  
    60  	// Clean up the deployment when it's paused and no rollback is in flight.
    61  	if d.Spec.Paused && getRollbackTo(d) == nil {
    62  		if err := dc.cleanupDeployment(ctx, oldRSs, d); err != nil {
    63  			return err
    64  		}
    65  	}
    66  
    67  	allRSs := append(oldRSs, newRS)
    68  	return dc.syncDeploymentStatus(ctx, allRSs, newRS, d)
    69  }
    70  
    71  // checkPausedConditions checks if the given deployment is paused or not and adds an appropriate condition.
    72  // These conditions are needed so that we won't accidentally report lack of progress for resumed deployments
    73  // that were paused for longer than progressDeadlineSeconds.
    74  func (dc *DeploymentController) checkPausedConditions(ctx context.Context, d *apps.Deployment) error {
    75  	if !deploymentutil.HasProgressDeadline(d) {
    76  		return nil
    77  	}
    78  	cond := deploymentutil.GetDeploymentCondition(d.Status, apps.DeploymentProgressing)
    79  	if cond != nil && cond.Reason == deploymentutil.TimedOutReason {
    80  		// If we have reported lack of progress, do not overwrite it with a paused condition.
    81  		return nil
    82  	}
    83  	pausedCondExists := cond != nil && cond.Reason == deploymentutil.PausedDeployReason
    84  
    85  	needsUpdate := false
    86  	if d.Spec.Paused && !pausedCondExists {
    87  		condition := deploymentutil.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionUnknown, deploymentutil.PausedDeployReason, "Deployment is paused")
    88  		deploymentutil.SetDeploymentCondition(&d.Status, *condition)
    89  		needsUpdate = true
    90  	} else if !d.Spec.Paused && pausedCondExists {
    91  		condition := deploymentutil.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionUnknown, deploymentutil.ResumedDeployReason, "Deployment is resumed")
    92  		deploymentutil.SetDeploymentCondition(&d.Status, *condition)
    93  		needsUpdate = true
    94  	}
    95  
    96  	if !needsUpdate {
    97  		return nil
    98  	}
    99  
   100  	var err error
   101  	_, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{})
   102  	return err
   103  }
   104  
   105  // getAllReplicaSetsAndSyncRevision returns all the replica sets for the provided deployment (new and all old), with new RS's and deployment's revision updated.
   106  //
   107  // rsList should come from getReplicaSetsForDeployment(d).
   108  //
   109  //  1. Get all old RSes this deployment targets, and calculate the max revision number among them (maxOldV).
   110  //  2. Get new RS this deployment targets (whose pod template matches deployment's), and update new RS's revision number to (maxOldV + 1),
   111  //     only if its revision number is smaller than (maxOldV + 1). If this step failed, we'll update it in the next deployment sync loop.
   112  //  3. Copy new RS's revision number to deployment (update deployment's revision). If this step failed, we'll update it in the next deployment sync loop.
   113  //
   114  // Note that currently the deployment controller is using caches to avoid querying the server for reads.
   115  // This may lead to stale reads of replica sets, thus incorrect deployment status.
   116  func (dc *DeploymentController) getAllReplicaSetsAndSyncRevision(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet, createIfNotExisted bool) (*apps.ReplicaSet, []*apps.ReplicaSet, error) {
   117  	_, allOldRSs := deploymentutil.FindOldReplicaSets(d, rsList)
   118  
   119  	// Get new replica set with the updated revision number
   120  	newRS, err := dc.getNewReplicaSet(ctx, d, rsList, allOldRSs, createIfNotExisted)
   121  	if err != nil {
   122  		return nil, nil, err
   123  	}
   124  
   125  	return newRS, allOldRSs, nil
   126  }
   127  
   128  const (
   129  	// limit revision history length to 100 element (~2000 chars)
   130  	maxRevHistoryLengthInChars = 2000
   131  )
   132  
   133  // Returns a replica set that matches the intent of the given deployment. Returns nil if the new replica set doesn't exist yet.
   134  // 1. Get existing new RS (the RS that the given deployment targets, whose pod template is the same as deployment's).
   135  // 2. If there's existing new RS, update its revision number if it's smaller than (maxOldRevision + 1), where maxOldRevision is the max revision number among all old RSes.
   136  // 3. If there's no existing new RS and createIfNotExisted is true, create one with appropriate revision number (maxOldRevision + 1) and replicas.
   137  // Note that the pod-template-hash will be added to adopted RSes and pods.
   138  func (dc *DeploymentController) getNewReplicaSet(ctx context.Context, d *apps.Deployment, rsList, oldRSs []*apps.ReplicaSet, createIfNotExisted bool) (*apps.ReplicaSet, error) {
   139  	logger := klog.FromContext(ctx)
   140  	existingNewRS := deploymentutil.FindNewReplicaSet(d, rsList)
   141  
   142  	// Calculate the max revision number among all old RSes
   143  	maxOldRevision := deploymentutil.MaxRevision(logger, oldRSs)
   144  	// Calculate revision number for this new replica set
   145  	newRevision := strconv.FormatInt(maxOldRevision+1, 10)
   146  
   147  	// Latest replica set exists. We need to sync its annotations (includes copying all but
   148  	// annotationsToSkip from the parent deployment, and update revision, desiredReplicas,
   149  	// and maxReplicas) and also update the revision annotation in the deployment with the
   150  	// latest revision.
   151  	if existingNewRS != nil {
   152  		rsCopy := existingNewRS.DeepCopy()
   153  
   154  		// Set existing new replica set's annotation
   155  		annotationsUpdated := deploymentutil.SetNewReplicaSetAnnotations(ctx, d, rsCopy, newRevision, true, maxRevHistoryLengthInChars)
   156  		minReadySecondsNeedsUpdate := rsCopy.Spec.MinReadySeconds != d.Spec.MinReadySeconds
   157  		if annotationsUpdated || minReadySecondsNeedsUpdate {
   158  			rsCopy.Spec.MinReadySeconds = d.Spec.MinReadySeconds
   159  			return dc.client.AppsV1().ReplicaSets(rsCopy.ObjectMeta.Namespace).Update(ctx, rsCopy, metav1.UpdateOptions{})
   160  		}
   161  
   162  		// Should use the revision in existingNewRS's annotation, since it set by before
   163  		needsUpdate := deploymentutil.SetDeploymentRevision(d, rsCopy.Annotations[deploymentutil.RevisionAnnotation])
   164  		// If no other Progressing condition has been recorded and we need to estimate the progress
   165  		// of this deployment then it is likely that old users started caring about progress. In that
   166  		// case we need to take into account the first time we noticed their new replica set.
   167  		cond := deploymentutil.GetDeploymentCondition(d.Status, apps.DeploymentProgressing)
   168  		if deploymentutil.HasProgressDeadline(d) && cond == nil {
   169  			msg := fmt.Sprintf("Found new replica set %q", rsCopy.Name)
   170  			condition := deploymentutil.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionTrue, deploymentutil.FoundNewRSReason, msg)
   171  			deploymentutil.SetDeploymentCondition(&d.Status, *condition)
   172  			needsUpdate = true
   173  		}
   174  
   175  		if needsUpdate {
   176  			var err error
   177  			if _, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{}); err != nil {
   178  				return nil, err
   179  			}
   180  		}
   181  		return rsCopy, nil
   182  	}
   183  
   184  	if !createIfNotExisted {
   185  		return nil, nil
   186  	}
   187  
   188  	// new ReplicaSet does not exist, create one.
   189  	newRSTemplate := *d.Spec.Template.DeepCopy()
   190  	podTemplateSpecHash := controller.ComputeHash(&newRSTemplate, d.Status.CollisionCount)
   191  	newRSTemplate.Labels = labelsutil.CloneAndAddLabel(d.Spec.Template.Labels, apps.DefaultDeploymentUniqueLabelKey, podTemplateSpecHash)
   192  	// Add podTemplateHash label to selector.
   193  	newRSSelector := labelsutil.CloneSelectorAndAddLabel(d.Spec.Selector, apps.DefaultDeploymentUniqueLabelKey, podTemplateSpecHash)
   194  
   195  	// Create new ReplicaSet
   196  	newRS := apps.ReplicaSet{
   197  		ObjectMeta: metav1.ObjectMeta{
   198  			// Make the name deterministic, to ensure idempotence
   199  			Name:            d.Name + "-" + podTemplateSpecHash,
   200  			Namespace:       d.Namespace,
   201  			OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(d, controllerKind)},
   202  			Labels:          newRSTemplate.Labels,
   203  		},
   204  		Spec: apps.ReplicaSetSpec{
   205  			Replicas:        new(int32),
   206  			MinReadySeconds: d.Spec.MinReadySeconds,
   207  			Selector:        newRSSelector,
   208  			Template:        newRSTemplate,
   209  		},
   210  	}
   211  	allRSs := append(oldRSs, &newRS)
   212  	newReplicasCount, err := deploymentutil.NewRSNewReplicas(d, allRSs, &newRS)
   213  	if err != nil {
   214  		return nil, err
   215  	}
   216  
   217  	*(newRS.Spec.Replicas) = newReplicasCount
   218  	// Set new replica set's annotation
   219  	deploymentutil.SetNewReplicaSetAnnotations(ctx, d, &newRS, newRevision, false, maxRevHistoryLengthInChars)
   220  	// Create the new ReplicaSet. If it already exists, then we need to check for possible
   221  	// hash collisions. If there is any other error, we need to report it in the status of
   222  	// the Deployment.
   223  	alreadyExists := false
   224  	createdRS, err := dc.client.AppsV1().ReplicaSets(d.Namespace).Create(ctx, &newRS, metav1.CreateOptions{})
   225  	switch {
   226  	// We may end up hitting this due to a slow cache or a fast resync of the Deployment.
   227  	case errors.IsAlreadyExists(err):
   228  		alreadyExists = true
   229  
   230  		// Fetch a copy of the ReplicaSet.
   231  		rs, rsErr := dc.rsLister.ReplicaSets(newRS.Namespace).Get(newRS.Name)
   232  		if rsErr != nil {
   233  			return nil, rsErr
   234  		}
   235  
   236  		// If the Deployment owns the ReplicaSet and the ReplicaSet's PodTemplateSpec is semantically
   237  		// deep equal to the PodTemplateSpec of the Deployment, it's the Deployment's new ReplicaSet.
   238  		// Otherwise, this is a hash collision and we need to increment the collisionCount field in
   239  		// the status of the Deployment and requeue to try the creation in the next sync.
   240  		controllerRef := metav1.GetControllerOf(rs)
   241  		if controllerRef != nil && controllerRef.UID == d.UID && deploymentutil.EqualIgnoreHash(&d.Spec.Template, &rs.Spec.Template) {
   242  			createdRS = rs
   243  			err = nil
   244  			break
   245  		}
   246  
   247  		// Matching ReplicaSet is not equal - increment the collisionCount in the DeploymentStatus
   248  		// and requeue the Deployment.
   249  		if d.Status.CollisionCount == nil {
   250  			d.Status.CollisionCount = new(int32)
   251  		}
   252  		preCollisionCount := *d.Status.CollisionCount
   253  		*d.Status.CollisionCount++
   254  		// Update the collisionCount for the Deployment and let it requeue by returning the original
   255  		// error.
   256  		_, dErr := dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{})
   257  		if dErr == nil {
   258  			logger.V(2).Info("Found a hash collision for deployment - bumping collisionCount to resolve it", "deployment", klog.KObj(d), "oldCollisionCount", preCollisionCount, "newCollisionCount", *d.Status.CollisionCount)
   259  		}
   260  		return nil, err
   261  	case errors.HasStatusCause(err, v1.NamespaceTerminatingCause):
   262  		// if the namespace is terminating, all subsequent creates will fail and we can safely do nothing
   263  		return nil, err
   264  	case err != nil:
   265  		msg := fmt.Sprintf("Failed to create new replica set %q: %v", newRS.Name, err)
   266  		if deploymentutil.HasProgressDeadline(d) {
   267  			cond := deploymentutil.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionFalse, deploymentutil.FailedRSCreateReason, msg)
   268  			deploymentutil.SetDeploymentCondition(&d.Status, *cond)
   269  			// We don't really care about this error at this point, since we have a bigger issue to report.
   270  			// TODO: Identify which errors are permanent and switch DeploymentIsFailed to take into account
   271  			// these reasons as well. Related issue: https://github.com/kubernetes/kubernetes/issues/18568
   272  			_, _ = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{})
   273  		}
   274  		dc.eventRecorder.Eventf(d, v1.EventTypeWarning, deploymentutil.FailedRSCreateReason, msg)
   275  		return nil, err
   276  	}
   277  	if !alreadyExists && newReplicasCount > 0 {
   278  		dc.eventRecorder.Eventf(d, v1.EventTypeNormal, "ScalingReplicaSet", "Scaled up replica set %s to %d", createdRS.Name, newReplicasCount)
   279  	}
   280  
   281  	needsUpdate := deploymentutil.SetDeploymentRevision(d, newRevision)
   282  	if !alreadyExists && deploymentutil.HasProgressDeadline(d) {
   283  		msg := fmt.Sprintf("Created new replica set %q", createdRS.Name)
   284  		condition := deploymentutil.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionTrue, deploymentutil.NewReplicaSetReason, msg)
   285  		deploymentutil.SetDeploymentCondition(&d.Status, *condition)
   286  		needsUpdate = true
   287  	}
   288  	if needsUpdate {
   289  		_, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{})
   290  	}
   291  	return createdRS, err
   292  }
   293  
   294  // scale scales proportionally in order to mitigate risk. Otherwise, scaling up can increase the size
   295  // of the new replica set and scaling down can decrease the sizes of the old ones, both of which would
   296  // have the effect of hastening the rollout progress, which could produce a higher proportion of unavailable
   297  // replicas in the event of a problem with the rolled out template. Should run only on scaling events or
   298  // when a deployment is paused and not during the normal rollout process.
   299  func (dc *DeploymentController) scale(ctx context.Context, deployment *apps.Deployment, newRS *apps.ReplicaSet, oldRSs []*apps.ReplicaSet) error {
   300  	// If there is only one active replica set then we should scale that up to the full count of the
   301  	// deployment. If there is no active replica set, then we should scale up the newest replica set.
   302  	if activeOrLatest := deploymentutil.FindActiveOrLatest(newRS, oldRSs); activeOrLatest != nil {
   303  		if *(activeOrLatest.Spec.Replicas) == *(deployment.Spec.Replicas) {
   304  			return nil
   305  		}
   306  		_, _, err := dc.scaleReplicaSetAndRecordEvent(ctx, activeOrLatest, *(deployment.Spec.Replicas), deployment)
   307  		return err
   308  	}
   309  
   310  	// If the new replica set is saturated, old replica sets should be fully scaled down.
   311  	// This case handles replica set adoption during a saturated new replica set.
   312  	if deploymentutil.IsSaturated(deployment, newRS) {
   313  		for _, old := range controller.FilterActiveReplicaSets(oldRSs) {
   314  			if _, _, err := dc.scaleReplicaSetAndRecordEvent(ctx, old, 0, deployment); err != nil {
   315  				return err
   316  			}
   317  		}
   318  		return nil
   319  	}
   320  
   321  	// There are old replica sets with pods and the new replica set is not saturated.
   322  	// We need to proportionally scale all replica sets (new and old) in case of a
   323  	// rolling deployment.
   324  	if deploymentutil.IsRollingUpdate(deployment) {
   325  		allRSs := controller.FilterActiveReplicaSets(append(oldRSs, newRS))
   326  		allRSsReplicas := deploymentutil.GetReplicaCountForReplicaSets(allRSs)
   327  
   328  		allowedSize := int32(0)
   329  		if *(deployment.Spec.Replicas) > 0 {
   330  			allowedSize = *(deployment.Spec.Replicas) + deploymentutil.MaxSurge(*deployment)
   331  		}
   332  
   333  		// Number of additional replicas that can be either added or removed from the total
   334  		// replicas count. These replicas should be distributed proportionally to the active
   335  		// replica sets.
   336  		deploymentReplicasToAdd := allowedSize - allRSsReplicas
   337  
   338  		// The additional replicas should be distributed proportionally amongst the active
   339  		// replica sets from the larger to the smaller in size replica set. Scaling direction
   340  		// drives what happens in case we are trying to scale replica sets of the same size.
   341  		// In such a case when scaling up, we should scale up newer replica sets first, and
   342  		// when scaling down, we should scale down older replica sets first.
   343  		var scalingOperation string
   344  		switch {
   345  		case deploymentReplicasToAdd > 0:
   346  			sort.Sort(controller.ReplicaSetsBySizeNewer(allRSs))
   347  			scalingOperation = "up"
   348  
   349  		case deploymentReplicasToAdd < 0:
   350  			sort.Sort(controller.ReplicaSetsBySizeOlder(allRSs))
   351  			scalingOperation = "down"
   352  		}
   353  
   354  		// Iterate over all active replica sets and estimate proportions for each of them.
   355  		// The absolute value of deploymentReplicasAdded should never exceed the absolute
   356  		// value of deploymentReplicasToAdd.
   357  		deploymentReplicasAdded := int32(0)
   358  		nameToSize := make(map[string]int32)
   359  		logger := klog.FromContext(ctx)
   360  		for i := range allRSs {
   361  			rs := allRSs[i]
   362  
   363  			// Estimate proportions if we have replicas to add, otherwise simply populate
   364  			// nameToSize with the current sizes for each replica set.
   365  			if deploymentReplicasToAdd != 0 {
   366  				proportion := deploymentutil.GetProportion(logger, rs, *deployment, deploymentReplicasToAdd, deploymentReplicasAdded)
   367  
   368  				nameToSize[rs.Name] = *(rs.Spec.Replicas) + proportion
   369  				deploymentReplicasAdded += proportion
   370  			} else {
   371  				nameToSize[rs.Name] = *(rs.Spec.Replicas)
   372  			}
   373  		}
   374  
   375  		// Update all replica sets
   376  		for i := range allRSs {
   377  			rs := allRSs[i]
   378  
   379  			// Add/remove any leftovers to the largest replica set.
   380  			if i == 0 && deploymentReplicasToAdd != 0 {
   381  				leftover := deploymentReplicasToAdd - deploymentReplicasAdded
   382  				nameToSize[rs.Name] = nameToSize[rs.Name] + leftover
   383  				if nameToSize[rs.Name] < 0 {
   384  					nameToSize[rs.Name] = 0
   385  				}
   386  			}
   387  
   388  			// TODO: Use transactions when we have them.
   389  			if _, _, err := dc.scaleReplicaSet(ctx, rs, nameToSize[rs.Name], deployment, scalingOperation); err != nil {
   390  				// Return as soon as we fail, the deployment is requeued
   391  				return err
   392  			}
   393  		}
   394  	}
   395  	return nil
   396  }
   397  
   398  func (dc *DeploymentController) scaleReplicaSetAndRecordEvent(ctx context.Context, rs *apps.ReplicaSet, newScale int32, deployment *apps.Deployment) (bool, *apps.ReplicaSet, error) {
   399  	// No need to scale
   400  	if *(rs.Spec.Replicas) == newScale {
   401  		return false, rs, nil
   402  	}
   403  	var scalingOperation string
   404  	if *(rs.Spec.Replicas) < newScale {
   405  		scalingOperation = "up"
   406  	} else {
   407  		scalingOperation = "down"
   408  	}
   409  	scaled, newRS, err := dc.scaleReplicaSet(ctx, rs, newScale, deployment, scalingOperation)
   410  	return scaled, newRS, err
   411  }
   412  
   413  func (dc *DeploymentController) scaleReplicaSet(ctx context.Context, rs *apps.ReplicaSet, newScale int32, deployment *apps.Deployment, scalingOperation string) (bool, *apps.ReplicaSet, error) {
   414  
   415  	sizeNeedsUpdate := *(rs.Spec.Replicas) != newScale
   416  
   417  	annotationsNeedUpdate := deploymentutil.ReplicasAnnotationsNeedUpdate(rs, *(deployment.Spec.Replicas), *(deployment.Spec.Replicas)+deploymentutil.MaxSurge(*deployment))
   418  
   419  	scaled := false
   420  	var err error
   421  	if sizeNeedsUpdate || annotationsNeedUpdate {
   422  		oldScale := *(rs.Spec.Replicas)
   423  		rsCopy := rs.DeepCopy()
   424  		*(rsCopy.Spec.Replicas) = newScale
   425  		deploymentutil.SetReplicasAnnotations(rsCopy, *(deployment.Spec.Replicas), *(deployment.Spec.Replicas)+deploymentutil.MaxSurge(*deployment))
   426  		rs, err = dc.client.AppsV1().ReplicaSets(rsCopy.Namespace).Update(ctx, rsCopy, metav1.UpdateOptions{})
   427  		if err == nil && sizeNeedsUpdate {
   428  			scaled = true
   429  			dc.eventRecorder.Eventf(deployment, v1.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d from %d", scalingOperation, rs.Name, newScale, oldScale)
   430  		}
   431  	}
   432  	return scaled, rs, err
   433  }
   434  
   435  // cleanupDeployment is responsible for cleaning up a deployment ie. retains all but the latest N old replica sets
   436  // where N=d.Spec.RevisionHistoryLimit. Old replica sets are older versions of the podtemplate of a deployment kept
   437  // around by default 1) for historical reasons and 2) for the ability to rollback a deployment.
   438  func (dc *DeploymentController) cleanupDeployment(ctx context.Context, oldRSs []*apps.ReplicaSet, deployment *apps.Deployment) error {
   439  	logger := klog.FromContext(ctx)
   440  	if !deploymentutil.HasRevisionHistoryLimit(deployment) {
   441  		return nil
   442  	}
   443  
   444  	// Avoid deleting replica set with deletion timestamp set
   445  	aliveFilter := func(rs *apps.ReplicaSet) bool {
   446  		return rs != nil && rs.ObjectMeta.DeletionTimestamp == nil
   447  	}
   448  	cleanableRSes := controller.FilterReplicaSets(oldRSs, aliveFilter)
   449  
   450  	diff := int32(len(cleanableRSes)) - *deployment.Spec.RevisionHistoryLimit
   451  	if diff <= 0 {
   452  		return nil
   453  	}
   454  
   455  	sort.Sort(deploymentutil.ReplicaSetsByRevision(cleanableRSes))
   456  	logger.V(4).Info("Looking to cleanup old replica sets for deployment", "deployment", klog.KObj(deployment))
   457  
   458  	for i := int32(0); i < diff; i++ {
   459  		rs := cleanableRSes[i]
   460  		// Avoid delete replica set with non-zero replica counts
   461  		if rs.Status.Replicas != 0 || *(rs.Spec.Replicas) != 0 || rs.Generation > rs.Status.ObservedGeneration || rs.DeletionTimestamp != nil {
   462  			continue
   463  		}
   464  		logger.V(4).Info("Trying to cleanup replica set for deployment", "replicaSet", klog.KObj(rs), "deployment", klog.KObj(deployment))
   465  		if err := dc.client.AppsV1().ReplicaSets(rs.Namespace).Delete(ctx, rs.Name, metav1.DeleteOptions{}); err != nil && !errors.IsNotFound(err) {
   466  			// Return error instead of aggregating and continuing DELETEs on the theory
   467  			// that we may be overloading the api server.
   468  			return err
   469  		}
   470  	}
   471  
   472  	return nil
   473  }
   474  
   475  // syncDeploymentStatus checks if the status is up-to-date and sync it if necessary
   476  func (dc *DeploymentController) syncDeploymentStatus(ctx context.Context, allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, d *apps.Deployment) error {
   477  	newStatus := calculateStatus(allRSs, newRS, d)
   478  
   479  	if reflect.DeepEqual(d.Status, newStatus) {
   480  		return nil
   481  	}
   482  
   483  	newDeployment := d
   484  	newDeployment.Status = newStatus
   485  	_, err := dc.client.AppsV1().Deployments(newDeployment.Namespace).UpdateStatus(ctx, newDeployment, metav1.UpdateOptions{})
   486  	return err
   487  }
   488  
   489  // calculateStatus calculates the latest status for the provided deployment by looking into the provided replica sets.
   490  func calculateStatus(allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployment *apps.Deployment) apps.DeploymentStatus {
   491  	availableReplicas := deploymentutil.GetAvailableReplicaCountForReplicaSets(allRSs)
   492  	totalReplicas := deploymentutil.GetReplicaCountForReplicaSets(allRSs)
   493  	unavailableReplicas := totalReplicas - availableReplicas
   494  	// If unavailableReplicas is negative, then that means the Deployment has more available replicas running than
   495  	// desired, e.g. whenever it scales down. In such a case we should simply default unavailableReplicas to zero.
   496  	if unavailableReplicas < 0 {
   497  		unavailableReplicas = 0
   498  	}
   499  
   500  	status := apps.DeploymentStatus{
   501  		// TODO: Ensure that if we start retrying status updates, we won't pick up a new Generation value.
   502  		ObservedGeneration:  deployment.Generation,
   503  		Replicas:            deploymentutil.GetActualReplicaCountForReplicaSets(allRSs),
   504  		UpdatedReplicas:     deploymentutil.GetActualReplicaCountForReplicaSets([]*apps.ReplicaSet{newRS}),
   505  		ReadyReplicas:       deploymentutil.GetReadyReplicaCountForReplicaSets(allRSs),
   506  		AvailableReplicas:   availableReplicas,
   507  		UnavailableReplicas: unavailableReplicas,
   508  		CollisionCount:      deployment.Status.CollisionCount,
   509  	}
   510  
   511  	// Copy conditions one by one so we won't mutate the original object.
   512  	conditions := deployment.Status.Conditions
   513  	for i := range conditions {
   514  		status.Conditions = append(status.Conditions, conditions[i])
   515  	}
   516  
   517  	if availableReplicas >= *(deployment.Spec.Replicas)-deploymentutil.MaxUnavailable(*deployment) {
   518  		minAvailability := deploymentutil.NewDeploymentCondition(apps.DeploymentAvailable, v1.ConditionTrue, deploymentutil.MinimumReplicasAvailable, "Deployment has minimum availability.")
   519  		deploymentutil.SetDeploymentCondition(&status, *minAvailability)
   520  	} else {
   521  		noMinAvailability := deploymentutil.NewDeploymentCondition(apps.DeploymentAvailable, v1.ConditionFalse, deploymentutil.MinimumReplicasUnavailable, "Deployment does not have minimum availability.")
   522  		deploymentutil.SetDeploymentCondition(&status, *noMinAvailability)
   523  	}
   524  
   525  	return status
   526  }
   527  
   528  // isScalingEvent checks whether the provided deployment has been updated with a scaling event
   529  // by looking at the desired-replicas annotation in the active replica sets of the deployment.
   530  //
   531  // rsList should come from getReplicaSetsForDeployment(d).
   532  func (dc *DeploymentController) isScalingEvent(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) (bool, error) {
   533  	newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, false)
   534  	if err != nil {
   535  		return false, err
   536  	}
   537  	allRSs := append(oldRSs, newRS)
   538  	logger := klog.FromContext(ctx)
   539  	for _, rs := range controller.FilterActiveReplicaSets(allRSs) {
   540  		desired, ok := deploymentutil.GetDesiredReplicasAnnotation(logger, rs)
   541  		if !ok {
   542  			continue
   543  		}
   544  		if desired != *(d.Spec.Replicas) {
   545  			return true, nil
   546  		}
   547  	}
   548  	return false, nil
   549  }
   550  

View as plain text