...

Source file src/k8s.io/kubernetes/pkg/scheduler/eventhandlers.go

Documentation: k8s.io/kubernetes/pkg/scheduler

     1  /*
     2  Copyright 2019 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package scheduler
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"strings"
    23  	"time"
    24  
    25  	v1 "k8s.io/api/core/v1"
    26  	storagev1 "k8s.io/api/storage/v1"
    27  	"k8s.io/apimachinery/pkg/api/equality"
    28  	"k8s.io/apimachinery/pkg/runtime/schema"
    29  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    30  	"k8s.io/apimachinery/pkg/util/wait"
    31  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    32  	"k8s.io/client-go/dynamic/dynamicinformer"
    33  	"k8s.io/client-go/informers"
    34  	"k8s.io/client-go/tools/cache"
    35  	corev1helpers "k8s.io/component-helpers/scheduling/corev1"
    36  	corev1nodeaffinity "k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
    37  	"k8s.io/klog/v2"
    38  	"k8s.io/kubernetes/pkg/features"
    39  	"k8s.io/kubernetes/pkg/scheduler/framework"
    40  	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeaffinity"
    41  	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodename"
    42  	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeports"
    43  	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
    44  	"k8s.io/kubernetes/pkg/scheduler/internal/queue"
    45  	"k8s.io/kubernetes/pkg/scheduler/profile"
    46  )
    47  
    48  func (sched *Scheduler) onStorageClassAdd(obj interface{}) {
    49  	logger := sched.logger
    50  	sc, ok := obj.(*storagev1.StorageClass)
    51  	if !ok {
    52  		logger.Error(nil, "Cannot convert to *storagev1.StorageClass", "obj", obj)
    53  		return
    54  	}
    55  
    56  	// CheckVolumeBindingPred fails if pod has unbound immediate PVCs. If these
    57  	// PVCs have specified StorageClass name, creating StorageClass objects
    58  	// with late binding will cause predicates to pass, so we need to move pods
    59  	// to active queue.
    60  	// We don't need to invalidate cached results because results will not be
    61  	// cached for pod that has unbound immediate PVCs.
    62  	if sc.VolumeBindingMode != nil && *sc.VolumeBindingMode == storagev1.VolumeBindingWaitForFirstConsumer {
    63  		sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.StorageClassAdd, nil, sc, nil)
    64  	}
    65  }
    66  
    67  func (sched *Scheduler) addNodeToCache(obj interface{}) {
    68  	logger := sched.logger
    69  	node, ok := obj.(*v1.Node)
    70  	if !ok {
    71  		logger.Error(nil, "Cannot convert to *v1.Node", "obj", obj)
    72  		return
    73  	}
    74  
    75  	logger.V(3).Info("Add event for node", "node", klog.KObj(node))
    76  	nodeInfo := sched.Cache.AddNode(logger, node)
    77  	sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.NodeAdd, nil, node, preCheckForNode(nodeInfo))
    78  }
    79  
    80  func (sched *Scheduler) updateNodeInCache(oldObj, newObj interface{}) {
    81  	logger := sched.logger
    82  	oldNode, ok := oldObj.(*v1.Node)
    83  	if !ok {
    84  		logger.Error(nil, "Cannot convert oldObj to *v1.Node", "oldObj", oldObj)
    85  		return
    86  	}
    87  	newNode, ok := newObj.(*v1.Node)
    88  	if !ok {
    89  		logger.Error(nil, "Cannot convert newObj to *v1.Node", "newObj", newObj)
    90  		return
    91  	}
    92  
    93  	logger.V(4).Info("Update event for node", "node", klog.KObj(newNode))
    94  	nodeInfo := sched.Cache.UpdateNode(logger, oldNode, newNode)
    95  	// Only requeue unschedulable pods if the node became more schedulable.
    96  	for _, evt := range nodeSchedulingPropertiesChange(newNode, oldNode) {
    97  		sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, evt, oldNode, newNode, preCheckForNode(nodeInfo))
    98  	}
    99  }
   100  
   101  func (sched *Scheduler) deleteNodeFromCache(obj interface{}) {
   102  	logger := sched.logger
   103  	var node *v1.Node
   104  	switch t := obj.(type) {
   105  	case *v1.Node:
   106  		node = t
   107  	case cache.DeletedFinalStateUnknown:
   108  		var ok bool
   109  		node, ok = t.Obj.(*v1.Node)
   110  		if !ok {
   111  			logger.Error(nil, "Cannot convert to *v1.Node", "obj", t.Obj)
   112  			return
   113  		}
   114  	default:
   115  		logger.Error(nil, "Cannot convert to *v1.Node", "obj", t)
   116  		return
   117  	}
   118  
   119  	logger.V(3).Info("Delete event for node", "node", klog.KObj(node))
   120  	if err := sched.Cache.RemoveNode(logger, node); err != nil {
   121  		logger.Error(err, "Scheduler cache RemoveNode failed")
   122  	}
   123  }
   124  
   125  func (sched *Scheduler) addPodToSchedulingQueue(obj interface{}) {
   126  	logger := sched.logger
   127  	pod := obj.(*v1.Pod)
   128  	logger.V(3).Info("Add event for unscheduled pod", "pod", klog.KObj(pod))
   129  	if err := sched.SchedulingQueue.Add(logger, pod); err != nil {
   130  		utilruntime.HandleError(fmt.Errorf("unable to queue %T: %v", obj, err))
   131  	}
   132  }
   133  
   134  func (sched *Scheduler) updatePodInSchedulingQueue(oldObj, newObj interface{}) {
   135  	logger := sched.logger
   136  	oldPod, newPod := oldObj.(*v1.Pod), newObj.(*v1.Pod)
   137  	// Bypass update event that carries identical objects; otherwise, a duplicated
   138  	// Pod may go through scheduling and cause unexpected behavior (see #96071).
   139  	if oldPod.ResourceVersion == newPod.ResourceVersion {
   140  		return
   141  	}
   142  
   143  	isAssumed, err := sched.Cache.IsAssumedPod(newPod)
   144  	if err != nil {
   145  		utilruntime.HandleError(fmt.Errorf("failed to check whether pod %s/%s is assumed: %v", newPod.Namespace, newPod.Name, err))
   146  	}
   147  	if isAssumed {
   148  		return
   149  	}
   150  
   151  	logger.V(4).Info("Update event for unscheduled pod", "pod", klog.KObj(newPod))
   152  	if err := sched.SchedulingQueue.Update(logger, oldPod, newPod); err != nil {
   153  		utilruntime.HandleError(fmt.Errorf("unable to update %T: %v", newObj, err))
   154  	}
   155  }
   156  
   157  func (sched *Scheduler) deletePodFromSchedulingQueue(obj interface{}) {
   158  	logger := sched.logger
   159  	var pod *v1.Pod
   160  	switch t := obj.(type) {
   161  	case *v1.Pod:
   162  		pod = obj.(*v1.Pod)
   163  	case cache.DeletedFinalStateUnknown:
   164  		var ok bool
   165  		pod, ok = t.Obj.(*v1.Pod)
   166  		if !ok {
   167  			utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched))
   168  			return
   169  		}
   170  	default:
   171  		utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj))
   172  		return
   173  	}
   174  
   175  	logger.V(3).Info("Delete event for unscheduled pod", "pod", klog.KObj(pod))
   176  	if err := sched.SchedulingQueue.Delete(pod); err != nil {
   177  		utilruntime.HandleError(fmt.Errorf("unable to dequeue %T: %v", obj, err))
   178  	}
   179  	fwk, err := sched.frameworkForPod(pod)
   180  	if err != nil {
   181  		// This shouldn't happen, because we only accept for scheduling the pods
   182  		// which specify a scheduler name that matches one of the profiles.
   183  		logger.Error(err, "Unable to get profile", "pod", klog.KObj(pod))
   184  		return
   185  	}
   186  	// If a waiting pod is rejected, it indicates it's previously assumed and we're
   187  	// removing it from the scheduler cache. In this case, signal a AssignedPodDelete
   188  	// event to immediately retry some unscheduled Pods.
   189  	if fwk.RejectWaitingPod(pod.UID) {
   190  		sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.AssignedPodDelete, pod, nil, nil)
   191  	}
   192  }
   193  
   194  func (sched *Scheduler) addPodToCache(obj interface{}) {
   195  	logger := sched.logger
   196  	pod, ok := obj.(*v1.Pod)
   197  	if !ok {
   198  		logger.Error(nil, "Cannot convert to *v1.Pod", "obj", obj)
   199  		return
   200  	}
   201  
   202  	logger.V(3).Info("Add event for scheduled pod", "pod", klog.KObj(pod))
   203  	if err := sched.Cache.AddPod(logger, pod); err != nil {
   204  		logger.Error(err, "Scheduler cache AddPod failed", "pod", klog.KObj(pod))
   205  	}
   206  
   207  	sched.SchedulingQueue.AssignedPodAdded(logger, pod)
   208  }
   209  
   210  func (sched *Scheduler) updatePodInCache(oldObj, newObj interface{}) {
   211  	logger := sched.logger
   212  	oldPod, ok := oldObj.(*v1.Pod)
   213  	if !ok {
   214  		logger.Error(nil, "Cannot convert oldObj to *v1.Pod", "oldObj", oldObj)
   215  		return
   216  	}
   217  	newPod, ok := newObj.(*v1.Pod)
   218  	if !ok {
   219  		logger.Error(nil, "Cannot convert newObj to *v1.Pod", "newObj", newObj)
   220  		return
   221  	}
   222  
   223  	logger.V(4).Info("Update event for scheduled pod", "pod", klog.KObj(oldPod))
   224  	if err := sched.Cache.UpdatePod(logger, oldPod, newPod); err != nil {
   225  		logger.Error(err, "Scheduler cache UpdatePod failed", "pod", klog.KObj(oldPod))
   226  	}
   227  
   228  	sched.SchedulingQueue.AssignedPodUpdated(logger, oldPod, newPod)
   229  }
   230  
   231  func (sched *Scheduler) deletePodFromCache(obj interface{}) {
   232  	logger := sched.logger
   233  	var pod *v1.Pod
   234  	switch t := obj.(type) {
   235  	case *v1.Pod:
   236  		pod = t
   237  	case cache.DeletedFinalStateUnknown:
   238  		var ok bool
   239  		pod, ok = t.Obj.(*v1.Pod)
   240  		if !ok {
   241  			logger.Error(nil, "Cannot convert to *v1.Pod", "obj", t.Obj)
   242  			return
   243  		}
   244  	default:
   245  		logger.Error(nil, "Cannot convert to *v1.Pod", "obj", t)
   246  		return
   247  	}
   248  
   249  	logger.V(3).Info("Delete event for scheduled pod", "pod", klog.KObj(pod))
   250  	if err := sched.Cache.RemovePod(logger, pod); err != nil {
   251  		logger.Error(err, "Scheduler cache RemovePod failed", "pod", klog.KObj(pod))
   252  	}
   253  
   254  	sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.AssignedPodDelete, pod, nil, nil)
   255  }
   256  
   257  // assignedPod selects pods that are assigned (scheduled and running).
   258  func assignedPod(pod *v1.Pod) bool {
   259  	return len(pod.Spec.NodeName) != 0
   260  }
   261  
   262  // responsibleForPod returns true if the pod has asked to be scheduled by the given scheduler.
   263  func responsibleForPod(pod *v1.Pod, profiles profile.Map) bool {
   264  	return profiles.HandlesSchedulerName(pod.Spec.SchedulerName)
   265  }
   266  
   267  const (
   268  	// syncedPollPeriod controls how often you look at the status of your sync funcs
   269  	syncedPollPeriod = 100 * time.Millisecond
   270  )
   271  
   272  // WaitForHandlersSync waits for EventHandlers to sync.
   273  // It returns true if it was successful, false if the controller should shut down
   274  func (sched *Scheduler) WaitForHandlersSync(ctx context.Context) error {
   275  	return wait.PollUntilContextCancel(ctx, syncedPollPeriod, true, func(ctx context.Context) (done bool, err error) {
   276  		for _, handler := range sched.registeredHandlers {
   277  			if !handler.HasSynced() {
   278  				return false, nil
   279  			}
   280  		}
   281  		return true, nil
   282  	})
   283  }
   284  
   285  // addAllEventHandlers is a helper function used in tests and in Scheduler
   286  // to add event handlers for various informers.
   287  func addAllEventHandlers(
   288  	sched *Scheduler,
   289  	informerFactory informers.SharedInformerFactory,
   290  	dynInformerFactory dynamicinformer.DynamicSharedInformerFactory,
   291  	gvkMap map[framework.GVK]framework.ActionType,
   292  ) error {
   293  	var (
   294  		handlerRegistration cache.ResourceEventHandlerRegistration
   295  		err                 error
   296  		handlers            []cache.ResourceEventHandlerRegistration
   297  	)
   298  	// scheduled pod cache
   299  	if handlerRegistration, err = informerFactory.Core().V1().Pods().Informer().AddEventHandler(
   300  		cache.FilteringResourceEventHandler{
   301  			FilterFunc: func(obj interface{}) bool {
   302  				switch t := obj.(type) {
   303  				case *v1.Pod:
   304  					return assignedPod(t)
   305  				case cache.DeletedFinalStateUnknown:
   306  					if _, ok := t.Obj.(*v1.Pod); ok {
   307  						// The carried object may be stale, so we don't use it to check if
   308  						// it's assigned or not. Attempting to cleanup anyways.
   309  						return true
   310  					}
   311  					utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched))
   312  					return false
   313  				default:
   314  					utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj))
   315  					return false
   316  				}
   317  			},
   318  			Handler: cache.ResourceEventHandlerFuncs{
   319  				AddFunc:    sched.addPodToCache,
   320  				UpdateFunc: sched.updatePodInCache,
   321  				DeleteFunc: sched.deletePodFromCache,
   322  			},
   323  		},
   324  	); err != nil {
   325  		return err
   326  	}
   327  	handlers = append(handlers, handlerRegistration)
   328  
   329  	// unscheduled pod queue
   330  	if handlerRegistration, err = informerFactory.Core().V1().Pods().Informer().AddEventHandler(
   331  		cache.FilteringResourceEventHandler{
   332  			FilterFunc: func(obj interface{}) bool {
   333  				switch t := obj.(type) {
   334  				case *v1.Pod:
   335  					return !assignedPod(t) && responsibleForPod(t, sched.Profiles)
   336  				case cache.DeletedFinalStateUnknown:
   337  					if pod, ok := t.Obj.(*v1.Pod); ok {
   338  						// The carried object may be stale, so we don't use it to check if
   339  						// it's assigned or not.
   340  						return responsibleForPod(pod, sched.Profiles)
   341  					}
   342  					utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched))
   343  					return false
   344  				default:
   345  					utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj))
   346  					return false
   347  				}
   348  			},
   349  			Handler: cache.ResourceEventHandlerFuncs{
   350  				AddFunc:    sched.addPodToSchedulingQueue,
   351  				UpdateFunc: sched.updatePodInSchedulingQueue,
   352  				DeleteFunc: sched.deletePodFromSchedulingQueue,
   353  			},
   354  		},
   355  	); err != nil {
   356  		return err
   357  	}
   358  	handlers = append(handlers, handlerRegistration)
   359  
   360  	if handlerRegistration, err = informerFactory.Core().V1().Nodes().Informer().AddEventHandler(
   361  		cache.ResourceEventHandlerFuncs{
   362  			AddFunc:    sched.addNodeToCache,
   363  			UpdateFunc: sched.updateNodeInCache,
   364  			DeleteFunc: sched.deleteNodeFromCache,
   365  		},
   366  	); err != nil {
   367  		return err
   368  	}
   369  	handlers = append(handlers, handlerRegistration)
   370  
   371  	logger := sched.logger
   372  	buildEvtResHandler := func(at framework.ActionType, gvk framework.GVK, shortGVK string) cache.ResourceEventHandlerFuncs {
   373  		funcs := cache.ResourceEventHandlerFuncs{}
   374  		if at&framework.Add != 0 {
   375  			evt := framework.ClusterEvent{Resource: gvk, ActionType: framework.Add, Label: fmt.Sprintf("%vAdd", shortGVK)}
   376  			funcs.AddFunc = func(obj interface{}) {
   377  				sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, evt, nil, obj, nil)
   378  			}
   379  		}
   380  		if at&framework.Update != 0 {
   381  			evt := framework.ClusterEvent{Resource: gvk, ActionType: framework.Update, Label: fmt.Sprintf("%vUpdate", shortGVK)}
   382  			funcs.UpdateFunc = func(old, obj interface{}) {
   383  				sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, evt, old, obj, nil)
   384  			}
   385  		}
   386  		if at&framework.Delete != 0 {
   387  			evt := framework.ClusterEvent{Resource: gvk, ActionType: framework.Delete, Label: fmt.Sprintf("%vDelete", shortGVK)}
   388  			funcs.DeleteFunc = func(obj interface{}) {
   389  				sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, evt, obj, nil, nil)
   390  			}
   391  		}
   392  		return funcs
   393  	}
   394  
   395  	for gvk, at := range gvkMap {
   396  		switch gvk {
   397  		case framework.Node, framework.Pod:
   398  			// Do nothing.
   399  		case framework.CSINode:
   400  			if handlerRegistration, err = informerFactory.Storage().V1().CSINodes().Informer().AddEventHandler(
   401  				buildEvtResHandler(at, framework.CSINode, "CSINode"),
   402  			); err != nil {
   403  				return err
   404  			}
   405  			handlers = append(handlers, handlerRegistration)
   406  		case framework.CSIDriver:
   407  			if handlerRegistration, err = informerFactory.Storage().V1().CSIDrivers().Informer().AddEventHandler(
   408  				buildEvtResHandler(at, framework.CSIDriver, "CSIDriver"),
   409  			); err != nil {
   410  				return err
   411  			}
   412  			handlers = append(handlers, handlerRegistration)
   413  		case framework.CSIStorageCapacity:
   414  			if handlerRegistration, err = informerFactory.Storage().V1().CSIStorageCapacities().Informer().AddEventHandler(
   415  				buildEvtResHandler(at, framework.CSIStorageCapacity, "CSIStorageCapacity"),
   416  			); err != nil {
   417  				return err
   418  			}
   419  			handlers = append(handlers, handlerRegistration)
   420  		case framework.PersistentVolume:
   421  			// MaxPDVolumeCountPredicate: since it relies on the counts of PV.
   422  			//
   423  			// PvAdd: Pods created when there are no PVs available will be stuck in
   424  			// unschedulable queue. But unbound PVs created for static provisioning and
   425  			// delay binding storage class are skipped in PV controller dynamic
   426  			// provisioning and binding process, will not trigger events to schedule pod
   427  			// again. So we need to move pods to active queue on PV add for this
   428  			// scenario.
   429  			//
   430  			// PvUpdate: Scheduler.bindVolumesWorker may fail to update assumed pod volume
   431  			// bindings due to conflicts if PVs are updated by PV controller or other
   432  			// parties, then scheduler will add pod back to unschedulable queue. We
   433  			// need to move pods to active queue on PV update for this scenario.
   434  			if handlerRegistration, err = informerFactory.Core().V1().PersistentVolumes().Informer().AddEventHandler(
   435  				buildEvtResHandler(at, framework.PersistentVolume, "Pv"),
   436  			); err != nil {
   437  				return err
   438  			}
   439  			handlers = append(handlers, handlerRegistration)
   440  		case framework.PersistentVolumeClaim:
   441  			// MaxPDVolumeCountPredicate: add/update PVC will affect counts of PV when it is bound.
   442  			if handlerRegistration, err = informerFactory.Core().V1().PersistentVolumeClaims().Informer().AddEventHandler(
   443  				buildEvtResHandler(at, framework.PersistentVolumeClaim, "Pvc"),
   444  			); err != nil {
   445  				return err
   446  			}
   447  			handlers = append(handlers, handlerRegistration)
   448  		case framework.PodSchedulingContext:
   449  			if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
   450  				if handlerRegistration, err = informerFactory.Resource().V1alpha2().PodSchedulingContexts().Informer().AddEventHandler(
   451  					buildEvtResHandler(at, framework.PodSchedulingContext, "PodSchedulingContext"),
   452  				); err != nil {
   453  					return err
   454  				}
   455  				handlers = append(handlers, handlerRegistration)
   456  			}
   457  		case framework.ResourceClaim:
   458  			if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
   459  				if handlerRegistration, err = informerFactory.Resource().V1alpha2().ResourceClaims().Informer().AddEventHandler(
   460  					buildEvtResHandler(at, framework.ResourceClaim, "ResourceClaim"),
   461  				); err != nil {
   462  					return err
   463  				}
   464  				handlers = append(handlers, handlerRegistration)
   465  			}
   466  		case framework.ResourceClass:
   467  			if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
   468  				if handlerRegistration, err = informerFactory.Resource().V1alpha2().ResourceClasses().Informer().AddEventHandler(
   469  					buildEvtResHandler(at, framework.ResourceClass, "ResourceClass"),
   470  				); err != nil {
   471  					return err
   472  				}
   473  				handlers = append(handlers, handlerRegistration)
   474  			}
   475  		case framework.ResourceClaimParameters:
   476  			if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
   477  				if handlerRegistration, err = informerFactory.Resource().V1alpha2().ResourceClaimParameters().Informer().AddEventHandler(
   478  					buildEvtResHandler(at, framework.ResourceClaimParameters, "ResourceClaimParameters"),
   479  				); err != nil {
   480  					return err
   481  				}
   482  				handlers = append(handlers, handlerRegistration)
   483  			}
   484  		case framework.ResourceClassParameters:
   485  			if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
   486  				if handlerRegistration, err = informerFactory.Resource().V1alpha2().ResourceClassParameters().Informer().AddEventHandler(
   487  					buildEvtResHandler(at, framework.ResourceClassParameters, "ResourceClassParameters"),
   488  				); err != nil {
   489  					return err
   490  				}
   491  				handlers = append(handlers, handlerRegistration)
   492  			}
   493  		case framework.StorageClass:
   494  			if at&framework.Add != 0 {
   495  				if handlerRegistration, err = informerFactory.Storage().V1().StorageClasses().Informer().AddEventHandler(
   496  					cache.ResourceEventHandlerFuncs{
   497  						AddFunc: sched.onStorageClassAdd,
   498  					},
   499  				); err != nil {
   500  					return err
   501  				}
   502  				handlers = append(handlers, handlerRegistration)
   503  			}
   504  			if at&framework.Update != 0 {
   505  				if handlerRegistration, err = informerFactory.Storage().V1().StorageClasses().Informer().AddEventHandler(
   506  					cache.ResourceEventHandlerFuncs{
   507  						UpdateFunc: func(old, obj interface{}) {
   508  							sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, queue.StorageClassUpdate, old, obj, nil)
   509  						},
   510  					},
   511  				); err != nil {
   512  					return err
   513  				}
   514  				handlers = append(handlers, handlerRegistration)
   515  			}
   516  		default:
   517  			// Tests may not instantiate dynInformerFactory.
   518  			if dynInformerFactory == nil {
   519  				continue
   520  			}
   521  			// GVK is expected to be at least 3-folded, separated by dots.
   522  			// <kind in plural>.<version>.<group>
   523  			// Valid examples:
   524  			// - foos.v1.example.com
   525  			// - bars.v1beta1.a.b.c
   526  			// Invalid examples:
   527  			// - foos.v1 (2 sections)
   528  			// - foo.v1.example.com (the first section should be plural)
   529  			if strings.Count(string(gvk), ".") < 2 {
   530  				logger.Error(nil, "incorrect event registration", "gvk", gvk)
   531  				continue
   532  			}
   533  			// Fall back to try dynamic informers.
   534  			gvr, _ := schema.ParseResourceArg(string(gvk))
   535  			dynInformer := dynInformerFactory.ForResource(*gvr).Informer()
   536  			if handlerRegistration, err = dynInformer.AddEventHandler(
   537  				buildEvtResHandler(at, gvk, strings.Title(gvr.Resource)),
   538  			); err != nil {
   539  				return err
   540  			}
   541  			handlers = append(handlers, handlerRegistration)
   542  		}
   543  	}
   544  	sched.registeredHandlers = handlers
   545  	return nil
   546  }
   547  
   548  func nodeSchedulingPropertiesChange(newNode *v1.Node, oldNode *v1.Node) []framework.ClusterEvent {
   549  	var events []framework.ClusterEvent
   550  
   551  	if nodeSpecUnschedulableChanged(newNode, oldNode) {
   552  		events = append(events, queue.NodeSpecUnschedulableChange)
   553  	}
   554  	if nodeAllocatableChanged(newNode, oldNode) {
   555  		events = append(events, queue.NodeAllocatableChange)
   556  	}
   557  	if nodeLabelsChanged(newNode, oldNode) {
   558  		events = append(events, queue.NodeLabelChange)
   559  	}
   560  	if nodeTaintsChanged(newNode, oldNode) {
   561  		events = append(events, queue.NodeTaintChange)
   562  	}
   563  	if nodeConditionsChanged(newNode, oldNode) {
   564  		events = append(events, queue.NodeConditionChange)
   565  	}
   566  	if nodeAnnotationsChanged(newNode, oldNode) {
   567  		events = append(events, queue.NodeAnnotationChange)
   568  	}
   569  
   570  	return events
   571  }
   572  
   573  func nodeAllocatableChanged(newNode *v1.Node, oldNode *v1.Node) bool {
   574  	return !equality.Semantic.DeepEqual(oldNode.Status.Allocatable, newNode.Status.Allocatable)
   575  }
   576  
   577  func nodeLabelsChanged(newNode *v1.Node, oldNode *v1.Node) bool {
   578  	return !equality.Semantic.DeepEqual(oldNode.GetLabels(), newNode.GetLabels())
   579  }
   580  
   581  func nodeTaintsChanged(newNode *v1.Node, oldNode *v1.Node) bool {
   582  	return !equality.Semantic.DeepEqual(newNode.Spec.Taints, oldNode.Spec.Taints)
   583  }
   584  
   585  func nodeConditionsChanged(newNode *v1.Node, oldNode *v1.Node) bool {
   586  	strip := func(conditions []v1.NodeCondition) map[v1.NodeConditionType]v1.ConditionStatus {
   587  		conditionStatuses := make(map[v1.NodeConditionType]v1.ConditionStatus, len(conditions))
   588  		for i := range conditions {
   589  			conditionStatuses[conditions[i].Type] = conditions[i].Status
   590  		}
   591  		return conditionStatuses
   592  	}
   593  	return !equality.Semantic.DeepEqual(strip(oldNode.Status.Conditions), strip(newNode.Status.Conditions))
   594  }
   595  
   596  func nodeSpecUnschedulableChanged(newNode *v1.Node, oldNode *v1.Node) bool {
   597  	return newNode.Spec.Unschedulable != oldNode.Spec.Unschedulable && !newNode.Spec.Unschedulable
   598  }
   599  
   600  func nodeAnnotationsChanged(newNode *v1.Node, oldNode *v1.Node) bool {
   601  	return !equality.Semantic.DeepEqual(oldNode.GetAnnotations(), newNode.GetAnnotations())
   602  }
   603  
   604  func preCheckForNode(nodeInfo *framework.NodeInfo) queue.PreEnqueueCheck {
   605  	// Note: the following checks doesn't take preemption into considerations, in very rare
   606  	// cases (e.g., node resizing), "pod" may still fail a check but preemption helps. We deliberately
   607  	// chose to ignore those cases as unschedulable pods will be re-queued eventually.
   608  	return func(pod *v1.Pod) bool {
   609  		admissionResults := AdmissionCheck(pod, nodeInfo, false)
   610  		if len(admissionResults) != 0 {
   611  			return false
   612  		}
   613  		_, isUntolerated := corev1helpers.FindMatchingUntoleratedTaint(nodeInfo.Node().Spec.Taints, pod.Spec.Tolerations, func(t *v1.Taint) bool {
   614  			return t.Effect == v1.TaintEffectNoSchedule
   615  		})
   616  		return !isUntolerated
   617  	}
   618  }
   619  
   620  // AdmissionCheck calls the filtering logic of noderesources/nodeport/nodeAffinity/nodename
   621  // and returns the failure reasons. It's used in kubelet(pkg/kubelet/lifecycle/predicate.go) and scheduler.
   622  // It returns the first failure if `includeAllFailures` is set to false; otherwise
   623  // returns all failures.
   624  func AdmissionCheck(pod *v1.Pod, nodeInfo *framework.NodeInfo, includeAllFailures bool) []AdmissionResult {
   625  	var admissionResults []AdmissionResult
   626  	insufficientResources := noderesources.Fits(pod, nodeInfo)
   627  	if len(insufficientResources) != 0 {
   628  		for i := range insufficientResources {
   629  			admissionResults = append(admissionResults, AdmissionResult{InsufficientResource: &insufficientResources[i]})
   630  		}
   631  		if !includeAllFailures {
   632  			return admissionResults
   633  		}
   634  	}
   635  
   636  	if matches, _ := corev1nodeaffinity.GetRequiredNodeAffinity(pod).Match(nodeInfo.Node()); !matches {
   637  		admissionResults = append(admissionResults, AdmissionResult{Name: nodeaffinity.Name, Reason: nodeaffinity.ErrReasonPod})
   638  		if !includeAllFailures {
   639  			return admissionResults
   640  		}
   641  	}
   642  	if !nodename.Fits(pod, nodeInfo) {
   643  		admissionResults = append(admissionResults, AdmissionResult{Name: nodename.Name, Reason: nodename.ErrReason})
   644  		if !includeAllFailures {
   645  			return admissionResults
   646  		}
   647  	}
   648  	if !nodeports.Fits(pod, nodeInfo) {
   649  		admissionResults = append(admissionResults, AdmissionResult{Name: nodeports.Name, Reason: nodeports.ErrReason})
   650  		if !includeAllFailures {
   651  			return admissionResults
   652  		}
   653  	}
   654  	return admissionResults
   655  }
   656  
   657  // AdmissionResult describes the reason why Scheduler can't admit the pod.
   658  // If the reason is a resource fit one, then AdmissionResult.InsufficientResource includes the details.
   659  type AdmissionResult struct {
   660  	Name                 string
   661  	Reason               string
   662  	InsufficientResource *noderesources.InsufficientResource
   663  }
   664  

View as plain text