...

Source file src/k8s.io/kubernetes/pkg/controller/nodelifecycle/node_lifecycle_controller.go

Documentation: k8s.io/kubernetes/pkg/controller/nodelifecycle

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  // The Controller sets tainted annotations on nodes.
    18  // Tainted nodes should not be used for new work loads and
    19  // some effort should be given to getting existing work
    20  // loads off of tainted nodes.
    21  
    22  package nodelifecycle
    23  
    24  import (
    25  	"context"
    26  	"fmt"
    27  	"sync"
    28  	"time"
    29  
    30  	"k8s.io/klog/v2"
    31  
    32  	coordv1 "k8s.io/api/coordination/v1"
    33  	v1 "k8s.io/api/core/v1"
    34  	apiequality "k8s.io/apimachinery/pkg/api/equality"
    35  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    36  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    37  	"k8s.io/apimachinery/pkg/labels"
    38  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    39  	"k8s.io/apimachinery/pkg/util/wait"
    40  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    41  	appsv1informers "k8s.io/client-go/informers/apps/v1"
    42  	coordinformers "k8s.io/client-go/informers/coordination/v1"
    43  	coreinformers "k8s.io/client-go/informers/core/v1"
    44  	clientset "k8s.io/client-go/kubernetes"
    45  	"k8s.io/client-go/kubernetes/scheme"
    46  	v1core "k8s.io/client-go/kubernetes/typed/core/v1"
    47  	appsv1listers "k8s.io/client-go/listers/apps/v1"
    48  	coordlisters "k8s.io/client-go/listers/coordination/v1"
    49  	corelisters "k8s.io/client-go/listers/core/v1"
    50  	"k8s.io/client-go/tools/cache"
    51  	"k8s.io/client-go/tools/record"
    52  	"k8s.io/client-go/util/flowcontrol"
    53  	"k8s.io/client-go/util/workqueue"
    54  	nodetopology "k8s.io/component-helpers/node/topology"
    55  	kubeletapis "k8s.io/kubelet/pkg/apis"
    56  	"k8s.io/kubernetes/pkg/controller"
    57  	"k8s.io/kubernetes/pkg/controller/nodelifecycle/scheduler"
    58  	"k8s.io/kubernetes/pkg/controller/tainteviction"
    59  	controllerutil "k8s.io/kubernetes/pkg/controller/util/node"
    60  	"k8s.io/kubernetes/pkg/features"
    61  	taintutils "k8s.io/kubernetes/pkg/util/taints"
    62  )
    63  
    64  func init() {
    65  	// Register prometheus metrics
    66  	Register()
    67  }
    68  
    69  var (
    70  	// UnreachableTaintTemplate is the taint for when a node becomes unreachable.
    71  	UnreachableTaintTemplate = &v1.Taint{
    72  		Key:    v1.TaintNodeUnreachable,
    73  		Effect: v1.TaintEffectNoExecute,
    74  	}
    75  
    76  	// NotReadyTaintTemplate is the taint for when a node is not ready for
    77  	// executing pods
    78  	NotReadyTaintTemplate = &v1.Taint{
    79  		Key:    v1.TaintNodeNotReady,
    80  		Effect: v1.TaintEffectNoExecute,
    81  	}
    82  
    83  	// map {NodeConditionType: {ConditionStatus: TaintKey}}
    84  	// represents which NodeConditionType under which ConditionStatus should be
    85  	// tainted with which TaintKey
    86  	// for certain NodeConditionType, there are multiple {ConditionStatus,TaintKey} pairs
    87  	nodeConditionToTaintKeyStatusMap = map[v1.NodeConditionType]map[v1.ConditionStatus]string{
    88  		v1.NodeReady: {
    89  			v1.ConditionFalse:   v1.TaintNodeNotReady,
    90  			v1.ConditionUnknown: v1.TaintNodeUnreachable,
    91  		},
    92  		v1.NodeMemoryPressure: {
    93  			v1.ConditionTrue: v1.TaintNodeMemoryPressure,
    94  		},
    95  		v1.NodeDiskPressure: {
    96  			v1.ConditionTrue: v1.TaintNodeDiskPressure,
    97  		},
    98  		v1.NodeNetworkUnavailable: {
    99  			v1.ConditionTrue: v1.TaintNodeNetworkUnavailable,
   100  		},
   101  		v1.NodePIDPressure: {
   102  			v1.ConditionTrue: v1.TaintNodePIDPressure,
   103  		},
   104  	}
   105  
   106  	taintKeyToNodeConditionMap = map[string]v1.NodeConditionType{
   107  		v1.TaintNodeNotReady:           v1.NodeReady,
   108  		v1.TaintNodeUnreachable:        v1.NodeReady,
   109  		v1.TaintNodeNetworkUnavailable: v1.NodeNetworkUnavailable,
   110  		v1.TaintNodeMemoryPressure:     v1.NodeMemoryPressure,
   111  		v1.TaintNodeDiskPressure:       v1.NodeDiskPressure,
   112  		v1.TaintNodePIDPressure:        v1.NodePIDPressure,
   113  	}
   114  )
   115  
   116  // ZoneState is the state of a given zone.
   117  type ZoneState string
   118  
   119  const (
   120  	stateInitial           = ZoneState("Initial")
   121  	stateNormal            = ZoneState("Normal")
   122  	stateFullDisruption    = ZoneState("FullDisruption")
   123  	statePartialDisruption = ZoneState("PartialDisruption")
   124  )
   125  
   126  const (
   127  	// The amount of time the nodecontroller should sleep between retrying node health updates
   128  	retrySleepTime   = 20 * time.Millisecond
   129  	nodeNameKeyIndex = "spec.nodeName"
   130  	// podUpdateWorkerSizes assumes that in most cases pod will be handled by monitorNodeHealth pass.
   131  	// Pod update workers will only handle lagging cache pods. 4 workers should be enough.
   132  	podUpdateWorkerSize = 4
   133  	// nodeUpdateWorkerSize defines the size of workers for node update or/and pod update.
   134  	nodeUpdateWorkerSize = 8
   135  
   136  	// taintEvictionController is defined here in order to prevent imports of
   137  	// k8s.io/kubernetes/cmd/kube-controller-manager/names which would result in validation errors.
   138  	// This constant will be removed upon graduation of the SeparateTaintEvictionController feature.
   139  	taintEvictionController = "taint-eviction-controller"
   140  )
   141  
   142  // labelReconcileInfo lists Node labels to reconcile, and how to reconcile them.
   143  // primaryKey and secondaryKey are keys of labels to reconcile.
   144  //   - If both keys exist, but their values don't match. Use the value from the
   145  //     primaryKey as the source of truth to reconcile.
   146  //   - If ensureSecondaryExists is true, and the secondaryKey does not
   147  //     exist, secondaryKey will be added with the value of the primaryKey.
   148  var labelReconcileInfo = []struct {
   149  	primaryKey            string
   150  	secondaryKey          string
   151  	ensureSecondaryExists bool
   152  }{
   153  	{
   154  		// Reconcile the beta and the stable OS label using the stable label as the source of truth.
   155  		// TODO(#89477): no earlier than 1.22: drop the beta labels if they differ from the GA labels
   156  		primaryKey:            v1.LabelOSStable,
   157  		secondaryKey:          kubeletapis.LabelOS,
   158  		ensureSecondaryExists: true,
   159  	},
   160  	{
   161  		// Reconcile the beta and the stable arch label using the stable label as the source of truth.
   162  		// TODO(#89477): no earlier than 1.22: drop the beta labels if they differ from the GA labels
   163  		primaryKey:            v1.LabelArchStable,
   164  		secondaryKey:          kubeletapis.LabelArch,
   165  		ensureSecondaryExists: true,
   166  	},
   167  }
   168  
   169  type nodeHealthData struct {
   170  	probeTimestamp           metav1.Time
   171  	readyTransitionTimestamp metav1.Time
   172  	status                   *v1.NodeStatus
   173  	lease                    *coordv1.Lease
   174  }
   175  
   176  func (n *nodeHealthData) deepCopy() *nodeHealthData {
   177  	if n == nil {
   178  		return nil
   179  	}
   180  	return &nodeHealthData{
   181  		probeTimestamp:           n.probeTimestamp,
   182  		readyTransitionTimestamp: n.readyTransitionTimestamp,
   183  		status:                   n.status.DeepCopy(),
   184  		lease:                    n.lease.DeepCopy(),
   185  	}
   186  }
   187  
   188  type nodeHealthMap struct {
   189  	lock        sync.RWMutex
   190  	nodeHealths map[string]*nodeHealthData
   191  }
   192  
   193  func newNodeHealthMap() *nodeHealthMap {
   194  	return &nodeHealthMap{
   195  		nodeHealths: make(map[string]*nodeHealthData),
   196  	}
   197  }
   198  
   199  // getDeepCopy - returns copy of node health data.
   200  // It prevents data being changed after retrieving it from the map.
   201  func (n *nodeHealthMap) getDeepCopy(name string) *nodeHealthData {
   202  	n.lock.RLock()
   203  	defer n.lock.RUnlock()
   204  	return n.nodeHealths[name].deepCopy()
   205  }
   206  
   207  func (n *nodeHealthMap) set(name string, data *nodeHealthData) {
   208  	n.lock.Lock()
   209  	defer n.lock.Unlock()
   210  	n.nodeHealths[name] = data
   211  }
   212  
   213  type podUpdateItem struct {
   214  	namespace string
   215  	name      string
   216  }
   217  
   218  // Controller is the controller that manages node's life cycle.
   219  type Controller struct {
   220  	taintManager *tainteviction.Controller
   221  
   222  	podLister         corelisters.PodLister
   223  	podInformerSynced cache.InformerSynced
   224  	kubeClient        clientset.Interface
   225  
   226  	// This timestamp is to be used instead of LastProbeTime stored in Condition. We do this
   227  	// to avoid the problem with time skew across the cluster.
   228  	now func() metav1.Time
   229  
   230  	enterPartialDisruptionFunc func(nodeNum int) float32
   231  	enterFullDisruptionFunc    func(nodeNum int) float32
   232  	computeZoneStateFunc       func(nodeConditions []*v1.NodeCondition) (int, ZoneState)
   233  
   234  	knownNodeSet map[string]*v1.Node
   235  	// per Node map storing last observed health together with a local time when it was observed.
   236  	nodeHealthMap *nodeHealthMap
   237  
   238  	// evictorLock protects zonePodEvictor and zoneNoExecuteTainter.
   239  	evictorLock sync.Mutex
   240  	// workers that are responsible for tainting nodes.
   241  	zoneNoExecuteTainter map[string]*scheduler.RateLimitedTimedQueue
   242  
   243  	nodesToRetry sync.Map
   244  
   245  	zoneStates map[string]ZoneState
   246  
   247  	daemonSetStore          appsv1listers.DaemonSetLister
   248  	daemonSetInformerSynced cache.InformerSynced
   249  
   250  	leaseLister         coordlisters.LeaseLister
   251  	leaseInformerSynced cache.InformerSynced
   252  	nodeLister          corelisters.NodeLister
   253  	nodeInformerSynced  cache.InformerSynced
   254  
   255  	getPodsAssignedToNode func(nodeName string) ([]*v1.Pod, error)
   256  
   257  	broadcaster record.EventBroadcaster
   258  	recorder    record.EventRecorder
   259  
   260  	// Value controlling Controller monitoring period, i.e. how often does Controller
   261  	// check node health signal posted from kubelet. This value should be lower than
   262  	// nodeMonitorGracePeriod.
   263  	// TODO: Change node health monitor to watch based.
   264  	nodeMonitorPeriod time.Duration
   265  
   266  	// When node is just created, e.g. cluster bootstrap or node creation, we give
   267  	// a longer grace period.
   268  	nodeStartupGracePeriod time.Duration
   269  
   270  	// Controller will not proactively sync node health, but will monitor node
   271  	// health signal updated from kubelet. There are 2 kinds of node healthiness
   272  	// signals: NodeStatus and NodeLease. If it doesn't receive update for this amount
   273  	// of time, it will start posting "NodeReady==ConditionUnknown". The amount of
   274  	// time before which Controller start evicting pods is controlled via flag
   275  	// 'pod-eviction-timeout'.
   276  	// Note: be cautious when changing the constant, it must work with
   277  	// nodeStatusUpdateFrequency in kubelet and renewInterval in NodeLease
   278  	// controller. The node health signal update frequency is the minimal of the
   279  	// two.
   280  	// There are several constraints:
   281  	// 1. nodeMonitorGracePeriod must be N times more than  the node health signal
   282  	//    update frequency, where N means number of retries allowed for kubelet to
   283  	//    post node status/lease. It is pointless to make nodeMonitorGracePeriod
   284  	//    be less than the node health signal update frequency, since there will
   285  	//    only be fresh values from Kubelet at an interval of node health signal
   286  	//    update frequency.
   287  	// 2. nodeMonitorGracePeriod can't be too large for user experience - larger
   288  	//    value takes longer for user to see up-to-date node health.
   289  	nodeMonitorGracePeriod time.Duration
   290  
   291  	// Number of workers Controller uses to process node monitor health updates.
   292  	// Defaults to scheduler.UpdateWorkerSize.
   293  	nodeUpdateWorkerSize int
   294  
   295  	evictionLimiterQPS          float32
   296  	secondaryEvictionLimiterQPS float32
   297  	largeClusterThreshold       int32
   298  	unhealthyZoneThreshold      float32
   299  
   300  	nodeUpdateQueue workqueue.Interface
   301  	podUpdateQueue  workqueue.RateLimitingInterface
   302  }
   303  
   304  // NewNodeLifecycleController returns a new taint controller.
   305  func NewNodeLifecycleController(
   306  	ctx context.Context,
   307  	leaseInformer coordinformers.LeaseInformer,
   308  	podInformer coreinformers.PodInformer,
   309  	nodeInformer coreinformers.NodeInformer,
   310  	daemonSetInformer appsv1informers.DaemonSetInformer,
   311  	kubeClient clientset.Interface,
   312  	nodeMonitorPeriod time.Duration,
   313  	nodeStartupGracePeriod time.Duration,
   314  	nodeMonitorGracePeriod time.Duration,
   315  	evictionLimiterQPS float32,
   316  	secondaryEvictionLimiterQPS float32,
   317  	largeClusterThreshold int32,
   318  	unhealthyZoneThreshold float32,
   319  ) (*Controller, error) {
   320  	logger := klog.FromContext(ctx)
   321  	if kubeClient == nil {
   322  		logger.Error(nil, "kubeClient is nil when starting nodelifecycle Controller")
   323  		klog.FlushAndExit(klog.ExitFlushTimeout, 1)
   324  	}
   325  
   326  	eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
   327  	recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "node-controller"})
   328  
   329  	nc := &Controller{
   330  		kubeClient:                  kubeClient,
   331  		now:                         metav1.Now,
   332  		knownNodeSet:                make(map[string]*v1.Node),
   333  		nodeHealthMap:               newNodeHealthMap(),
   334  		broadcaster:                 eventBroadcaster,
   335  		recorder:                    recorder,
   336  		nodeMonitorPeriod:           nodeMonitorPeriod,
   337  		nodeStartupGracePeriod:      nodeStartupGracePeriod,
   338  		nodeMonitorGracePeriod:      nodeMonitorGracePeriod,
   339  		nodeUpdateWorkerSize:        nodeUpdateWorkerSize,
   340  		zoneNoExecuteTainter:        make(map[string]*scheduler.RateLimitedTimedQueue),
   341  		nodesToRetry:                sync.Map{},
   342  		zoneStates:                  make(map[string]ZoneState),
   343  		evictionLimiterQPS:          evictionLimiterQPS,
   344  		secondaryEvictionLimiterQPS: secondaryEvictionLimiterQPS,
   345  		largeClusterThreshold:       largeClusterThreshold,
   346  		unhealthyZoneThreshold:      unhealthyZoneThreshold,
   347  		nodeUpdateQueue:             workqueue.NewNamed("node_lifecycle_controller"),
   348  		podUpdateQueue:              workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "node_lifecycle_controller_pods"),
   349  	}
   350  
   351  	nc.enterPartialDisruptionFunc = nc.ReducedQPSFunc
   352  	nc.enterFullDisruptionFunc = nc.HealthyQPSFunc
   353  	nc.computeZoneStateFunc = nc.ComputeZoneState
   354  
   355  	podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   356  		AddFunc: func(obj interface{}) {
   357  			pod := obj.(*v1.Pod)
   358  			nc.podUpdated(nil, pod)
   359  		},
   360  		UpdateFunc: func(prev, obj interface{}) {
   361  			prevPod := prev.(*v1.Pod)
   362  			newPod := obj.(*v1.Pod)
   363  			nc.podUpdated(prevPod, newPod)
   364  		},
   365  		DeleteFunc: func(obj interface{}) {
   366  			pod, isPod := obj.(*v1.Pod)
   367  			// We can get DeletedFinalStateUnknown instead of *v1.Pod here and we need to handle that correctly.
   368  			if !isPod {
   369  				deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
   370  				if !ok {
   371  					logger.Error(nil, "Received unexpected object", "object", obj)
   372  					return
   373  				}
   374  				pod, ok = deletedState.Obj.(*v1.Pod)
   375  				if !ok {
   376  					logger.Error(nil, "DeletedFinalStateUnknown contained non-Pod object", "object", deletedState.Obj)
   377  					return
   378  				}
   379  			}
   380  			nc.podUpdated(pod, nil)
   381  		},
   382  	})
   383  	nc.podInformerSynced = podInformer.Informer().HasSynced
   384  	podInformer.Informer().AddIndexers(cache.Indexers{
   385  		nodeNameKeyIndex: func(obj interface{}) ([]string, error) {
   386  			pod, ok := obj.(*v1.Pod)
   387  			if !ok {
   388  				return []string{}, nil
   389  			}
   390  			if len(pod.Spec.NodeName) == 0 {
   391  				return []string{}, nil
   392  			}
   393  			return []string{pod.Spec.NodeName}, nil
   394  		},
   395  	})
   396  
   397  	podIndexer := podInformer.Informer().GetIndexer()
   398  	nc.getPodsAssignedToNode = func(nodeName string) ([]*v1.Pod, error) {
   399  		objs, err := podIndexer.ByIndex(nodeNameKeyIndex, nodeName)
   400  		if err != nil {
   401  			return nil, err
   402  		}
   403  		pods := make([]*v1.Pod, 0, len(objs))
   404  		for _, obj := range objs {
   405  			pod, ok := obj.(*v1.Pod)
   406  			if !ok {
   407  				continue
   408  			}
   409  			pods = append(pods, pod)
   410  		}
   411  		return pods, nil
   412  	}
   413  	nc.podLister = podInformer.Lister()
   414  	nc.nodeLister = nodeInformer.Lister()
   415  
   416  	if !utilfeature.DefaultFeatureGate.Enabled(features.SeparateTaintEvictionController) {
   417  		logger.Info("Running TaintEvictionController as part of NodeLifecyleController")
   418  		tm, err := tainteviction.New(ctx, kubeClient, podInformer, nodeInformer, taintEvictionController)
   419  		if err != nil {
   420  			return nil, err
   421  		}
   422  		nc.taintManager = tm
   423  	}
   424  
   425  	logger.Info("Controller will reconcile labels")
   426  	nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   427  		AddFunc: controllerutil.CreateAddNodeHandler(func(node *v1.Node) error {
   428  			nc.nodeUpdateQueue.Add(node.Name)
   429  			return nil
   430  		}),
   431  		UpdateFunc: controllerutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
   432  			nc.nodeUpdateQueue.Add(newNode.Name)
   433  			return nil
   434  		}),
   435  		DeleteFunc: controllerutil.CreateDeleteNodeHandler(logger, func(node *v1.Node) error {
   436  			nc.nodesToRetry.Delete(node.Name)
   437  			return nil
   438  		}),
   439  	})
   440  
   441  	nc.leaseLister = leaseInformer.Lister()
   442  	nc.leaseInformerSynced = leaseInformer.Informer().HasSynced
   443  
   444  	nc.nodeInformerSynced = nodeInformer.Informer().HasSynced
   445  
   446  	nc.daemonSetStore = daemonSetInformer.Lister()
   447  	nc.daemonSetInformerSynced = daemonSetInformer.Informer().HasSynced
   448  
   449  	return nc, nil
   450  }
   451  
   452  // Run starts an asynchronous loop that monitors the status of cluster nodes.
   453  func (nc *Controller) Run(ctx context.Context) {
   454  	defer utilruntime.HandleCrash()
   455  
   456  	// Start events processing pipeline.
   457  	nc.broadcaster.StartStructuredLogging(3)
   458  	logger := klog.FromContext(ctx)
   459  	logger.Info("Sending events to api server")
   460  	nc.broadcaster.StartRecordingToSink(
   461  		&v1core.EventSinkImpl{
   462  			Interface: v1core.New(nc.kubeClient.CoreV1().RESTClient()).Events(""),
   463  		})
   464  	defer nc.broadcaster.Shutdown()
   465  
   466  	// Close node update queue to cleanup go routine.
   467  	defer nc.nodeUpdateQueue.ShutDown()
   468  	defer nc.podUpdateQueue.ShutDown()
   469  
   470  	logger.Info("Starting node controller")
   471  	defer logger.Info("Shutting down node controller")
   472  
   473  	if !cache.WaitForNamedCacheSync("taint", ctx.Done(), nc.leaseInformerSynced, nc.nodeInformerSynced, nc.podInformerSynced, nc.daemonSetInformerSynced) {
   474  		return
   475  	}
   476  
   477  	if !utilfeature.DefaultFeatureGate.Enabled(features.SeparateTaintEvictionController) {
   478  		logger.Info("Starting", "controller", taintEvictionController)
   479  		go nc.taintManager.Run(ctx)
   480  	}
   481  
   482  	// Start workers to reconcile labels and/or update NoSchedule taint for nodes.
   483  	for i := 0; i < nodeUpdateWorkerSize; i++ {
   484  		// Thanks to "workqueue", each worker just need to get item from queue, because
   485  		// the item is flagged when got from queue: if new event come, the new item will
   486  		// be re-queued until "Done", so no more than one worker handle the same item and
   487  		// no event missed.
   488  		go wait.UntilWithContext(ctx, nc.doNodeProcessingPassWorker, time.Second)
   489  	}
   490  
   491  	for i := 0; i < podUpdateWorkerSize; i++ {
   492  		go wait.UntilWithContext(ctx, nc.doPodProcessingWorker, time.Second)
   493  	}
   494  
   495  	// Handling taint based evictions. Because we don't want a dedicated logic in TaintManager for NC-originated
   496  	// taints and we normally don't rate limit evictions caused by taints, we need to rate limit adding taints.
   497  	go wait.UntilWithContext(ctx, nc.doNoExecuteTaintingPass, scheduler.NodeEvictionPeriod)
   498  
   499  	// Incorporate the results of node health signal pushed from kubelet to master.
   500  	go wait.UntilWithContext(ctx, func(ctx context.Context) {
   501  		if err := nc.monitorNodeHealth(ctx); err != nil {
   502  			logger.Error(err, "Error monitoring node health")
   503  		}
   504  	}, nc.nodeMonitorPeriod)
   505  
   506  	<-ctx.Done()
   507  }
   508  
   509  func (nc *Controller) doNodeProcessingPassWorker(ctx context.Context) {
   510  	logger := klog.FromContext(ctx)
   511  	for {
   512  		obj, shutdown := nc.nodeUpdateQueue.Get()
   513  		// "nodeUpdateQueue" will be shutdown when "stopCh" closed;
   514  		// we do not need to re-check "stopCh" again.
   515  		if shutdown {
   516  			return
   517  		}
   518  		nodeName := obj.(string)
   519  		if err := nc.doNoScheduleTaintingPass(ctx, nodeName); err != nil {
   520  			logger.Error(err, "Failed to taint NoSchedule on node, requeue it", "node", klog.KRef("", nodeName))
   521  			// TODO(k82cn): Add nodeName back to the queue
   522  		}
   523  		// TODO: re-evaluate whether there are any labels that need to be
   524  		// reconcile in 1.19. Remove this function if it's no longer necessary.
   525  		if err := nc.reconcileNodeLabels(ctx, nodeName); err != nil {
   526  			logger.Error(err, "Failed to reconcile labels for node, requeue it", "node", klog.KRef("", nodeName))
   527  			// TODO(yujuhong): Add nodeName back to the queue
   528  		}
   529  		nc.nodeUpdateQueue.Done(nodeName)
   530  	}
   531  }
   532  
   533  func (nc *Controller) doNoScheduleTaintingPass(ctx context.Context, nodeName string) error {
   534  	node, err := nc.nodeLister.Get(nodeName)
   535  	if err != nil {
   536  		// If node not found, just ignore it.
   537  		if apierrors.IsNotFound(err) {
   538  			return nil
   539  		}
   540  		return err
   541  	}
   542  
   543  	// Map node's condition to Taints.
   544  	var taints []v1.Taint
   545  	for _, condition := range node.Status.Conditions {
   546  		if taintMap, found := nodeConditionToTaintKeyStatusMap[condition.Type]; found {
   547  			if taintKey, found := taintMap[condition.Status]; found {
   548  				taints = append(taints, v1.Taint{
   549  					Key:    taintKey,
   550  					Effect: v1.TaintEffectNoSchedule,
   551  				})
   552  			}
   553  		}
   554  	}
   555  	if node.Spec.Unschedulable {
   556  		// If unschedulable, append related taint.
   557  		taints = append(taints, v1.Taint{
   558  			Key:    v1.TaintNodeUnschedulable,
   559  			Effect: v1.TaintEffectNoSchedule,
   560  		})
   561  	}
   562  
   563  	// Get exist taints of node.
   564  	nodeTaints := taintutils.TaintSetFilter(node.Spec.Taints, func(t *v1.Taint) bool {
   565  		// only NoSchedule taints are candidates to be compared with "taints" later
   566  		if t.Effect != v1.TaintEffectNoSchedule {
   567  			return false
   568  		}
   569  		// Find unschedulable taint of node.
   570  		if t.Key == v1.TaintNodeUnschedulable {
   571  			return true
   572  		}
   573  		// Find node condition taints of node.
   574  		_, found := taintKeyToNodeConditionMap[t.Key]
   575  		return found
   576  	})
   577  	taintsToAdd, taintsToDel := taintutils.TaintSetDiff(taints, nodeTaints)
   578  	// If nothing to add or delete, return true directly.
   579  	if len(taintsToAdd) == 0 && len(taintsToDel) == 0 {
   580  		return nil
   581  	}
   582  	if !controllerutil.SwapNodeControllerTaint(ctx, nc.kubeClient, taintsToAdd, taintsToDel, node) {
   583  		return fmt.Errorf("failed to swap taints of node %+v", node)
   584  	}
   585  	return nil
   586  }
   587  
   588  func (nc *Controller) doNoExecuteTaintingPass(ctx context.Context) {
   589  	// Extract out the keys of the map in order to not hold
   590  	// the evictorLock for the entire function and hold it
   591  	// only when nescessary.
   592  	var zoneNoExecuteTainterKeys []string
   593  	func() {
   594  		nc.evictorLock.Lock()
   595  		defer nc.evictorLock.Unlock()
   596  
   597  		zoneNoExecuteTainterKeys = make([]string, 0, len(nc.zoneNoExecuteTainter))
   598  		for k := range nc.zoneNoExecuteTainter {
   599  			zoneNoExecuteTainterKeys = append(zoneNoExecuteTainterKeys, k)
   600  		}
   601  	}()
   602  	logger := klog.FromContext(ctx)
   603  	for _, k := range zoneNoExecuteTainterKeys {
   604  		var zoneNoExecuteTainterWorker *scheduler.RateLimitedTimedQueue
   605  		func() {
   606  			nc.evictorLock.Lock()
   607  			defer nc.evictorLock.Unlock()
   608  			// Extracting the value without checking if the key
   609  			// exists or not is safe to do here since zones do
   610  			// not get removed, and consequently pod evictors for
   611  			// these zones also do not get removed, only added.
   612  			zoneNoExecuteTainterWorker = nc.zoneNoExecuteTainter[k]
   613  		}()
   614  		// Function should return 'false' and a time after which it should be retried, or 'true' if it shouldn't (it succeeded).
   615  		zoneNoExecuteTainterWorker.Try(logger, func(value scheduler.TimedValue) (bool, time.Duration) {
   616  			node, err := nc.nodeLister.Get(value.Value)
   617  			if apierrors.IsNotFound(err) {
   618  				logger.Info("Node no longer present in nodeLister", "node", klog.KRef("", value.Value))
   619  				return true, 0
   620  			} else if err != nil {
   621  				logger.Info("Failed to get Node from the nodeLister", "node", klog.KRef("", value.Value), "err", err)
   622  				// retry in 50 millisecond
   623  				return false, 50 * time.Millisecond
   624  			}
   625  			_, condition := controllerutil.GetNodeCondition(&node.Status, v1.NodeReady)
   626  			if condition == nil {
   627  				logger.Info("Failed to get NodeCondition from the node status", "node", klog.KRef("", value.Value))
   628  				// retry in 50 millisecond
   629  				return false, 50 * time.Millisecond
   630  			}
   631  			// Because we want to mimic NodeStatus.Condition["Ready"] we make "unreachable" and "not ready" taints mutually exclusive.
   632  			taintToAdd := v1.Taint{}
   633  			oppositeTaint := v1.Taint{}
   634  			switch condition.Status {
   635  			case v1.ConditionFalse:
   636  				taintToAdd = *NotReadyTaintTemplate
   637  				oppositeTaint = *UnreachableTaintTemplate
   638  			case v1.ConditionUnknown:
   639  				taintToAdd = *UnreachableTaintTemplate
   640  				oppositeTaint = *NotReadyTaintTemplate
   641  			default:
   642  				// It seems that the Node is ready again, so there's no need to taint it.
   643  				logger.V(4).Info("Node was in a taint queue, but it's ready now. Ignoring taint request", "node", klog.KRef("", value.Value))
   644  				return true, 0
   645  			}
   646  			result := controllerutil.SwapNodeControllerTaint(ctx, nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{&oppositeTaint}, node)
   647  			if result {
   648  				// Count the number of evictions.
   649  				zone := nodetopology.GetZoneKey(node)
   650  				evictionsTotal.WithLabelValues(zone).Inc()
   651  			}
   652  
   653  			return result, 0
   654  		})
   655  	}
   656  }
   657  
   658  // monitorNodeHealth verifies node health are constantly updated by kubelet, and if not, post "NodeReady==ConditionUnknown".
   659  // This function will
   660  //   - add nodes which are not ready or not reachable for a long period of time to a rate-limited
   661  //     queue so that NoExecute taints can be added by the goroutine running the doNoExecuteTaintingPass function,
   662  //   - update the PodReady condition Pods according to the state of the Node Ready condition.
   663  func (nc *Controller) monitorNodeHealth(ctx context.Context) error {
   664  	start := nc.now()
   665  	defer func() {
   666  		updateAllNodesHealthDuration.Observe(time.Since(start.Time).Seconds())
   667  	}()
   668  
   669  	// We are listing nodes from local cache as we can tolerate some small delays
   670  	// comparing to state from etcd and there is eventual consistency anyway.
   671  	nodes, err := nc.nodeLister.List(labels.Everything())
   672  	if err != nil {
   673  		return err
   674  	}
   675  	added, deleted, newZoneRepresentatives := nc.classifyNodes(nodes)
   676  	logger := klog.FromContext(ctx)
   677  	for i := range newZoneRepresentatives {
   678  		nc.addPodEvictorForNewZone(logger, newZoneRepresentatives[i])
   679  	}
   680  	for i := range added {
   681  		logger.V(1).Info("Controller observed a new Node", "node", klog.KRef("", added[i].Name))
   682  		controllerutil.RecordNodeEvent(ctx, nc.recorder, added[i].Name, string(added[i].UID), v1.EventTypeNormal, "RegisteredNode", fmt.Sprintf("Registered Node %v in Controller", added[i].Name))
   683  		nc.knownNodeSet[added[i].Name] = added[i]
   684  		nc.addPodEvictorForNewZone(logger, added[i])
   685  		nc.markNodeAsReachable(ctx, added[i])
   686  	}
   687  
   688  	for i := range deleted {
   689  		logger.V(1).Info("Controller observed a Node deletion", "node", klog.KRef("", deleted[i].Name))
   690  		controllerutil.RecordNodeEvent(ctx, nc.recorder, deleted[i].Name, string(deleted[i].UID), v1.EventTypeNormal, "RemovingNode", fmt.Sprintf("Removing Node %v from Controller", deleted[i].Name))
   691  		delete(nc.knownNodeSet, deleted[i].Name)
   692  	}
   693  
   694  	var zoneToNodeConditionsLock sync.Mutex
   695  	zoneToNodeConditions := map[string][]*v1.NodeCondition{}
   696  	updateNodeFunc := func(piece int) {
   697  		start := nc.now()
   698  		defer func() {
   699  			updateNodeHealthDuration.Observe(time.Since(start.Time).Seconds())
   700  		}()
   701  
   702  		var observedReadyCondition v1.NodeCondition
   703  		var currentReadyCondition *v1.NodeCondition
   704  		node := nodes[piece].DeepCopy()
   705  
   706  		if err := wait.PollImmediate(retrySleepTime, retrySleepTime*scheduler.NodeHealthUpdateRetry, func() (bool, error) {
   707  			var err error
   708  			_, observedReadyCondition, currentReadyCondition, err = nc.tryUpdateNodeHealth(ctx, node)
   709  			if err == nil {
   710  				return true, nil
   711  			}
   712  			name := node.Name
   713  			node, err = nc.kubeClient.CoreV1().Nodes().Get(ctx, name, metav1.GetOptions{})
   714  			if err != nil {
   715  				logger.Error(nil, "Failed while getting a Node to retry updating node health. Probably Node was deleted", "node", klog.KRef("", name))
   716  				return false, err
   717  			}
   718  			return false, nil
   719  		}); err != nil {
   720  			logger.Error(err, "Update health of Node from Controller error, Skipping - no pods will be evicted", "node", klog.KObj(node))
   721  			return
   722  		}
   723  
   724  		// Some nodes may be excluded from disruption checking
   725  		if !isNodeExcludedFromDisruptionChecks(node) {
   726  			zoneToNodeConditionsLock.Lock()
   727  			zoneToNodeConditions[nodetopology.GetZoneKey(node)] = append(zoneToNodeConditions[nodetopology.GetZoneKey(node)], currentReadyCondition)
   728  			zoneToNodeConditionsLock.Unlock()
   729  		}
   730  
   731  		if currentReadyCondition != nil {
   732  			pods, err := nc.getPodsAssignedToNode(node.Name)
   733  			if err != nil {
   734  				utilruntime.HandleError(fmt.Errorf("unable to list pods of node %v: %v", node.Name, err))
   735  				if currentReadyCondition.Status != v1.ConditionTrue && observedReadyCondition.Status == v1.ConditionTrue {
   736  					// If error happened during node status transition (Ready -> NotReady)
   737  					// we need to mark node for retry to force MarkPodsNotReady execution
   738  					// in the next iteration.
   739  					nc.nodesToRetry.Store(node.Name, struct{}{})
   740  				}
   741  				return
   742  			}
   743  			nc.processTaintBaseEviction(ctx, node, &observedReadyCondition)
   744  
   745  			_, needsRetry := nc.nodesToRetry.Load(node.Name)
   746  			switch {
   747  			case currentReadyCondition.Status != v1.ConditionTrue && observedReadyCondition.Status == v1.ConditionTrue:
   748  				// Report node event only once when status changed.
   749  				controllerutil.RecordNodeStatusChange(logger, nc.recorder, node, "NodeNotReady")
   750  				fallthrough
   751  			case needsRetry && observedReadyCondition.Status != v1.ConditionTrue:
   752  				if err = controllerutil.MarkPodsNotReady(ctx, nc.kubeClient, nc.recorder, pods, node.Name); err != nil {
   753  					utilruntime.HandleError(fmt.Errorf("unable to mark all pods NotReady on node %v: %v; queuing for retry", node.Name, err))
   754  					nc.nodesToRetry.Store(node.Name, struct{}{})
   755  					return
   756  				}
   757  			}
   758  		}
   759  		nc.nodesToRetry.Delete(node.Name)
   760  	}
   761  
   762  	// Marking the pods not ready on a node requires looping over them and
   763  	// updating each pod's status one at a time. This is performed serially, and
   764  	// can take a while if we're processing each node serially as well. So we
   765  	// process them with bounded concurrency instead, since most of the time is
   766  	// spent waiting on io.
   767  	workqueue.ParallelizeUntil(ctx, nc.nodeUpdateWorkerSize, len(nodes), updateNodeFunc)
   768  
   769  	nc.handleDisruption(ctx, zoneToNodeConditions, nodes)
   770  
   771  	return nil
   772  }
   773  
   774  func (nc *Controller) processTaintBaseEviction(ctx context.Context, node *v1.Node, observedReadyCondition *v1.NodeCondition) {
   775  	decisionTimestamp := nc.now()
   776  	// Check eviction timeout against decisionTimestamp
   777  	logger := klog.FromContext(ctx)
   778  	switch observedReadyCondition.Status {
   779  	case v1.ConditionFalse:
   780  		// We want to update the taint straight away if Node is already tainted with the UnreachableTaint
   781  		if taintutils.TaintExists(node.Spec.Taints, UnreachableTaintTemplate) {
   782  			taintToAdd := *NotReadyTaintTemplate
   783  			if !controllerutil.SwapNodeControllerTaint(ctx, nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{UnreachableTaintTemplate}, node) {
   784  				logger.Error(nil, "Failed to instantly swap UnreachableTaint to NotReadyTaint. Will try again in the next cycle")
   785  			}
   786  		} else if nc.markNodeForTainting(node, v1.ConditionFalse) {
   787  			logger.V(2).Info("Node is NotReady. Adding it to the Taint queue", "node", klog.KObj(node), "timeStamp", decisionTimestamp)
   788  		}
   789  	case v1.ConditionUnknown:
   790  		// We want to update the taint straight away if Node is already tainted with the UnreachableTaint
   791  		if taintutils.TaintExists(node.Spec.Taints, NotReadyTaintTemplate) {
   792  			taintToAdd := *UnreachableTaintTemplate
   793  			if !controllerutil.SwapNodeControllerTaint(ctx, nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{NotReadyTaintTemplate}, node) {
   794  				logger.Error(nil, "Failed to instantly swap NotReadyTaint to UnreachableTaint. Will try again in the next cycle")
   795  			}
   796  		} else if nc.markNodeForTainting(node, v1.ConditionUnknown) {
   797  			logger.V(2).Info("Node is unresponsive. Adding it to the Taint queue", "node", klog.KObj(node), "timeStamp", decisionTimestamp)
   798  		}
   799  	case v1.ConditionTrue:
   800  		removed, err := nc.markNodeAsReachable(ctx, node)
   801  		if err != nil {
   802  			logger.Error(nil, "Failed to remove taints from node. Will retry in next iteration", "node", klog.KObj(node))
   803  		}
   804  		if removed {
   805  			logger.V(2).Info("Node is healthy again, removing all taints", "node", klog.KObj(node))
   806  		}
   807  	}
   808  }
   809  
   810  // labelNodeDisruptionExclusion is a label on nodes that controls whether they are
   811  // excluded from being considered for disruption checks by the node controller.
   812  const labelNodeDisruptionExclusion = "node.kubernetes.io/exclude-disruption"
   813  
   814  func isNodeExcludedFromDisruptionChecks(node *v1.Node) bool {
   815  	if _, ok := node.Labels[labelNodeDisruptionExclusion]; ok {
   816  		return true
   817  	}
   818  	return false
   819  }
   820  
   821  // tryUpdateNodeHealth checks a given node's conditions and tries to update it. Returns grace period to
   822  // which given node is entitled, state of current and last observed Ready Condition, and an error if it occurred.
   823  func (nc *Controller) tryUpdateNodeHealth(ctx context.Context, node *v1.Node) (time.Duration, v1.NodeCondition, *v1.NodeCondition, error) {
   824  	nodeHealth := nc.nodeHealthMap.getDeepCopy(node.Name)
   825  	defer func() {
   826  		nc.nodeHealthMap.set(node.Name, nodeHealth)
   827  	}()
   828  
   829  	var gracePeriod time.Duration
   830  	var observedReadyCondition v1.NodeCondition
   831  	_, currentReadyCondition := controllerutil.GetNodeCondition(&node.Status, v1.NodeReady)
   832  	if currentReadyCondition == nil {
   833  		// If ready condition is nil, then kubelet (or nodecontroller) never posted node status.
   834  		// A fake ready condition is created, where LastHeartbeatTime and LastTransitionTime is set
   835  		// to node.CreationTimestamp to avoid handle the corner case.
   836  		observedReadyCondition = v1.NodeCondition{
   837  			Type:               v1.NodeReady,
   838  			Status:             v1.ConditionUnknown,
   839  			LastHeartbeatTime:  node.CreationTimestamp,
   840  			LastTransitionTime: node.CreationTimestamp,
   841  		}
   842  		gracePeriod = nc.nodeStartupGracePeriod
   843  		if nodeHealth != nil {
   844  			nodeHealth.status = &node.Status
   845  		} else {
   846  			nodeHealth = &nodeHealthData{
   847  				status:                   &node.Status,
   848  				probeTimestamp:           node.CreationTimestamp,
   849  				readyTransitionTimestamp: node.CreationTimestamp,
   850  			}
   851  		}
   852  	} else {
   853  		// If ready condition is not nil, make a copy of it, since we may modify it in place later.
   854  		observedReadyCondition = *currentReadyCondition
   855  		gracePeriod = nc.nodeMonitorGracePeriod
   856  	}
   857  	// There are following cases to check:
   858  	// - both saved and new status have no Ready Condition set - we leave everything as it is,
   859  	// - saved status have no Ready Condition, but current one does - Controller was restarted with Node data already present in etcd,
   860  	// - saved status have some Ready Condition, but current one does not - it's an error, but we fill it up because that's probably a good thing to do,
   861  	// - both saved and current statuses have Ready Conditions and they have the same LastProbeTime - nothing happened on that Node, it may be
   862  	//   unresponsive, so we leave it as it is,
   863  	// - both saved and current statuses have Ready Conditions, they have different LastProbeTimes, but the same Ready Condition State -
   864  	//   everything's in order, no transition occurred, we update only probeTimestamp,
   865  	// - both saved and current statuses have Ready Conditions, different LastProbeTimes and different Ready Condition State -
   866  	//   Ready Condition changed it state since we last seen it, so we update both probeTimestamp and readyTransitionTimestamp.
   867  	// TODO: things to consider:
   868  	//   - if 'LastProbeTime' have gone back in time its probably an error, currently we ignore it,
   869  	//   - currently only correct Ready State transition outside of Node Controller is marking it ready by Kubelet, we don't check
   870  	//     if that's the case, but it does not seem necessary.
   871  	var savedCondition *v1.NodeCondition
   872  	var savedLease *coordv1.Lease
   873  	if nodeHealth != nil {
   874  		_, savedCondition = controllerutil.GetNodeCondition(nodeHealth.status, v1.NodeReady)
   875  		savedLease = nodeHealth.lease
   876  	}
   877  	logger := klog.FromContext(ctx)
   878  	if nodeHealth == nil {
   879  		logger.Info("Missing timestamp for Node. Assuming now as a timestamp", "node", klog.KObj(node))
   880  		nodeHealth = &nodeHealthData{
   881  			status:                   &node.Status,
   882  			probeTimestamp:           nc.now(),
   883  			readyTransitionTimestamp: nc.now(),
   884  		}
   885  	} else if savedCondition == nil && currentReadyCondition != nil {
   886  		logger.V(1).Info("Creating timestamp entry for newly observed Node", "node", klog.KObj(node))
   887  		nodeHealth = &nodeHealthData{
   888  			status:                   &node.Status,
   889  			probeTimestamp:           nc.now(),
   890  			readyTransitionTimestamp: nc.now(),
   891  		}
   892  	} else if savedCondition != nil && currentReadyCondition == nil {
   893  		logger.Error(nil, "ReadyCondition was removed from Status of Node", "node", klog.KObj(node))
   894  		// TODO: figure out what to do in this case. For now we do the same thing as above.
   895  		nodeHealth = &nodeHealthData{
   896  			status:                   &node.Status,
   897  			probeTimestamp:           nc.now(),
   898  			readyTransitionTimestamp: nc.now(),
   899  		}
   900  	} else if savedCondition != nil && currentReadyCondition != nil && savedCondition.LastHeartbeatTime != currentReadyCondition.LastHeartbeatTime {
   901  		var transitionTime metav1.Time
   902  		// If ReadyCondition changed since the last time we checked, we update the transition timestamp to "now",
   903  		// otherwise we leave it as it is.
   904  		if savedCondition.LastTransitionTime != currentReadyCondition.LastTransitionTime {
   905  			logger.V(3).Info("ReadyCondition for Node transitioned from savedCondition to currentReadyCondition", "node", klog.KObj(node), "savedCondition", savedCondition, "currentReadyCondition", currentReadyCondition)
   906  			transitionTime = nc.now()
   907  		} else {
   908  			transitionTime = nodeHealth.readyTransitionTimestamp
   909  		}
   910  		if loggerV := logger.V(5); loggerV.Enabled() {
   911  			loggerV.Info("Node ReadyCondition updated. Updating timestamp", "node", klog.KObj(node), "nodeHealthStatus", nodeHealth.status, "nodeStatus", node.Status)
   912  		} else {
   913  			logger.V(3).Info("Node ReadyCondition updated. Updating timestamp", "node", klog.KObj(node))
   914  		}
   915  		nodeHealth = &nodeHealthData{
   916  			status:                   &node.Status,
   917  			probeTimestamp:           nc.now(),
   918  			readyTransitionTimestamp: transitionTime,
   919  		}
   920  	}
   921  	// Always update the probe time if node lease is renewed.
   922  	// Note: If kubelet never posted the node status, but continues renewing the
   923  	// heartbeat leases, the node controller will assume the node is healthy and
   924  	// take no action.
   925  	observedLease, _ := nc.leaseLister.Leases(v1.NamespaceNodeLease).Get(node.Name)
   926  	if observedLease != nil && (savedLease == nil || savedLease.Spec.RenewTime.Before(observedLease.Spec.RenewTime)) {
   927  		nodeHealth.lease = observedLease
   928  		nodeHealth.probeTimestamp = nc.now()
   929  	}
   930  
   931  	if nc.now().After(nodeHealth.probeTimestamp.Add(gracePeriod)) {
   932  		// NodeReady condition or lease was last set longer ago than gracePeriod, so
   933  		// update it to Unknown (regardless of its current value) in the master.
   934  
   935  		nodeConditionTypes := []v1.NodeConditionType{
   936  			v1.NodeReady,
   937  			v1.NodeMemoryPressure,
   938  			v1.NodeDiskPressure,
   939  			v1.NodePIDPressure,
   940  			// We don't change 'NodeNetworkUnavailable' condition, as it's managed on a control plane level.
   941  			// v1.NodeNetworkUnavailable,
   942  		}
   943  
   944  		nowTimestamp := nc.now()
   945  		for _, nodeConditionType := range nodeConditionTypes {
   946  			_, currentCondition := controllerutil.GetNodeCondition(&node.Status, nodeConditionType)
   947  			if currentCondition == nil {
   948  				logger.V(2).Info("Condition of node was never updated by kubelet", "nodeConditionType", nodeConditionType, "node", klog.KObj(node))
   949  				node.Status.Conditions = append(node.Status.Conditions, v1.NodeCondition{
   950  					Type:               nodeConditionType,
   951  					Status:             v1.ConditionUnknown,
   952  					Reason:             "NodeStatusNeverUpdated",
   953  					Message:            "Kubelet never posted node status.",
   954  					LastHeartbeatTime:  node.CreationTimestamp,
   955  					LastTransitionTime: nowTimestamp,
   956  				})
   957  			} else {
   958  				logger.V(2).Info("Node hasn't been updated",
   959  					"node", klog.KObj(node), "duration", nc.now().Time.Sub(nodeHealth.probeTimestamp.Time), "nodeConditionType", nodeConditionType, "currentCondition", currentCondition)
   960  				if currentCondition.Status != v1.ConditionUnknown {
   961  					currentCondition.Status = v1.ConditionUnknown
   962  					currentCondition.Reason = "NodeStatusUnknown"
   963  					currentCondition.Message = "Kubelet stopped posting node status."
   964  					currentCondition.LastTransitionTime = nowTimestamp
   965  				}
   966  			}
   967  		}
   968  		// We need to update currentReadyCondition due to its value potentially changed.
   969  		_, currentReadyCondition = controllerutil.GetNodeCondition(&node.Status, v1.NodeReady)
   970  
   971  		if !apiequality.Semantic.DeepEqual(currentReadyCondition, &observedReadyCondition) {
   972  			if _, err := nc.kubeClient.CoreV1().Nodes().UpdateStatus(ctx, node, metav1.UpdateOptions{}); err != nil {
   973  				logger.Error(err, "Error updating node", "node", klog.KObj(node))
   974  				return gracePeriod, observedReadyCondition, currentReadyCondition, err
   975  			}
   976  			nodeHealth = &nodeHealthData{
   977  				status:                   &node.Status,
   978  				probeTimestamp:           nodeHealth.probeTimestamp,
   979  				readyTransitionTimestamp: nc.now(),
   980  				lease:                    observedLease,
   981  			}
   982  			return gracePeriod, observedReadyCondition, currentReadyCondition, nil
   983  		}
   984  	}
   985  
   986  	return gracePeriod, observedReadyCondition, currentReadyCondition, nil
   987  }
   988  
   989  func (nc *Controller) handleDisruption(ctx context.Context, zoneToNodeConditions map[string][]*v1.NodeCondition, nodes []*v1.Node) {
   990  	newZoneStates := map[string]ZoneState{}
   991  	allAreFullyDisrupted := true
   992  	logger := klog.FromContext(ctx)
   993  	for k, v := range zoneToNodeConditions {
   994  		zoneSize.WithLabelValues(k).Set(float64(len(v)))
   995  		unhealthy, newState := nc.computeZoneStateFunc(v)
   996  		zoneHealth.WithLabelValues(k).Set(float64(100*(len(v)-unhealthy)) / float64(len(v)))
   997  		unhealthyNodes.WithLabelValues(k).Set(float64(unhealthy))
   998  		if newState != stateFullDisruption {
   999  			allAreFullyDisrupted = false
  1000  		}
  1001  		newZoneStates[k] = newState
  1002  		if _, had := nc.zoneStates[k]; !had {
  1003  			logger.Error(nil, "Setting initial state for unseen zone", "zone", k)
  1004  			nc.zoneStates[k] = stateInitial
  1005  		}
  1006  	}
  1007  
  1008  	allWasFullyDisrupted := true
  1009  	for k, v := range nc.zoneStates {
  1010  		if _, have := zoneToNodeConditions[k]; !have {
  1011  			zoneSize.WithLabelValues(k).Set(0)
  1012  			zoneHealth.WithLabelValues(k).Set(100)
  1013  			unhealthyNodes.WithLabelValues(k).Set(0)
  1014  			delete(nc.zoneStates, k)
  1015  			continue
  1016  		}
  1017  		if v != stateFullDisruption {
  1018  			allWasFullyDisrupted = false
  1019  			break
  1020  		}
  1021  	}
  1022  
  1023  	// At least one node was responding in previous pass or in the current pass. Semantics is as follows:
  1024  	// - if the new state is "partialDisruption" we call a user defined function that returns a new limiter to use,
  1025  	// - if the new state is "normal" we resume normal operation (go back to default limiter settings),
  1026  	// - if new state is "fullDisruption" we restore normal eviction rate,
  1027  	//   - unless all zones in the cluster are in "fullDisruption" - in that case we stop all evictions.
  1028  	if !allAreFullyDisrupted || !allWasFullyDisrupted {
  1029  		// We're switching to full disruption mode
  1030  		if allAreFullyDisrupted {
  1031  			logger.Info("Controller detected that all Nodes are not-Ready. Entering master disruption mode")
  1032  			for i := range nodes {
  1033  				_, err := nc.markNodeAsReachable(ctx, nodes[i])
  1034  				if err != nil {
  1035  					logger.Error(nil, "Failed to remove taints from Node", "node", klog.KObj(nodes[i]))
  1036  				}
  1037  			}
  1038  			// We stop all evictions.
  1039  			for k := range nc.zoneStates {
  1040  				nc.zoneNoExecuteTainter[k].SwapLimiter(0)
  1041  			}
  1042  			for k := range nc.zoneStates {
  1043  				nc.zoneStates[k] = stateFullDisruption
  1044  			}
  1045  			// All rate limiters are updated, so we can return early here.
  1046  			return
  1047  		}
  1048  		// We're exiting full disruption mode
  1049  		if allWasFullyDisrupted {
  1050  			logger.Info("Controller detected that some Nodes are Ready. Exiting master disruption mode")
  1051  			// When exiting disruption mode update probe timestamps on all Nodes.
  1052  			now := nc.now()
  1053  			for i := range nodes {
  1054  				v := nc.nodeHealthMap.getDeepCopy(nodes[i].Name)
  1055  				v.probeTimestamp = now
  1056  				v.readyTransitionTimestamp = now
  1057  				nc.nodeHealthMap.set(nodes[i].Name, v)
  1058  			}
  1059  			// We reset all rate limiters to settings appropriate for the given state.
  1060  			for k := range nc.zoneStates {
  1061  				nc.setLimiterInZone(k, len(zoneToNodeConditions[k]), newZoneStates[k])
  1062  				nc.zoneStates[k] = newZoneStates[k]
  1063  			}
  1064  			return
  1065  		}
  1066  		// We know that there's at least one not-fully disrupted so,
  1067  		// we can use default behavior for rate limiters
  1068  		for k, v := range nc.zoneStates {
  1069  			newState := newZoneStates[k]
  1070  			if v == newState {
  1071  				continue
  1072  			}
  1073  			logger.Info("Controller detected that zone is now in new state", "zone", k, "newState", newState)
  1074  			nc.setLimiterInZone(k, len(zoneToNodeConditions[k]), newState)
  1075  			nc.zoneStates[k] = newState
  1076  		}
  1077  	}
  1078  }
  1079  
  1080  func (nc *Controller) podUpdated(oldPod, newPod *v1.Pod) {
  1081  	if newPod == nil {
  1082  		return
  1083  	}
  1084  	if len(newPod.Spec.NodeName) != 0 && (oldPod == nil || newPod.Spec.NodeName != oldPod.Spec.NodeName) {
  1085  		podItem := podUpdateItem{newPod.Namespace, newPod.Name}
  1086  		nc.podUpdateQueue.Add(podItem)
  1087  	}
  1088  }
  1089  
  1090  func (nc *Controller) doPodProcessingWorker(ctx context.Context) {
  1091  	for {
  1092  		obj, shutdown := nc.podUpdateQueue.Get()
  1093  		// "podUpdateQueue" will be shutdown when "stopCh" closed;
  1094  		// we do not need to re-check "stopCh" again.
  1095  		if shutdown {
  1096  			return
  1097  		}
  1098  
  1099  		podItem := obj.(podUpdateItem)
  1100  		nc.processPod(ctx, podItem)
  1101  	}
  1102  }
  1103  
  1104  // processPod is processing events of assigning pods to nodes. In particular:
  1105  // 1. for NodeReady=true node, taint eviction for this pod will be cancelled
  1106  // 2. for NodeReady=false or unknown node, taint eviction of pod will happen and pod will be marked as not ready
  1107  // 3. if node doesn't exist in cache, it will be skipped.
  1108  func (nc *Controller) processPod(ctx context.Context, podItem podUpdateItem) {
  1109  	defer nc.podUpdateQueue.Done(podItem)
  1110  	pod, err := nc.podLister.Pods(podItem.namespace).Get(podItem.name)
  1111  	logger := klog.FromContext(ctx)
  1112  	if err != nil {
  1113  		if apierrors.IsNotFound(err) {
  1114  			// If the pod was deleted, there is no need to requeue.
  1115  			return
  1116  		}
  1117  		logger.Info("Failed to read pod", "pod", klog.KRef(podItem.namespace, podItem.name), "err", err)
  1118  		nc.podUpdateQueue.AddRateLimited(podItem)
  1119  		return
  1120  	}
  1121  
  1122  	nodeName := pod.Spec.NodeName
  1123  
  1124  	nodeHealth := nc.nodeHealthMap.getDeepCopy(nodeName)
  1125  	if nodeHealth == nil {
  1126  		// Node data is not gathered yet or node has been removed in the meantime.
  1127  		return
  1128  	}
  1129  
  1130  	_, err = nc.nodeLister.Get(nodeName)
  1131  	if err != nil {
  1132  		logger.Info("Failed to read node", "node", klog.KRef("", nodeName), "err", err)
  1133  		nc.podUpdateQueue.AddRateLimited(podItem)
  1134  		return
  1135  	}
  1136  
  1137  	_, currentReadyCondition := controllerutil.GetNodeCondition(nodeHealth.status, v1.NodeReady)
  1138  	if currentReadyCondition == nil {
  1139  		// Lack of NodeReady condition may only happen after node addition (or if it will be maliciously deleted).
  1140  		// In both cases, the pod will be handled correctly (evicted if needed) during processing
  1141  		// of the next node update event.
  1142  		return
  1143  	}
  1144  
  1145  	pods := []*v1.Pod{pod}
  1146  	if currentReadyCondition.Status != v1.ConditionTrue {
  1147  		if err := controllerutil.MarkPodsNotReady(ctx, nc.kubeClient, nc.recorder, pods, nodeName); err != nil {
  1148  			logger.Info("Unable to mark pod NotReady on node", "pod", klog.KRef(podItem.namespace, podItem.name), "node", klog.KRef("", nodeName), "err", err)
  1149  			nc.podUpdateQueue.AddRateLimited(podItem)
  1150  		}
  1151  	}
  1152  }
  1153  
  1154  func (nc *Controller) setLimiterInZone(zone string, zoneSize int, state ZoneState) {
  1155  	switch state {
  1156  	case stateNormal:
  1157  		nc.zoneNoExecuteTainter[zone].SwapLimiter(nc.evictionLimiterQPS)
  1158  	case statePartialDisruption:
  1159  		nc.zoneNoExecuteTainter[zone].SwapLimiter(
  1160  			nc.enterPartialDisruptionFunc(zoneSize))
  1161  	case stateFullDisruption:
  1162  		nc.zoneNoExecuteTainter[zone].SwapLimiter(
  1163  			nc.enterFullDisruptionFunc(zoneSize))
  1164  	}
  1165  }
  1166  
  1167  // classifyNodes classifies the allNodes to three categories:
  1168  //  1. added: the nodes that in 'allNodes', but not in 'knownNodeSet'
  1169  //  2. deleted: the nodes that in 'knownNodeSet', but not in 'allNodes'
  1170  //  3. newZoneRepresentatives: the nodes that in both 'knownNodeSet' and 'allNodes', but no zone states
  1171  func (nc *Controller) classifyNodes(allNodes []*v1.Node) (added, deleted, newZoneRepresentatives []*v1.Node) {
  1172  	for i := range allNodes {
  1173  		if _, has := nc.knownNodeSet[allNodes[i].Name]; !has {
  1174  			added = append(added, allNodes[i])
  1175  		} else {
  1176  			// Currently, we only consider new zone as updated.
  1177  			zone := nodetopology.GetZoneKey(allNodes[i])
  1178  			if _, found := nc.zoneStates[zone]; !found {
  1179  				newZoneRepresentatives = append(newZoneRepresentatives, allNodes[i])
  1180  			}
  1181  		}
  1182  	}
  1183  
  1184  	// If there's a difference between lengths of known Nodes and observed nodes
  1185  	// we must have removed some Node.
  1186  	if len(nc.knownNodeSet)+len(added) != len(allNodes) {
  1187  		knowSetCopy := map[string]*v1.Node{}
  1188  		for k, v := range nc.knownNodeSet {
  1189  			knowSetCopy[k] = v
  1190  		}
  1191  		for i := range allNodes {
  1192  			delete(knowSetCopy, allNodes[i].Name)
  1193  		}
  1194  		for i := range knowSetCopy {
  1195  			deleted = append(deleted, knowSetCopy[i])
  1196  		}
  1197  	}
  1198  	return
  1199  }
  1200  
  1201  // HealthyQPSFunc returns the default value for cluster eviction rate - we take
  1202  // nodeNum for consistency with ReducedQPSFunc.
  1203  func (nc *Controller) HealthyQPSFunc(nodeNum int) float32 {
  1204  	return nc.evictionLimiterQPS
  1205  }
  1206  
  1207  // ReducedQPSFunc returns the QPS for when the cluster is large make
  1208  // evictions slower, if they're small stop evictions altogether.
  1209  func (nc *Controller) ReducedQPSFunc(nodeNum int) float32 {
  1210  	if int32(nodeNum) > nc.largeClusterThreshold {
  1211  		return nc.secondaryEvictionLimiterQPS
  1212  	}
  1213  	return 0
  1214  }
  1215  
  1216  // addPodEvictorForNewZone checks if new zone appeared, and if so add new evictor.
  1217  func (nc *Controller) addPodEvictorForNewZone(logger klog.Logger, node *v1.Node) {
  1218  	nc.evictorLock.Lock()
  1219  	defer nc.evictorLock.Unlock()
  1220  	zone := nodetopology.GetZoneKey(node)
  1221  	if _, found := nc.zoneStates[zone]; !found {
  1222  		nc.zoneStates[zone] = stateInitial
  1223  		nc.zoneNoExecuteTainter[zone] =
  1224  			scheduler.NewRateLimitedTimedQueue(
  1225  				flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, scheduler.EvictionRateLimiterBurst))
  1226  		// Init the metric for the new zone.
  1227  		logger.Info("Initializing eviction metric for zone", "zone", zone)
  1228  		evictionsTotal.WithLabelValues(zone).Add(0)
  1229  	}
  1230  }
  1231  
  1232  func (nc *Controller) markNodeForTainting(node *v1.Node, status v1.ConditionStatus) bool {
  1233  	nc.evictorLock.Lock()
  1234  	defer nc.evictorLock.Unlock()
  1235  	if status == v1.ConditionFalse {
  1236  		if !taintutils.TaintExists(node.Spec.Taints, NotReadyTaintTemplate) {
  1237  			nc.zoneNoExecuteTainter[nodetopology.GetZoneKey(node)].Remove(node.Name)
  1238  		}
  1239  	}
  1240  
  1241  	if status == v1.ConditionUnknown {
  1242  		if !taintutils.TaintExists(node.Spec.Taints, UnreachableTaintTemplate) {
  1243  			nc.zoneNoExecuteTainter[nodetopology.GetZoneKey(node)].Remove(node.Name)
  1244  		}
  1245  	}
  1246  
  1247  	return nc.zoneNoExecuteTainter[nodetopology.GetZoneKey(node)].Add(node.Name, string(node.UID))
  1248  }
  1249  
  1250  func (nc *Controller) markNodeAsReachable(ctx context.Context, node *v1.Node) (bool, error) {
  1251  	err := controller.RemoveTaintOffNode(ctx, nc.kubeClient, node.Name, node, UnreachableTaintTemplate)
  1252  	logger := klog.FromContext(ctx)
  1253  	if err != nil {
  1254  		logger.Error(err, "Failed to remove taint from node", "node", klog.KObj(node))
  1255  		return false, err
  1256  	}
  1257  	err = controller.RemoveTaintOffNode(ctx, nc.kubeClient, node.Name, node, NotReadyTaintTemplate)
  1258  	if err != nil {
  1259  		logger.Error(err, "Failed to remove taint from node", "node", klog.KObj(node))
  1260  		return false, err
  1261  	}
  1262  	nc.evictorLock.Lock()
  1263  	defer nc.evictorLock.Unlock()
  1264  
  1265  	return nc.zoneNoExecuteTainter[nodetopology.GetZoneKey(node)].Remove(node.Name), nil
  1266  }
  1267  
  1268  // ComputeZoneState returns a slice of NodeReadyConditions for all Nodes in a given zone.
  1269  // The zone is considered:
  1270  // - fullyDisrupted if there're no Ready Nodes,
  1271  // - partiallyDisrupted if at least than nc.unhealthyZoneThreshold percent of Nodes are not Ready,
  1272  // - normal otherwise
  1273  func (nc *Controller) ComputeZoneState(nodeReadyConditions []*v1.NodeCondition) (int, ZoneState) {
  1274  	readyNodes := 0
  1275  	notReadyNodes := 0
  1276  	for i := range nodeReadyConditions {
  1277  		if nodeReadyConditions[i] != nil && nodeReadyConditions[i].Status == v1.ConditionTrue {
  1278  			readyNodes++
  1279  		} else {
  1280  			notReadyNodes++
  1281  		}
  1282  	}
  1283  	switch {
  1284  	case readyNodes == 0 && notReadyNodes > 0:
  1285  		return notReadyNodes, stateFullDisruption
  1286  	case notReadyNodes > 2 && float32(notReadyNodes)/float32(notReadyNodes+readyNodes) >= nc.unhealthyZoneThreshold:
  1287  		return notReadyNodes, statePartialDisruption
  1288  	default:
  1289  		return notReadyNodes, stateNormal
  1290  	}
  1291  }
  1292  
  1293  // reconcileNodeLabels reconciles node labels.
  1294  func (nc *Controller) reconcileNodeLabels(ctx context.Context, nodeName string) error {
  1295  	node, err := nc.nodeLister.Get(nodeName)
  1296  	if err != nil {
  1297  		// If node not found, just ignore it.
  1298  		if apierrors.IsNotFound(err) {
  1299  			return nil
  1300  		}
  1301  		return err
  1302  	}
  1303  
  1304  	if node.Labels == nil {
  1305  		// Nothing to reconcile.
  1306  		return nil
  1307  	}
  1308  
  1309  	labelsToUpdate := map[string]string{}
  1310  	for _, r := range labelReconcileInfo {
  1311  		primaryValue, primaryExists := node.Labels[r.primaryKey]
  1312  		secondaryValue, secondaryExists := node.Labels[r.secondaryKey]
  1313  
  1314  		if !primaryExists {
  1315  			// The primary label key does not exist. This should not happen
  1316  			// within our supported version skew range, when no external
  1317  			// components/factors modifying the node object. Ignore this case.
  1318  			continue
  1319  		}
  1320  		if secondaryExists && primaryValue != secondaryValue {
  1321  			// Secondary label exists, but not consistent with the primary
  1322  			// label. Need to reconcile.
  1323  			labelsToUpdate[r.secondaryKey] = primaryValue
  1324  
  1325  		} else if !secondaryExists && r.ensureSecondaryExists {
  1326  			// Apply secondary label based on primary label.
  1327  			labelsToUpdate[r.secondaryKey] = primaryValue
  1328  		}
  1329  	}
  1330  
  1331  	if len(labelsToUpdate) == 0 {
  1332  		return nil
  1333  	}
  1334  	if !controllerutil.AddOrUpdateLabelsOnNode(ctx, nc.kubeClient, labelsToUpdate, node) {
  1335  		return fmt.Errorf("failed update labels for node %+v", node)
  1336  	}
  1337  	return nil
  1338  }
  1339  

View as plain text