...

Source file src/k8s.io/kubernetes/pkg/controller/daemon/daemon_controller.go

Documentation: k8s.io/kubernetes/pkg/controller/daemon

     1  /*
     2  Copyright 2015 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package daemon
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"reflect"
    23  	"sort"
    24  	"sync"
    25  	"time"
    26  
    27  	apps "k8s.io/api/apps/v1"
    28  	v1 "k8s.io/api/core/v1"
    29  	apiequality "k8s.io/apimachinery/pkg/api/equality"
    30  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    31  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    32  	"k8s.io/apimachinery/pkg/labels"
    33  	utilerrors "k8s.io/apimachinery/pkg/util/errors"
    34  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    35  	"k8s.io/apimachinery/pkg/util/wait"
    36  	appsinformers "k8s.io/client-go/informers/apps/v1"
    37  	coreinformers "k8s.io/client-go/informers/core/v1"
    38  	clientset "k8s.io/client-go/kubernetes"
    39  	"k8s.io/client-go/kubernetes/scheme"
    40  	unversionedapps "k8s.io/client-go/kubernetes/typed/apps/v1"
    41  	v1core "k8s.io/client-go/kubernetes/typed/core/v1"
    42  	appslisters "k8s.io/client-go/listers/apps/v1"
    43  	corelisters "k8s.io/client-go/listers/core/v1"
    44  	"k8s.io/client-go/tools/cache"
    45  	"k8s.io/client-go/tools/record"
    46  	"k8s.io/client-go/util/flowcontrol"
    47  	"k8s.io/client-go/util/workqueue"
    48  	v1helper "k8s.io/component-helpers/scheduling/corev1"
    49  	"k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
    50  	"k8s.io/klog/v2"
    51  	podutil "k8s.io/kubernetes/pkg/api/v1/pod"
    52  	"k8s.io/kubernetes/pkg/controller"
    53  	"k8s.io/kubernetes/pkg/controller/daemon/util"
    54  )
    55  
    56  const (
    57  	// BurstReplicas is a rate limiter for booting pods on a lot of pods.
    58  	// The value of 250 is chosen b/c values that are too high can cause registry DoS issues.
    59  	BurstReplicas = 250
    60  
    61  	// StatusUpdateRetries limits the number of retries if sending a status update to API server fails.
    62  	StatusUpdateRetries = 1
    63  
    64  	// BackoffGCInterval is the time that has to pass before next iteration of backoff GC is run
    65  	BackoffGCInterval = 1 * time.Minute
    66  )
    67  
    68  // Reasons for DaemonSet events
    69  const (
    70  	// SelectingAllReason is added to an event when a DaemonSet selects all Pods.
    71  	SelectingAllReason = "SelectingAll"
    72  	// FailedPlacementReason is added to an event when a DaemonSet can't schedule a Pod to a specified node.
    73  	FailedPlacementReason = "FailedPlacement"
    74  	// FailedDaemonPodReason is added to an event when the status of a Pod of a DaemonSet is 'Failed'.
    75  	FailedDaemonPodReason = "FailedDaemonPod"
    76  	// SucceededDaemonPodReason is added to an event when the status of a Pod of a DaemonSet is 'Succeeded'.
    77  	SucceededDaemonPodReason = "SucceededDaemonPod"
    78  )
    79  
    80  // controllerKind contains the schema.GroupVersionKind for this controller type.
    81  var controllerKind = apps.SchemeGroupVersion.WithKind("DaemonSet")
    82  
    83  // DaemonSetsController is responsible for synchronizing DaemonSet objects stored
    84  // in the system with actual running pods.
    85  type DaemonSetsController struct {
    86  	kubeClient clientset.Interface
    87  
    88  	eventBroadcaster record.EventBroadcaster
    89  	eventRecorder    record.EventRecorder
    90  
    91  	podControl controller.PodControlInterface
    92  	crControl  controller.ControllerRevisionControlInterface
    93  
    94  	// An dsc is temporarily suspended after creating/deleting these many replicas.
    95  	// It resumes normal action after observing the watch events for them.
    96  	burstReplicas int
    97  
    98  	// To allow injection of syncDaemonSet for testing.
    99  	syncHandler func(ctx context.Context, dsKey string) error
   100  	// used for unit testing
   101  	enqueueDaemonSet func(ds *apps.DaemonSet)
   102  	// A TTLCache of pod creates/deletes each ds expects to see
   103  	expectations controller.ControllerExpectationsInterface
   104  	// dsLister can list/get daemonsets from the shared informer's store
   105  	dsLister appslisters.DaemonSetLister
   106  	// dsStoreSynced returns true if the daemonset store has been synced at least once.
   107  	// Added as a member to the struct to allow injection for testing.
   108  	dsStoreSynced cache.InformerSynced
   109  	// historyLister get list/get history from the shared informers's store
   110  	historyLister appslisters.ControllerRevisionLister
   111  	// historyStoreSynced returns true if the history store has been synced at least once.
   112  	// Added as a member to the struct to allow injection for testing.
   113  	historyStoreSynced cache.InformerSynced
   114  	// podLister get list/get pods from the shared informers's store
   115  	podLister corelisters.PodLister
   116  	// podStoreSynced returns true if the pod store has been synced at least once.
   117  	// Added as a member to the struct to allow injection for testing.
   118  	podStoreSynced cache.InformerSynced
   119  	// nodeLister can list/get nodes from the shared informer's store
   120  	nodeLister corelisters.NodeLister
   121  	// nodeStoreSynced returns true if the node store has been synced at least once.
   122  	// Added as a member to the struct to allow injection for testing.
   123  	nodeStoreSynced cache.InformerSynced
   124  
   125  	// DaemonSet keys that need to be synced.
   126  	queue workqueue.RateLimitingInterface
   127  
   128  	failedPodsBackoff *flowcontrol.Backoff
   129  }
   130  
   131  // NewDaemonSetsController creates a new DaemonSetsController
   132  func NewDaemonSetsController(
   133  	ctx context.Context,
   134  	daemonSetInformer appsinformers.DaemonSetInformer,
   135  	historyInformer appsinformers.ControllerRevisionInformer,
   136  	podInformer coreinformers.PodInformer,
   137  	nodeInformer coreinformers.NodeInformer,
   138  	kubeClient clientset.Interface,
   139  	failedPodsBackoff *flowcontrol.Backoff,
   140  ) (*DaemonSetsController, error) {
   141  	eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
   142  	logger := klog.FromContext(ctx)
   143  	dsc := &DaemonSetsController{
   144  		kubeClient:       kubeClient,
   145  		eventBroadcaster: eventBroadcaster,
   146  		eventRecorder:    eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "daemonset-controller"}),
   147  		podControl: controller.RealPodControl{
   148  			KubeClient: kubeClient,
   149  			Recorder:   eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "daemonset-controller"}),
   150  		},
   151  		crControl: controller.RealControllerRevisionControl{
   152  			KubeClient: kubeClient,
   153  		},
   154  		burstReplicas: BurstReplicas,
   155  		expectations:  controller.NewControllerExpectations(),
   156  		queue:         workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "daemonset"),
   157  	}
   158  
   159  	daemonSetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   160  		AddFunc: func(obj interface{}) {
   161  			dsc.addDaemonset(logger, obj)
   162  		},
   163  		UpdateFunc: func(oldObj, newObj interface{}) {
   164  			dsc.updateDaemonset(logger, oldObj, newObj)
   165  		},
   166  		DeleteFunc: func(obj interface{}) {
   167  			dsc.deleteDaemonset(logger, obj)
   168  		},
   169  	})
   170  	dsc.dsLister = daemonSetInformer.Lister()
   171  	dsc.dsStoreSynced = daemonSetInformer.Informer().HasSynced
   172  
   173  	historyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   174  		AddFunc: func(obj interface{}) {
   175  			dsc.addHistory(logger, obj)
   176  		},
   177  		UpdateFunc: func(oldObj, newObj interface{}) {
   178  			dsc.updateHistory(logger, oldObj, newObj)
   179  		},
   180  		DeleteFunc: func(obj interface{}) {
   181  			dsc.deleteHistory(logger, obj)
   182  		},
   183  	})
   184  	dsc.historyLister = historyInformer.Lister()
   185  	dsc.historyStoreSynced = historyInformer.Informer().HasSynced
   186  
   187  	// Watch for creation/deletion of pods. The reason we watch is that we don't want a daemon set to create/delete
   188  	// more pods until all the effects (expectations) of a daemon set's create/delete have been observed.
   189  	podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   190  		AddFunc: func(obj interface{}) {
   191  			dsc.addPod(logger, obj)
   192  		},
   193  		UpdateFunc: func(oldObj, newObj interface{}) {
   194  			dsc.updatePod(logger, oldObj, newObj)
   195  		},
   196  		DeleteFunc: func(obj interface{}) {
   197  			dsc.deletePod(logger, obj)
   198  		},
   199  	})
   200  	dsc.podLister = podInformer.Lister()
   201  	dsc.podStoreSynced = podInformer.Informer().HasSynced
   202  
   203  	nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   204  		AddFunc: func(obj interface{}) {
   205  			dsc.addNode(logger, obj)
   206  		},
   207  		UpdateFunc: func(oldObj, newObj interface{}) {
   208  			dsc.updateNode(logger, oldObj, newObj)
   209  		},
   210  	},
   211  	)
   212  	dsc.nodeStoreSynced = nodeInformer.Informer().HasSynced
   213  	dsc.nodeLister = nodeInformer.Lister()
   214  
   215  	dsc.syncHandler = dsc.syncDaemonSet
   216  	dsc.enqueueDaemonSet = dsc.enqueue
   217  
   218  	dsc.failedPodsBackoff = failedPodsBackoff
   219  
   220  	return dsc, nil
   221  }
   222  
   223  func (dsc *DaemonSetsController) addDaemonset(logger klog.Logger, obj interface{}) {
   224  	ds := obj.(*apps.DaemonSet)
   225  	logger.V(4).Info("Adding daemon set", "daemonset", klog.KObj(ds))
   226  	dsc.enqueueDaemonSet(ds)
   227  }
   228  
   229  func (dsc *DaemonSetsController) updateDaemonset(logger klog.Logger, cur, old interface{}) {
   230  	oldDS := old.(*apps.DaemonSet)
   231  	curDS := cur.(*apps.DaemonSet)
   232  
   233  	// TODO: make a KEP and fix informers to always call the delete event handler on re-create
   234  	if curDS.UID != oldDS.UID {
   235  		key, err := controller.KeyFunc(oldDS)
   236  		if err != nil {
   237  			utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", oldDS, err))
   238  			return
   239  		}
   240  		dsc.deleteDaemonset(logger, cache.DeletedFinalStateUnknown{
   241  			Key: key,
   242  			Obj: oldDS,
   243  		})
   244  	}
   245  
   246  	logger.V(4).Info("Updating daemon set", "daemonset", klog.KObj(oldDS))
   247  	dsc.enqueueDaemonSet(curDS)
   248  }
   249  
   250  func (dsc *DaemonSetsController) deleteDaemonset(logger klog.Logger, obj interface{}) {
   251  	ds, ok := obj.(*apps.DaemonSet)
   252  	if !ok {
   253  		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
   254  		if !ok {
   255  			utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
   256  			return
   257  		}
   258  		ds, ok = tombstone.Obj.(*apps.DaemonSet)
   259  		if !ok {
   260  			utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a DaemonSet %#v", obj))
   261  			return
   262  		}
   263  	}
   264  	logger.V(4).Info("Deleting daemon set", "daemonset", klog.KObj(ds))
   265  
   266  	key, err := controller.KeyFunc(ds)
   267  	if err != nil {
   268  		utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", ds, err))
   269  		return
   270  	}
   271  
   272  	// Delete expectations for the DaemonSet so if we create a new one with the same name it starts clean
   273  	dsc.expectations.DeleteExpectations(logger, key)
   274  
   275  	dsc.queue.Add(key)
   276  }
   277  
   278  // Run begins watching and syncing daemon sets.
   279  func (dsc *DaemonSetsController) Run(ctx context.Context, workers int) {
   280  	defer utilruntime.HandleCrash()
   281  
   282  	dsc.eventBroadcaster.StartStructuredLogging(3)
   283  	dsc.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: dsc.kubeClient.CoreV1().Events("")})
   284  	defer dsc.eventBroadcaster.Shutdown()
   285  
   286  	defer dsc.queue.ShutDown()
   287  
   288  	logger := klog.FromContext(ctx)
   289  	logger.Info("Starting daemon sets controller")
   290  	defer logger.Info("Shutting down daemon sets controller")
   291  
   292  	if !cache.WaitForNamedCacheSync("daemon sets", ctx.Done(), dsc.podStoreSynced, dsc.nodeStoreSynced, dsc.historyStoreSynced, dsc.dsStoreSynced) {
   293  		return
   294  	}
   295  
   296  	for i := 0; i < workers; i++ {
   297  		go wait.UntilWithContext(ctx, dsc.runWorker, time.Second)
   298  	}
   299  
   300  	go wait.Until(dsc.failedPodsBackoff.GC, BackoffGCInterval, ctx.Done())
   301  
   302  	<-ctx.Done()
   303  }
   304  
   305  func (dsc *DaemonSetsController) runWorker(ctx context.Context) {
   306  	for dsc.processNextWorkItem(ctx) {
   307  	}
   308  }
   309  
   310  // processNextWorkItem deals with one key off the queue.  It returns false when it's time to quit.
   311  func (dsc *DaemonSetsController) processNextWorkItem(ctx context.Context) bool {
   312  	dsKey, quit := dsc.queue.Get()
   313  	if quit {
   314  		return false
   315  	}
   316  	defer dsc.queue.Done(dsKey)
   317  
   318  	err := dsc.syncHandler(ctx, dsKey.(string))
   319  	if err == nil {
   320  		dsc.queue.Forget(dsKey)
   321  		return true
   322  	}
   323  
   324  	utilruntime.HandleError(fmt.Errorf("%v failed with : %v", dsKey, err))
   325  	dsc.queue.AddRateLimited(dsKey)
   326  
   327  	return true
   328  }
   329  
   330  func (dsc *DaemonSetsController) enqueue(ds *apps.DaemonSet) {
   331  	key, err := controller.KeyFunc(ds)
   332  	if err != nil {
   333  		utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", ds, err))
   334  		return
   335  	}
   336  
   337  	// TODO: Handle overlapping controllers better. See comment in ReplicationManager.
   338  	dsc.queue.Add(key)
   339  }
   340  
   341  func (dsc *DaemonSetsController) enqueueDaemonSetAfter(obj interface{}, after time.Duration) {
   342  	key, err := controller.KeyFunc(obj)
   343  	if err != nil {
   344  		utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
   345  		return
   346  	}
   347  
   348  	// TODO: Handle overlapping controllers better. See comment in ReplicationManager.
   349  	dsc.queue.AddAfter(key, after)
   350  }
   351  
   352  // getDaemonSetsForPod returns a list of DaemonSets that potentially match the pod.
   353  func (dsc *DaemonSetsController) getDaemonSetsForPod(pod *v1.Pod) []*apps.DaemonSet {
   354  	sets, err := dsc.dsLister.GetPodDaemonSets(pod)
   355  	if err != nil {
   356  		return nil
   357  	}
   358  	if len(sets) > 1 {
   359  		// ControllerRef will ensure we don't do anything crazy, but more than one
   360  		// item in this list nevertheless constitutes user error.
   361  		utilruntime.HandleError(fmt.Errorf("user error! more than one daemon is selecting pods with labels: %+v", pod.Labels))
   362  	}
   363  	return sets
   364  }
   365  
   366  // getDaemonSetsForHistory returns a list of DaemonSets that potentially
   367  // match a ControllerRevision.
   368  func (dsc *DaemonSetsController) getDaemonSetsForHistory(logger klog.Logger, history *apps.ControllerRevision) []*apps.DaemonSet {
   369  	daemonSets, err := dsc.dsLister.GetHistoryDaemonSets(history)
   370  	if err != nil || len(daemonSets) == 0 {
   371  		return nil
   372  	}
   373  	if len(daemonSets) > 1 {
   374  		// ControllerRef will ensure we don't do anything crazy, but more than one
   375  		// item in this list nevertheless constitutes user error.
   376  		logger.V(4).Info("Found more than one DaemonSet selecting the ControllerRevision. This is potentially a user error",
   377  			"controllerRevision", klog.KObj(history), "labels", history.Labels)
   378  	}
   379  	return daemonSets
   380  }
   381  
   382  // addHistory enqueues the DaemonSet that manages a ControllerRevision when the ControllerRevision is created
   383  // or when the controller manager is restarted.
   384  func (dsc *DaemonSetsController) addHistory(logger klog.Logger, obj interface{}) {
   385  	history := obj.(*apps.ControllerRevision)
   386  	if history.DeletionTimestamp != nil {
   387  		// On a restart of the controller manager, it's possible for an object to
   388  		// show up in a state that is already pending deletion.
   389  		dsc.deleteHistory(logger, history)
   390  		return
   391  	}
   392  
   393  	// If it has a ControllerRef, that's all that matters.
   394  	if controllerRef := metav1.GetControllerOf(history); controllerRef != nil {
   395  		ds := dsc.resolveControllerRef(history.Namespace, controllerRef)
   396  		if ds == nil {
   397  			return
   398  		}
   399  		logger.V(4).Info("Observed a ControllerRevision", "controllerRevision", klog.KObj(history))
   400  		return
   401  	}
   402  
   403  	// Otherwise, it's an orphan. Get a list of all matching DaemonSets and sync
   404  	// them to see if anyone wants to adopt it.
   405  	daemonSets := dsc.getDaemonSetsForHistory(logger, history)
   406  	if len(daemonSets) == 0 {
   407  		return
   408  	}
   409  	logger.V(4).Info("Orphan ControllerRevision added", "controllerRevision", klog.KObj(history))
   410  	for _, ds := range daemonSets {
   411  		dsc.enqueueDaemonSet(ds)
   412  	}
   413  }
   414  
   415  // updateHistory figures out what DaemonSet(s) manage a ControllerRevision when the ControllerRevision
   416  // is updated and wake them up. If anything of the ControllerRevision has changed, we need to  awaken
   417  // both the old and new DaemonSets.
   418  func (dsc *DaemonSetsController) updateHistory(logger klog.Logger, old, cur interface{}) {
   419  	curHistory := cur.(*apps.ControllerRevision)
   420  	oldHistory := old.(*apps.ControllerRevision)
   421  	if curHistory.ResourceVersion == oldHistory.ResourceVersion {
   422  		// Periodic resync will send update events for all known ControllerRevisions.
   423  		return
   424  	}
   425  
   426  	curControllerRef := metav1.GetControllerOf(curHistory)
   427  	oldControllerRef := metav1.GetControllerOf(oldHistory)
   428  	controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
   429  	if controllerRefChanged && oldControllerRef != nil {
   430  		// The ControllerRef was changed. Sync the old controller, if any.
   431  		if ds := dsc.resolveControllerRef(oldHistory.Namespace, oldControllerRef); ds != nil {
   432  			dsc.enqueueDaemonSet(ds)
   433  		}
   434  	}
   435  
   436  	// If it has a ControllerRef, that's all that matters.
   437  	if curControllerRef != nil {
   438  		ds := dsc.resolveControllerRef(curHistory.Namespace, curControllerRef)
   439  		if ds == nil {
   440  			return
   441  		}
   442  		logger.V(4).Info("Observed an update to a ControllerRevision", "controllerRevision", klog.KObj(curHistory))
   443  		dsc.enqueueDaemonSet(ds)
   444  		return
   445  	}
   446  
   447  	// Otherwise, it's an orphan. If anything changed, sync matching controllers
   448  	// to see if anyone wants to adopt it now.
   449  	labelChanged := !reflect.DeepEqual(curHistory.Labels, oldHistory.Labels)
   450  	if labelChanged || controllerRefChanged {
   451  		daemonSets := dsc.getDaemonSetsForHistory(logger, curHistory)
   452  		if len(daemonSets) == 0 {
   453  			return
   454  		}
   455  		logger.V(4).Info("Orphan ControllerRevision updated", "controllerRevision", klog.KObj(curHistory))
   456  		for _, ds := range daemonSets {
   457  			dsc.enqueueDaemonSet(ds)
   458  		}
   459  	}
   460  }
   461  
   462  // deleteHistory enqueues the DaemonSet that manages a ControllerRevision when
   463  // the ControllerRevision is deleted. obj could be an *app.ControllerRevision, or
   464  // a DeletionFinalStateUnknown marker item.
   465  func (dsc *DaemonSetsController) deleteHistory(logger klog.Logger, obj interface{}) {
   466  	history, ok := obj.(*apps.ControllerRevision)
   467  
   468  	// When a delete is dropped, the relist will notice a ControllerRevision in the store not
   469  	// in the list, leading to the insertion of a tombstone object which contains
   470  	// the deleted key/value. Note that this value might be stale. If the ControllerRevision
   471  	// changed labels the new DaemonSet will not be woken up till the periodic resync.
   472  	if !ok {
   473  		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
   474  		if !ok {
   475  			utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
   476  			return
   477  		}
   478  		history, ok = tombstone.Obj.(*apps.ControllerRevision)
   479  		if !ok {
   480  			utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a ControllerRevision %#v", obj))
   481  			return
   482  		}
   483  	}
   484  
   485  	controllerRef := metav1.GetControllerOf(history)
   486  	if controllerRef == nil {
   487  		// No controller should care about orphans being deleted.
   488  		return
   489  	}
   490  	ds := dsc.resolveControllerRef(history.Namespace, controllerRef)
   491  	if ds == nil {
   492  		return
   493  	}
   494  	logger.V(4).Info("ControllerRevision deleted", "controllerRevision", klog.KObj(history))
   495  	dsc.enqueueDaemonSet(ds)
   496  }
   497  
   498  func (dsc *DaemonSetsController) addPod(logger klog.Logger, obj interface{}) {
   499  	pod := obj.(*v1.Pod)
   500  
   501  	if pod.DeletionTimestamp != nil {
   502  		// on a restart of the controller manager, it's possible a new pod shows up in a state that
   503  		// is already pending deletion. Prevent the pod from being a creation observation.
   504  		dsc.deletePod(logger, pod)
   505  		return
   506  	}
   507  
   508  	// If it has a ControllerRef, that's all that matters.
   509  	if controllerRef := metav1.GetControllerOf(pod); controllerRef != nil {
   510  		ds := dsc.resolveControllerRef(pod.Namespace, controllerRef)
   511  		if ds == nil {
   512  			return
   513  		}
   514  		dsKey, err := controller.KeyFunc(ds)
   515  		if err != nil {
   516  			return
   517  		}
   518  		logger.V(4).Info("Pod added", "pod", klog.KObj(pod))
   519  		dsc.expectations.CreationObserved(logger, dsKey)
   520  		dsc.enqueueDaemonSet(ds)
   521  		return
   522  	}
   523  
   524  	// Otherwise, it's an orphan. Get a list of all matching DaemonSets and sync
   525  	// them to see if anyone wants to adopt it.
   526  	// DO NOT observe creation because no controller should be waiting for an
   527  	// orphan.
   528  	dss := dsc.getDaemonSetsForPod(pod)
   529  	if len(dss) == 0 {
   530  		return
   531  	}
   532  	logger.V(4).Info("Orphan Pod added", "pod", klog.KObj(pod))
   533  	for _, ds := range dss {
   534  		dsc.enqueueDaemonSet(ds)
   535  	}
   536  }
   537  
   538  // When a pod is updated, figure out what sets manage it and wake them
   539  // up. If the labels of the pod have changed we need to awaken both the old
   540  // and new set. old and cur must be *v1.Pod types.
   541  func (dsc *DaemonSetsController) updatePod(logger klog.Logger, old, cur interface{}) {
   542  	curPod := cur.(*v1.Pod)
   543  	oldPod := old.(*v1.Pod)
   544  	if curPod.ResourceVersion == oldPod.ResourceVersion {
   545  		// Periodic resync will send update events for all known pods.
   546  		// Two different versions of the same pod will always have different RVs.
   547  		return
   548  	}
   549  
   550  	if curPod.DeletionTimestamp != nil {
   551  		// when a pod is deleted gracefully its deletion timestamp is first modified to reflect a grace period,
   552  		// and after such time has passed, the kubelet actually deletes it from the store. We receive an update
   553  		// for modification of the deletion timestamp and expect an ds to create more replicas asap, not wait
   554  		// until the kubelet actually deletes the pod.
   555  		dsc.deletePod(logger, curPod)
   556  		return
   557  	}
   558  
   559  	curControllerRef := metav1.GetControllerOf(curPod)
   560  	oldControllerRef := metav1.GetControllerOf(oldPod)
   561  	controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
   562  	if controllerRefChanged && oldControllerRef != nil {
   563  		// The ControllerRef was changed. Sync the old controller, if any.
   564  		if ds := dsc.resolveControllerRef(oldPod.Namespace, oldControllerRef); ds != nil {
   565  			dsc.enqueueDaemonSet(ds)
   566  		}
   567  	}
   568  
   569  	// If it has a ControllerRef, that's all that matters.
   570  	if curControllerRef != nil {
   571  		ds := dsc.resolveControllerRef(curPod.Namespace, curControllerRef)
   572  		if ds == nil {
   573  			return
   574  		}
   575  		logger.V(4).Info("Pod updated", "pod", klog.KObj(curPod))
   576  		dsc.enqueueDaemonSet(ds)
   577  		changedToReady := !podutil.IsPodReady(oldPod) && podutil.IsPodReady(curPod)
   578  		// See https://github.com/kubernetes/kubernetes/pull/38076 for more details
   579  		if changedToReady && ds.Spec.MinReadySeconds > 0 {
   580  			// Add a second to avoid milliseconds skew in AddAfter.
   581  			// See https://github.com/kubernetes/kubernetes/issues/39785#issuecomment-279959133 for more info.
   582  			dsc.enqueueDaemonSetAfter(ds, (time.Duration(ds.Spec.MinReadySeconds)*time.Second)+time.Second)
   583  		}
   584  		return
   585  	}
   586  
   587  	// Otherwise, it's an orphan. If anything changed, sync matching controllers
   588  	// to see if anyone wants to adopt it now.
   589  	dss := dsc.getDaemonSetsForPod(curPod)
   590  	if len(dss) == 0 {
   591  		return
   592  	}
   593  	logger.V(4).Info("Orphan Pod updated", "pod", klog.KObj(curPod))
   594  	labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
   595  	if labelChanged || controllerRefChanged {
   596  		for _, ds := range dss {
   597  			dsc.enqueueDaemonSet(ds)
   598  		}
   599  	}
   600  }
   601  
   602  func (dsc *DaemonSetsController) deletePod(logger klog.Logger, obj interface{}) {
   603  	pod, ok := obj.(*v1.Pod)
   604  	// When a delete is dropped, the relist will notice a pod in the store not
   605  	// in the list, leading to the insertion of a tombstone object which contains
   606  	// the deleted key/value. Note that this value might be stale. If the pod
   607  	// changed labels the new daemonset will not be woken up till the periodic
   608  	// resync.
   609  	if !ok {
   610  		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
   611  		if !ok {
   612  			utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
   613  			return
   614  		}
   615  		pod, ok = tombstone.Obj.(*v1.Pod)
   616  		if !ok {
   617  			utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %#v", obj))
   618  			return
   619  		}
   620  	}
   621  
   622  	controllerRef := metav1.GetControllerOf(pod)
   623  	if controllerRef == nil {
   624  		// No controller should care about orphans being deleted.
   625  		return
   626  	}
   627  	ds := dsc.resolveControllerRef(pod.Namespace, controllerRef)
   628  	if ds == nil {
   629  		return
   630  	}
   631  	dsKey, err := controller.KeyFunc(ds)
   632  	if err != nil {
   633  		return
   634  	}
   635  	logger.V(4).Info("Pod deleted", "pod", klog.KObj(pod))
   636  	dsc.expectations.DeletionObserved(logger, dsKey)
   637  	dsc.enqueueDaemonSet(ds)
   638  }
   639  
   640  func (dsc *DaemonSetsController) addNode(logger klog.Logger, obj interface{}) {
   641  	// TODO: it'd be nice to pass a hint with these enqueues, so that each ds would only examine the added node (unless it has other work to do, too).
   642  	dsList, err := dsc.dsLister.List(labels.Everything())
   643  	if err != nil {
   644  		logger.V(4).Info("Error enqueueing daemon sets", "err", err)
   645  		return
   646  	}
   647  	node := obj.(*v1.Node)
   648  	for _, ds := range dsList {
   649  		if shouldRun, _ := NodeShouldRunDaemonPod(node, ds); shouldRun {
   650  			dsc.enqueueDaemonSet(ds)
   651  		}
   652  	}
   653  }
   654  
   655  // shouldIgnoreNodeUpdate returns true if Node labels and taints have not changed, otherwise returns false.
   656  // If other calling functions need to use other properties of Node, shouldIgnoreNodeUpdate needs to be updated.
   657  func shouldIgnoreNodeUpdate(oldNode, curNode v1.Node) bool {
   658  	return apiequality.Semantic.DeepEqual(oldNode.Labels, curNode.Labels) &&
   659  		apiequality.Semantic.DeepEqual(oldNode.Spec.Taints, curNode.Spec.Taints)
   660  }
   661  
   662  func (dsc *DaemonSetsController) updateNode(logger klog.Logger, old, cur interface{}) {
   663  	oldNode := old.(*v1.Node)
   664  	curNode := cur.(*v1.Node)
   665  	if shouldIgnoreNodeUpdate(*oldNode, *curNode) {
   666  		return
   667  	}
   668  
   669  	dsList, err := dsc.dsLister.List(labels.Everything())
   670  	if err != nil {
   671  		logger.V(4).Info("Error listing daemon sets", "err", err)
   672  		return
   673  	}
   674  	// TODO: it'd be nice to pass a hint with these enqueues, so that each ds would only examine the added node (unless it has other work to do, too).
   675  	for _, ds := range dsList {
   676  		// If NodeShouldRunDaemonPod needs to uses other than Labels and Taints (mutable) properties of node, it needs to update shouldIgnoreNodeUpdate.
   677  		oldShouldRun, oldShouldContinueRunning := NodeShouldRunDaemonPod(oldNode, ds)
   678  		currentShouldRun, currentShouldContinueRunning := NodeShouldRunDaemonPod(curNode, ds)
   679  		if (oldShouldRun != currentShouldRun) || (oldShouldContinueRunning != currentShouldContinueRunning) {
   680  			dsc.enqueueDaemonSet(ds)
   681  		}
   682  	}
   683  }
   684  
   685  // getDaemonPods returns daemon pods owned by the given ds.
   686  // This also reconciles ControllerRef by adopting/orphaning.
   687  // Note that returned Pods are pointers to objects in the cache.
   688  // If you want to modify one, you need to deep-copy it first.
   689  func (dsc *DaemonSetsController) getDaemonPods(ctx context.Context, ds *apps.DaemonSet) ([]*v1.Pod, error) {
   690  	selector, err := metav1.LabelSelectorAsSelector(ds.Spec.Selector)
   691  	if err != nil {
   692  		return nil, err
   693  	}
   694  
   695  	// List all pods to include those that don't match the selector anymore but
   696  	// have a ControllerRef pointing to this controller.
   697  	pods, err := dsc.podLister.Pods(ds.Namespace).List(labels.Everything())
   698  	if err != nil {
   699  		return nil, err
   700  	}
   701  	// If any adoptions are attempted, we should first recheck for deletion with
   702  	// an uncached quorum read sometime after listing Pods (see #42639).
   703  	dsNotDeleted := controller.RecheckDeletionTimestamp(func(ctx context.Context) (metav1.Object, error) {
   704  		fresh, err := dsc.kubeClient.AppsV1().DaemonSets(ds.Namespace).Get(ctx, ds.Name, metav1.GetOptions{})
   705  		if err != nil {
   706  			return nil, err
   707  		}
   708  		if fresh.UID != ds.UID {
   709  			return nil, fmt.Errorf("original DaemonSet %v/%v is gone: got uid %v, wanted %v", ds.Namespace, ds.Name, fresh.UID, ds.UID)
   710  		}
   711  		return fresh, nil
   712  	})
   713  
   714  	// Use ControllerRefManager to adopt/orphan as needed.
   715  	cm := controller.NewPodControllerRefManager(dsc.podControl, ds, selector, controllerKind, dsNotDeleted)
   716  	return cm.ClaimPods(ctx, pods)
   717  }
   718  
   719  // getNodesToDaemonPods returns a map from nodes to daemon pods (corresponding to ds) created for the nodes.
   720  // This also reconciles ControllerRef by adopting/orphaning.
   721  // Note that returned Pods are pointers to objects in the cache.
   722  // If you want to modify one, you need to deep-copy it first.
   723  func (dsc *DaemonSetsController) getNodesToDaemonPods(ctx context.Context, ds *apps.DaemonSet, includeDeletedTerminal bool) (map[string][]*v1.Pod, error) {
   724  	claimedPods, err := dsc.getDaemonPods(ctx, ds)
   725  	if err != nil {
   726  		return nil, err
   727  	}
   728  	// Group Pods by Node name.
   729  	nodeToDaemonPods := make(map[string][]*v1.Pod)
   730  	logger := klog.FromContext(ctx)
   731  	for _, pod := range claimedPods {
   732  		if !includeDeletedTerminal && podutil.IsPodTerminal(pod) && pod.DeletionTimestamp != nil {
   733  			// This Pod has a finalizer or is already scheduled for deletion from the
   734  			// store by the kubelet or the Pod GC. The DS controller doesn't have
   735  			// anything else to do with it.
   736  			continue
   737  		}
   738  		nodeName, err := util.GetTargetNodeName(pod)
   739  		if err != nil {
   740  			logger.V(4).Info("Failed to get target node name of Pod in DaemonSet",
   741  				"pod", klog.KObj(pod), "daemonset", klog.KObj(ds))
   742  			continue
   743  		}
   744  
   745  		nodeToDaemonPods[nodeName] = append(nodeToDaemonPods[nodeName], pod)
   746  	}
   747  
   748  	return nodeToDaemonPods, nil
   749  }
   750  
   751  // resolveControllerRef returns the controller referenced by a ControllerRef,
   752  // or nil if the ControllerRef could not be resolved to a matching controller
   753  // of the correct Kind.
   754  func (dsc *DaemonSetsController) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *apps.DaemonSet {
   755  	// We can't look up by UID, so look up by Name and then verify UID.
   756  	// Don't even try to look up by Name if it's the wrong Kind.
   757  	if controllerRef.Kind != controllerKind.Kind {
   758  		return nil
   759  	}
   760  	ds, err := dsc.dsLister.DaemonSets(namespace).Get(controllerRef.Name)
   761  	if err != nil {
   762  		return nil
   763  	}
   764  	if ds.UID != controllerRef.UID {
   765  		// The controller we found with this Name is not the same one that the
   766  		// ControllerRef points to.
   767  		return nil
   768  	}
   769  	return ds
   770  }
   771  
   772  // podsShouldBeOnNode figures out the DaemonSet pods to be created and deleted on the given node:
   773  //   - nodesNeedingDaemonPods: the pods need to start on the node
   774  //   - podsToDelete: the Pods need to be deleted on the node
   775  //   - err: unexpected error
   776  func (dsc *DaemonSetsController) podsShouldBeOnNode(
   777  	logger klog.Logger,
   778  	node *v1.Node,
   779  	nodeToDaemonPods map[string][]*v1.Pod,
   780  	ds *apps.DaemonSet,
   781  	hash string,
   782  ) (nodesNeedingDaemonPods, podsToDelete []string) {
   783  
   784  	shouldRun, shouldContinueRunning := NodeShouldRunDaemonPod(node, ds)
   785  	daemonPods, exists := nodeToDaemonPods[node.Name]
   786  
   787  	switch {
   788  	case shouldRun && !exists:
   789  		// If daemon pod is supposed to be running on node, but isn't, create daemon pod.
   790  		nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, node.Name)
   791  	case shouldContinueRunning:
   792  		// If a daemon pod failed, delete it
   793  		// If there's non-daemon pods left on this node, we will create it in the next sync loop
   794  		var daemonPodsRunning []*v1.Pod
   795  		for _, pod := range daemonPods {
   796  			if pod.DeletionTimestamp != nil {
   797  				continue
   798  			}
   799  			if pod.Status.Phase == v1.PodFailed {
   800  				// This is a critical place where DS is often fighting with kubelet that rejects pods.
   801  				// We need to avoid hot looping and backoff.
   802  				backoffKey := failedPodsBackoffKey(ds, node.Name)
   803  
   804  				now := dsc.failedPodsBackoff.Clock.Now()
   805  				inBackoff := dsc.failedPodsBackoff.IsInBackOffSinceUpdate(backoffKey, now)
   806  				if inBackoff {
   807  					delay := dsc.failedPodsBackoff.Get(backoffKey)
   808  					logger.V(4).Info("Deleting failed pod on node has been limited by backoff",
   809  						"pod", klog.KObj(pod), "node", klog.KObj(node), "currentDelay", delay)
   810  					dsc.enqueueDaemonSetAfter(ds, delay)
   811  					continue
   812  				}
   813  
   814  				dsc.failedPodsBackoff.Next(backoffKey, now)
   815  
   816  				msg := fmt.Sprintf("Found failed daemon pod %s/%s on node %s, will try to kill it", pod.Namespace, pod.Name, node.Name)
   817  				logger.V(2).Info("Found failed daemon pod on node, will try to kill it", "pod", klog.KObj(pod), "node", klog.KObj(node))
   818  				// Emit an event so that it's discoverable to users.
   819  				dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, FailedDaemonPodReason, msg)
   820  				podsToDelete = append(podsToDelete, pod.Name)
   821  			} else if pod.Status.Phase == v1.PodSucceeded {
   822  				msg := fmt.Sprintf("Found succeeded daemon pod %s/%s on node %s, will try to delete it", pod.Namespace, pod.Name, node.Name)
   823  				logger.V(2).Info("Found succeeded daemon pod on node, will try to delete it", "pod", klog.KObj(pod), "node", klog.KObj(node))
   824  				// Emit an event so that it's discoverable to users.
   825  				dsc.eventRecorder.Eventf(ds, v1.EventTypeNormal, SucceededDaemonPodReason, msg)
   826  				podsToDelete = append(podsToDelete, pod.Name)
   827  			} else {
   828  				daemonPodsRunning = append(daemonPodsRunning, pod)
   829  			}
   830  		}
   831  
   832  		// When surge is not enabled, if there is more than 1 running pod on a node delete all but the oldest
   833  		if !util.AllowsSurge(ds) {
   834  			if len(daemonPodsRunning) <= 1 {
   835  				// There are no excess pods to be pruned, and no pods to create
   836  				break
   837  			}
   838  
   839  			sort.Sort(podByCreationTimestampAndPhase(daemonPodsRunning))
   840  			for i := 1; i < len(daemonPodsRunning); i++ {
   841  				podsToDelete = append(podsToDelete, daemonPodsRunning[i].Name)
   842  			}
   843  			break
   844  		}
   845  
   846  		if len(daemonPodsRunning) <= 1 {
   847  			// // There are no excess pods to be pruned
   848  			if len(daemonPodsRunning) == 0 && shouldRun {
   849  				// We are surging so we need to have at least one non-deleted pod on the node
   850  				nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, node.Name)
   851  			}
   852  			break
   853  		}
   854  
   855  		// When surge is enabled, we allow 2 pods if and only if the oldest pod matching the current hash state
   856  		// is not ready AND the oldest pod that doesn't match the current hash state is ready. All other pods are
   857  		// deleted. If neither pod is ready, only the one matching the current hash revision is kept.
   858  		var oldestNewPod, oldestOldPod *v1.Pod
   859  		sort.Sort(podByCreationTimestampAndPhase(daemonPodsRunning))
   860  		for _, pod := range daemonPodsRunning {
   861  			if pod.Labels[apps.ControllerRevisionHashLabelKey] == hash {
   862  				if oldestNewPod == nil {
   863  					oldestNewPod = pod
   864  					continue
   865  				}
   866  			} else {
   867  				if oldestOldPod == nil {
   868  					oldestOldPod = pod
   869  					continue
   870  				}
   871  			}
   872  			podsToDelete = append(podsToDelete, pod.Name)
   873  		}
   874  		if oldestNewPod != nil && oldestOldPod != nil {
   875  			switch {
   876  			case !podutil.IsPodReady(oldestOldPod):
   877  				logger.V(5).Info("Pod from daemonset is no longer ready and will be replaced with newer pod", "oldPod", klog.KObj(oldestOldPod), "daemonset", klog.KObj(ds), "newPod", klog.KObj(oldestNewPod))
   878  				podsToDelete = append(podsToDelete, oldestOldPod.Name)
   879  			case podutil.IsPodAvailable(oldestNewPod, ds.Spec.MinReadySeconds, metav1.Time{Time: dsc.failedPodsBackoff.Clock.Now()}):
   880  				logger.V(5).Info("Pod from daemonset is now ready and will replace older pod", "newPod", klog.KObj(oldestNewPod), "daemonset", klog.KObj(ds), "oldPod", klog.KObj(oldestOldPod))
   881  				podsToDelete = append(podsToDelete, oldestOldPod.Name)
   882  			}
   883  		}
   884  
   885  	case !shouldContinueRunning && exists:
   886  		// If daemon pod isn't supposed to run on node, but it is, delete all daemon pods on node.
   887  		for _, pod := range daemonPods {
   888  			if pod.DeletionTimestamp != nil {
   889  				continue
   890  			}
   891  			podsToDelete = append(podsToDelete, pod.Name)
   892  		}
   893  	}
   894  
   895  	return nodesNeedingDaemonPods, podsToDelete
   896  }
   897  
   898  func (dsc *DaemonSetsController) updateDaemonSet(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, hash, key string, old []*apps.ControllerRevision) error {
   899  	err := dsc.manage(ctx, ds, nodeList, hash)
   900  	if err != nil {
   901  		return err
   902  	}
   903  
   904  	// Process rolling updates if we're ready.
   905  	if dsc.expectations.SatisfiedExpectations(klog.FromContext(ctx), key) {
   906  		switch ds.Spec.UpdateStrategy.Type {
   907  		case apps.OnDeleteDaemonSetStrategyType:
   908  		case apps.RollingUpdateDaemonSetStrategyType:
   909  			err = dsc.rollingUpdate(ctx, ds, nodeList, hash)
   910  		}
   911  		if err != nil {
   912  			return err
   913  		}
   914  	}
   915  
   916  	err = dsc.cleanupHistory(ctx, ds, old)
   917  	if err != nil {
   918  		return fmt.Errorf("failed to clean up revisions of DaemonSet: %w", err)
   919  	}
   920  
   921  	return nil
   922  }
   923  
   924  // manage manages the scheduling and running of Pods of ds on nodes.
   925  // After figuring out which nodes should run a Pod of ds but not yet running one and
   926  // which nodes should not run a Pod of ds but currently running one, it calls function
   927  // syncNodes with a list of pods to remove and a list of nodes to run a Pod of ds.
   928  func (dsc *DaemonSetsController) manage(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, hash string) error {
   929  	// Find out the pods which are created for the nodes by DaemonSet.
   930  	nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds, false)
   931  	if err != nil {
   932  		return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
   933  	}
   934  
   935  	// For each node, if the node is running the daemon pod but isn't supposed to, kill the daemon
   936  	// pod. If the node is supposed to run the daemon pod, but isn't, create the daemon pod on the node.
   937  	logger := klog.FromContext(ctx)
   938  	var nodesNeedingDaemonPods, podsToDelete []string
   939  	for _, node := range nodeList {
   940  		nodesNeedingDaemonPodsOnNode, podsToDeleteOnNode := dsc.podsShouldBeOnNode(
   941  			logger, node, nodeToDaemonPods, ds, hash)
   942  
   943  		nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, nodesNeedingDaemonPodsOnNode...)
   944  		podsToDelete = append(podsToDelete, podsToDeleteOnNode...)
   945  	}
   946  
   947  	// Remove unscheduled pods assigned to not existing nodes when daemonset pods are scheduled by scheduler.
   948  	// If node doesn't exist then pods are never scheduled and can't be deleted by PodGCController.
   949  	podsToDelete = append(podsToDelete, getUnscheduledPodsWithoutNode(nodeList, nodeToDaemonPods)...)
   950  
   951  	// Label new pods using the hash label value of the current history when creating them
   952  	if err = dsc.syncNodes(ctx, ds, podsToDelete, nodesNeedingDaemonPods, hash); err != nil {
   953  		return err
   954  	}
   955  
   956  	return nil
   957  }
   958  
   959  // syncNodes deletes given pods and creates new daemon set pods on the given nodes
   960  // returns slice with errors if any
   961  func (dsc *DaemonSetsController) syncNodes(ctx context.Context, ds *apps.DaemonSet, podsToDelete, nodesNeedingDaemonPods []string, hash string) error {
   962  	// We need to set expectations before creating/deleting pods to avoid race conditions.
   963  	logger := klog.FromContext(ctx)
   964  	dsKey, err := controller.KeyFunc(ds)
   965  	if err != nil {
   966  		return fmt.Errorf("couldn't get key for object %#v: %v", ds, err)
   967  	}
   968  
   969  	createDiff := len(nodesNeedingDaemonPods)
   970  	deleteDiff := len(podsToDelete)
   971  
   972  	if createDiff > dsc.burstReplicas {
   973  		createDiff = dsc.burstReplicas
   974  	}
   975  	if deleteDiff > dsc.burstReplicas {
   976  		deleteDiff = dsc.burstReplicas
   977  	}
   978  
   979  	dsc.expectations.SetExpectations(logger, dsKey, createDiff, deleteDiff)
   980  
   981  	// error channel to communicate back failures.  make the buffer big enough to avoid any blocking
   982  	errCh := make(chan error, createDiff+deleteDiff)
   983  
   984  	logger.V(4).Info("Nodes needing daemon pods for daemon set, creating", "daemonset", klog.KObj(ds), "needCount", nodesNeedingDaemonPods, "createCount", createDiff)
   985  	createWait := sync.WaitGroup{}
   986  	// If the returned error is not nil we have a parse error.
   987  	// The controller handles this via the hash.
   988  	generation, err := util.GetTemplateGeneration(ds)
   989  	if err != nil {
   990  		generation = nil
   991  	}
   992  	template := util.CreatePodTemplate(ds.Spec.Template, generation, hash)
   993  	// Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize
   994  	// and double with each successful iteration in a kind of "slow start".
   995  	// This handles attempts to start large numbers of pods that would
   996  	// likely all fail with the same error. For example a project with a
   997  	// low quota that attempts to create a large number of pods will be
   998  	// prevented from spamming the API service with the pod create requests
   999  	// after one of its pods fails.  Conveniently, this also prevents the
  1000  	// event spam that those failures would generate.
  1001  	batchSize := min(createDiff, controller.SlowStartInitialBatchSize)
  1002  	for pos := 0; createDiff > pos; batchSize, pos = min(2*batchSize, createDiff-(pos+batchSize)), pos+batchSize {
  1003  		errorCount := len(errCh)
  1004  		createWait.Add(batchSize)
  1005  		for i := pos; i < pos+batchSize; i++ {
  1006  			go func(ix int) {
  1007  				defer createWait.Done()
  1008  
  1009  				podTemplate := template.DeepCopy()
  1010  				// The pod's NodeAffinity will be updated to make sure the Pod is bound
  1011  				// to the target node by default scheduler. It is safe to do so because there
  1012  				// should be no conflicting node affinity with the target node.
  1013  				podTemplate.Spec.Affinity = util.ReplaceDaemonSetPodNodeNameNodeAffinity(
  1014  					podTemplate.Spec.Affinity, nodesNeedingDaemonPods[ix])
  1015  
  1016  				err := dsc.podControl.CreatePods(ctx, ds.Namespace, podTemplate,
  1017  					ds, metav1.NewControllerRef(ds, controllerKind))
  1018  
  1019  				if err != nil {
  1020  					if apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
  1021  						// If the namespace is being torn down, we can safely ignore
  1022  						// this error since all subsequent creations will fail.
  1023  						return
  1024  					}
  1025  				}
  1026  				if err != nil {
  1027  					logger.V(2).Info("Failed creation, decrementing expectations for daemon set", "daemonset", klog.KObj(ds))
  1028  					dsc.expectations.CreationObserved(logger, dsKey)
  1029  					errCh <- err
  1030  					utilruntime.HandleError(err)
  1031  				}
  1032  			}(i)
  1033  		}
  1034  		createWait.Wait()
  1035  		// any skipped pods that we never attempted to start shouldn't be expected.
  1036  		skippedPods := createDiff - (batchSize + pos)
  1037  		if errorCount < len(errCh) && skippedPods > 0 {
  1038  			logger.V(2).Info("Slow-start failure. Skipping creation pods, decrementing expectations for daemon set", "skippedPods", skippedPods, "daemonset", klog.KObj(ds))
  1039  			dsc.expectations.LowerExpectations(logger, dsKey, skippedPods, 0)
  1040  			// The skipped pods will be retried later. The next controller resync will
  1041  			// retry the slow start process.
  1042  			break
  1043  		}
  1044  	}
  1045  
  1046  	logger.V(4).Info("Pods to delete for daemon set, deleting", "daemonset", klog.KObj(ds), "toDeleteCount", podsToDelete, "deleteCount", deleteDiff)
  1047  	deleteWait := sync.WaitGroup{}
  1048  	deleteWait.Add(deleteDiff)
  1049  	for i := 0; i < deleteDiff; i++ {
  1050  		go func(ix int) {
  1051  			defer deleteWait.Done()
  1052  			if err := dsc.podControl.DeletePod(ctx, ds.Namespace, podsToDelete[ix], ds); err != nil {
  1053  				dsc.expectations.DeletionObserved(logger, dsKey)
  1054  				if !apierrors.IsNotFound(err) {
  1055  					logger.V(2).Info("Failed deletion, decremented expectations for daemon set", "daemonset", klog.KObj(ds))
  1056  					errCh <- err
  1057  					utilruntime.HandleError(err)
  1058  				}
  1059  			}
  1060  		}(i)
  1061  	}
  1062  	deleteWait.Wait()
  1063  
  1064  	// collect errors if any for proper reporting/retry logic in the controller
  1065  	errors := []error{}
  1066  	close(errCh)
  1067  	for err := range errCh {
  1068  		errors = append(errors, err)
  1069  	}
  1070  	return utilerrors.NewAggregate(errors)
  1071  }
  1072  
  1073  func storeDaemonSetStatus(
  1074  	ctx context.Context,
  1075  	dsClient unversionedapps.DaemonSetInterface,
  1076  	ds *apps.DaemonSet, desiredNumberScheduled,
  1077  	currentNumberScheduled,
  1078  	numberMisscheduled,
  1079  	numberReady,
  1080  	updatedNumberScheduled,
  1081  	numberAvailable,
  1082  	numberUnavailable int,
  1083  	updateObservedGen bool) error {
  1084  	if int(ds.Status.DesiredNumberScheduled) == desiredNumberScheduled &&
  1085  		int(ds.Status.CurrentNumberScheduled) == currentNumberScheduled &&
  1086  		int(ds.Status.NumberMisscheduled) == numberMisscheduled &&
  1087  		int(ds.Status.NumberReady) == numberReady &&
  1088  		int(ds.Status.UpdatedNumberScheduled) == updatedNumberScheduled &&
  1089  		int(ds.Status.NumberAvailable) == numberAvailable &&
  1090  		int(ds.Status.NumberUnavailable) == numberUnavailable &&
  1091  		ds.Status.ObservedGeneration >= ds.Generation {
  1092  		return nil
  1093  	}
  1094  
  1095  	toUpdate := ds.DeepCopy()
  1096  
  1097  	var updateErr, getErr error
  1098  	for i := 0; ; i++ {
  1099  		if updateObservedGen {
  1100  			toUpdate.Status.ObservedGeneration = ds.Generation
  1101  		}
  1102  		toUpdate.Status.DesiredNumberScheduled = int32(desiredNumberScheduled)
  1103  		toUpdate.Status.CurrentNumberScheduled = int32(currentNumberScheduled)
  1104  		toUpdate.Status.NumberMisscheduled = int32(numberMisscheduled)
  1105  		toUpdate.Status.NumberReady = int32(numberReady)
  1106  		toUpdate.Status.UpdatedNumberScheduled = int32(updatedNumberScheduled)
  1107  		toUpdate.Status.NumberAvailable = int32(numberAvailable)
  1108  		toUpdate.Status.NumberUnavailable = int32(numberUnavailable)
  1109  
  1110  		if _, updateErr = dsClient.UpdateStatus(ctx, toUpdate, metav1.UpdateOptions{}); updateErr == nil {
  1111  			return nil
  1112  		}
  1113  
  1114  		// Stop retrying if we exceed statusUpdateRetries - the DaemonSet will be requeued with a rate limit.
  1115  		if i >= StatusUpdateRetries {
  1116  			break
  1117  		}
  1118  		// Update the set with the latest resource version for the next poll
  1119  		if toUpdate, getErr = dsClient.Get(ctx, ds.Name, metav1.GetOptions{}); getErr != nil {
  1120  			// If the GET fails we can't trust status.Replicas anymore. This error
  1121  			// is bound to be more interesting than the update failure.
  1122  			return getErr
  1123  		}
  1124  	}
  1125  	return updateErr
  1126  }
  1127  
  1128  func (dsc *DaemonSetsController) updateDaemonSetStatus(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, hash string, updateObservedGen bool) error {
  1129  	logger := klog.FromContext(ctx)
  1130  	logger.V(4).Info("Updating daemon set status")
  1131  	nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds, false)
  1132  	if err != nil {
  1133  		return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
  1134  	}
  1135  
  1136  	var desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady, updatedNumberScheduled, numberAvailable int
  1137  	now := dsc.failedPodsBackoff.Clock.Now()
  1138  	for _, node := range nodeList {
  1139  		shouldRun, _ := NodeShouldRunDaemonPod(node, ds)
  1140  		scheduled := len(nodeToDaemonPods[node.Name]) > 0
  1141  
  1142  		if shouldRun {
  1143  			desiredNumberScheduled++
  1144  			if !scheduled {
  1145  				continue
  1146  			}
  1147  
  1148  			currentNumberScheduled++
  1149  			// Sort the daemon pods by creation time, so that the oldest is first.
  1150  			daemonPods, _ := nodeToDaemonPods[node.Name]
  1151  			sort.Sort(podByCreationTimestampAndPhase(daemonPods))
  1152  			pod := daemonPods[0]
  1153  			if podutil.IsPodReady(pod) {
  1154  				numberReady++
  1155  				if podutil.IsPodAvailable(pod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}) {
  1156  					numberAvailable++
  1157  				}
  1158  			}
  1159  			// If the returned error is not nil we have a parse error.
  1160  			// The controller handles this via the hash.
  1161  			generation, err := util.GetTemplateGeneration(ds)
  1162  			if err != nil {
  1163  				generation = nil
  1164  			}
  1165  			if util.IsPodUpdated(pod, hash, generation) {
  1166  				updatedNumberScheduled++
  1167  			}
  1168  		} else {
  1169  			if scheduled {
  1170  				numberMisscheduled++
  1171  			}
  1172  		}
  1173  	}
  1174  	numberUnavailable := desiredNumberScheduled - numberAvailable
  1175  
  1176  	err = storeDaemonSetStatus(ctx, dsc.kubeClient.AppsV1().DaemonSets(ds.Namespace), ds, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady, updatedNumberScheduled, numberAvailable, numberUnavailable, updateObservedGen)
  1177  	if err != nil {
  1178  		return fmt.Errorf("error storing status for daemon set %#v: %w", ds, err)
  1179  	}
  1180  
  1181  	// Resync the DaemonSet after MinReadySeconds as a last line of defense to guard against clock-skew.
  1182  	if ds.Spec.MinReadySeconds > 0 && numberReady != numberAvailable {
  1183  		dsc.enqueueDaemonSetAfter(ds, time.Duration(ds.Spec.MinReadySeconds)*time.Second)
  1184  	}
  1185  	return nil
  1186  }
  1187  
  1188  func (dsc *DaemonSetsController) syncDaemonSet(ctx context.Context, key string) error {
  1189  	logger := klog.FromContext(ctx)
  1190  	startTime := dsc.failedPodsBackoff.Clock.Now()
  1191  
  1192  	defer func() {
  1193  		logger.V(4).Info("Finished syncing daemon set", "daemonset", key, "time", dsc.failedPodsBackoff.Clock.Now().Sub(startTime))
  1194  	}()
  1195  
  1196  	namespace, name, err := cache.SplitMetaNamespaceKey(key)
  1197  	if err != nil {
  1198  		return err
  1199  	}
  1200  	ds, err := dsc.dsLister.DaemonSets(namespace).Get(name)
  1201  	if apierrors.IsNotFound(err) {
  1202  		logger.V(3).Info("Daemon set has been deleted", "daemonset", key)
  1203  		dsc.expectations.DeleteExpectations(logger, key)
  1204  		return nil
  1205  	}
  1206  	if err != nil {
  1207  		return fmt.Errorf("unable to retrieve ds %v from store: %v", key, err)
  1208  	}
  1209  
  1210  	nodeList, err := dsc.nodeLister.List(labels.Everything())
  1211  	if err != nil {
  1212  		return fmt.Errorf("couldn't get list of nodes when syncing daemon set %#v: %v", ds, err)
  1213  	}
  1214  
  1215  	everything := metav1.LabelSelector{}
  1216  	if reflect.DeepEqual(ds.Spec.Selector, &everything) {
  1217  		dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, SelectingAllReason, "This daemon set is selecting all pods. A non-empty selector is required.")
  1218  		return nil
  1219  	}
  1220  
  1221  	// Don't process a daemon set until all its creations and deletions have been processed.
  1222  	// For example if daemon set foo asked for 3 new daemon pods in the previous call to manage,
  1223  	// then we do not want to call manage on foo until the daemon pods have been created.
  1224  	dsKey, err := controller.KeyFunc(ds)
  1225  	if err != nil {
  1226  		return fmt.Errorf("couldn't get key for object %#v: %v", ds, err)
  1227  	}
  1228  
  1229  	// If the DaemonSet is being deleted (either by foreground deletion or
  1230  	// orphan deletion), we cannot be sure if the DaemonSet history objects
  1231  	// it owned still exist -- those history objects can either be deleted
  1232  	// or orphaned. Garbage collector doesn't guarantee that it will delete
  1233  	// DaemonSet pods before deleting DaemonSet history objects, because
  1234  	// DaemonSet history doesn't own DaemonSet pods. We cannot reliably
  1235  	// calculate the status of a DaemonSet being deleted. Therefore, return
  1236  	// here without updating status for the DaemonSet being deleted.
  1237  	if ds.DeletionTimestamp != nil {
  1238  		return nil
  1239  	}
  1240  
  1241  	// Construct histories of the DaemonSet, and get the hash of current history
  1242  	cur, old, err := dsc.constructHistory(ctx, ds)
  1243  	if err != nil {
  1244  		return fmt.Errorf("failed to construct revisions of DaemonSet: %v", err)
  1245  	}
  1246  	hash := cur.Labels[apps.DefaultDaemonSetUniqueLabelKey]
  1247  
  1248  	if !dsc.expectations.SatisfiedExpectations(logger, dsKey) {
  1249  		// Only update status. Don't raise observedGeneration since controller didn't process object of that generation.
  1250  		return dsc.updateDaemonSetStatus(ctx, ds, nodeList, hash, false)
  1251  	}
  1252  
  1253  	err = dsc.updateDaemonSet(ctx, ds, nodeList, hash, dsKey, old)
  1254  	statusErr := dsc.updateDaemonSetStatus(ctx, ds, nodeList, hash, true)
  1255  	switch {
  1256  	case err != nil && statusErr != nil:
  1257  		// If there was an error, and we failed to update status,
  1258  		// log it and return the original error.
  1259  		logger.Error(statusErr, "Failed to update status", "daemonSet", klog.KObj(ds))
  1260  		return err
  1261  	case err != nil:
  1262  		return err
  1263  	case statusErr != nil:
  1264  		return statusErr
  1265  	}
  1266  
  1267  	return nil
  1268  }
  1269  
  1270  // NodeShouldRunDaemonPod checks a set of preconditions against a (node,daemonset) and returns a
  1271  // summary. Returned booleans are:
  1272  //   - shouldRun:
  1273  //     Returns true when a daemonset should run on the node if a daemonset pod is not already
  1274  //     running on that node.
  1275  //   - shouldContinueRunning:
  1276  //     Returns true when a daemonset should continue running on a node if a daemonset pod is already
  1277  //     running on that node.
  1278  func NodeShouldRunDaemonPod(node *v1.Node, ds *apps.DaemonSet) (bool, bool) {
  1279  	pod := NewPod(ds, node.Name)
  1280  
  1281  	// If the daemon set specifies a node name, check that it matches with node.Name.
  1282  	if !(ds.Spec.Template.Spec.NodeName == "" || ds.Spec.Template.Spec.NodeName == node.Name) {
  1283  		return false, false
  1284  	}
  1285  
  1286  	taints := node.Spec.Taints
  1287  	fitsNodeName, fitsNodeAffinity, fitsTaints := predicates(pod, node, taints)
  1288  	if !fitsNodeName || !fitsNodeAffinity {
  1289  		return false, false
  1290  	}
  1291  
  1292  	if !fitsTaints {
  1293  		// Scheduled daemon pods should continue running if they tolerate NoExecute taint.
  1294  		_, hasUntoleratedTaint := v1helper.FindMatchingUntoleratedTaint(taints, pod.Spec.Tolerations, func(t *v1.Taint) bool {
  1295  			return t.Effect == v1.TaintEffectNoExecute
  1296  		})
  1297  		return false, !hasUntoleratedTaint
  1298  	}
  1299  
  1300  	return true, true
  1301  }
  1302  
  1303  // predicates checks if a DaemonSet's pod can run on a node.
  1304  func predicates(pod *v1.Pod, node *v1.Node, taints []v1.Taint) (fitsNodeName, fitsNodeAffinity, fitsTaints bool) {
  1305  	fitsNodeName = len(pod.Spec.NodeName) == 0 || pod.Spec.NodeName == node.Name
  1306  	// Ignore parsing errors for backwards compatibility.
  1307  	fitsNodeAffinity, _ = nodeaffinity.GetRequiredNodeAffinity(pod).Match(node)
  1308  	_, hasUntoleratedTaint := v1helper.FindMatchingUntoleratedTaint(taints, pod.Spec.Tolerations, func(t *v1.Taint) bool {
  1309  		return t.Effect == v1.TaintEffectNoExecute || t.Effect == v1.TaintEffectNoSchedule
  1310  	})
  1311  	fitsTaints = !hasUntoleratedTaint
  1312  	return
  1313  }
  1314  
  1315  // NewPod creates a new pod
  1316  func NewPod(ds *apps.DaemonSet, nodeName string) *v1.Pod {
  1317  	newPod := &v1.Pod{Spec: ds.Spec.Template.Spec, ObjectMeta: ds.Spec.Template.ObjectMeta}
  1318  	newPod.Namespace = ds.Namespace
  1319  	newPod.Spec.NodeName = nodeName
  1320  
  1321  	// Added default tolerations for DaemonSet pods.
  1322  	util.AddOrUpdateDaemonPodTolerations(&newPod.Spec)
  1323  
  1324  	return newPod
  1325  }
  1326  
  1327  type podByCreationTimestampAndPhase []*v1.Pod
  1328  
  1329  func (o podByCreationTimestampAndPhase) Len() int      { return len(o) }
  1330  func (o podByCreationTimestampAndPhase) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
  1331  
  1332  func (o podByCreationTimestampAndPhase) Less(i, j int) bool {
  1333  	// Scheduled Pod first
  1334  	if len(o[i].Spec.NodeName) != 0 && len(o[j].Spec.NodeName) == 0 {
  1335  		return true
  1336  	}
  1337  
  1338  	if len(o[i].Spec.NodeName) == 0 && len(o[j].Spec.NodeName) != 0 {
  1339  		return false
  1340  	}
  1341  
  1342  	if o[i].CreationTimestamp.Equal(&o[j].CreationTimestamp) {
  1343  		return o[i].Name < o[j].Name
  1344  	}
  1345  	return o[i].CreationTimestamp.Before(&o[j].CreationTimestamp)
  1346  }
  1347  
  1348  func failedPodsBackoffKey(ds *apps.DaemonSet, nodeName string) string {
  1349  	return fmt.Sprintf("%s/%d/%s", ds.UID, ds.Status.ObservedGeneration, nodeName)
  1350  }
  1351  
  1352  // getUnscheduledPodsWithoutNode returns list of unscheduled pods assigned to not existing nodes.
  1353  // Returned pods can't be deleted by PodGCController so they should be deleted by DaemonSetController.
  1354  func getUnscheduledPodsWithoutNode(runningNodesList []*v1.Node, nodeToDaemonPods map[string][]*v1.Pod) []string {
  1355  	var results []string
  1356  	isNodeRunning := make(map[string]bool, len(runningNodesList))
  1357  	for _, node := range runningNodesList {
  1358  		isNodeRunning[node.Name] = true
  1359  	}
  1360  
  1361  	for n, pods := range nodeToDaemonPods {
  1362  		if isNodeRunning[n] {
  1363  			continue
  1364  		}
  1365  		for _, pod := range pods {
  1366  			if len(pod.Spec.NodeName) == 0 {
  1367  				results = append(results, pod.Name)
  1368  			}
  1369  		}
  1370  	}
  1371  
  1372  	return results
  1373  }
  1374  

View as plain text