
Source file src/k8s.io/kubernetes/pkg/controller/replicaset/replica_set.go

Documentation: k8s.io/kubernetes/pkg/controller/replicaset

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     8      http://www.apache.org/licenses/LICENSE-2.0
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    17  // ### ATTENTION ###
    18  //
    19  // This code implements both ReplicaSet and ReplicationController.
    20  //
    21  // For RC, the objects are converted on the way in and out (see ../replication/),
    22  // as if ReplicationController were just an older API version of ReplicaSet.
    23  // However, RC and RS still have separate storage and separate instantiations
    24  // of the ReplicaSetController object.
    25  //
    26  // Use rsc.Kind in log messages rather than hard-coding "ReplicaSet".
    28  package replicaset
    30  import (
    31  	"context"
    32  	"fmt"
    33  	"reflect"
    34  	"sort"
    35  	"strings"
    36  	"sync"
    37  	"time"
    39  	apps "k8s.io/api/apps/v1"
    40  	v1 "k8s.io/api/core/v1"
    41  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    42  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    43  	"k8s.io/apimachinery/pkg/labels"
    44  	"k8s.io/apimachinery/pkg/runtime/schema"
    45  	"k8s.io/apimachinery/pkg/types"
    46  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    47  	"k8s.io/apimachinery/pkg/util/wait"
    48  	appsinformers "k8s.io/client-go/informers/apps/v1"
    49  	coreinformers "k8s.io/client-go/informers/core/v1"
    50  	clientset "k8s.io/client-go/kubernetes"
    51  	"k8s.io/client-go/kubernetes/scheme"
    52  	v1core "k8s.io/client-go/kubernetes/typed/core/v1"
    53  	appslisters "k8s.io/client-go/listers/apps/v1"
    54  	corelisters "k8s.io/client-go/listers/core/v1"
    55  	"k8s.io/client-go/tools/cache"
    56  	"k8s.io/client-go/tools/record"
    57  	"k8s.io/client-go/util/workqueue"
    58  	"k8s.io/component-base/metrics/legacyregistry"
    59  	"k8s.io/klog/v2"
    60  	podutil "k8s.io/kubernetes/pkg/api/v1/pod"
    61  	"k8s.io/kubernetes/pkg/controller"
    62  	"k8s.io/kubernetes/pkg/controller/replicaset/metrics"
    63  )
    65  const (
    66  	// Realistic value of the burstReplica field for the replica set manager based off
    67  	// performance requirements for kubernetes 1.0.
    68  	BurstReplicas = 500
    70  	// The number of times we retry updating a ReplicaSet's status.
    71  	statusUpdateRetries = 1
    73  	// controllerUIDIndex is the name for the ReplicaSet store's index function,
    74  	// which is to index by ReplicaSet's controllerUID.
    75  	controllerUIDIndex = "controllerUID"
    76  )
    78  // ReplicaSetController is responsible for synchronizing ReplicaSet objects stored
    79  // in the system with actual running pods.
    80  type ReplicaSetController struct {
    81  	// GroupVersionKind indicates the controller type.
    82  	// Different instances of this struct may handle different GVKs.
    83  	// For example, this struct can be used (with adapters) to handle ReplicationController.
    84  	schema.GroupVersionKind
    86  	kubeClient clientset.Interface
    87  	podControl controller.PodControlInterface
    89  	eventBroadcaster record.EventBroadcaster
    91  	// A ReplicaSet is temporarily suspended after creating/deleting these many replicas.
    92  	// It resumes normal action after observing the watch events for them.
    93  	burstReplicas int
    94  	// To allow injection of syncReplicaSet for testing.
    95  	syncHandler func(ctx context.Context, rsKey string) error
    97  	// A TTLCache of pod creates/deletes each rc expects to see.
    98  	expectations *controller.UIDTrackingControllerExpectations
   100  	// A store of ReplicaSets, populated by the shared informer passed to NewReplicaSetController
   101  	rsLister appslisters.ReplicaSetLister
   102  	// rsListerSynced returns true if the pod store has been synced at least once.
   103  	// Added as a member to the struct to allow injection for testing.
   104  	rsListerSynced cache.InformerSynced
   105  	rsIndexer      cache.Indexer
   107  	// A store of pods, populated by the shared informer passed to NewReplicaSetController
   108  	podLister corelisters.PodLister
   109  	// podListerSynced returns true if the pod store has been synced at least once.
   110  	// Added as a member to the struct to allow injection for testing.
   111  	podListerSynced cache.InformerSynced
   113  	// Controllers that need to be synced
   114  	queue workqueue.RateLimitingInterface
   115  }
   117  // NewReplicaSetController configures a replica set controller with the specified event recorder
   118  func NewReplicaSetController(ctx context.Context, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int) *ReplicaSetController {
   119  	logger := klog.FromContext(ctx)
   120  	eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
   121  	if err := metrics.Register(legacyregistry.Register); err != nil {
   122  		logger.Error(err, "unable to register metrics")
   123  	}
   124  	return NewBaseController(logger, rsInformer, podInformer, kubeClient, burstReplicas,
   125  		apps.SchemeGroupVersion.WithKind("ReplicaSet"),
   126  		"replicaset_controller",
   127  		"replicaset",
   128  		controller.RealPodControl{
   129  			KubeClient: kubeClient,
   130  			Recorder:   eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "replicaset-controller"}),
   131  		},
   132  		eventBroadcaster,
   133  	)
   134  }
   136  // NewBaseController is the implementation of NewReplicaSetController with additional injected
   137  // parameters so that it can also serve as the implementation of NewReplicationController.
   138  func NewBaseController(logger klog.Logger, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int,
   139  	gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface, eventBroadcaster record.EventBroadcaster) *ReplicaSetController {
   141  	rsc := &ReplicaSetController{
   142  		GroupVersionKind: gvk,
   143  		kubeClient:       kubeClient,
   144  		podControl:       podControl,
   145  		eventBroadcaster: eventBroadcaster,
   146  		burstReplicas:    burstReplicas,
   147  		expectations:     controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
   148  		queue:            workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), queueName),
   149  	}
   151  	rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   152  		AddFunc: func(obj interface{}) {
   153  			rsc.addRS(logger, obj)
   154  		},
   155  		UpdateFunc: func(oldObj, newObj interface{}) {
   156  			rsc.updateRS(logger, oldObj, newObj)
   157  		},
   158  		DeleteFunc: func(obj interface{}) {
   159  			rsc.deleteRS(logger, obj)
   160  		},
   161  	})
   162  	rsInformer.Informer().AddIndexers(cache.Indexers{
   163  		controllerUIDIndex: func(obj interface{}) ([]string, error) {
   164  			rs, ok := obj.(*apps.ReplicaSet)
   165  			if !ok {
   166  				return []string{}, nil
   167  			}
   168  			controllerRef := metav1.GetControllerOf(rs)
   169  			if controllerRef == nil {
   170  				return []string{}, nil
   171  			}
   172  			return []string{string(controllerRef.UID)}, nil
   173  		},
   174  	})
   175  	rsc.rsIndexer = rsInformer.Informer().GetIndexer()
   176  	rsc.rsLister = rsInformer.Lister()
   177  	rsc.rsListerSynced = rsInformer.Informer().HasSynced
   179  	podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   180  		AddFunc: func(obj interface{}) {
   181  			rsc.addPod(logger, obj)
   182  		},
   183  		// This invokes the ReplicaSet for every pod change, eg: host assignment. Though this might seem like
   184  		// overkill the most frequent pod update is status, and the associated ReplicaSet will only list from
   185  		// local storage, so it should be ok.
   186  		UpdateFunc: func(oldObj, newObj interface{}) {
   187  			rsc.updatePod(logger, oldObj, newObj)
   188  		},
   189  		DeleteFunc: func(obj interface{}) {
   190  			rsc.deletePod(logger, obj)
   191  		},
   192  	})
   193  	rsc.podLister = podInformer.Lister()
   194  	rsc.podListerSynced = podInformer.Informer().HasSynced
   196  	rsc.syncHandler = rsc.syncReplicaSet
   198  	return rsc
   199  }
   201  // Run begins watching and syncing.
   202  func (rsc *ReplicaSetController) Run(ctx context.Context, workers int) {
   203  	defer utilruntime.HandleCrash()
   205  	// Start events processing pipeline.
   206  	rsc.eventBroadcaster.StartStructuredLogging(3)
   207  	rsc.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: rsc.kubeClient.CoreV1().Events("")})
   208  	defer rsc.eventBroadcaster.Shutdown()
   210  	defer rsc.queue.ShutDown()
   212  	controllerName := strings.ToLower(rsc.Kind)
   213  	logger := klog.FromContext(ctx)
   214  	logger.Info("Starting controller", "name", controllerName)
   215  	defer logger.Info("Shutting down controller", "name", controllerName)
   217  	if !cache.WaitForNamedCacheSync(rsc.Kind, ctx.Done(), rsc.podListerSynced, rsc.rsListerSynced) {
   218  		return
   219  	}
   221  	for i := 0; i < workers; i++ {
   222  		go wait.UntilWithContext(ctx, rsc.worker, time.Second)
   223  	}
   225  	<-ctx.Done()
   226  }
   228  // getReplicaSetsWithSameController returns a list of ReplicaSets with the same
   229  // owner as the given ReplicaSet.
   230  func (rsc *ReplicaSetController) getReplicaSetsWithSameController(logger klog.Logger, rs *apps.ReplicaSet) []*apps.ReplicaSet {
   231  	controllerRef := metav1.GetControllerOf(rs)
   232  	if controllerRef == nil {
   233  		utilruntime.HandleError(fmt.Errorf("ReplicaSet has no controller: %v", rs))
   234  		return nil
   235  	}
   237  	objects, err := rsc.rsIndexer.ByIndex(controllerUIDIndex, string(controllerRef.UID))
   238  	if err != nil {
   239  		utilruntime.HandleError(err)
   240  		return nil
   241  	}
   242  	relatedRSs := make([]*apps.ReplicaSet, 0, len(objects))
   243  	for _, obj := range objects {
   244  		relatedRSs = append(relatedRSs, obj.(*apps.ReplicaSet))
   245  	}
   247  	if klogV := logger.V(2); klogV.Enabled() {
   248  		klogV.Info("Found related ReplicaSets", "replicaSet", klog.KObj(rs), "relatedReplicaSets", klog.KObjSlice(relatedRSs))
   249  	}
   251  	return relatedRSs
   252  }
   254  // getPodReplicaSets returns a list of ReplicaSets matching the given pod.
   255  func (rsc *ReplicaSetController) getPodReplicaSets(pod *v1.Pod) []*apps.ReplicaSet {
   256  	rss, err := rsc.rsLister.GetPodReplicaSets(pod)
   257  	if err != nil {
   258  		return nil
   259  	}
   260  	if len(rss) > 1 {
   261  		// ControllerRef will ensure we don't do anything crazy, but more than one
   262  		// item in this list nevertheless constitutes user error.
   263  		utilruntime.HandleError(fmt.Errorf("user error! more than one %v is selecting pods with labels: %+v", rsc.Kind, pod.Labels))
   264  	}
   265  	return rss
   266  }
   268  // resolveControllerRef returns the controller referenced by a ControllerRef,
   269  // or nil if the ControllerRef could not be resolved to a matching controller
   270  // of the correct Kind.
   271  func (rsc *ReplicaSetController) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *apps.ReplicaSet {
   272  	// We can't look up by UID, so look up by Name and then verify UID.
   273  	// Don't even try to look up by Name if it's the wrong Kind.
   274  	if controllerRef.Kind != rsc.Kind {
   275  		return nil
   276  	}
   277  	rs, err := rsc.rsLister.ReplicaSets(namespace).Get(controllerRef.Name)
   278  	if err != nil {
   279  		return nil
   280  	}
   281  	if rs.UID != controllerRef.UID {
   282  		// The controller we found with this Name is not the same one that the
   283  		// ControllerRef points to.
   284  		return nil
   285  	}
   286  	return rs
   287  }
   289  func (rsc *ReplicaSetController) enqueueRS(rs *apps.ReplicaSet) {
   290  	key, err := controller.KeyFunc(rs)
   291  	if err != nil {
   292  		utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", rs, err))
   293  		return
   294  	}
   296  	rsc.queue.Add(key)
   297  }
   299  func (rsc *ReplicaSetController) enqueueRSAfter(rs *apps.ReplicaSet, duration time.Duration) {
   300  	key, err := controller.KeyFunc(rs)
   301  	if err != nil {
   302  		utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", rs, err))
   303  		return
   304  	}
   306  	rsc.queue.AddAfter(key, duration)
   307  }
   309  func (rsc *ReplicaSetController) addRS(logger klog.Logger, obj interface{}) {
   310  	rs := obj.(*apps.ReplicaSet)
   311  	logger.V(4).Info("Adding", "replicaSet", klog.KObj(rs))
   312  	rsc.enqueueRS(rs)
   313  }
   315  // callback when RS is updated
   316  func (rsc *ReplicaSetController) updateRS(logger klog.Logger, old, cur interface{}) {
   317  	oldRS := old.(*apps.ReplicaSet)
   318  	curRS := cur.(*apps.ReplicaSet)
   320  	// TODO: make a KEP and fix informers to always call the delete event handler on re-create
   321  	if curRS.UID != oldRS.UID {
   322  		key, err := controller.KeyFunc(oldRS)
   323  		if err != nil {
   324  			utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", oldRS, err))
   325  			return
   326  		}
   327  		rsc.deleteRS(logger, cache.DeletedFinalStateUnknown{
   328  			Key: key,
   329  			Obj: oldRS,
   330  		})
   331  	}
   333  	// You might imagine that we only really need to enqueue the
   334  	// replica set when Spec changes, but it is safer to sync any
   335  	// time this function is triggered. That way a full informer
   336  	// resync can requeue any replica set that don't yet have pods
   337  	// but whose last attempts at creating a pod have failed (since
   338  	// we don't block on creation of pods) instead of those
   339  	// replica sets stalling indefinitely. Enqueueing every time
   340  	// does result in some spurious syncs (like when Status.Replica
   341  	// is updated and the watch notification from it retriggers
   342  	// this function), but in general extra resyncs shouldn't be
   343  	// that bad as ReplicaSets that haven't met expectations yet won't
   344  	// sync, and all the listing is done using local stores.
   345  	if *(oldRS.Spec.Replicas) != *(curRS.Spec.Replicas) {
   346  		logger.V(4).Info("replicaSet updated. Desired pod count change.", "replicaSet", klog.KObj(oldRS), "oldReplicas", *(oldRS.Spec.Replicas), "newReplicas", *(curRS.Spec.Replicas))
   347  	}
   348  	rsc.enqueueRS(curRS)
   349  }
   351  func (rsc *ReplicaSetController) deleteRS(logger klog.Logger, obj interface{}) {
   352  	rs, ok := obj.(*apps.ReplicaSet)
   353  	if !ok {
   354  		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
   355  		if !ok {
   356  			utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
   357  			return
   358  		}
   359  		rs, ok = tombstone.Obj.(*apps.ReplicaSet)
   360  		if !ok {
   361  			utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a ReplicaSet %#v", obj))
   362  			return
   363  		}
   364  	}
   366  	key, err := controller.KeyFunc(rs)
   367  	if err != nil {
   368  		utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", rs, err))
   369  		return
   370  	}
   372  	logger.V(4).Info("Deleting", "replicaSet", klog.KObj(rs))
   374  	// Delete expectations for the ReplicaSet so if we create a new one with the same name it starts clean
   375  	rsc.expectations.DeleteExpectations(logger, key)
   377  	rsc.queue.Add(key)
   378  }
   380  // When a pod is created, enqueue the replica set that manages it and update its expectations.
   381  func (rsc *ReplicaSetController) addPod(logger klog.Logger, obj interface{}) {
   382  	pod := obj.(*v1.Pod)
   384  	if pod.DeletionTimestamp != nil {
   385  		// on a restart of the controller manager, it's possible a new pod shows up in a state that
   386  		// is already pending deletion. Prevent the pod from being a creation observation.
   387  		rsc.deletePod(logger, pod)
   388  		return
   389  	}
   391  	// If it has a ControllerRef, that's all that matters.
   392  	if controllerRef := metav1.GetControllerOf(pod); controllerRef != nil {
   393  		rs := rsc.resolveControllerRef(pod.Namespace, controllerRef)
   394  		if rs == nil {
   395  			return
   396  		}
   397  		rsKey, err := controller.KeyFunc(rs)
   398  		if err != nil {
   399  			return
   400  		}
   401  		logger.V(4).Info("Pod created", "pod", klog.KObj(pod), "detail", pod)
   402  		rsc.expectations.CreationObserved(logger, rsKey)
   403  		rsc.queue.Add(rsKey)
   404  		return
   405  	}
   407  	// Otherwise, it's an orphan. Get a list of all matching ReplicaSets and sync
   408  	// them to see if anyone wants to adopt it.
   409  	// DO NOT observe creation because no controller should be waiting for an
   410  	// orphan.
   411  	rss := rsc.getPodReplicaSets(pod)
   412  	if len(rss) == 0 {
   413  		return
   414  	}
   415  	logger.V(4).Info("Orphan Pod created", "pod", klog.KObj(pod), "detail", pod)
   416  	for _, rs := range rss {
   417  		rsc.enqueueRS(rs)
   418  	}
   419  }
   421  // When a pod is updated, figure out what replica set/s manage it and wake them
   422  // up. If the labels of the pod have changed we need to awaken both the old
   423  // and new replica set. old and cur must be *v1.Pod types.
   424  func (rsc *ReplicaSetController) updatePod(logger klog.Logger, old, cur interface{}) {
   425  	curPod := cur.(*v1.Pod)
   426  	oldPod := old.(*v1.Pod)
   427  	if curPod.ResourceVersion == oldPod.ResourceVersion {
   428  		// Periodic resync will send update events for all known pods.
   429  		// Two different versions of the same pod will always have different RVs.
   430  		return
   431  	}
   433  	labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
   434  	if curPod.DeletionTimestamp != nil {
   435  		// when a pod is deleted gracefully it's deletion timestamp is first modified to reflect a grace period,
   436  		// and after such time has passed, the kubelet actually deletes it from the store. We receive an update
   437  		// for modification of the deletion timestamp and expect an rs to create more replicas asap, not wait
   438  		// until the kubelet actually deletes the pod. This is different from the Phase of a pod changing, because
   439  		// an rs never initiates a phase change, and so is never asleep waiting for the same.
   440  		rsc.deletePod(logger, curPod)
   441  		if labelChanged {
   442  			// we don't need to check the oldPod.DeletionTimestamp because DeletionTimestamp cannot be unset.
   443  			rsc.deletePod(logger, oldPod)
   444  		}
   445  		return
   446  	}
   448  	curControllerRef := metav1.GetControllerOf(curPod)
   449  	oldControllerRef := metav1.GetControllerOf(oldPod)
   450  	controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
   451  	if controllerRefChanged && oldControllerRef != nil {
   452  		// The ControllerRef was changed. Sync the old controller, if any.
   453  		if rs := rsc.resolveControllerRef(oldPod.Namespace, oldControllerRef); rs != nil {
   454  			rsc.enqueueRS(rs)
   455  		}
   456  	}
   458  	// If it has a ControllerRef, that's all that matters.
   459  	if curControllerRef != nil {
   460  		rs := rsc.resolveControllerRef(curPod.Namespace, curControllerRef)
   461  		if rs == nil {
   462  			return
   463  		}
   464  		logger.V(4).Info("Pod objectMeta updated.", "pod", klog.KObj(oldPod), "oldObjectMeta", oldPod.ObjectMeta, "curObjectMeta", curPod.ObjectMeta)
   465  		rsc.enqueueRS(rs)
   466  		// TODO: MinReadySeconds in the Pod will generate an Available condition to be added in
   467  		// the Pod status which in turn will trigger a requeue of the owning replica set thus
   468  		// having its status updated with the newly available replica. For now, we can fake the
   469  		// update by resyncing the controller MinReadySeconds after the it is requeued because
   470  		// a Pod transitioned to Ready.
   471  		// Note that this still suffers from #29229, we are just moving the problem one level
   472  		// "closer" to kubelet (from the deployment to the replica set controller).
   473  		if !podutil.IsPodReady(oldPod) && podutil.IsPodReady(curPod) && rs.Spec.MinReadySeconds > 0 {
   474  			logger.V(2).Info("pod will be enqueued after a while for availability check", "duration", rs.Spec.MinReadySeconds, "kind", rsc.Kind, "pod", klog.KObj(oldPod))
   475  			// Add a second to avoid milliseconds skew in AddAfter.
   476  			// See https://github.com/kubernetes/kubernetes/issues/39785#issuecomment-279959133 for more info.
   477  			rsc.enqueueRSAfter(rs, (time.Duration(rs.Spec.MinReadySeconds)*time.Second)+time.Second)
   478  		}
   479  		return
   480  	}
   482  	// Otherwise, it's an orphan. If anything changed, sync matching controllers
   483  	// to see if anyone wants to adopt it now.
   484  	if labelChanged || controllerRefChanged {
   485  		rss := rsc.getPodReplicaSets(curPod)
   486  		if len(rss) == 0 {
   487  			return
   488  		}
   489  		logger.V(4).Info("Orphan Pod objectMeta updated.", "pod", klog.KObj(oldPod), "oldObjectMeta", oldPod.ObjectMeta, "curObjectMeta", curPod.ObjectMeta)
   490  		for _, rs := range rss {
   491  			rsc.enqueueRS(rs)
   492  		}
   493  	}
   494  }
   496  // When a pod is deleted, enqueue the replica set that manages the pod and update its expectations.
   497  // obj could be an *v1.Pod, or a DeletionFinalStateUnknown marker item.
   498  func (rsc *ReplicaSetController) deletePod(logger klog.Logger, obj interface{}) {
   499  	pod, ok := obj.(*v1.Pod)
   501  	// When a delete is dropped, the relist will notice a pod in the store not
   502  	// in the list, leading to the insertion of a tombstone object which contains
   503  	// the deleted key/value. Note that this value might be stale. If the pod
   504  	// changed labels the new ReplicaSet will not be woken up till the periodic resync.
   505  	if !ok {
   506  		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
   507  		if !ok {
   508  			utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %+v", obj))
   509  			return
   510  		}
   511  		pod, ok = tombstone.Obj.(*v1.Pod)
   512  		if !ok {
   513  			utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %#v", obj))
   514  			return
   515  		}
   516  	}
   518  	controllerRef := metav1.GetControllerOf(pod)
   519  	if controllerRef == nil {
   520  		// No controller should care about orphans being deleted.
   521  		return
   522  	}
   523  	rs := rsc.resolveControllerRef(pod.Namespace, controllerRef)
   524  	if rs == nil {
   525  		return
   526  	}
   527  	rsKey, err := controller.KeyFunc(rs)
   528  	if err != nil {
   529  		utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", rs, err))
   530  		return
   531  	}
   532  	logger.V(4).Info("Pod deleted", "delete_by", utilruntime.GetCaller(), "deletion_timestamp", pod.DeletionTimestamp, "pod", klog.KObj(pod))
   533  	rsc.expectations.DeletionObserved(logger, rsKey, controller.PodKey(pod))
   534  	rsc.queue.Add(rsKey)
   535  }
   537  // worker runs a worker thread that just dequeues items, processes them, and marks them done.
   538  // It enforces that the syncHandler is never invoked concurrently with the same key.
   539  func (rsc *ReplicaSetController) worker(ctx context.Context) {
   540  	for rsc.processNextWorkItem(ctx) {
   541  	}
   542  }
   544  func (rsc *ReplicaSetController) processNextWorkItem(ctx context.Context) bool {
   545  	key, quit := rsc.queue.Get()
   546  	if quit {
   547  		return false
   548  	}
   549  	defer rsc.queue.Done(key)
   551  	err := rsc.syncHandler(ctx, key.(string))
   552  	if err == nil {
   553  		rsc.queue.Forget(key)
   554  		return true
   555  	}
   557  	utilruntime.HandleError(fmt.Errorf("sync %q failed with %v", key, err))
   558  	rsc.queue.AddRateLimited(key)
   560  	return true
   561  }
   563  // manageReplicas checks and updates replicas for the given ReplicaSet.
   564  // Does NOT modify <filteredPods>.
   565  // It will requeue the replica set in case of an error while creating/deleting pods.
   566  func (rsc *ReplicaSetController) manageReplicas(ctx context.Context, filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
   567  	diff := len(filteredPods) - int(*(rs.Spec.Replicas))
   568  	rsKey, err := controller.KeyFunc(rs)
   569  	if err != nil {
   570  		utilruntime.HandleError(fmt.Errorf("couldn't get key for %v %#v: %v", rsc.Kind, rs, err))
   571  		return nil
   572  	}
   573  	logger := klog.FromContext(ctx)
   574  	if diff < 0 {
   575  		diff *= -1
   576  		if diff > rsc.burstReplicas {
   577  			diff = rsc.burstReplicas
   578  		}
   579  		// TODO: Track UIDs of creates just like deletes. The problem currently
   580  		// is we'd need to wait on the result of a create to record the pod's
   581  		// UID, which would require locking *across* the create, which will turn
   582  		// into a performance bottleneck. We should generate a UID for the pod
   583  		// beforehand and store it via ExpectCreations.
   584  		rsc.expectations.ExpectCreations(logger, rsKey, diff)
   585  		logger.V(2).Info("Too few replicas", "replicaSet", klog.KObj(rs), "need", *(rs.Spec.Replicas), "creating", diff)
   586  		// Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize
   587  		// and double with each successful iteration in a kind of "slow start".
   588  		// This handles attempts to start large numbers of pods that would
   589  		// likely all fail with the same error. For example a project with a
   590  		// low quota that attempts to create a large number of pods will be
   591  		// prevented from spamming the API service with the pod create requests
   592  		// after one of its pods fails.  Conveniently, this also prevents the
   593  		// event spam that those failures would generate.
   594  		successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {
   595  			err := rsc.podControl.CreatePods(ctx, rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind))
   596  			if err != nil {
   597  				if apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
   598  					// if the namespace is being terminated, we don't have to do
   599  					// anything because any creation will fail
   600  					return nil
   601  				}
   602  			}
   603  			return err
   604  		})
   606  		// Any skipped pods that we never attempted to start shouldn't be expected.
   607  		// The skipped pods will be retried later. The next controller resync will
   608  		// retry the slow start process.
   609  		if skippedPods := diff - successfulCreations; skippedPods > 0 {
   610  			logger.V(2).Info("Slow-start failure. Skipping creation of pods, decrementing expectations", "podsSkipped", skippedPods, "kind", rsc.Kind, "replicaSet", klog.KObj(rs))
   611  			for i := 0; i < skippedPods; i++ {
   612  				// Decrement the expected number of creates because the informer won't observe this pod
   613  				rsc.expectations.CreationObserved(logger, rsKey)
   614  			}
   615  		}
   616  		return err
   617  	} else if diff > 0 {
   618  		if diff > rsc.burstReplicas {
   619  			diff = rsc.burstReplicas
   620  		}
   621  		logger.V(2).Info("Too many replicas", "replicaSet", klog.KObj(rs), "need", *(rs.Spec.Replicas), "deleting", diff)
   623  		relatedPods, err := rsc.getIndirectlyRelatedPods(logger, rs)
   624  		utilruntime.HandleError(err)
   626  		// Choose which Pods to delete, preferring those in earlier phases of startup.
   627  		podsToDelete := getPodsToDelete(filteredPods, relatedPods, diff)
   629  		// Snapshot the UIDs (ns/name) of the pods we're expecting to see
   630  		// deleted, so we know to record their expectations exactly once either
   631  		// when we see it as an update of the deletion timestamp, or as a delete.
   632  		// Note that if the labels on a pod/rs change in a way that the pod gets
   633  		// orphaned, the rs will only wake up after the expectations have
   634  		// expired even if other pods are deleted.
   635  		rsc.expectations.ExpectDeletions(logger, rsKey, getPodKeys(podsToDelete))
   637  		errCh := make(chan error, diff)
   638  		var wg sync.WaitGroup
   639  		wg.Add(diff)
   640  		for _, pod := range podsToDelete {
   641  			go func(targetPod *v1.Pod) {
   642  				defer wg.Done()
   643  				if err := rsc.podControl.DeletePod(ctx, rs.Namespace, targetPod.Name, rs); err != nil {
   644  					// Decrement the expected number of deletes because the informer won't observe this deletion
   645  					podKey := controller.PodKey(targetPod)
   646  					rsc.expectations.DeletionObserved(logger, rsKey, podKey)
   647  					if !apierrors.IsNotFound(err) {
   648  						logger.V(2).Info("Failed to delete pod, decremented expectations", "pod", podKey, "kind", rsc.Kind, "replicaSet", klog.KObj(rs))
   649  						errCh <- err
   650  					}
   651  				}
   652  			}(pod)
   653  		}
   654  		wg.Wait()
   656  		select {
   657  		case err := <-errCh:
   658  			// all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit.
   659  			if err != nil {
   660  				return err
   661  			}
   662  		default:
   663  		}
   664  	}
   666  	return nil
   667  }
   669  // syncReplicaSet will sync the ReplicaSet with the given key if it has had its expectations fulfilled,
   670  // meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be
   671  // invoked concurrently with the same key.
   672  func (rsc *ReplicaSetController) syncReplicaSet(ctx context.Context, key string) error {
   673  	logger := klog.FromContext(ctx)
   674  	startTime := time.Now()
   675  	defer func() {
   676  		logger.Info("Finished syncing", "kind", rsc.Kind, "key", key, "duration", time.Since(startTime))
   677  	}()
   679  	namespace, name, err := cache.SplitMetaNamespaceKey(key)
   680  	if err != nil {
   681  		return err
   682  	}
   683  	rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
   684  	if apierrors.IsNotFound(err) {
   685  		logger.V(4).Info("deleted", "kind", rsc.Kind, "key", key)
   686  		rsc.expectations.DeleteExpectations(logger, key)
   687  		return nil
   688  	}
   689  	if err != nil {
   690  		return err
   691  	}
   693  	rsNeedsSync := rsc.expectations.SatisfiedExpectations(logger, key)
   694  	selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
   695  	if err != nil {
   696  		utilruntime.HandleError(fmt.Errorf("error converting pod selector to selector for rs %v/%v: %v", namespace, name, err))
   697  		return nil
   698  	}
   700  	// list all pods to include the pods that don't match the rs`s selector
   701  	// anymore but has the stale controller ref.
   702  	// TODO: Do the List and Filter in a single pass, or use an index.
   703  	allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
   704  	if err != nil {
   705  		return err
   706  	}
   707  	// Ignore inactive pods.
   708  	filteredPods := controller.FilterActivePods(logger, allPods)
   710  	// NOTE: filteredPods are pointing to objects from cache - if you need to
   711  	// modify them, you need to copy it first.
   712  	filteredPods, err = rsc.claimPods(ctx, rs, selector, filteredPods)
   713  	if err != nil {
   714  		return err
   715  	}
   717  	var manageReplicasErr error
   718  	if rsNeedsSync && rs.DeletionTimestamp == nil {
   719  		manageReplicasErr = rsc.manageReplicas(ctx, filteredPods, rs)
   720  	}
   721  	rs = rs.DeepCopy()
   722  	newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)
   724  	// Always updates status as pods come up or die.
   725  	updatedRS, err := updateReplicaSetStatus(logger, rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)
   726  	if err != nil {
   727  		// Multiple things could lead to this update failing. Requeuing the replica set ensures
   728  		// Returning an error causes a requeue without forcing a hotloop
   729  		return err
   730  	}
   731  	// Resync the ReplicaSet after MinReadySeconds as a last line of defense to guard against clock-skew.
   732  	if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 &&
   733  		updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) &&
   734  		updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) {
   735  		rsc.queue.AddAfter(key, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second)
   736  	}
   737  	return manageReplicasErr
   738  }
   740  func (rsc *ReplicaSetController) claimPods(ctx context.Context, rs *apps.ReplicaSet, selector labels.Selector, filteredPods []*v1.Pod) ([]*v1.Pod, error) {
   741  	// If any adoptions are attempted, we should first recheck for deletion with
   742  	// an uncached quorum read sometime after listing Pods (see #42639).
   743  	canAdoptFunc := controller.RecheckDeletionTimestamp(func(ctx context.Context) (metav1.Object, error) {
   744  		fresh, err := rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace).Get(ctx, rs.Name, metav1.GetOptions{})
   745  		if err != nil {
   746  			return nil, err
   747  		}
   748  		if fresh.UID != rs.UID {
   749  			return nil, fmt.Errorf("original %v %v/%v is gone: got uid %v, wanted %v", rsc.Kind, rs.Namespace, rs.Name, fresh.UID, rs.UID)
   750  		}
   751  		return fresh, nil
   752  	})
   753  	cm := controller.NewPodControllerRefManager(rsc.podControl, rs, selector, rsc.GroupVersionKind, canAdoptFunc)
   754  	return cm.ClaimPods(ctx, filteredPods)
   755  }
   757  // slowStartBatch tries to call the provided function a total of 'count' times,
   758  // starting slow to check for errors, then speeding up if calls succeed.
   759  //
   760  // It groups the calls into batches, starting with a group of initialBatchSize.
   761  // Within each batch, it may call the function multiple times concurrently.
   762  //
   763  // If a whole batch succeeds, the next batch may get exponentially larger.
   764  // If there are any failures in a batch, all remaining batches are skipped
   765  // after waiting for the current batch to complete.
   766  //
   767  // It returns the number of successful calls to the function.
   768  func slowStartBatch(count int, initialBatchSize int, fn func() error) (int, error) {
   769  	remaining := count
   770  	successes := 0
   771  	for batchSize := min(remaining, initialBatchSize); batchSize > 0; batchSize = min(2*batchSize, remaining) {
   772  		errCh := make(chan error, batchSize)
   773  		var wg sync.WaitGroup
   774  		wg.Add(batchSize)
   775  		for i := 0; i < batchSize; i++ {
   776  			go func() {
   777  				defer wg.Done()
   778  				if err := fn(); err != nil {
   779  					errCh <- err
   780  				}
   781  			}()
   782  		}
   783  		wg.Wait()
   784  		curSuccesses := batchSize - len(errCh)
   785  		successes += curSuccesses
   786  		if len(errCh) > 0 {
   787  			return successes, <-errCh
   788  		}
   789  		remaining -= batchSize
   790  	}
   791  	return successes, nil
   792  }
   794  // getIndirectlyRelatedPods returns all pods that are owned by any ReplicaSet
   795  // that is owned by the given ReplicaSet's owner.
   796  func (rsc *ReplicaSetController) getIndirectlyRelatedPods(logger klog.Logger, rs *apps.ReplicaSet) ([]*v1.Pod, error) {
   797  	var relatedPods []*v1.Pod
   798  	seen := make(map[types.UID]*apps.ReplicaSet)
   799  	for _, relatedRS := range rsc.getReplicaSetsWithSameController(logger, rs) {
   800  		selector, err := metav1.LabelSelectorAsSelector(relatedRS.Spec.Selector)
   801  		if err != nil {
   802  			// This object has an invalid selector, it does not match any pods
   803  			continue
   804  		}
   805  		pods, err := rsc.podLister.Pods(relatedRS.Namespace).List(selector)
   806  		if err != nil {
   807  			return nil, err
   808  		}
   809  		for _, pod := range pods {
   810  			if otherRS, found := seen[pod.UID]; found {
   811  				logger.V(5).Info("Pod is owned by both", "pod", klog.KObj(pod), "kind", rsc.Kind, "replicaSets", klog.KObjSlice([]klog.KMetadata{otherRS, relatedRS}))
   812  				continue
   813  			}
   814  			seen[pod.UID] = relatedRS
   815  			relatedPods = append(relatedPods, pod)
   816  		}
   817  	}
   818  	logger.V(4).Info("Found related pods", "kind", rsc.Kind, "replicaSet", klog.KObj(rs), "pods", klog.KObjSlice(relatedPods))
   819  	return relatedPods, nil
   820  }
   822  func getPodsToDelete(filteredPods, relatedPods []*v1.Pod, diff int) []*v1.Pod {
   823  	// No need to sort pods if we are about to delete all of them.
   824  	// diff will always be <= len(filteredPods), so not need to handle > case.
   825  	if diff < len(filteredPods) {
   826  		podsWithRanks := getPodsRankedByRelatedPodsOnSameNode(filteredPods, relatedPods)
   827  		sort.Sort(podsWithRanks)
   828  		reportSortingDeletionAgeRatioMetric(filteredPods, diff)
   829  	}
   830  	return filteredPods[:diff]
   831  }
   833  func reportSortingDeletionAgeRatioMetric(filteredPods []*v1.Pod, diff int) {
   834  	now := time.Now()
   835  	youngestTime := time.Time{}
   836  	// first we need to check all of the ready pods to get the youngest, as they may not necessarily be sorted by timestamp alone
   837  	for _, pod := range filteredPods {
   838  		if pod.CreationTimestamp.Time.After(youngestTime) && podutil.IsPodReady(pod) {
   839  			youngestTime = pod.CreationTimestamp.Time
   840  		}
   841  	}
   843  	// for each pod chosen for deletion, report the ratio of its age to the youngest pod's age
   844  	for _, pod := range filteredPods[:diff] {
   845  		if !podutil.IsPodReady(pod) {
   846  			continue
   847  		}
   848  		ratio := float64(now.Sub(pod.CreationTimestamp.Time).Milliseconds() / now.Sub(youngestTime).Milliseconds())
   849  		metrics.SortingDeletionAgeRatio.Observe(ratio)
   850  	}
   851  }
   853  // getPodsRankedByRelatedPodsOnSameNode returns an ActivePodsWithRanks value
   854  // that wraps podsToRank and assigns each pod a rank equal to the number of
   855  // active pods in relatedPods that are colocated on the same node with the pod.
   856  // relatedPods generally should be a superset of podsToRank.
   857  func getPodsRankedByRelatedPodsOnSameNode(podsToRank, relatedPods []*v1.Pod) controller.ActivePodsWithRanks {
   858  	podsOnNode := make(map[string]int)
   859  	for _, pod := range relatedPods {
   860  		if controller.IsPodActive(pod) {
   861  			podsOnNode[pod.Spec.NodeName]++
   862  		}
   863  	}
   864  	ranks := make([]int, len(podsToRank))
   865  	for i, pod := range podsToRank {
   866  		ranks[i] = podsOnNode[pod.Spec.NodeName]
   867  	}
   868  	return controller.ActivePodsWithRanks{Pods: podsToRank, Rank: ranks, Now: metav1.Now()}
   869  }
   871  func getPodKeys(pods []*v1.Pod) []string {
   872  	podKeys := make([]string, 0, len(pods))
   873  	for _, pod := range pods {
   874  		podKeys = append(podKeys, controller.PodKey(pod))
   875  	}
   876  	return podKeys
   877  }

