    17  // Package deployment contains all the logic for handling Kubernetes Deployments.
    18  // It implements a set of strategies (rolling, recreate) for deploying an application,
    19  // the means to rollback to previous versions, proportional scaling for mitigating
    20  // risk, cleanup policy, and other useful features of Deployments.
    21  package deployment
    23  import (
    24  	"context"
    25  	"fmt"
    26  	"reflect"
    27  	"time"
    29  	apps "k8s.io/api/apps/v1"
    30  	v1 "k8s.io/api/core/v1"
    31  	"k8s.io/apimachinery/pkg/api/errors"
    32  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    33  	"k8s.io/apimachinery/pkg/labels"
    34  	"k8s.io/apimachinery/pkg/types"
    35  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    36  	"k8s.io/apimachinery/pkg/util/wait"
    37  	appsinformers "k8s.io/client-go/informers/apps/v1"
    38  	coreinformers "k8s.io/client-go/informers/core/v1"
    39  	clientset "k8s.io/client-go/kubernetes"
    40  	"k8s.io/client-go/kubernetes/scheme"
    41  	v1core "k8s.io/client-go/kubernetes/typed/core/v1"
    42  	appslisters "k8s.io/client-go/listers/apps/v1"
    43  	corelisters "k8s.io/client-go/listers/core/v1"
    44  	"k8s.io/client-go/tools/cache"
    45  	"k8s.io/client-go/tools/record"
    46  	"k8s.io/client-go/util/workqueue"
    47  	"k8s.io/klog/v2"
    48  	"k8s.io/kubernetes/pkg/controller"
    49  	"k8s.io/kubernetes/pkg/controller/deployment/util"
    50  )
    52  const (
    53  	// maxRetries is the number of times a deployment will be retried before it is dropped out of the queue.
    54  	// With the current rate-limiter in use (5ms*2^(maxRetries-1)) the following numbers represent the times
    55  	// a deployment is going to be requeued:
    56  	//
    57  	// 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s, 20.4s, 41s, 82s
    58  	maxRetries = 15
    59  )
    61  // controllerKind contains the schema.GroupVersionKind for this controller type.
    62  var controllerKind = apps.SchemeGroupVersion.WithKind("Deployment")
    64  // DeploymentController is responsible for synchronizing Deployment objects stored
    65  // in the system with actual running replica sets and pods.
    66  type DeploymentController struct {
    67  	// rsControl is used for adopting/releasing replica sets.
    68  	rsControl controller.RSControlInterface
    69  	client    clientset.Interface
    71  	eventBroadcaster record.EventBroadcaster
    72  	eventRecorder    record.EventRecorder
    74  	// To allow injection of syncDeployment for testing.
    75  	syncHandler func(ctx context.Context, dKey string) error
    76  	// used for unit testing
    77  	enqueueDeployment func(deployment *apps.Deployment)
    79  	// dLister can list/get deployments from the shared informer's store
    80  	dLister appslisters.DeploymentLister
    81  	// rsLister can list/get replica sets from the shared informer's store
    82  	rsLister appslisters.ReplicaSetLister
    83  	// podLister can list/get pods from the shared informer's store
    84  	podLister corelisters.PodLister
    86  	// dListerSynced returns true if the Deployment store has been synced at least once.
    87  	// Added as a member to the struct to allow injection for testing.
    88  	dListerSynced cache.InformerSynced
    89  	// rsListerSynced returns true if the ReplicaSet store has been synced at least once.
    90  	// Added as a member to the struct to allow injection for testing.
    91  	rsListerSynced cache.InformerSynced
    92  	// podListerSynced returns true if the pod store has been synced at least once.
    93  	// Added as a member to the struct to allow injection for testing.
    94  	podListerSynced cache.InformerSynced
    96  	// Deployments that need to be synced
    97  	queue workqueue.RateLimitingInterface
    98  }
   100  // NewDeploymentController creates a new DeploymentController.
   101  func NewDeploymentController(ctx context.Context, dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {
   102  	eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
   103  	logger := klog.FromContext(ctx)
   104  	dc := &DeploymentController{
   105  		client:           client,
   106  		eventBroadcaster: eventBroadcaster,
   107  		eventRecorder:    eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "deployment-controller"}),
   108  		queue:            workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),
   109  	}
   110  	dc.rsControl = controller.RealRSControl{
   111  		KubeClient: client,
   112  		Recorder:   dc.eventRecorder,
   113  	}
   115  	dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   116  		AddFunc: func(obj interface{}) {
   117  			dc.addDeployment(logger, obj)
   118  		},
   119  		UpdateFunc: func(oldObj, newObj interface{}) {
   120  			dc.updateDeployment(logger, oldObj, newObj)
   121  		},
   122  		// This will enter the sync loop and no-op, because the deployment has been deleted from the store.
   123  		DeleteFunc: func(obj interface{}) {
   124  			dc.deleteDeployment(logger, obj)
   125  		},
   126  	})
   127  	rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   128  		AddFunc: func(obj interface{}) {
   129  			dc.addReplicaSet(logger, obj)
   130  		},
   131  		UpdateFunc: func(oldObj, newObj interface{}) {
   132  			dc.updateReplicaSet(logger, oldObj, newObj)
   133  		},
   134  		DeleteFunc: func(obj interface{}) {
   135  			dc.deleteReplicaSet(logger, obj)
   136  		},
   137  	})
   138  	podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   139  		DeleteFunc: func(obj interface{}) {
   140  			dc.deletePod(logger, obj)
   141  		},
   142  	})
   144  	dc.syncHandler = dc.syncDeployment
   145  	dc.enqueueDeployment = dc.enqueue
   147  	dc.dLister = dInformer.Lister()
   148  	dc.rsLister = rsInformer.Lister()
   149  	dc.podLister = podInformer.Lister()
   150  	dc.dListerSynced = dInformer.Informer().HasSynced
   151  	dc.rsListerSynced = rsInformer.Informer().HasSynced
   152  	dc.podListerSynced = podInformer.Informer().HasSynced
   153  	return dc, nil
   154  }
   156  // Run begins watching and syncing.
   157  func (dc *DeploymentController) Run(ctx context.Context, workers int) {
   158  	defer utilruntime.HandleCrash()
   160  	// Start events processing pipeline.
   161  	dc.eventBroadcaster.StartStructuredLogging(3)
   162  	dc.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: dc.client.CoreV1().Events("")})
   163  	defer dc.eventBroadcaster.Shutdown()
   165  	defer dc.queue.ShutDown()
   167  	logger := klog.FromContext(ctx)
   168  	logger.Info("Starting controller", "controller", "deployment")
   169  	defer logger.Info("Shutting down controller", "controller", "deployment")
   171  	if !cache.WaitForNamedCacheSync("deployment", ctx.Done(), dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) {
   172  		return
   173  	}
   175  	for i := 0; i < workers; i++ {
   176  		go wait.UntilWithContext(ctx, dc.worker, time.Second)
   177  	}
   179  	<-ctx.Done()
   180  }
   182  func (dc *DeploymentController) addDeployment(logger klog.Logger, obj interface{}) {
   183  	d := obj.(*apps.Deployment)
   184  	logger.V(4).Info("Adding deployment", "deployment", klog.KObj(d))
   185  	dc.enqueueDeployment(d)
   186  }
   188  func (dc *DeploymentController) updateDeployment(logger klog.Logger, old, cur interface{}) {
   189  	oldD := old.(*apps.Deployment)
   190  	curD := cur.(*apps.Deployment)
   191  	logger.V(4).Info("Updating deployment", "deployment", klog.KObj(oldD))
   192  	dc.enqueueDeployment(curD)
   193  }
   195  func (dc *DeploymentController) deleteDeployment(logger klog.Logger, obj interface{}) {
   196  	d, ok := obj.(*apps.Deployment)
   197  	if !ok {
   198  		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
   199  		if !ok {
   200  			utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
   201  			return
   202  		}
   203  		d, ok = tombstone.Obj.(*apps.Deployment)
   204  		if !ok {
   205  			utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a Deployment %#v", obj))
   206  			return
   207  		}
   208  	}
   209  	logger.V(4).Info("Deleting deployment", "deployment", klog.KObj(d))
   210  	dc.enqueueDeployment(d)
   211  }
   213  // addReplicaSet enqueues the deployment that manages a ReplicaSet when the ReplicaSet is created.
   214  func (dc *DeploymentController) addReplicaSet(logger klog.Logger, obj interface{}) {
   215  	rs := obj.(*apps.ReplicaSet)
   217  	if rs.DeletionTimestamp != nil {
   218  		// On a restart of the controller manager, it's possible for an object to
   219  		// show up in a state that is already pending deletion.
   220  		dc.deleteReplicaSet(logger, rs)
   221  		return
   222  	}
   223  	// If it has a ControllerRef, that's all that matters.
   224  	if controllerRef := metav1.GetControllerOf(rs); controllerRef != nil {
   225  		d := dc.resolveControllerRef(rs.Namespace, controllerRef)
   226  		if d == nil {
   227  			return
   228  		}
   229  		logger.V(4).Info("ReplicaSet added", "replicaSet", klog.KObj(rs))
   230  		dc.enqueueDeployment(d)
   231  		return
   232  	}
   234  	// Otherwise, it's an orphan. Get a list of all matching Deployments and sync
   235  	// them to see if anyone wants to adopt it.
   236  	ds := dc.getDeploymentsForReplicaSet(logger, rs)
   237  	if len(ds) == 0 {
   238  		return
   239  	}
   240  	logger.V(4).Info("Orphan ReplicaSet added", "replicaSet", klog.KObj(rs))
   241  	for _, d := range ds {
   242  		dc.enqueueDeployment(d)
   243  	}
   244  }
   246  // getDeploymentsForReplicaSet returns a list of Deployments that potentially
   247  // match a ReplicaSet.
   248  func (dc *DeploymentController) getDeploymentsForReplicaSet(logger klog.Logger, rs *apps.ReplicaSet) []*apps.Deployment {
   249  	deployments, err := util.GetDeploymentsForReplicaSet(dc.dLister, rs)
   250  	if err != nil || len(deployments) == 0 {
   251  		return nil
   252  	}
   253  	// Because all ReplicaSet's belonging to a deployment should have a unique label key,
   254  	// there should never be more than one deployment returned by the above method.
   255  	// If that happens we should probably dynamically repair the situation by ultimately
   256  	// trying to clean up one of the controllers, for now we just return the older one
   257  	if len(deployments) > 1 {
   258  		// ControllerRef will ensure we don't do anything crazy, but more than one
   259  		// item in this list nevertheless constitutes user error.
   260  		logger.V(4).Info("user error! more than one deployment is selecting replica set",
   261  			"replicaSet", klog.KObj(rs), "labels", rs.Labels, "deployment", klog.KObj(deployments[0]))
   262  	}
   263  	return deployments
   264  }
   266  // updateReplicaSet figures out what deployment(s) manage a ReplicaSet when the ReplicaSet
   267  // is updated and wake them up. If the anything of the ReplicaSets have changed, we need to
   268  // awaken both the old and new deployments. old and cur must be *apps.ReplicaSet
   269  // types.
   270  func (dc *DeploymentController) updateReplicaSet(logger klog.Logger, old, cur interface{}) {
   271  	curRS := cur.(*apps.ReplicaSet)
   272  	oldRS := old.(*apps.ReplicaSet)
   273  	if curRS.ResourceVersion == oldRS.ResourceVersion {
   274  		// Periodic resync will send update events for all known replica sets.
   275  		// Two different versions of the same replica set will always have different RVs.
   276  		return
   277  	}
   279  	curControllerRef := metav1.GetControllerOf(curRS)
   280  	oldControllerRef := metav1.GetControllerOf(oldRS)
   281  	controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
   282  	if controllerRefChanged && oldControllerRef != nil {
   283  		// The ControllerRef was changed. Sync the old controller, if any.
   284  		if d := dc.resolveControllerRef(oldRS.Namespace, oldControllerRef); d != nil {
   285  			dc.enqueueDeployment(d)
   286  		}
   287  	}
   288  	// If it has a ControllerRef, that's all that matters.
   289  	if curControllerRef != nil {
   290  		d := dc.resolveControllerRef(curRS.Namespace, curControllerRef)
   291  		if d == nil {
   292  			return
   293  		}
   294  		logger.V(4).Info("ReplicaSet updated", "replicaSet", klog.KObj(curRS))
   295  		dc.enqueueDeployment(d)
   296  		return
   297  	}
   299  	// Otherwise, it's an orphan. If anything changed, sync matching controllers
   300  	// to see if anyone wants to adopt it now.
   301  	labelChanged := !reflect.DeepEqual(curRS.Labels, oldRS.Labels)
   302  	if labelChanged || controllerRefChanged {
   303  		ds := dc.getDeploymentsForReplicaSet(logger, curRS)
   304  		if len(ds) == 0 {
   305  			return
   306  		}
   307  		logger.V(4).Info("Orphan ReplicaSet updated", "replicaSet", klog.KObj(curRS))
   308  		for _, d := range ds {
   309  			dc.enqueueDeployment(d)
   310  		}
   311  	}
   312  }
   314  // deleteReplicaSet enqueues the deployment that manages a ReplicaSet when
   315  // the ReplicaSet is deleted. obj could be an *apps.ReplicaSet, or
   316  // a DeletionFinalStateUnknown marker item.
   317  func (dc *DeploymentController) deleteReplicaSet(logger klog.Logger, obj interface{}) {
   318  	rs, ok := obj.(*apps.ReplicaSet)
   320  	// When a delete is dropped, the relist will notice a pod in the store not
   321  	// in the list, leading to the insertion of a tombstone object which contains
   322  	// the deleted key/value. Note that this value might be stale. If the ReplicaSet
   323  	// changed labels the new deployment will not be woken up till the periodic resync.
   324  	if !ok {
   325  		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
   326  		if !ok {
   327  			utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
   328  			return
   329  		}
   330  		rs, ok = tombstone.Obj.(*apps.ReplicaSet)
   331  		if !ok {
   332  			utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a ReplicaSet %#v", obj))
   333  			return
   334  		}
   335  	}
   337  	controllerRef := metav1.GetControllerOf(rs)
   338  	if controllerRef == nil {
   339  		// No controller should care about orphans being deleted.
   340  		return
   341  	}
   342  	d := dc.resolveControllerRef(rs.Namespace, controllerRef)
   343  	if d == nil {
   344  		return
   345  	}
   346  	logger.V(4).Info("ReplicaSet deleted", "replicaSet", klog.KObj(rs))
   347  	dc.enqueueDeployment(d)
   348  }
   350  // deletePod will enqueue a Recreate Deployment once all of its pods have stopped running.
   351  func (dc *DeploymentController) deletePod(logger klog.Logger, obj interface{}) {
   352  	pod, ok := obj.(*v1.Pod)
   354  	// When a delete is dropped, the relist will notice a pod in the store not
   355  	// in the list, leading to the insertion of a tombstone object which contains
   356  	// the deleted key/value. Note that this value might be stale. If the Pod
   357  	// changed labels the new deployment will not be woken up till the periodic resync.
   358  	if !ok {
   359  		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
   360  		if !ok {
   361  			utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
   362  			return
   363  		}
   364  		pod, ok = tombstone.Obj.(*v1.Pod)
   365  		if !ok {
   366  			utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %#v", obj))
   367  			return
   368  		}
   369  	}
   370  	d := dc.getDeploymentForPod(logger, pod)
   371  	if d == nil {
   372  		return
   373  	}
   374  	logger.V(4).Info("Pod deleted", "pod", klog.KObj(pod))
   375  	if d.Spec.Strategy.Type == apps.RecreateDeploymentStrategyType {
   376  		// Sync if this Deployment now has no more Pods.
   377  		rsList, err := util.ListReplicaSets(d, util.RsListFromClient(dc.client.AppsV1()))
   378  		if err != nil {
   379  			return
   380  		}
   381  		podMap, err := dc.getPodMapForDeployment(d, rsList)
   382  		if err != nil {
   383  			return
   384  		}
   385  		numPods := 0
   386  		for _, podList := range podMap {
   387  			numPods += len(podList)
   388  		}
   389  		if numPods == 0 {
   390  			dc.enqueueDeployment(d)
   391  		}
   392  	}
   393  }
   395  func (dc *DeploymentController) enqueue(deployment *apps.Deployment) {
   396  	key, err := controller.KeyFunc(deployment)
   397  	if err != nil {
   398  		utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", deployment, err))
   399  		return
   400  	}
   402  	dc.queue.Add(key)
   403  }
   405  func (dc *DeploymentController) enqueueRateLimited(deployment *apps.Deployment) {
   406  	key, err := controller.KeyFunc(deployment)
   407  	if err != nil {
   408  		utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", deployment, err))
   409  		return
   410  	}
   412  	dc.queue.AddRateLimited(key)
   413  }
   415  // enqueueAfter will enqueue a deployment after the provided amount of time.
   416  func (dc *DeploymentController) enqueueAfter(deployment *apps.Deployment, after time.Duration) {
   417  	key, err := controller.KeyFunc(deployment)
   418  	if err != nil {
   419  		utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", deployment, err))
   420  		return
   421  	}
   423  	dc.queue.AddAfter(key, after)
   424  }
   426  // getDeploymentForPod returns the deployment managing the given Pod.
   427  func (dc *DeploymentController) getDeploymentForPod(logger klog.Logger, pod *v1.Pod) *apps.Deployment {
   428  	// Find the owning replica set
   429  	var rs *apps.ReplicaSet
   430  	var err error
   431  	controllerRef := metav1.GetControllerOf(pod)
   432  	if controllerRef == nil {
   433  		// No controller owns this Pod.
   434  		return nil
   435  	}
   436  	if controllerRef.Kind != apps.SchemeGroupVersion.WithKind("ReplicaSet").Kind {
   437  		// Not a pod owned by a replica set.
   438  		return nil
   439  	}
   440  	rs, err = dc.rsLister.ReplicaSets(pod.Namespace).Get(controllerRef.Name)
   441  	if err != nil || rs.UID != controllerRef.UID {
   442  		logger.V(4).Info("Cannot get replicaset for pod", "ownerReference", controllerRef.Name, "pod", klog.KObj(pod), "err", err)
   443  		return nil
   444  	}
   446  	// Now find the Deployment that owns that ReplicaSet.
   447  	controllerRef = metav1.GetControllerOf(rs)
   448  	if controllerRef == nil {
   449  		return nil
   450  	}
   451  	return dc.resolveControllerRef(rs.Namespace, controllerRef)
   452  }
   454  // resolveControllerRef returns the controller referenced by a ControllerRef,
   455  // or nil if the ControllerRef could not be resolved to a matching controller
   456  // of the correct Kind.
   457  func (dc *DeploymentController) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *apps.Deployment {
   458  	// We can't look up by UID, so look up by Name and then verify UID.
   459  	// Don't even try to look up by Name if it's the wrong Kind.
   460  	if controllerRef.Kind != controllerKind.Kind {
   461  		return nil
   462  	}
   463  	d, err := dc.dLister.Deployments(namespace).Get(controllerRef.Name)
   464  	if err != nil {
   465  		return nil
   466  	}
   467  	if d.UID != controllerRef.UID {
   468  		// The controller we found with this Name is not the same one that the
   469  		// ControllerRef points to.
   470  		return nil
   471  	}
   472  	return d
   473  }
   475  // worker runs a worker thread that just dequeues items, processes them, and marks them done.
   476  // It enforces that the syncHandler is never invoked concurrently with the same key.
   477  func (dc *DeploymentController) worker(ctx context.Context) {
   478  	for dc.processNextWorkItem(ctx) {
   479  	}
   480  }
   482  func (dc *DeploymentController) processNextWorkItem(ctx context.Context) bool {
   483  	key, quit := dc.queue.Get()
   484  	if quit {
   485  		return false
   486  	}
   487  	defer dc.queue.Done(key)
   489  	err := dc.syncHandler(ctx, key.(string))
   490  	dc.handleErr(ctx, err, key)
   492  	return true
   493  }
   495  func (dc *DeploymentController) handleErr(ctx context.Context, err error, key interface{}) {
   496  	logger := klog.FromContext(ctx)
   497  	if err == nil || errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
   498  		dc.queue.Forget(key)
   499  		return
   500  	}
   501  	ns, name, keyErr := cache.SplitMetaNamespaceKey(key.(string))
   502  	if keyErr != nil {
   503  		logger.Error(err, "Failed to split meta namespace cache key", "cacheKey", key)
   504  	}
   506  	if dc.queue.NumRequeues(key) < maxRetries {
   507  		logger.V(2).Info("Error syncing deployment", "deployment", klog.KRef(ns, name), "err", err)
   508  		dc.queue.AddRateLimited(key)
   509  		return
   510  	}
   512  	utilruntime.HandleError(err)
   513  	logger.V(2).Info("Dropping deployment out of the queue", "deployment", klog.KRef(ns, name), "err", err)
   514  	dc.queue.Forget(key)
   515  }
   517  // getReplicaSetsForDeployment uses ControllerRefManager to reconcile
   518  // ControllerRef by adopting and orphaning.
   519  // It returns the list of ReplicaSets that this Deployment should manage.
   520  func (dc *DeploymentController) getReplicaSetsForDeployment(ctx context.Context, d *apps.Deployment) ([]*apps.ReplicaSet, error) {
   521  	// List all ReplicaSets to find those we own but that no longer match our
   522  	// selector. They will be orphaned by ClaimReplicaSets().
   523  	rsList, err := dc.rsLister.ReplicaSets(d.Namespace).List(labels.Everything())
   524  	if err != nil {
   525  		return nil, err
   526  	}
   527  	deploymentSelector, err := metav1.LabelSelectorAsSelector(d.Spec.Selector)
   528  	if err != nil {
   529  		return nil, fmt.Errorf("deployment %s/%s has invalid label selector: %v", d.Namespace, d.Name, err)
   530  	}
   531  	// If any adoptions are attempted, we should first recheck for deletion with
   532  	// an uncached quorum read sometime after listing ReplicaSets (see #42639).
   533  	canAdoptFunc := controller.RecheckDeletionTimestamp(func(ctx context.Context) (metav1.Object, error) {
   534  		fresh, err := dc.client.AppsV1().Deployments(d.Namespace).Get(ctx, d.Name, metav1.GetOptions{})
   535  		if err != nil {
   536  			return nil, err
   537  		}
   538  		if fresh.UID != d.UID {
   539  			return nil, fmt.Errorf("original Deployment %v/%v is gone: got uid %v, wanted %v", d.Namespace, d.Name, fresh.UID, d.UID)
   540  		}
   541  		return fresh, nil
   542  	})
   543  	cm := controller.NewReplicaSetControllerRefManager(dc.rsControl, d, deploymentSelector, controllerKind, canAdoptFunc)
   544  	return cm.ClaimReplicaSets(ctx, rsList)
   545  }
   547  // getPodMapForDeployment returns the Pods managed by a Deployment.
   548  //
   549  // It returns a map from ReplicaSet UID to a list of Pods controlled by that RS,
   550  // according to the Pod's ControllerRef.
   551  // NOTE: The pod pointers returned by this method point the pod objects in the cache and thus
   552  // shouldn't be modified in any way.
   553  func (dc *DeploymentController) getPodMapForDeployment(d *apps.Deployment, rsList []*apps.ReplicaSet) (map[types.UID][]*v1.Pod, error) {
   554  	// Get all Pods that potentially belong to this Deployment.
   555  	selector, err := metav1.LabelSelectorAsSelector(d.Spec.Selector)
   556  	if err != nil {
   557  		return nil, err
   558  	}
   559  	pods, err := dc.podLister.Pods(d.Namespace).List(selector)
   560  	if err != nil {
   561  		return nil, err
   562  	}
   563  	// Group Pods by their controller (if it's in rsList).
   564  	podMap := make(map[types.UID][]*v1.Pod, len(rsList))
   565  	for _, rs := range rsList {
   566  		podMap[rs.UID] = []*v1.Pod{}
   567  	}
   568  	for _, pod := range pods {
   569  		// Do not ignore inactive Pods because Recreate Deployments need to verify that no
   570  		// Pods from older versions are running before spinning up new Pods.
   571  		controllerRef := metav1.GetControllerOf(pod)
   572  		if controllerRef == nil {
   573  			continue
   574  		}
   575  		// Only append if we care about this UID.
   576  		if _, ok := podMap[controllerRef.UID]; ok {
   577  			podMap[controllerRef.UID] = append(podMap[controllerRef.UID], pod)
   578  		}
   579  	}
   580  	return podMap, nil
   581  }
   583  // syncDeployment will sync the deployment with the given key.
   584  // This function is not meant to be invoked concurrently with the same key.
   585  func (dc *DeploymentController) syncDeployment(ctx context.Context, key string) error {
   586  	logger := klog.FromContext(ctx)
   587  	namespace, name, err := cache.SplitMetaNamespaceKey(key)
   588  	if err != nil {
   589  		logger.Error(err, "Failed to split meta namespace cache key", "cacheKey", key)
   590  		return err
   591  	}
   593  	startTime := time.Now()
   594  	logger.V(4).Info("Started syncing deployment", "deployment", klog.KRef(namespace, name), "startTime", startTime)
   595  	defer func() {
   596  		logger.V(4).Info("Finished syncing deployment", "deployment", klog.KRef(namespace, name), "duration", time.Since(startTime))
   597  	}()
   599  	deployment, err := dc.dLister.Deployments(namespace).Get(name)
   600  	if errors.IsNotFound(err) {
   601  		logger.V(2).Info("Deployment has been deleted", "deployment", klog.KRef(namespace, name))
   602  		return nil
   603  	}
   604  	if err != nil {
   605  		return err
   606  	}
   608  	// Deep-copy otherwise we are mutating our cache.
   609  	// TODO: Deep-copy only when needed.
   610  	d := deployment.DeepCopy()
   612  	everything := metav1.LabelSelector{}
   613  	if reflect.DeepEqual(d.Spec.Selector, &everything) {
   614  		dc.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.")
   615  		if d.Status.ObservedGeneration < d.Generation {
   616  			d.Status.ObservedGeneration = d.Generation
   617  			dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{})
   618  		}
   619  		return nil
   620  	}
   622  	// List ReplicaSets owned by this Deployment, while reconciling ControllerRef
   623  	// through adoption/orphaning.
   624  	rsList, err := dc.getReplicaSetsForDeployment(ctx, d)
   625  	if err != nil {
   626  		return err
   627  	}
   628  	// List all Pods owned by this Deployment, grouped by their ReplicaSet.
   629  	// Current uses of the podMap are:
   630  	//
   631  	// * check if a Pod is labeled correctly with the pod-template-hash label.
   632  	// * check that no old Pods are running in the middle of Recreate Deployments.
   633  	podMap, err := dc.getPodMapForDeployment(d, rsList)
   634  	if err != nil {
   635  		return err
   636  	}
   638  	if d.DeletionTimestamp != nil {
   639  		return dc.syncStatusOnly(ctx, d, rsList)
   640  	}
   642  	// Update deployment conditions with an Unknown condition when pausing/resuming
   643  	// a deployment. In this way, we can be sure that we won't timeout when a user
   644  	// resumes a Deployment with a set progressDeadlineSeconds.
   645  	if err = dc.checkPausedConditions(ctx, d); err != nil {
   646  		return err
   647  	}
   649  	if d.Spec.Paused {
   650  		return dc.sync(ctx, d, rsList)
   651  	}
   653  	// rollback is not re-entrant in case the underlying replica sets are updated with a new
   654  	// revision so we should ensure that we won't proceed to update replica sets until we
   655  	// make sure that the deployment has cleaned up its rollback spec in subsequent enqueues.
   656  	if getRollbackTo(d) != nil {
   657  		return dc.rollback(ctx, d, rsList)
   658  	}
   660  	scalingEvent, err := dc.isScalingEvent(ctx, d, rsList)
   661  	if err != nil {
   662  		return err
   663  	}
   664  	if scalingEvent {
   665  		return dc.sync(ctx, d, rsList)
   666  	}
   668  	switch d.Spec.Strategy.Type {
   669  	case apps.RecreateDeploymentStrategyType:
   670  		return dc.rolloutRecreate(ctx, d, rsList, podMap)
   671  	case apps.RollingUpdateDeploymentStrategyType:
   672  		return dc.rolloutRolling(ctx, d, rsList)
   673  	}
   674  	return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
   675  }

