...

Source file src/k8s.io/kubernetes/pkg/controller/deployment/deployment_controller.go

Documentation: k8s.io/kubernetes/pkg/controller/deployment

     1  /*
     2  Copyright 2015 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  // Package deployment contains all the logic for handling Kubernetes Deployments.
    18  // It implements a set of strategies (rolling, recreate) for deploying an application,
    19  // the means to rollback to previous versions, proportional scaling for mitigating
    20  // risk, cleanup policy, and other useful features of Deployments.
    21  package deployment
    22  
    23  import (
    24  	"context"
    25  	"fmt"
    26  	"reflect"
    27  	"time"
    28  
    29  	apps "k8s.io/api/apps/v1"
    30  	v1 "k8s.io/api/core/v1"
    31  	"k8s.io/apimachinery/pkg/api/errors"
    32  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    33  	"k8s.io/apimachinery/pkg/labels"
    34  	"k8s.io/apimachinery/pkg/types"
    35  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    36  	"k8s.io/apimachinery/pkg/util/wait"
    37  	appsinformers "k8s.io/client-go/informers/apps/v1"
    38  	coreinformers "k8s.io/client-go/informers/core/v1"
    39  	clientset "k8s.io/client-go/kubernetes"
    40  	"k8s.io/client-go/kubernetes/scheme"
    41  	v1core "k8s.io/client-go/kubernetes/typed/core/v1"
    42  	appslisters "k8s.io/client-go/listers/apps/v1"
    43  	corelisters "k8s.io/client-go/listers/core/v1"
    44  	"k8s.io/client-go/tools/cache"
    45  	"k8s.io/client-go/tools/record"
    46  	"k8s.io/client-go/util/workqueue"
    47  	"k8s.io/klog/v2"
    48  	"k8s.io/kubernetes/pkg/controller"
    49  	"k8s.io/kubernetes/pkg/controller/deployment/util"
    50  )
    51  
    52  const (
    53  	// maxRetries is the number of times a deployment will be retried before it is dropped out of the queue.
    54  	// With the current rate-limiter in use (5ms*2^(maxRetries-1)) the following numbers represent the times
    55  	// a deployment is going to be requeued:
    56  	//
    57  	// 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s, 20.4s, 41s, 82s
    58  	maxRetries = 15
    59  )
    60  
    61  // controllerKind contains the schema.GroupVersionKind for this controller type.
    62  var controllerKind = apps.SchemeGroupVersion.WithKind("Deployment")
    63  
    64  // DeploymentController is responsible for synchronizing Deployment objects stored
    65  // in the system with actual running replica sets and pods.
    66  type DeploymentController struct {
    67  	// rsControl is used for adopting/releasing replica sets.
    68  	rsControl controller.RSControlInterface
    69  	client    clientset.Interface
    70  
    71  	eventBroadcaster record.EventBroadcaster
    72  	eventRecorder    record.EventRecorder
    73  
    74  	// To allow injection of syncDeployment for testing.
    75  	syncHandler func(ctx context.Context, dKey string) error
    76  	// used for unit testing
    77  	enqueueDeployment func(deployment *apps.Deployment)
    78  
    79  	// dLister can list/get deployments from the shared informer's store
    80  	dLister appslisters.DeploymentLister
    81  	// rsLister can list/get replica sets from the shared informer's store
    82  	rsLister appslisters.ReplicaSetLister
    83  	// podLister can list/get pods from the shared informer's store
    84  	podLister corelisters.PodLister
    85  
    86  	// dListerSynced returns true if the Deployment store has been synced at least once.
    87  	// Added as a member to the struct to allow injection for testing.
    88  	dListerSynced cache.InformerSynced
    89  	// rsListerSynced returns true if the ReplicaSet store has been synced at least once.
    90  	// Added as a member to the struct to allow injection for testing.
    91  	rsListerSynced cache.InformerSynced
    92  	// podListerSynced returns true if the pod store has been synced at least once.
    93  	// Added as a member to the struct to allow injection for testing.
    94  	podListerSynced cache.InformerSynced
    95  
    96  	// Deployments that need to be synced
    97  	queue workqueue.RateLimitingInterface
    98  }
    99  
   100  // NewDeploymentController creates a new DeploymentController.
   101  func NewDeploymentController(ctx context.Context, dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {
   102  	eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
   103  	logger := klog.FromContext(ctx)
   104  	dc := &DeploymentController{
   105  		client:           client,
   106  		eventBroadcaster: eventBroadcaster,
   107  		eventRecorder:    eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "deployment-controller"}),
   108  		queue:            workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),
   109  	}
   110  	dc.rsControl = controller.RealRSControl{
   111  		KubeClient: client,
   112  		Recorder:   dc.eventRecorder,
   113  	}
   114  
   115  	dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   116  		AddFunc: func(obj interface{}) {
   117  			dc.addDeployment(logger, obj)
   118  		},
   119  		UpdateFunc: func(oldObj, newObj interface{}) {
   120  			dc.updateDeployment(logger, oldObj, newObj)
   121  		},
   122  		// This will enter the sync loop and no-op, because the deployment has been deleted from the store.
   123  		DeleteFunc: func(obj interface{}) {
   124  			dc.deleteDeployment(logger, obj)
   125  		},
   126  	})
   127  	rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   128  		AddFunc: func(obj interface{}) {
   129  			dc.addReplicaSet(logger, obj)
   130  		},
   131  		UpdateFunc: func(oldObj, newObj interface{}) {
   132  			dc.updateReplicaSet(logger, oldObj, newObj)
   133  		},
   134  		DeleteFunc: func(obj interface{}) {
   135  			dc.deleteReplicaSet(logger, obj)
   136  		},
   137  	})
   138  	podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   139  		DeleteFunc: func(obj interface{}) {
   140  			dc.deletePod(logger, obj)
   141  		},
   142  	})
   143  
   144  	dc.syncHandler = dc.syncDeployment
   145  	dc.enqueueDeployment = dc.enqueue
   146  
   147  	dc.dLister = dInformer.Lister()
   148  	dc.rsLister = rsInformer.Lister()
   149  	dc.podLister = podInformer.Lister()
   150  	dc.dListerSynced = dInformer.Informer().HasSynced
   151  	dc.rsListerSynced = rsInformer.Informer().HasSynced
   152  	dc.podListerSynced = podInformer.Informer().HasSynced
   153  	return dc, nil
   154  }
   155  
   156  // Run begins watching and syncing.
   157  func (dc *DeploymentController) Run(ctx context.Context, workers int) {
   158  	defer utilruntime.HandleCrash()
   159  
   160  	// Start events processing pipeline.
   161  	dc.eventBroadcaster.StartStructuredLogging(3)
   162  	dc.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: dc.client.CoreV1().Events("")})
   163  	defer dc.eventBroadcaster.Shutdown()
   164  
   165  	defer dc.queue.ShutDown()
   166  
   167  	logger := klog.FromContext(ctx)
   168  	logger.Info("Starting controller", "controller", "deployment")
   169  	defer logger.Info("Shutting down controller", "controller", "deployment")
   170  
   171  	if !cache.WaitForNamedCacheSync("deployment", ctx.Done(), dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) {
   172  		return
   173  	}
   174  
   175  	for i := 0; i < workers; i++ {
   176  		go wait.UntilWithContext(ctx, dc.worker, time.Second)
   177  	}
   178  
   179  	<-ctx.Done()
   180  }
   181  
   182  func (dc *DeploymentController) addDeployment(logger klog.Logger, obj interface{}) {
   183  	d := obj.(*apps.Deployment)
   184  	logger.V(4).Info("Adding deployment", "deployment", klog.KObj(d))
   185  	dc.enqueueDeployment(d)
   186  }
   187  
   188  func (dc *DeploymentController) updateDeployment(logger klog.Logger, old, cur interface{}) {
   189  	oldD := old.(*apps.Deployment)
   190  	curD := cur.(*apps.Deployment)
   191  	logger.V(4).Info("Updating deployment", "deployment", klog.KObj(oldD))
   192  	dc.enqueueDeployment(curD)
   193  }
   194  
   195  func (dc *DeploymentController) deleteDeployment(logger klog.Logger, obj interface{}) {
   196  	d, ok := obj.(*apps.Deployment)
   197  	if !ok {
   198  		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
   199  		if !ok {
   200  			utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
   201  			return
   202  		}
   203  		d, ok = tombstone.Obj.(*apps.Deployment)
   204  		if !ok {
   205  			utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a Deployment %#v", obj))
   206  			return
   207  		}
   208  	}
   209  	logger.V(4).Info("Deleting deployment", "deployment", klog.KObj(d))
   210  	dc.enqueueDeployment(d)
   211  }
   212  
   213  // addReplicaSet enqueues the deployment that manages a ReplicaSet when the ReplicaSet is created.
   214  func (dc *DeploymentController) addReplicaSet(logger klog.Logger, obj interface{}) {
   215  	rs := obj.(*apps.ReplicaSet)
   216  
   217  	if rs.DeletionTimestamp != nil {
   218  		// On a restart of the controller manager, it's possible for an object to
   219  		// show up in a state that is already pending deletion.
   220  		dc.deleteReplicaSet(logger, rs)
   221  		return
   222  	}
   223  	// If it has a ControllerRef, that's all that matters.
   224  	if controllerRef := metav1.GetControllerOf(rs); controllerRef != nil {
   225  		d := dc.resolveControllerRef(rs.Namespace, controllerRef)
   226  		if d == nil {
   227  			return
   228  		}
   229  		logger.V(4).Info("ReplicaSet added", "replicaSet", klog.KObj(rs))
   230  		dc.enqueueDeployment(d)
   231  		return
   232  	}
   233  
   234  	// Otherwise, it's an orphan. Get a list of all matching Deployments and sync
   235  	// them to see if anyone wants to adopt it.
   236  	ds := dc.getDeploymentsForReplicaSet(logger, rs)
   237  	if len(ds) == 0 {
   238  		return
   239  	}
   240  	logger.V(4).Info("Orphan ReplicaSet added", "replicaSet", klog.KObj(rs))
   241  	for _, d := range ds {
   242  		dc.enqueueDeployment(d)
   243  	}
   244  }
   245  
   246  // getDeploymentsForReplicaSet returns a list of Deployments that potentially
   247  // match a ReplicaSet.
   248  func (dc *DeploymentController) getDeploymentsForReplicaSet(logger klog.Logger, rs *apps.ReplicaSet) []*apps.Deployment {
   249  	deployments, err := util.GetDeploymentsForReplicaSet(dc.dLister, rs)
   250  	if err != nil || len(deployments) == 0 {
   251  		return nil
   252  	}
   253  	// Because all ReplicaSet's belonging to a deployment should have a unique label key,
   254  	// there should never be more than one deployment returned by the above method.
   255  	// If that happens we should probably dynamically repair the situation by ultimately
   256  	// trying to clean up one of the controllers, for now we just return the older one
   257  	if len(deployments) > 1 {
   258  		// ControllerRef will ensure we don't do anything crazy, but more than one
   259  		// item in this list nevertheless constitutes user error.
   260  		logger.V(4).Info("user error! more than one deployment is selecting replica set",
   261  			"replicaSet", klog.KObj(rs), "labels", rs.Labels, "deployment", klog.KObj(deployments[0]))
   262  	}
   263  	return deployments
   264  }
   265  
   266  // updateReplicaSet figures out what deployment(s) manage a ReplicaSet when the ReplicaSet
   267  // is updated and wake them up. If the anything of the ReplicaSets have changed, we need to
   268  // awaken both the old and new deployments. old and cur must be *apps.ReplicaSet
   269  // types.
   270  func (dc *DeploymentController) updateReplicaSet(logger klog.Logger, old, cur interface{}) {
   271  	curRS := cur.(*apps.ReplicaSet)
   272  	oldRS := old.(*apps.ReplicaSet)
   273  	if curRS.ResourceVersion == oldRS.ResourceVersion {
   274  		// Periodic resync will send update events for all known replica sets.
   275  		// Two different versions of the same replica set will always have different RVs.
   276  		return
   277  	}
   278  
   279  	curControllerRef := metav1.GetControllerOf(curRS)
   280  	oldControllerRef := metav1.GetControllerOf(oldRS)
   281  	controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
   282  	if controllerRefChanged && oldControllerRef != nil {
   283  		// The ControllerRef was changed. Sync the old controller, if any.
   284  		if d := dc.resolveControllerRef(oldRS.Namespace, oldControllerRef); d != nil {
   285  			dc.enqueueDeployment(d)
   286  		}
   287  	}
   288  	// If it has a ControllerRef, that's all that matters.
   289  	if curControllerRef != nil {
   290  		d := dc.resolveControllerRef(curRS.Namespace, curControllerRef)
   291  		if d == nil {
   292  			return
   293  		}
   294  		logger.V(4).Info("ReplicaSet updated", "replicaSet", klog.KObj(curRS))
   295  		dc.enqueueDeployment(d)
   296  		return
   297  	}
   298  
   299  	// Otherwise, it's an orphan. If anything changed, sync matching controllers
   300  	// to see if anyone wants to adopt it now.
   301  	labelChanged := !reflect.DeepEqual(curRS.Labels, oldRS.Labels)
   302  	if labelChanged || controllerRefChanged {
   303  		ds := dc.getDeploymentsForReplicaSet(logger, curRS)
   304  		if len(ds) == 0 {
   305  			return
   306  		}
   307  		logger.V(4).Info("Orphan ReplicaSet updated", "replicaSet", klog.KObj(curRS))
   308  		for _, d := range ds {
   309  			dc.enqueueDeployment(d)
   310  		}
   311  	}
   312  }
   313  
   314  // deleteReplicaSet enqueues the deployment that manages a ReplicaSet when
   315  // the ReplicaSet is deleted. obj could be an *apps.ReplicaSet, or
   316  // a DeletionFinalStateUnknown marker item.
   317  func (dc *DeploymentController) deleteReplicaSet(logger klog.Logger, obj interface{}) {
   318  	rs, ok := obj.(*apps.ReplicaSet)
   319  
   320  	// When a delete is dropped, the relist will notice a pod in the store not
   321  	// in the list, leading to the insertion of a tombstone object which contains
   322  	// the deleted key/value. Note that this value might be stale. If the ReplicaSet
   323  	// changed labels the new deployment will not be woken up till the periodic resync.
   324  	if !ok {
   325  		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
   326  		if !ok {
   327  			utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
   328  			return
   329  		}
   330  		rs, ok = tombstone.Obj.(*apps.ReplicaSet)
   331  		if !ok {
   332  			utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a ReplicaSet %#v", obj))
   333  			return
   334  		}
   335  	}
   336  
   337  	controllerRef := metav1.GetControllerOf(rs)
   338  	if controllerRef == nil {
   339  		// No controller should care about orphans being deleted.
   340  		return
   341  	}
   342  	d := dc.resolveControllerRef(rs.Namespace, controllerRef)
   343  	if d == nil {
   344  		return
   345  	}
   346  	logger.V(4).Info("ReplicaSet deleted", "replicaSet", klog.KObj(rs))
   347  	dc.enqueueDeployment(d)
   348  }
   349  
   350  // deletePod will enqueue a Recreate Deployment once all of its pods have stopped running.
   351  func (dc *DeploymentController) deletePod(logger klog.Logger, obj interface{}) {
   352  	pod, ok := obj.(*v1.Pod)
   353  
   354  	// When a delete is dropped, the relist will notice a pod in the store not
   355  	// in the list, leading to the insertion of a tombstone object which contains
   356  	// the deleted key/value. Note that this value might be stale. If the Pod
   357  	// changed labels the new deployment will not be woken up till the periodic resync.
   358  	if !ok {
   359  		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
   360  		if !ok {
   361  			utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
   362  			return
   363  		}
   364  		pod, ok = tombstone.Obj.(*v1.Pod)
   365  		if !ok {
   366  			utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %#v", obj))
   367  			return
   368  		}
   369  	}
   370  	d := dc.getDeploymentForPod(logger, pod)
   371  	if d == nil {
   372  		return
   373  	}
   374  	logger.V(4).Info("Pod deleted", "pod", klog.KObj(pod))
   375  	if d.Spec.Strategy.Type == apps.RecreateDeploymentStrategyType {
   376  		// Sync if this Deployment now has no more Pods.
   377  		rsList, err := util.ListReplicaSets(d, util.RsListFromClient(dc.client.AppsV1()))
   378  		if err != nil {
   379  			return
   380  		}
   381  		podMap, err := dc.getPodMapForDeployment(d, rsList)
   382  		if err != nil {
   383  			return
   384  		}
   385  		numPods := 0
   386  		for _, podList := range podMap {
   387  			numPods += len(podList)
   388  		}
   389  		if numPods == 0 {
   390  			dc.enqueueDeployment(d)
   391  		}
   392  	}
   393  }
   394  
   395  func (dc *DeploymentController) enqueue(deployment *apps.Deployment) {
   396  	key, err := controller.KeyFunc(deployment)
   397  	if err != nil {
   398  		utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", deployment, err))
   399  		return
   400  	}
   401  
   402  	dc.queue.Add(key)
   403  }
   404  
   405  func (dc *DeploymentController) enqueueRateLimited(deployment *apps.Deployment) {
   406  	key, err := controller.KeyFunc(deployment)
   407  	if err != nil {
   408  		utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", deployment, err))
   409  		return
   410  	}
   411  
   412  	dc.queue.AddRateLimited(key)
   413  }
   414  
   415  // enqueueAfter will enqueue a deployment after the provided amount of time.
   416  func (dc *DeploymentController) enqueueAfter(deployment *apps.Deployment, after time.Duration) {
   417  	key, err := controller.KeyFunc(deployment)
   418  	if err != nil {
   419  		utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", deployment, err))
   420  		return
   421  	}
   422  
   423  	dc.queue.AddAfter(key, after)
   424  }
   425  
   426  // getDeploymentForPod returns the deployment managing the given Pod.
   427  func (dc *DeploymentController) getDeploymentForPod(logger klog.Logger, pod *v1.Pod) *apps.Deployment {
   428  	// Find the owning replica set
   429  	var rs *apps.ReplicaSet
   430  	var err error
   431  	controllerRef := metav1.GetControllerOf(pod)
   432  	if controllerRef == nil {
   433  		// No controller owns this Pod.
   434  		return nil
   435  	}
   436  	if controllerRef.Kind != apps.SchemeGroupVersion.WithKind("ReplicaSet").Kind {
   437  		// Not a pod owned by a replica set.
   438  		return nil
   439  	}
   440  	rs, err = dc.rsLister.ReplicaSets(pod.Namespace).Get(controllerRef.Name)
   441  	if err != nil || rs.UID != controllerRef.UID {
   442  		logger.V(4).Info("Cannot get replicaset for pod", "ownerReference", controllerRef.Name, "pod", klog.KObj(pod), "err", err)
   443  		return nil
   444  	}
   445  
   446  	// Now find the Deployment that owns that ReplicaSet.
   447  	controllerRef = metav1.GetControllerOf(rs)
   448  	if controllerRef == nil {
   449  		return nil
   450  	}
   451  	return dc.resolveControllerRef(rs.Namespace, controllerRef)
   452  }
   453  
   454  // resolveControllerRef returns the controller referenced by a ControllerRef,
   455  // or nil if the ControllerRef could not be resolved to a matching controller
   456  // of the correct Kind.
   457  func (dc *DeploymentController) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *apps.Deployment {
   458  	// We can't look up by UID, so look up by Name and then verify UID.
   459  	// Don't even try to look up by Name if it's the wrong Kind.
   460  	if controllerRef.Kind != controllerKind.Kind {
   461  		return nil
   462  	}
   463  	d, err := dc.dLister.Deployments(namespace).Get(controllerRef.Name)
   464  	if err != nil {
   465  		return nil
   466  	}
   467  	if d.UID != controllerRef.UID {
   468  		// The controller we found with this Name is not the same one that the
   469  		// ControllerRef points to.
   470  		return nil
   471  	}
   472  	return d
   473  }
   474  
   475  // worker runs a worker thread that just dequeues items, processes them, and marks them done.
   476  // It enforces that the syncHandler is never invoked concurrently with the same key.
   477  func (dc *DeploymentController) worker(ctx context.Context) {
   478  	for dc.processNextWorkItem(ctx) {
   479  	}
   480  }
   481  
   482  func (dc *DeploymentController) processNextWorkItem(ctx context.Context) bool {
   483  	key, quit := dc.queue.Get()
   484  	if quit {
   485  		return false
   486  	}
   487  	defer dc.queue.Done(key)
   488  
   489  	err := dc.syncHandler(ctx, key.(string))
   490  	dc.handleErr(ctx, err, key)
   491  
   492  	return true
   493  }
   494  
   495  func (dc *DeploymentController) handleErr(ctx context.Context, err error, key interface{}) {
   496  	logger := klog.FromContext(ctx)
   497  	if err == nil || errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
   498  		dc.queue.Forget(key)
   499  		return
   500  	}
   501  	ns, name, keyErr := cache.SplitMetaNamespaceKey(key.(string))
   502  	if keyErr != nil {
   503  		logger.Error(err, "Failed to split meta namespace cache key", "cacheKey", key)
   504  	}
   505  
   506  	if dc.queue.NumRequeues(key) < maxRetries {
   507  		logger.V(2).Info("Error syncing deployment", "deployment", klog.KRef(ns, name), "err", err)
   508  		dc.queue.AddRateLimited(key)
   509  		return
   510  	}
   511  
   512  	utilruntime.HandleError(err)
   513  	logger.V(2).Info("Dropping deployment out of the queue", "deployment", klog.KRef(ns, name), "err", err)
   514  	dc.queue.Forget(key)
   515  }
   516  
   517  // getReplicaSetsForDeployment uses ControllerRefManager to reconcile
   518  // ControllerRef by adopting and orphaning.
   519  // It returns the list of ReplicaSets that this Deployment should manage.
   520  func (dc *DeploymentController) getReplicaSetsForDeployment(ctx context.Context, d *apps.Deployment) ([]*apps.ReplicaSet, error) {
   521  	// List all ReplicaSets to find those we own but that no longer match our
   522  	// selector. They will be orphaned by ClaimReplicaSets().
   523  	rsList, err := dc.rsLister.ReplicaSets(d.Namespace).List(labels.Everything())
   524  	if err != nil {
   525  		return nil, err
   526  	}
   527  	deploymentSelector, err := metav1.LabelSelectorAsSelector(d.Spec.Selector)
   528  	if err != nil {
   529  		return nil, fmt.Errorf("deployment %s/%s has invalid label selector: %v", d.Namespace, d.Name, err)
   530  	}
   531  	// If any adoptions are attempted, we should first recheck for deletion with
   532  	// an uncached quorum read sometime after listing ReplicaSets (see #42639).
   533  	canAdoptFunc := controller.RecheckDeletionTimestamp(func(ctx context.Context) (metav1.Object, error) {
   534  		fresh, err := dc.client.AppsV1().Deployments(d.Namespace).Get(ctx, d.Name, metav1.GetOptions{})
   535  		if err != nil {
   536  			return nil, err
   537  		}
   538  		if fresh.UID != d.UID {
   539  			return nil, fmt.Errorf("original Deployment %v/%v is gone: got uid %v, wanted %v", d.Namespace, d.Name, fresh.UID, d.UID)
   540  		}
   541  		return fresh, nil
   542  	})
   543  	cm := controller.NewReplicaSetControllerRefManager(dc.rsControl, d, deploymentSelector, controllerKind, canAdoptFunc)
   544  	return cm.ClaimReplicaSets(ctx, rsList)
   545  }
   546  
   547  // getPodMapForDeployment returns the Pods managed by a Deployment.
   548  //
   549  // It returns a map from ReplicaSet UID to a list of Pods controlled by that RS,
   550  // according to the Pod's ControllerRef.
   551  // NOTE: The pod pointers returned by this method point the pod objects in the cache and thus
   552  // shouldn't be modified in any way.
   553  func (dc *DeploymentController) getPodMapForDeployment(d *apps.Deployment, rsList []*apps.ReplicaSet) (map[types.UID][]*v1.Pod, error) {
   554  	// Get all Pods that potentially belong to this Deployment.
   555  	selector, err := metav1.LabelSelectorAsSelector(d.Spec.Selector)
   556  	if err != nil {
   557  		return nil, err
   558  	}
   559  	pods, err := dc.podLister.Pods(d.Namespace).List(selector)
   560  	if err != nil {
   561  		return nil, err
   562  	}
   563  	// Group Pods by their controller (if it's in rsList).
   564  	podMap := make(map[types.UID][]*v1.Pod, len(rsList))
   565  	for _, rs := range rsList {
   566  		podMap[rs.UID] = []*v1.Pod{}
   567  	}
   568  	for _, pod := range pods {
   569  		// Do not ignore inactive Pods because Recreate Deployments need to verify that no
   570  		// Pods from older versions are running before spinning up new Pods.
   571  		controllerRef := metav1.GetControllerOf(pod)
   572  		if controllerRef == nil {
   573  			continue
   574  		}
   575  		// Only append if we care about this UID.
   576  		if _, ok := podMap[controllerRef.UID]; ok {
   577  			podMap[controllerRef.UID] = append(podMap[controllerRef.UID], pod)
   578  		}
   579  	}
   580  	return podMap, nil
   581  }
   582  
   583  // syncDeployment will sync the deployment with the given key.
   584  // This function is not meant to be invoked concurrently with the same key.
   585  func (dc *DeploymentController) syncDeployment(ctx context.Context, key string) error {
   586  	logger := klog.FromContext(ctx)
   587  	namespace, name, err := cache.SplitMetaNamespaceKey(key)
   588  	if err != nil {
   589  		logger.Error(err, "Failed to split meta namespace cache key", "cacheKey", key)
   590  		return err
   591  	}
   592  
   593  	startTime := time.Now()
   594  	logger.V(4).Info("Started syncing deployment", "deployment", klog.KRef(namespace, name), "startTime", startTime)
   595  	defer func() {
   596  		logger.V(4).Info("Finished syncing deployment", "deployment", klog.KRef(namespace, name), "duration", time.Since(startTime))
   597  	}()
   598  
   599  	deployment, err := dc.dLister.Deployments(namespace).Get(name)
   600  	if errors.IsNotFound(err) {
   601  		logger.V(2).Info("Deployment has been deleted", "deployment", klog.KRef(namespace, name))
   602  		return nil
   603  	}
   604  	if err != nil {
   605  		return err
   606  	}
   607  
   608  	// Deep-copy otherwise we are mutating our cache.
   609  	// TODO: Deep-copy only when needed.
   610  	d := deployment.DeepCopy()
   611  
   612  	everything := metav1.LabelSelector{}
   613  	if reflect.DeepEqual(d.Spec.Selector, &everything) {
   614  		dc.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.")
   615  		if d.Status.ObservedGeneration < d.Generation {
   616  			d.Status.ObservedGeneration = d.Generation
   617  			dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{})
   618  		}
   619  		return nil
   620  	}
   621  
   622  	// List ReplicaSets owned by this Deployment, while reconciling ControllerRef
   623  	// through adoption/orphaning.
   624  	rsList, err := dc.getReplicaSetsForDeployment(ctx, d)
   625  	if err != nil {
   626  		return err
   627  	}
   628  	// List all Pods owned by this Deployment, grouped by their ReplicaSet.
   629  	// Current uses of the podMap are:
   630  	//
   631  	// * check if a Pod is labeled correctly with the pod-template-hash label.
   632  	// * check that no old Pods are running in the middle of Recreate Deployments.
   633  	podMap, err := dc.getPodMapForDeployment(d, rsList)
   634  	if err != nil {
   635  		return err
   636  	}
   637  
   638  	if d.DeletionTimestamp != nil {
   639  		return dc.syncStatusOnly(ctx, d, rsList)
   640  	}
   641  
   642  	// Update deployment conditions with an Unknown condition when pausing/resuming
   643  	// a deployment. In this way, we can be sure that we won't timeout when a user
   644  	// resumes a Deployment with a set progressDeadlineSeconds.
   645  	if err = dc.checkPausedConditions(ctx, d); err != nil {
   646  		return err
   647  	}
   648  
   649  	if d.Spec.Paused {
   650  		return dc.sync(ctx, d, rsList)
   651  	}
   652  
   653  	// rollback is not re-entrant in case the underlying replica sets are updated with a new
   654  	// revision so we should ensure that we won't proceed to update replica sets until we
   655  	// make sure that the deployment has cleaned up its rollback spec in subsequent enqueues.
   656  	if getRollbackTo(d) != nil {
   657  		return dc.rollback(ctx, d, rsList)
   658  	}
   659  
   660  	scalingEvent, err := dc.isScalingEvent(ctx, d, rsList)
   661  	if err != nil {
   662  		return err
   663  	}
   664  	if scalingEvent {
   665  		return dc.sync(ctx, d, rsList)
   666  	}
   667  
   668  	switch d.Spec.Strategy.Type {
   669  	case apps.RecreateDeploymentStrategyType:
   670  		return dc.rolloutRecreate(ctx, d, rsList, podMap)
   671  	case apps.RollingUpdateDeploymentStrategyType:
   672  		return dc.rolloutRolling(ctx, d, rsList)
   673  	}
   674  	return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
   675  }
   676  

View as plain text