...

Source file src/k8s.io/kubernetes/pkg/controller/tainteviction/taint_eviction.go

Documentation: k8s.io/kubernetes/pkg/controller/tainteviction

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package tainteviction
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"hash/fnv"
    23  	"io"
    24  	"math"
    25  	"sync"
    26  	"time"
    27  
    28  	v1 "k8s.io/api/core/v1"
    29  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    30  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    31  	"k8s.io/apimachinery/pkg/types"
    32  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    33  	"k8s.io/apiserver/pkg/util/feature"
    34  	corev1informers "k8s.io/client-go/informers/core/v1"
    35  	clientset "k8s.io/client-go/kubernetes"
    36  	"k8s.io/client-go/kubernetes/scheme"
    37  	v1core "k8s.io/client-go/kubernetes/typed/core/v1"
    38  	corelisters "k8s.io/client-go/listers/core/v1"
    39  	"k8s.io/client-go/tools/cache"
    40  	"k8s.io/client-go/tools/record"
    41  	"k8s.io/client-go/util/workqueue"
    42  	"k8s.io/klog/v2"
    43  	apipod "k8s.io/kubernetes/pkg/api/v1/pod"
    44  	"k8s.io/kubernetes/pkg/apis/core/helper"
    45  	v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
    46  	"k8s.io/kubernetes/pkg/controller/tainteviction/metrics"
    47  	controllerutil "k8s.io/kubernetes/pkg/controller/util/node"
    48  	"k8s.io/kubernetes/pkg/features"
    49  	utilpod "k8s.io/kubernetes/pkg/util/pod"
    50  )
    51  
    52  const (
    53  	// TODO (k82cn): Figure out a reasonable number of workers/channels and propagate
    54  	// the number of workers up making it a parameter of Run() function.
    55  
    56  	// NodeUpdateChannelSize defines the size of channel for node update events.
    57  	NodeUpdateChannelSize = 10
    58  	// UpdateWorkerSize defines the size of workers for node update or/and pod update.
    59  	UpdateWorkerSize     = 8
    60  	podUpdateChannelSize = 1
    61  	retries              = 5
    62  )
    63  
    64  type nodeUpdateItem struct {
    65  	nodeName string
    66  }
    67  
    68  type podUpdateItem struct {
    69  	podName      string
    70  	podNamespace string
    71  	nodeName     string
    72  }
    73  
    74  func hash(val string, max int) int {
    75  	hasher := fnv.New32a()
    76  	io.WriteString(hasher, val)
    77  	return int(hasher.Sum32() % uint32(max))
    78  }
    79  
    80  // GetPodsByNodeNameFunc returns the list of pods assigned to the specified node.
    81  type GetPodsByNodeNameFunc func(nodeName string) ([]*v1.Pod, error)
    82  
    83  // Controller listens to Taint/Toleration changes and is responsible for removing Pods
    84  // from Nodes tainted with NoExecute Taints.
    85  type Controller struct {
    86  	name string
    87  
    88  	client                clientset.Interface
    89  	broadcaster           record.EventBroadcaster
    90  	recorder              record.EventRecorder
    91  	podLister             corelisters.PodLister
    92  	podListerSynced       cache.InformerSynced
    93  	nodeLister            corelisters.NodeLister
    94  	nodeListerSynced      cache.InformerSynced
    95  	getPodsAssignedToNode GetPodsByNodeNameFunc
    96  
    97  	taintEvictionQueue *TimedWorkerQueue
    98  	// keeps a map from nodeName to all noExecute taints on that Node
    99  	taintedNodesLock sync.Mutex
   100  	taintedNodes     map[string][]v1.Taint
   101  
   102  	nodeUpdateChannels []chan nodeUpdateItem
   103  	podUpdateChannels  []chan podUpdateItem
   104  
   105  	nodeUpdateQueue workqueue.Interface
   106  	podUpdateQueue  workqueue.Interface
   107  }
   108  
   109  func deletePodHandler(c clientset.Interface, emitEventFunc func(types.NamespacedName), controllerName string) func(ctx context.Context, fireAt time.Time, args *WorkArgs) error {
   110  	return func(ctx context.Context, fireAt time.Time, args *WorkArgs) error {
   111  		ns := args.NamespacedName.Namespace
   112  		name := args.NamespacedName.Name
   113  		klog.FromContext(ctx).Info("Deleting pod", "controller", controllerName, "pod", args.NamespacedName)
   114  		if emitEventFunc != nil {
   115  			emitEventFunc(args.NamespacedName)
   116  		}
   117  		var err error
   118  		for i := 0; i < retries; i++ {
   119  			err = addConditionAndDeletePod(ctx, c, name, ns)
   120  			if err == nil {
   121  				metrics.PodDeletionsTotal.Inc()
   122  				metrics.PodDeletionsLatency.Observe(float64(time.Since(fireAt) * time.Second))
   123  				break
   124  			}
   125  			time.Sleep(10 * time.Millisecond)
   126  		}
   127  		return err
   128  	}
   129  }
   130  
   131  func addConditionAndDeletePod(ctx context.Context, c clientset.Interface, name, ns string) (err error) {
   132  	if feature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) {
   133  		pod, err := c.CoreV1().Pods(ns).Get(ctx, name, metav1.GetOptions{})
   134  		if err != nil {
   135  			return err
   136  		}
   137  		newStatus := pod.Status.DeepCopy()
   138  		updated := apipod.UpdatePodCondition(newStatus, &v1.PodCondition{
   139  			Type:    v1.DisruptionTarget,
   140  			Status:  v1.ConditionTrue,
   141  			Reason:  "DeletionByTaintManager",
   142  			Message: "Taint manager: deleting due to NoExecute taint",
   143  		})
   144  		if updated {
   145  			if _, _, _, err := utilpod.PatchPodStatus(ctx, c, pod.Namespace, pod.Name, pod.UID, pod.Status, *newStatus); err != nil {
   146  				return err
   147  			}
   148  		}
   149  	}
   150  	return c.CoreV1().Pods(ns).Delete(ctx, name, metav1.DeleteOptions{})
   151  }
   152  
   153  func getNoExecuteTaints(taints []v1.Taint) []v1.Taint {
   154  	result := []v1.Taint{}
   155  	for i := range taints {
   156  		if taints[i].Effect == v1.TaintEffectNoExecute {
   157  			result = append(result, taints[i])
   158  		}
   159  	}
   160  	return result
   161  }
   162  
   163  // getMinTolerationTime returns minimal toleration time from the given slice, or -1 if it's infinite.
   164  func getMinTolerationTime(tolerations []v1.Toleration) time.Duration {
   165  	minTolerationTime := int64(math.MaxInt64)
   166  	if len(tolerations) == 0 {
   167  		return 0
   168  	}
   169  
   170  	for i := range tolerations {
   171  		if tolerations[i].TolerationSeconds != nil {
   172  			tolerationSeconds := *(tolerations[i].TolerationSeconds)
   173  			if tolerationSeconds <= 0 {
   174  				return 0
   175  			} else if tolerationSeconds < minTolerationTime {
   176  				minTolerationTime = tolerationSeconds
   177  			}
   178  		}
   179  	}
   180  
   181  	if minTolerationTime == int64(math.MaxInt64) {
   182  		return -1
   183  	}
   184  	return time.Duration(minTolerationTime) * time.Second
   185  }
   186  
   187  // New creates a new Controller that will use passed clientset to communicate with the API server.
   188  func New(ctx context.Context, c clientset.Interface, podInformer corev1informers.PodInformer, nodeInformer corev1informers.NodeInformer, controllerName string) (*Controller, error) {
   189  	logger := klog.FromContext(ctx)
   190  	metrics.Register()
   191  	eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
   192  	recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: controllerName})
   193  
   194  	podIndexer := podInformer.Informer().GetIndexer()
   195  
   196  	tm := &Controller{
   197  		name: controllerName,
   198  
   199  		client:           c,
   200  		broadcaster:      eventBroadcaster,
   201  		recorder:         recorder,
   202  		podLister:        podInformer.Lister(),
   203  		podListerSynced:  podInformer.Informer().HasSynced,
   204  		nodeLister:       nodeInformer.Lister(),
   205  		nodeListerSynced: nodeInformer.Informer().HasSynced,
   206  		getPodsAssignedToNode: func(nodeName string) ([]*v1.Pod, error) {
   207  			objs, err := podIndexer.ByIndex("spec.nodeName", nodeName)
   208  			if err != nil {
   209  				return nil, err
   210  			}
   211  			pods := make([]*v1.Pod, 0, len(objs))
   212  			for _, obj := range objs {
   213  				pod, ok := obj.(*v1.Pod)
   214  				if !ok {
   215  					continue
   216  				}
   217  				pods = append(pods, pod)
   218  			}
   219  			return pods, nil
   220  		},
   221  		taintedNodes: make(map[string][]v1.Taint),
   222  
   223  		nodeUpdateQueue: workqueue.NewWithConfig(workqueue.QueueConfig{Name: "noexec_taint_node"}),
   224  		podUpdateQueue:  workqueue.NewWithConfig(workqueue.QueueConfig{Name: "noexec_taint_pod"}),
   225  	}
   226  	tm.taintEvictionQueue = CreateWorkerQueue(deletePodHandler(c, tm.emitPodDeletionEvent, tm.name))
   227  
   228  	_, err := podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   229  		AddFunc: func(obj interface{}) {
   230  			pod := obj.(*v1.Pod)
   231  			tm.PodUpdated(nil, pod)
   232  		},
   233  		UpdateFunc: func(prev, obj interface{}) {
   234  			prevPod := prev.(*v1.Pod)
   235  			newPod := obj.(*v1.Pod)
   236  			tm.PodUpdated(prevPod, newPod)
   237  		},
   238  		DeleteFunc: func(obj interface{}) {
   239  			pod, isPod := obj.(*v1.Pod)
   240  			// We can get DeletedFinalStateUnknown instead of *v1.Pod here and we need to handle that correctly.
   241  			if !isPod {
   242  				deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
   243  				if !ok {
   244  					logger.Error(nil, "Received unexpected object", "object", obj)
   245  					return
   246  				}
   247  				pod, ok = deletedState.Obj.(*v1.Pod)
   248  				if !ok {
   249  					logger.Error(nil, "DeletedFinalStateUnknown contained non-Pod object", "object", deletedState.Obj)
   250  					return
   251  				}
   252  			}
   253  			tm.PodUpdated(pod, nil)
   254  		},
   255  	})
   256  	if err != nil {
   257  		return nil, fmt.Errorf("unable to add pod event handler: %w", err)
   258  	}
   259  
   260  	_, err = nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   261  		AddFunc: controllerutil.CreateAddNodeHandler(func(node *v1.Node) error {
   262  			tm.NodeUpdated(nil, node)
   263  			return nil
   264  		}),
   265  		UpdateFunc: controllerutil.CreateUpdateNodeHandler(func(oldNode, newNode *v1.Node) error {
   266  			tm.NodeUpdated(oldNode, newNode)
   267  			return nil
   268  		}),
   269  		DeleteFunc: controllerutil.CreateDeleteNodeHandler(logger, func(node *v1.Node) error {
   270  			tm.NodeUpdated(node, nil)
   271  			return nil
   272  		}),
   273  	})
   274  	if err != nil {
   275  		return nil, fmt.Errorf("unable to add node event handler: %w", err)
   276  	}
   277  
   278  	return tm, nil
   279  }
   280  
   281  // Run starts the controller which will run in loop until `stopCh` is closed.
   282  func (tc *Controller) Run(ctx context.Context) {
   283  	defer utilruntime.HandleCrash()
   284  	logger := klog.FromContext(ctx)
   285  	logger.Info("Starting", "controller", tc.name)
   286  	defer logger.Info("Shutting down controller", "controller", tc.name)
   287  
   288  	// Start events processing pipeline.
   289  	tc.broadcaster.StartStructuredLogging(3)
   290  	if tc.client != nil {
   291  		logger.Info("Sending events to api server")
   292  		tc.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: tc.client.CoreV1().Events("")})
   293  	} else {
   294  		logger.Error(nil, "kubeClient is nil", "controller", tc.name)
   295  		klog.FlushAndExit(klog.ExitFlushTimeout, 1)
   296  	}
   297  	defer tc.broadcaster.Shutdown()
   298  	defer tc.nodeUpdateQueue.ShutDown()
   299  	defer tc.podUpdateQueue.ShutDown()
   300  
   301  	// wait for the cache to be synced
   302  	if !cache.WaitForNamedCacheSync(tc.name, ctx.Done(), tc.podListerSynced, tc.nodeListerSynced) {
   303  		return
   304  	}
   305  
   306  	for i := 0; i < UpdateWorkerSize; i++ {
   307  		tc.nodeUpdateChannels = append(tc.nodeUpdateChannels, make(chan nodeUpdateItem, NodeUpdateChannelSize))
   308  		tc.podUpdateChannels = append(tc.podUpdateChannels, make(chan podUpdateItem, podUpdateChannelSize))
   309  	}
   310  
   311  	// Functions that are responsible for taking work items out of the workqueues and putting them
   312  	// into channels.
   313  	go func(stopCh <-chan struct{}) {
   314  		for {
   315  			item, shutdown := tc.nodeUpdateQueue.Get()
   316  			if shutdown {
   317  				break
   318  			}
   319  			nodeUpdate := item.(nodeUpdateItem)
   320  			hash := hash(nodeUpdate.nodeName, UpdateWorkerSize)
   321  			select {
   322  			case <-stopCh:
   323  				tc.nodeUpdateQueue.Done(item)
   324  				return
   325  			case tc.nodeUpdateChannels[hash] <- nodeUpdate:
   326  				// tc.nodeUpdateQueue.Done is called by the nodeUpdateChannels worker
   327  			}
   328  		}
   329  	}(ctx.Done())
   330  
   331  	go func(stopCh <-chan struct{}) {
   332  		for {
   333  			item, shutdown := tc.podUpdateQueue.Get()
   334  			if shutdown {
   335  				break
   336  			}
   337  			// The fact that pods are processed by the same worker as nodes is used to avoid races
   338  			// between node worker setting tc.taintedNodes and pod worker reading this to decide
   339  			// whether to delete pod.
   340  			// It's possible that even without this assumption this code is still correct.
   341  			podUpdate := item.(podUpdateItem)
   342  			hash := hash(podUpdate.nodeName, UpdateWorkerSize)
   343  			select {
   344  			case <-stopCh:
   345  				tc.podUpdateQueue.Done(item)
   346  				return
   347  			case tc.podUpdateChannels[hash] <- podUpdate:
   348  				// tc.podUpdateQueue.Done is called by the podUpdateChannels worker
   349  			}
   350  		}
   351  	}(ctx.Done())
   352  
   353  	wg := sync.WaitGroup{}
   354  	wg.Add(UpdateWorkerSize)
   355  	for i := 0; i < UpdateWorkerSize; i++ {
   356  		go tc.worker(ctx, i, wg.Done, ctx.Done())
   357  	}
   358  	wg.Wait()
   359  }
   360  
   361  func (tc *Controller) worker(ctx context.Context, worker int, done func(), stopCh <-chan struct{}) {
   362  	defer done()
   363  
   364  	// When processing events we want to prioritize Node updates over Pod updates,
   365  	// as NodeUpdates that interest the controller should be handled as soon as possible -
   366  	// we don't want user (or system) to wait until PodUpdate queue is drained before it can
   367  	// start evicting Pods from tainted Nodes.
   368  	for {
   369  		select {
   370  		case <-stopCh:
   371  			return
   372  		case nodeUpdate := <-tc.nodeUpdateChannels[worker]:
   373  			tc.handleNodeUpdate(ctx, nodeUpdate)
   374  			tc.nodeUpdateQueue.Done(nodeUpdate)
   375  		case podUpdate := <-tc.podUpdateChannels[worker]:
   376  			// If we found a Pod update we need to empty Node queue first.
   377  		priority:
   378  			for {
   379  				select {
   380  				case nodeUpdate := <-tc.nodeUpdateChannels[worker]:
   381  					tc.handleNodeUpdate(ctx, nodeUpdate)
   382  					tc.nodeUpdateQueue.Done(nodeUpdate)
   383  				default:
   384  					break priority
   385  				}
   386  			}
   387  			// After Node queue is emptied we process podUpdate.
   388  			tc.handlePodUpdate(ctx, podUpdate)
   389  			tc.podUpdateQueue.Done(podUpdate)
   390  		}
   391  	}
   392  }
   393  
   394  // PodUpdated is used to notify NoExecuteTaintManager about Pod changes.
   395  func (tc *Controller) PodUpdated(oldPod *v1.Pod, newPod *v1.Pod) {
   396  	podName := ""
   397  	podNamespace := ""
   398  	nodeName := ""
   399  	oldTolerations := []v1.Toleration{}
   400  	if oldPod != nil {
   401  		podName = oldPod.Name
   402  		podNamespace = oldPod.Namespace
   403  		nodeName = oldPod.Spec.NodeName
   404  		oldTolerations = oldPod.Spec.Tolerations
   405  	}
   406  	newTolerations := []v1.Toleration{}
   407  	if newPod != nil {
   408  		podName = newPod.Name
   409  		podNamespace = newPod.Namespace
   410  		nodeName = newPod.Spec.NodeName
   411  		newTolerations = newPod.Spec.Tolerations
   412  	}
   413  
   414  	if oldPod != nil && newPod != nil && helper.Semantic.DeepEqual(oldTolerations, newTolerations) && oldPod.Spec.NodeName == newPod.Spec.NodeName {
   415  		return
   416  	}
   417  	updateItem := podUpdateItem{
   418  		podName:      podName,
   419  		podNamespace: podNamespace,
   420  		nodeName:     nodeName,
   421  	}
   422  
   423  	tc.podUpdateQueue.Add(updateItem)
   424  }
   425  
   426  // NodeUpdated is used to notify NoExecuteTaintManager about Node changes.
   427  func (tc *Controller) NodeUpdated(oldNode *v1.Node, newNode *v1.Node) {
   428  	nodeName := ""
   429  	oldTaints := []v1.Taint{}
   430  	if oldNode != nil {
   431  		nodeName = oldNode.Name
   432  		oldTaints = getNoExecuteTaints(oldNode.Spec.Taints)
   433  	}
   434  
   435  	newTaints := []v1.Taint{}
   436  	if newNode != nil {
   437  		nodeName = newNode.Name
   438  		newTaints = getNoExecuteTaints(newNode.Spec.Taints)
   439  	}
   440  
   441  	if oldNode != nil && newNode != nil && helper.Semantic.DeepEqual(oldTaints, newTaints) {
   442  		return
   443  	}
   444  	updateItem := nodeUpdateItem{
   445  		nodeName: nodeName,
   446  	}
   447  
   448  	tc.nodeUpdateQueue.Add(updateItem)
   449  }
   450  
   451  func (tc *Controller) cancelWorkWithEvent(logger klog.Logger, nsName types.NamespacedName) {
   452  	if tc.taintEvictionQueue.CancelWork(logger, nsName.String()) {
   453  		tc.emitCancelPodDeletionEvent(nsName)
   454  	}
   455  }
   456  
   457  func (tc *Controller) processPodOnNode(
   458  	ctx context.Context,
   459  	podNamespacedName types.NamespacedName,
   460  	nodeName string,
   461  	tolerations []v1.Toleration,
   462  	taints []v1.Taint,
   463  	now time.Time,
   464  ) {
   465  	logger := klog.FromContext(ctx)
   466  	if len(taints) == 0 {
   467  		tc.cancelWorkWithEvent(logger, podNamespacedName)
   468  	}
   469  	allTolerated, usedTolerations := v1helper.GetMatchingTolerations(taints, tolerations)
   470  	if !allTolerated {
   471  		logger.V(2).Info("Not all taints are tolerated after update for pod on node", "pod", podNamespacedName.String(), "node", klog.KRef("", nodeName))
   472  		// We're canceling scheduled work (if any), as we're going to delete the Pod right away.
   473  		tc.cancelWorkWithEvent(logger, podNamespacedName)
   474  		tc.taintEvictionQueue.AddWork(ctx, NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), now, now)
   475  		return
   476  	}
   477  	minTolerationTime := getMinTolerationTime(usedTolerations)
   478  	// getMinTolerationTime returns negative value to denote infinite toleration.
   479  	if minTolerationTime < 0 {
   480  		logger.V(4).Info("Current tolerations for pod tolerate forever, cancelling any scheduled deletion", "pod", podNamespacedName.String())
   481  		tc.cancelWorkWithEvent(logger, podNamespacedName)
   482  		return
   483  	}
   484  
   485  	startTime := now
   486  	triggerTime := startTime.Add(minTolerationTime)
   487  	scheduledEviction := tc.taintEvictionQueue.GetWorkerUnsafe(podNamespacedName.String())
   488  	if scheduledEviction != nil {
   489  		startTime = scheduledEviction.CreatedAt
   490  		if startTime.Add(minTolerationTime).Before(triggerTime) {
   491  			return
   492  		}
   493  		tc.cancelWorkWithEvent(logger, podNamespacedName)
   494  	}
   495  	tc.taintEvictionQueue.AddWork(ctx, NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), startTime, triggerTime)
   496  }
   497  
   498  func (tc *Controller) handlePodUpdate(ctx context.Context, podUpdate podUpdateItem) {
   499  	pod, err := tc.podLister.Pods(podUpdate.podNamespace).Get(podUpdate.podName)
   500  	logger := klog.FromContext(ctx)
   501  	if err != nil {
   502  		if apierrors.IsNotFound(err) {
   503  			// Delete
   504  			podNamespacedName := types.NamespacedName{Namespace: podUpdate.podNamespace, Name: podUpdate.podName}
   505  			logger.V(4).Info("Noticed pod deletion", "pod", podNamespacedName)
   506  			tc.cancelWorkWithEvent(logger, podNamespacedName)
   507  			return
   508  		}
   509  		utilruntime.HandleError(fmt.Errorf("could not get pod %s/%s: %v", podUpdate.podName, podUpdate.podNamespace, err))
   510  		return
   511  	}
   512  
   513  	// We key the workqueue and shard workers by nodeName. If we don't match the current state we should not be the one processing the current object.
   514  	if pod.Spec.NodeName != podUpdate.nodeName {
   515  		return
   516  	}
   517  
   518  	// Create or Update
   519  	podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
   520  	logger.V(4).Info("Noticed pod update", "pod", podNamespacedName)
   521  	nodeName := pod.Spec.NodeName
   522  	if nodeName == "" {
   523  		return
   524  	}
   525  	taints, ok := func() ([]v1.Taint, bool) {
   526  		tc.taintedNodesLock.Lock()
   527  		defer tc.taintedNodesLock.Unlock()
   528  		taints, ok := tc.taintedNodes[nodeName]
   529  		return taints, ok
   530  	}()
   531  	// It's possible that Node was deleted, or Taints were removed before, which triggered
   532  	// eviction cancelling if it was needed.
   533  	if !ok {
   534  		return
   535  	}
   536  	tc.processPodOnNode(ctx, podNamespacedName, nodeName, pod.Spec.Tolerations, taints, time.Now())
   537  }
   538  
   539  func (tc *Controller) handleNodeUpdate(ctx context.Context, nodeUpdate nodeUpdateItem) {
   540  	node, err := tc.nodeLister.Get(nodeUpdate.nodeName)
   541  	logger := klog.FromContext(ctx)
   542  	if err != nil {
   543  		if apierrors.IsNotFound(err) {
   544  			// Delete
   545  			logger.V(4).Info("Noticed node deletion", "node", klog.KRef("", nodeUpdate.nodeName))
   546  			tc.taintedNodesLock.Lock()
   547  			defer tc.taintedNodesLock.Unlock()
   548  			delete(tc.taintedNodes, nodeUpdate.nodeName)
   549  			return
   550  		}
   551  		utilruntime.HandleError(fmt.Errorf("cannot get node %s: %v", nodeUpdate.nodeName, err))
   552  		return
   553  	}
   554  
   555  	// Create or Update
   556  	logger.V(4).Info("Noticed node update", "node", klog.KObj(node))
   557  	taints := getNoExecuteTaints(node.Spec.Taints)
   558  	func() {
   559  		tc.taintedNodesLock.Lock()
   560  		defer tc.taintedNodesLock.Unlock()
   561  		logger.V(4).Info("Updating known taints on node", "node", klog.KObj(node), "taints", taints)
   562  		if len(taints) == 0 {
   563  			delete(tc.taintedNodes, node.Name)
   564  		} else {
   565  			tc.taintedNodes[node.Name] = taints
   566  		}
   567  	}()
   568  
   569  	// This is critical that we update tc.taintedNodes before we call getPodsAssignedToNode:
   570  	// getPodsAssignedToNode can be delayed as long as all future updates to pods will call
   571  	// tc.PodUpdated which will use tc.taintedNodes to potentially delete delayed pods.
   572  	pods, err := tc.getPodsAssignedToNode(node.Name)
   573  	if err != nil {
   574  		logger.Error(err, "Failed to get pods assigned to node", "node", klog.KObj(node))
   575  		return
   576  	}
   577  	if len(pods) == 0 {
   578  		return
   579  	}
   580  	// Short circuit, to make this controller a bit faster.
   581  	if len(taints) == 0 {
   582  		logger.V(4).Info("All taints were removed from the node. Cancelling all evictions...", "node", klog.KObj(node))
   583  		for i := range pods {
   584  			tc.cancelWorkWithEvent(logger, types.NamespacedName{Namespace: pods[i].Namespace, Name: pods[i].Name})
   585  		}
   586  		return
   587  	}
   588  
   589  	now := time.Now()
   590  	for _, pod := range pods {
   591  		podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
   592  		tc.processPodOnNode(ctx, podNamespacedName, node.Name, pod.Spec.Tolerations, taints, now)
   593  	}
   594  }
   595  
   596  func (tc *Controller) emitPodDeletionEvent(nsName types.NamespacedName) {
   597  	if tc.recorder == nil {
   598  		return
   599  	}
   600  	ref := &v1.ObjectReference{
   601  		APIVersion: "v1",
   602  		Kind:       "Pod",
   603  		Name:       nsName.Name,
   604  		Namespace:  nsName.Namespace,
   605  	}
   606  	tc.recorder.Eventf(ref, v1.EventTypeNormal, "TaintManagerEviction", "Marking for deletion Pod %s", nsName.String())
   607  }
   608  
   609  func (tc *Controller) emitCancelPodDeletionEvent(nsName types.NamespacedName) {
   610  	if tc.recorder == nil {
   611  		return
   612  	}
   613  	ref := &v1.ObjectReference{
   614  		APIVersion: "v1",
   615  		Kind:       "Pod",
   616  		Name:       nsName.Name,
   617  		Namespace:  nsName.Namespace,
   618  	}
   619  	tc.recorder.Eventf(ref, v1.EventTypeNormal, "TaintManagerEviction", "Cancelling deletion of Pod %s", nsName.String())
   620  }
   621  

View as plain text