...

Source file src/k8s.io/kubernetes/pkg/controller/deployment/rolling.go

Documentation: k8s.io/kubernetes/pkg/controller/deployment

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package deployment
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"sort"
    23  
    24  	apps "k8s.io/api/apps/v1"
    25  	"k8s.io/klog/v2"
    26  	"k8s.io/kubernetes/pkg/controller"
    27  	deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util"
    28  )
    29  
    30  // rolloutRolling implements the logic for rolling a new replica set.
    31  func (dc *DeploymentController) rolloutRolling(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) error {
    32  	newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, true)
    33  	if err != nil {
    34  		return err
    35  	}
    36  	allRSs := append(oldRSs, newRS)
    37  
    38  	// Scale up, if we can.
    39  	scaledUp, err := dc.reconcileNewReplicaSet(ctx, allRSs, newRS, d)
    40  	if err != nil {
    41  		return err
    42  	}
    43  	if scaledUp {
    44  		// Update DeploymentStatus
    45  		return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
    46  	}
    47  
    48  	// Scale down, if we can.
    49  	scaledDown, err := dc.reconcileOldReplicaSets(ctx, allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, d)
    50  	if err != nil {
    51  		return err
    52  	}
    53  	if scaledDown {
    54  		// Update DeploymentStatus
    55  		return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
    56  	}
    57  
    58  	if deploymentutil.DeploymentComplete(d, &d.Status) {
    59  		if err := dc.cleanupDeployment(ctx, oldRSs, d); err != nil {
    60  			return err
    61  		}
    62  	}
    63  
    64  	// Sync deployment status
    65  	return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
    66  }
    67  
    68  func (dc *DeploymentController) reconcileNewReplicaSet(ctx context.Context, allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployment *apps.Deployment) (bool, error) {
    69  	if *(newRS.Spec.Replicas) == *(deployment.Spec.Replicas) {
    70  		// Scaling not required.
    71  		return false, nil
    72  	}
    73  	if *(newRS.Spec.Replicas) > *(deployment.Spec.Replicas) {
    74  		// Scale down.
    75  		scaled, _, err := dc.scaleReplicaSetAndRecordEvent(ctx, newRS, *(deployment.Spec.Replicas), deployment)
    76  		return scaled, err
    77  	}
    78  	newReplicasCount, err := deploymentutil.NewRSNewReplicas(deployment, allRSs, newRS)
    79  	if err != nil {
    80  		return false, err
    81  	}
    82  	scaled, _, err := dc.scaleReplicaSetAndRecordEvent(ctx, newRS, newReplicasCount, deployment)
    83  	return scaled, err
    84  }
    85  
    86  func (dc *DeploymentController) reconcileOldReplicaSets(ctx context.Context, allRSs []*apps.ReplicaSet, oldRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployment *apps.Deployment) (bool, error) {
    87  	logger := klog.FromContext(ctx)
    88  	oldPodsCount := deploymentutil.GetReplicaCountForReplicaSets(oldRSs)
    89  	if oldPodsCount == 0 {
    90  		// Can't scale down further
    91  		return false, nil
    92  	}
    93  	allPodsCount := deploymentutil.GetReplicaCountForReplicaSets(allRSs)
    94  	logger.V(4).Info("New replica set", "replicaSet", klog.KObj(newRS), "availableReplicas", newRS.Status.AvailableReplicas)
    95  	maxUnavailable := deploymentutil.MaxUnavailable(*deployment)
    96  
    97  	// Check if we can scale down. We can scale down in the following 2 cases:
    98  	// * Some old replica sets have unhealthy replicas, we could safely scale down those unhealthy replicas since that won't further
    99  	//  increase unavailability.
   100  	// * New replica set has scaled up and it's replicas becomes ready, then we can scale down old replica sets in a further step.
   101  	//
   102  	// maxScaledDown := allPodsCount - minAvailable - newReplicaSetPodsUnavailable
   103  	// take into account not only maxUnavailable and any surge pods that have been created, but also unavailable pods from
   104  	// the newRS, so that the unavailable pods from the newRS would not make us scale down old replica sets in a further
   105  	// step(that will increase unavailability).
   106  	//
   107  	// Concrete example:
   108  	//
   109  	// * 10 replicas
   110  	// * 2 maxUnavailable (absolute number, not percent)
   111  	// * 3 maxSurge (absolute number, not percent)
   112  	//
   113  	// case 1:
   114  	// * Deployment is updated, newRS is created with 3 replicas, oldRS is scaled down to 8, and newRS is scaled up to 5.
   115  	// * The new replica set pods crashloop and never become available.
   116  	// * allPodsCount is 13. minAvailable is 8. newRSPodsUnavailable is 5.
   117  	// * A node fails and causes one of the oldRS pods to become unavailable. However, 13 - 8 - 5 = 0, so the oldRS won't be scaled down.
   118  	// * The user notices the crashloop and does kubectl rollout undo to rollback.
   119  	// * newRSPodsUnavailable is 1, since we rolled back to the good replica set, so maxScaledDown = 13 - 8 - 1 = 4. 4 of the crashlooping pods will be scaled down.
   120  	// * The total number of pods will then be 9 and the newRS can be scaled up to 10.
   121  	//
   122  	// case 2:
   123  	// Same example, but pushing a new pod template instead of rolling back (aka "roll over"):
   124  	// * The new replica set created must start with 0 replicas because allPodsCount is already at 13.
   125  	// * However, newRSPodsUnavailable would also be 0, so the 2 old replica sets could be scaled down by 5 (13 - 8 - 0), which would then
   126  	// allow the new replica set to be scaled up by 5.
   127  	minAvailable := *(deployment.Spec.Replicas) - maxUnavailable
   128  	newRSUnavailablePodCount := *(newRS.Spec.Replicas) - newRS.Status.AvailableReplicas
   129  	maxScaledDown := allPodsCount - minAvailable - newRSUnavailablePodCount
   130  	if maxScaledDown <= 0 {
   131  		return false, nil
   132  	}
   133  
   134  	// Clean up unhealthy replicas first, otherwise unhealthy replicas will block deployment
   135  	// and cause timeout. See https://github.com/kubernetes/kubernetes/issues/16737
   136  	oldRSs, cleanupCount, err := dc.cleanupUnhealthyReplicas(ctx, oldRSs, deployment, maxScaledDown)
   137  	if err != nil {
   138  		return false, nil
   139  	}
   140  	logger.V(4).Info("Cleaned up unhealthy replicas from old RSes", "count", cleanupCount)
   141  
   142  	// Scale down old replica sets, need check maxUnavailable to ensure we can scale down
   143  	allRSs = append(oldRSs, newRS)
   144  	scaledDownCount, err := dc.scaleDownOldReplicaSetsForRollingUpdate(ctx, allRSs, oldRSs, deployment)
   145  	if err != nil {
   146  		return false, nil
   147  	}
   148  	logger.V(4).Info("Scaled down old RSes", "deployment", klog.KObj(deployment), "count", scaledDownCount)
   149  
   150  	totalScaledDown := cleanupCount + scaledDownCount
   151  	return totalScaledDown > 0, nil
   152  }
   153  
   154  // cleanupUnhealthyReplicas will scale down old replica sets with unhealthy replicas, so that all unhealthy replicas will be deleted.
   155  func (dc *DeploymentController) cleanupUnhealthyReplicas(ctx context.Context, oldRSs []*apps.ReplicaSet, deployment *apps.Deployment, maxCleanupCount int32) ([]*apps.ReplicaSet, int32, error) {
   156  	logger := klog.FromContext(ctx)
   157  	sort.Sort(controller.ReplicaSetsByCreationTimestamp(oldRSs))
   158  	// Safely scale down all old replica sets with unhealthy replicas. Replica set will sort the pods in the order
   159  	// such that not-ready < ready, unscheduled < scheduled, and pending < running. This ensures that unhealthy replicas will
   160  	// been deleted first and won't increase unavailability.
   161  	totalScaledDown := int32(0)
   162  	for i, targetRS := range oldRSs {
   163  		if totalScaledDown >= maxCleanupCount {
   164  			break
   165  		}
   166  		if *(targetRS.Spec.Replicas) == 0 {
   167  			// cannot scale down this replica set.
   168  			continue
   169  		}
   170  		logger.V(4).Info("Found available pods in old RS", "replicaSet", klog.KObj(targetRS), "availableReplicas", targetRS.Status.AvailableReplicas)
   171  		if *(targetRS.Spec.Replicas) == targetRS.Status.AvailableReplicas {
   172  			// no unhealthy replicas found, no scaling required.
   173  			continue
   174  		}
   175  
   176  		scaledDownCount := min(maxCleanupCount-totalScaledDown, *(targetRS.Spec.Replicas)-targetRS.Status.AvailableReplicas)
   177  		newReplicasCount := *(targetRS.Spec.Replicas) - scaledDownCount
   178  		if newReplicasCount > *(targetRS.Spec.Replicas) {
   179  			return nil, 0, fmt.Errorf("when cleaning up unhealthy replicas, got invalid request to scale down %s/%s %d -> %d", targetRS.Namespace, targetRS.Name, *(targetRS.Spec.Replicas), newReplicasCount)
   180  		}
   181  		_, updatedOldRS, err := dc.scaleReplicaSetAndRecordEvent(ctx, targetRS, newReplicasCount, deployment)
   182  		if err != nil {
   183  			return nil, totalScaledDown, err
   184  		}
   185  		totalScaledDown += scaledDownCount
   186  		oldRSs[i] = updatedOldRS
   187  	}
   188  	return oldRSs, totalScaledDown, nil
   189  }
   190  
   191  // scaleDownOldReplicaSetsForRollingUpdate scales down old replica sets when deployment strategy is "RollingUpdate".
   192  // Need check maxUnavailable to ensure availability
   193  func (dc *DeploymentController) scaleDownOldReplicaSetsForRollingUpdate(ctx context.Context, allRSs []*apps.ReplicaSet, oldRSs []*apps.ReplicaSet, deployment *apps.Deployment) (int32, error) {
   194  	logger := klog.FromContext(ctx)
   195  	maxUnavailable := deploymentutil.MaxUnavailable(*deployment)
   196  
   197  	// Check if we can scale down.
   198  	minAvailable := *(deployment.Spec.Replicas) - maxUnavailable
   199  	// Find the number of available pods.
   200  	availablePodCount := deploymentutil.GetAvailableReplicaCountForReplicaSets(allRSs)
   201  	if availablePodCount <= minAvailable {
   202  		// Cannot scale down.
   203  		return 0, nil
   204  	}
   205  	logger.V(4).Info("Found available pods in deployment, scaling down old RSes", "deployment", klog.KObj(deployment), "availableReplicas", availablePodCount)
   206  
   207  	sort.Sort(controller.ReplicaSetsByCreationTimestamp(oldRSs))
   208  
   209  	totalScaledDown := int32(0)
   210  	totalScaleDownCount := availablePodCount - minAvailable
   211  	for _, targetRS := range oldRSs {
   212  		if totalScaledDown >= totalScaleDownCount {
   213  			// No further scaling required.
   214  			break
   215  		}
   216  		if *(targetRS.Spec.Replicas) == 0 {
   217  			// cannot scale down this ReplicaSet.
   218  			continue
   219  		}
   220  		// Scale down.
   221  		scaleDownCount := min(*(targetRS.Spec.Replicas), totalScaleDownCount-totalScaledDown)
   222  		newReplicasCount := *(targetRS.Spec.Replicas) - scaleDownCount
   223  		if newReplicasCount > *(targetRS.Spec.Replicas) {
   224  			return 0, fmt.Errorf("when scaling down old RS, got invalid request to scale down %s/%s %d -> %d", targetRS.Namespace, targetRS.Name, *(targetRS.Spec.Replicas), newReplicasCount)
   225  		}
   226  		_, _, err := dc.scaleReplicaSetAndRecordEvent(ctx, targetRS, newReplicasCount, deployment)
   227  		if err != nil {
   228  			return totalScaledDown, err
   229  		}
   230  
   231  		totalScaledDown += scaleDownCount
   232  	}
   233  
   234  	return totalScaledDown, nil
   235  }
   236  

View as plain text