...

Source file src/k8s.io/kubernetes/pkg/scheduler/framework/runtime/framework.go

Documentation: k8s.io/kubernetes/pkg/scheduler/framework/runtime

     1  /*
     2  Copyright 2019 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package runtime
    18  
    19  import (
    20  	"context"
    21  	"errors"
    22  	"fmt"
    23  	"io"
    24  	"reflect"
    25  	"sort"
    26  	"time"
    27  
    28  	v1 "k8s.io/api/core/v1"
    29  	"k8s.io/apimachinery/pkg/runtime"
    30  	"k8s.io/apimachinery/pkg/types"
    31  	"k8s.io/apimachinery/pkg/util/sets"
    32  	"k8s.io/client-go/informers"
    33  	clientset "k8s.io/client-go/kubernetes"
    34  	restclient "k8s.io/client-go/rest"
    35  	"k8s.io/client-go/tools/events"
    36  	"k8s.io/component-helpers/scheduling/corev1"
    37  	"k8s.io/klog/v2"
    38  	"k8s.io/kubernetes/pkg/scheduler/apis/config"
    39  	"k8s.io/kubernetes/pkg/scheduler/framework"
    40  	"k8s.io/kubernetes/pkg/scheduler/framework/parallelize"
    41  	"k8s.io/kubernetes/pkg/scheduler/metrics"
    42  	"k8s.io/kubernetes/pkg/util/slice"
    43  )
    44  
    45  const (
    46  	// Specifies the maximum timeout a permit plugin can return.
    47  	maxTimeout = 15 * time.Minute
    48  )
    49  
    50  // frameworkImpl is the component responsible for initializing and running scheduler
    51  // plugins.
    52  type frameworkImpl struct {
    53  	registry             Registry
    54  	snapshotSharedLister framework.SharedLister
    55  	waitingPods          *waitingPodsMap
    56  	scorePluginWeight    map[string]int
    57  	preEnqueuePlugins    []framework.PreEnqueuePlugin
    58  	enqueueExtensions    []framework.EnqueueExtensions
    59  	queueSortPlugins     []framework.QueueSortPlugin
    60  	preFilterPlugins     []framework.PreFilterPlugin
    61  	filterPlugins        []framework.FilterPlugin
    62  	postFilterPlugins    []framework.PostFilterPlugin
    63  	preScorePlugins      []framework.PreScorePlugin
    64  	scorePlugins         []framework.ScorePlugin
    65  	reservePlugins       []framework.ReservePlugin
    66  	preBindPlugins       []framework.PreBindPlugin
    67  	bindPlugins          []framework.BindPlugin
    68  	postBindPlugins      []framework.PostBindPlugin
    69  	permitPlugins        []framework.PermitPlugin
    70  
    71  	// pluginsMap contains all plugins, by name.
    72  	pluginsMap map[string]framework.Plugin
    73  
    74  	clientSet       clientset.Interface
    75  	kubeConfig      *restclient.Config
    76  	eventRecorder   events.EventRecorder
    77  	informerFactory informers.SharedInformerFactory
    78  	logger          klog.Logger
    79  
    80  	metricsRecorder          *metrics.MetricAsyncRecorder
    81  	profileName              string
    82  	percentageOfNodesToScore *int32
    83  
    84  	extenders []framework.Extender
    85  	framework.PodNominator
    86  
    87  	parallelizer parallelize.Parallelizer
    88  }
    89  
    90  // extensionPoint encapsulates desired and applied set of plugins at a specific extension
    91  // point. This is used to simplify iterating over all extension points supported by the
    92  // frameworkImpl.
    93  type extensionPoint struct {
    94  	// the set of plugins to be configured at this extension point.
    95  	plugins *config.PluginSet
    96  	// a pointer to the slice storing plugins implementations that will run at this
    97  	// extension point.
    98  	slicePtr interface{}
    99  }
   100  
   101  func (f *frameworkImpl) getExtensionPoints(plugins *config.Plugins) []extensionPoint {
   102  	return []extensionPoint{
   103  		{&plugins.PreFilter, &f.preFilterPlugins},
   104  		{&plugins.Filter, &f.filterPlugins},
   105  		{&plugins.PostFilter, &f.postFilterPlugins},
   106  		{&plugins.Reserve, &f.reservePlugins},
   107  		{&plugins.PreScore, &f.preScorePlugins},
   108  		{&plugins.Score, &f.scorePlugins},
   109  		{&plugins.PreBind, &f.preBindPlugins},
   110  		{&plugins.Bind, &f.bindPlugins},
   111  		{&plugins.PostBind, &f.postBindPlugins},
   112  		{&plugins.Permit, &f.permitPlugins},
   113  		{&plugins.PreEnqueue, &f.preEnqueuePlugins},
   114  		{&plugins.QueueSort, &f.queueSortPlugins},
   115  	}
   116  }
   117  
   118  // Extenders returns the registered extenders.
   119  func (f *frameworkImpl) Extenders() []framework.Extender {
   120  	return f.extenders
   121  }
   122  
   123  type frameworkOptions struct {
   124  	componentConfigVersion string
   125  	clientSet              clientset.Interface
   126  	kubeConfig             *restclient.Config
   127  	eventRecorder          events.EventRecorder
   128  	informerFactory        informers.SharedInformerFactory
   129  	snapshotSharedLister   framework.SharedLister
   130  	metricsRecorder        *metrics.MetricAsyncRecorder
   131  	podNominator           framework.PodNominator
   132  	extenders              []framework.Extender
   133  	captureProfile         CaptureProfile
   134  	parallelizer           parallelize.Parallelizer
   135  	logger                 *klog.Logger
   136  }
   137  
   138  // Option for the frameworkImpl.
   139  type Option func(*frameworkOptions)
   140  
   141  // WithComponentConfigVersion sets the component config version to the
   142  // KubeSchedulerConfiguration version used. The string should be the full
   143  // scheme group/version of the external type we converted from (for example
   144  // "kubescheduler.config.k8s.io/v1")
   145  func WithComponentConfigVersion(componentConfigVersion string) Option {
   146  	return func(o *frameworkOptions) {
   147  		o.componentConfigVersion = componentConfigVersion
   148  	}
   149  }
   150  
   151  // WithClientSet sets clientSet for the scheduling frameworkImpl.
   152  func WithClientSet(clientSet clientset.Interface) Option {
   153  	return func(o *frameworkOptions) {
   154  		o.clientSet = clientSet
   155  	}
   156  }
   157  
   158  // WithKubeConfig sets kubeConfig for the scheduling frameworkImpl.
   159  func WithKubeConfig(kubeConfig *restclient.Config) Option {
   160  	return func(o *frameworkOptions) {
   161  		o.kubeConfig = kubeConfig
   162  	}
   163  }
   164  
   165  // WithEventRecorder sets clientSet for the scheduling frameworkImpl.
   166  func WithEventRecorder(recorder events.EventRecorder) Option {
   167  	return func(o *frameworkOptions) {
   168  		o.eventRecorder = recorder
   169  	}
   170  }
   171  
   172  // WithInformerFactory sets informer factory for the scheduling frameworkImpl.
   173  func WithInformerFactory(informerFactory informers.SharedInformerFactory) Option {
   174  	return func(o *frameworkOptions) {
   175  		o.informerFactory = informerFactory
   176  	}
   177  }
   178  
   179  // WithSnapshotSharedLister sets the SharedLister of the snapshot.
   180  func WithSnapshotSharedLister(snapshotSharedLister framework.SharedLister) Option {
   181  	return func(o *frameworkOptions) {
   182  		o.snapshotSharedLister = snapshotSharedLister
   183  	}
   184  }
   185  
   186  // WithPodNominator sets podNominator for the scheduling frameworkImpl.
   187  func WithPodNominator(nominator framework.PodNominator) Option {
   188  	return func(o *frameworkOptions) {
   189  		o.podNominator = nominator
   190  	}
   191  }
   192  
   193  // WithExtenders sets extenders for the scheduling frameworkImpl.
   194  func WithExtenders(extenders []framework.Extender) Option {
   195  	return func(o *frameworkOptions) {
   196  		o.extenders = extenders
   197  	}
   198  }
   199  
   200  // WithParallelism sets parallelism for the scheduling frameworkImpl.
   201  func WithParallelism(parallelism int) Option {
   202  	return func(o *frameworkOptions) {
   203  		o.parallelizer = parallelize.NewParallelizer(parallelism)
   204  	}
   205  }
   206  
   207  // CaptureProfile is a callback to capture a finalized profile.
   208  type CaptureProfile func(config.KubeSchedulerProfile)
   209  
   210  // WithCaptureProfile sets a callback to capture the finalized profile.
   211  func WithCaptureProfile(c CaptureProfile) Option {
   212  	return func(o *frameworkOptions) {
   213  		o.captureProfile = c
   214  	}
   215  }
   216  
   217  // WithMetricsRecorder sets metrics recorder for the scheduling frameworkImpl.
   218  func WithMetricsRecorder(r *metrics.MetricAsyncRecorder) Option {
   219  	return func(o *frameworkOptions) {
   220  		o.metricsRecorder = r
   221  	}
   222  }
   223  
   224  // WithLogger overrides the default logger from k8s.io/klog.
   225  func WithLogger(logger klog.Logger) Option {
   226  	return func(o *frameworkOptions) {
   227  		o.logger = &logger
   228  	}
   229  }
   230  
   231  // defaultFrameworkOptions are applied when no option corresponding to those fields exist.
   232  func defaultFrameworkOptions(stopCh <-chan struct{}) frameworkOptions {
   233  	return frameworkOptions{
   234  		metricsRecorder: metrics.NewMetricsAsyncRecorder(1000, time.Second, stopCh),
   235  		parallelizer:    parallelize.NewParallelizer(parallelize.DefaultParallelism),
   236  	}
   237  }
   238  
   239  var _ framework.Framework = &frameworkImpl{}
   240  
   241  // NewFramework initializes plugins given the configuration and the registry.
   242  func NewFramework(ctx context.Context, r Registry, profile *config.KubeSchedulerProfile, opts ...Option) (framework.Framework, error) {
   243  	options := defaultFrameworkOptions(ctx.Done())
   244  	for _, opt := range opts {
   245  		opt(&options)
   246  	}
   247  
   248  	logger := klog.FromContext(ctx)
   249  	if options.logger != nil {
   250  		logger = *options.logger
   251  	}
   252  
   253  	f := &frameworkImpl{
   254  		registry:             r,
   255  		snapshotSharedLister: options.snapshotSharedLister,
   256  		scorePluginWeight:    make(map[string]int),
   257  		waitingPods:          newWaitingPodsMap(),
   258  		clientSet:            options.clientSet,
   259  		kubeConfig:           options.kubeConfig,
   260  		eventRecorder:        options.eventRecorder,
   261  		informerFactory:      options.informerFactory,
   262  		metricsRecorder:      options.metricsRecorder,
   263  		extenders:            options.extenders,
   264  		PodNominator:         options.podNominator,
   265  		parallelizer:         options.parallelizer,
   266  		logger:               logger,
   267  	}
   268  
   269  	if len(f.extenders) > 0 {
   270  		// Extender doesn't support any kind of requeueing feature like EnqueueExtensions in the scheduling framework.
   271  		// We register a defaultEnqueueExtension to framework.ExtenderName here.
   272  		// And, in the scheduling cycle, when Extenders reject some Nodes and the pod ends up being unschedulable,
   273  		// we put framework.ExtenderName to pInfo.UnschedulablePlugins.
   274  		f.enqueueExtensions = []framework.EnqueueExtensions{&defaultEnqueueExtension{pluginName: framework.ExtenderName}}
   275  	}
   276  
   277  	if profile == nil {
   278  		return f, nil
   279  	}
   280  
   281  	f.profileName = profile.SchedulerName
   282  	f.percentageOfNodesToScore = profile.PercentageOfNodesToScore
   283  	if profile.Plugins == nil {
   284  		return f, nil
   285  	}
   286  
   287  	// get needed plugins from config
   288  	pg := f.pluginsNeeded(profile.Plugins)
   289  
   290  	pluginConfig := make(map[string]runtime.Object, len(profile.PluginConfig))
   291  	for i := range profile.PluginConfig {
   292  		name := profile.PluginConfig[i].Name
   293  		if _, ok := pluginConfig[name]; ok {
   294  			return nil, fmt.Errorf("repeated config for plugin %s", name)
   295  		}
   296  		pluginConfig[name] = profile.PluginConfig[i].Args
   297  	}
   298  	outputProfile := config.KubeSchedulerProfile{
   299  		SchedulerName:            f.profileName,
   300  		PercentageOfNodesToScore: f.percentageOfNodesToScore,
   301  		Plugins:                  profile.Plugins,
   302  		PluginConfig:             make([]config.PluginConfig, 0, len(pg)),
   303  	}
   304  
   305  	f.pluginsMap = make(map[string]framework.Plugin)
   306  	for name, factory := range r {
   307  		// initialize only needed plugins.
   308  		if !pg.Has(name) {
   309  			continue
   310  		}
   311  
   312  		args := pluginConfig[name]
   313  		if args != nil {
   314  			outputProfile.PluginConfig = append(outputProfile.PluginConfig, config.PluginConfig{
   315  				Name: name,
   316  				Args: args,
   317  			})
   318  		}
   319  		p, err := factory(ctx, args, f)
   320  		if err != nil {
   321  			return nil, fmt.Errorf("initializing plugin %q: %w", name, err)
   322  		}
   323  		f.pluginsMap[name] = p
   324  
   325  		f.fillEnqueueExtensions(p)
   326  	}
   327  
   328  	// initialize plugins per individual extension points
   329  	for _, e := range f.getExtensionPoints(profile.Plugins) {
   330  		if err := updatePluginList(e.slicePtr, *e.plugins, f.pluginsMap); err != nil {
   331  			return nil, err
   332  		}
   333  	}
   334  
   335  	// initialize multiPoint plugins to their expanded extension points
   336  	if len(profile.Plugins.MultiPoint.Enabled) > 0 {
   337  		if err := f.expandMultiPointPlugins(logger, profile); err != nil {
   338  			return nil, err
   339  		}
   340  	}
   341  
   342  	if len(f.queueSortPlugins) != 1 {
   343  		return nil, fmt.Errorf("only one queue sort plugin required for profile with scheduler name %q, but got %d", profile.SchedulerName, len(f.queueSortPlugins))
   344  	}
   345  	if len(f.bindPlugins) == 0 {
   346  		return nil, fmt.Errorf("at least one bind plugin is needed for profile with scheduler name %q", profile.SchedulerName)
   347  	}
   348  
   349  	if err := getScoreWeights(f, append(profile.Plugins.Score.Enabled, profile.Plugins.MultiPoint.Enabled...)); err != nil {
   350  		return nil, err
   351  	}
   352  
   353  	// Verifying the score weights again since Plugin.Name() could return a different
   354  	// value from the one used in the configuration.
   355  	for _, scorePlugin := range f.scorePlugins {
   356  		if f.scorePluginWeight[scorePlugin.Name()] == 0 {
   357  			return nil, fmt.Errorf("score plugin %q is not configured with weight", scorePlugin.Name())
   358  		}
   359  	}
   360  
   361  	if options.captureProfile != nil {
   362  		if len(outputProfile.PluginConfig) != 0 {
   363  			sort.Slice(outputProfile.PluginConfig, func(i, j int) bool {
   364  				return outputProfile.PluginConfig[i].Name < outputProfile.PluginConfig[j].Name
   365  			})
   366  		} else {
   367  			outputProfile.PluginConfig = nil
   368  		}
   369  		options.captureProfile(outputProfile)
   370  	}
   371  
   372  	// Logs Enabled Plugins at each extension point, taking default plugins, given config, and multipoint into consideration
   373  	logger.V(2).Info("the scheduler starts to work with those plugins", "Plugins", *f.ListPlugins())
   374  	f.setInstrumentedPlugins()
   375  	return f, nil
   376  }
   377  
   378  // setInstrumentedPlugins initializes instrumented plugins from current plugins that frameworkImpl has.
   379  func (f *frameworkImpl) setInstrumentedPlugins() {
   380  	// Cache metric streams for prefilter and filter plugins.
   381  	for i, pl := range f.preFilterPlugins {
   382  		f.preFilterPlugins[i] = &instrumentedPreFilterPlugin{
   383  			PreFilterPlugin: f.preFilterPlugins[i],
   384  			metric:          metrics.PluginEvaluationTotal.WithLabelValues(pl.Name(), metrics.PreFilter, f.profileName),
   385  		}
   386  	}
   387  	for i, pl := range f.filterPlugins {
   388  		f.filterPlugins[i] = &instrumentedFilterPlugin{
   389  			FilterPlugin: f.filterPlugins[i],
   390  			metric:       metrics.PluginEvaluationTotal.WithLabelValues(pl.Name(), metrics.Filter, f.profileName),
   391  		}
   392  	}
   393  
   394  	// Cache metric streams for prescore and score plugins.
   395  	for i, pl := range f.preScorePlugins {
   396  		f.preScorePlugins[i] = &instrumentedPreScorePlugin{
   397  			PreScorePlugin: f.preScorePlugins[i],
   398  			metric:         metrics.PluginEvaluationTotal.WithLabelValues(pl.Name(), metrics.PreScore, f.profileName),
   399  		}
   400  	}
   401  	for i, pl := range f.scorePlugins {
   402  		f.scorePlugins[i] = &instrumentedScorePlugin{
   403  			ScorePlugin: f.scorePlugins[i],
   404  			metric:      metrics.PluginEvaluationTotal.WithLabelValues(pl.Name(), metrics.Score, f.profileName),
   405  		}
   406  	}
   407  }
   408  
   409  func (f *frameworkImpl) SetPodNominator(n framework.PodNominator) {
   410  	f.PodNominator = n
   411  }
   412  
   413  // Close closes each plugin, when they implement io.Closer interface.
   414  func (f *frameworkImpl) Close() error {
   415  	var errs []error
   416  	for name, plugin := range f.pluginsMap {
   417  		if closer, ok := plugin.(io.Closer); ok {
   418  			err := closer.Close()
   419  			if err != nil {
   420  				errs = append(errs, fmt.Errorf("%s failed to close: %w", name, err))
   421  				// We try to close all plugins even if we got errors from some.
   422  			}
   423  		}
   424  	}
   425  	return errors.Join(errs...)
   426  }
   427  
   428  // getScoreWeights makes sure that, between MultiPoint-Score plugin weights and individual Score
   429  // plugin weights there is not an overflow of MaxTotalScore.
   430  func getScoreWeights(f *frameworkImpl, plugins []config.Plugin) error {
   431  	var totalPriority int64
   432  	scorePlugins := reflect.ValueOf(&f.scorePlugins).Elem()
   433  	pluginType := scorePlugins.Type().Elem()
   434  	for _, e := range plugins {
   435  		pg := f.pluginsMap[e.Name]
   436  		if !reflect.TypeOf(pg).Implements(pluginType) {
   437  			continue
   438  		}
   439  
   440  		// We append MultiPoint plugins to the list of Score plugins. So if this plugin has already been
   441  		// encountered, let the individual Score weight take precedence.
   442  		if _, ok := f.scorePluginWeight[e.Name]; ok {
   443  			continue
   444  		}
   445  		// a weight of zero is not permitted, plugins can be disabled explicitly
   446  		// when configured.
   447  		f.scorePluginWeight[e.Name] = int(e.Weight)
   448  		if f.scorePluginWeight[e.Name] == 0 {
   449  			f.scorePluginWeight[e.Name] = 1
   450  		}
   451  
   452  		// Checks totalPriority against MaxTotalScore to avoid overflow
   453  		if int64(f.scorePluginWeight[e.Name])*framework.MaxNodeScore > framework.MaxTotalScore-totalPriority {
   454  			return fmt.Errorf("total score of Score plugins could overflow")
   455  		}
   456  		totalPriority += int64(f.scorePluginWeight[e.Name]) * framework.MaxNodeScore
   457  	}
   458  	return nil
   459  }
   460  
   461  type orderedSet struct {
   462  	set         map[string]int
   463  	list        []string
   464  	deletionCnt int
   465  }
   466  
   467  func newOrderedSet() *orderedSet {
   468  	return &orderedSet{set: make(map[string]int)}
   469  }
   470  
   471  func (os *orderedSet) insert(s string) {
   472  	if os.has(s) {
   473  		return
   474  	}
   475  	os.set[s] = len(os.list)
   476  	os.list = append(os.list, s)
   477  }
   478  
   479  func (os *orderedSet) has(s string) bool {
   480  	_, found := os.set[s]
   481  	return found
   482  }
   483  
   484  func (os *orderedSet) delete(s string) {
   485  	if i, found := os.set[s]; found {
   486  		delete(os.set, s)
   487  		os.list = append(os.list[:i-os.deletionCnt], os.list[i+1-os.deletionCnt:]...)
   488  		os.deletionCnt++
   489  	}
   490  }
   491  
   492  func (f *frameworkImpl) expandMultiPointPlugins(logger klog.Logger, profile *config.KubeSchedulerProfile) error {
   493  	// initialize MultiPoint plugins
   494  	for _, e := range f.getExtensionPoints(profile.Plugins) {
   495  		plugins := reflect.ValueOf(e.slicePtr).Elem()
   496  		pluginType := plugins.Type().Elem()
   497  		// build enabledSet of plugins already registered via normal extension points
   498  		// to check double registration
   499  		enabledSet := newOrderedSet()
   500  		for _, plugin := range e.plugins.Enabled {
   501  			enabledSet.insert(plugin.Name)
   502  		}
   503  
   504  		disabledSet := sets.New[string]()
   505  		for _, disabledPlugin := range e.plugins.Disabled {
   506  			disabledSet.Insert(disabledPlugin.Name)
   507  		}
   508  		if disabledSet.Has("*") {
   509  			logger.V(4).Info("Skipped MultiPoint expansion because all plugins are disabled for extension point", "extension", pluginType)
   510  			continue
   511  		}
   512  
   513  		// track plugins enabled via multipoint separately from those enabled by specific extensions,
   514  		// so that we can distinguish between double-registration and explicit overrides
   515  		multiPointEnabled := newOrderedSet()
   516  		overridePlugins := newOrderedSet()
   517  		for _, ep := range profile.Plugins.MultiPoint.Enabled {
   518  			pg, ok := f.pluginsMap[ep.Name]
   519  			if !ok {
   520  				return fmt.Errorf("%s %q does not exist", pluginType.Name(), ep.Name)
   521  			}
   522  
   523  			// if this plugin doesn't implement the type for the current extension we're trying to expand, skip
   524  			if !reflect.TypeOf(pg).Implements(pluginType) {
   525  				continue
   526  			}
   527  
   528  			// a plugin that's enabled via MultiPoint can still be disabled for specific extension points
   529  			if disabledSet.Has(ep.Name) {
   530  				logger.V(4).Info("Skipped disabled plugin for extension point", "plugin", ep.Name, "extension", pluginType)
   531  				continue
   532  			}
   533  
   534  			// if this plugin has already been enabled by the specific extension point,
   535  			// the user intent is to override the default plugin or make some other explicit setting.
   536  			// Either way, discard the MultiPoint value for this plugin.
   537  			// This maintains expected behavior for overriding default plugins (see https://github.com/kubernetes/kubernetes/pull/99582)
   538  			if enabledSet.has(ep.Name) {
   539  				overridePlugins.insert(ep.Name)
   540  				logger.Info("MultiPoint plugin is explicitly re-configured; overriding", "plugin", ep.Name)
   541  				continue
   542  			}
   543  
   544  			// if this plugin is already registered via MultiPoint, then this is
   545  			// a double registration and an error in the config.
   546  			if multiPointEnabled.has(ep.Name) {
   547  				return fmt.Errorf("plugin %q already registered as %q", ep.Name, pluginType.Name())
   548  			}
   549  
   550  			// we only need to update the multipoint set, since we already have the specific extension set from above
   551  			multiPointEnabled.insert(ep.Name)
   552  		}
   553  
   554  		// Reorder plugins. Here is the expected order:
   555  		// - part 1: overridePlugins. Their order stay intact as how they're specified in regular extension point.
   556  		// - part 2: multiPointEnabled - i.e., plugin defined in multipoint but not in regular extension point.
   557  		// - part 3: other plugins (excluded by part 1 & 2) in regular extension point.
   558  		newPlugins := reflect.New(reflect.TypeOf(e.slicePtr).Elem()).Elem()
   559  		// part 1
   560  		for _, name := range slice.CopyStrings(enabledSet.list) {
   561  			if overridePlugins.has(name) {
   562  				newPlugins = reflect.Append(newPlugins, reflect.ValueOf(f.pluginsMap[name]))
   563  				enabledSet.delete(name)
   564  			}
   565  		}
   566  		// part 2
   567  		for _, name := range multiPointEnabled.list {
   568  			newPlugins = reflect.Append(newPlugins, reflect.ValueOf(f.pluginsMap[name]))
   569  		}
   570  		// part 3
   571  		for _, name := range enabledSet.list {
   572  			newPlugins = reflect.Append(newPlugins, reflect.ValueOf(f.pluginsMap[name]))
   573  		}
   574  		plugins.Set(newPlugins)
   575  	}
   576  	return nil
   577  }
   578  
   579  func shouldHaveEnqueueExtensions(p framework.Plugin) bool {
   580  	switch p.(type) {
   581  	// Only PreEnqueue, PreFilter, Filter, Reserve, and Permit plugins can (should) have EnqueueExtensions.
   582  	// See the comment of EnqueueExtensions for more detailed reason here.
   583  	case framework.PreEnqueuePlugin, framework.PreFilterPlugin, framework.FilterPlugin, framework.ReservePlugin, framework.PermitPlugin:
   584  		return true
   585  	}
   586  	return false
   587  }
   588  
   589  func (f *frameworkImpl) fillEnqueueExtensions(p framework.Plugin) {
   590  	if !shouldHaveEnqueueExtensions(p) {
   591  		// Ignore EnqueueExtensions from plugin which isn't PreEnqueue, PreFilter, Filter, Reserve, and Permit.
   592  		return
   593  	}
   594  
   595  	ext, ok := p.(framework.EnqueueExtensions)
   596  	if !ok {
   597  		// If interface EnqueueExtensions is not implemented, register the default enqueue extensions
   598  		// to the plugin because we don't know which events the plugin is interested in.
   599  		// This is to ensure backward compatibility.
   600  		f.enqueueExtensions = append(f.enqueueExtensions, &defaultEnqueueExtension{pluginName: p.Name()})
   601  		return
   602  	}
   603  
   604  	f.enqueueExtensions = append(f.enqueueExtensions, ext)
   605  }
   606  
   607  // defaultEnqueueExtension is used when a plugin does not implement EnqueueExtensions interface.
   608  type defaultEnqueueExtension struct {
   609  	pluginName string
   610  }
   611  
   612  func (p *defaultEnqueueExtension) Name() string { return p.pluginName }
   613  func (p *defaultEnqueueExtension) EventsToRegister() []framework.ClusterEventWithHint {
   614  	// need to return all specific cluster events with framework.All action instead of wildcard event
   615  	// because the returning values are used to register event handlers.
   616  	// If we return the wildcard here, it won't affect the event handlers registered by the plugin
   617  	// and some events may not be registered in the event handlers.
   618  	return framework.UnrollWildCardResource()
   619  }
   620  
   621  func updatePluginList(pluginList interface{}, pluginSet config.PluginSet, pluginsMap map[string]framework.Plugin) error {
   622  	plugins := reflect.ValueOf(pluginList).Elem()
   623  	pluginType := plugins.Type().Elem()
   624  	set := sets.New[string]()
   625  	for _, ep := range pluginSet.Enabled {
   626  		pg, ok := pluginsMap[ep.Name]
   627  		if !ok {
   628  			return fmt.Errorf("%s %q does not exist", pluginType.Name(), ep.Name)
   629  		}
   630  
   631  		if !reflect.TypeOf(pg).Implements(pluginType) {
   632  			return fmt.Errorf("plugin %q does not extend %s plugin", ep.Name, pluginType.Name())
   633  		}
   634  
   635  		if set.Has(ep.Name) {
   636  			return fmt.Errorf("plugin %q already registered as %q", ep.Name, pluginType.Name())
   637  		}
   638  
   639  		set.Insert(ep.Name)
   640  
   641  		newPlugins := reflect.Append(plugins, reflect.ValueOf(pg))
   642  		plugins.Set(newPlugins)
   643  	}
   644  	return nil
   645  }
   646  
   647  // PreEnqueuePlugins returns the registered preEnqueue plugins.
   648  func (f *frameworkImpl) PreEnqueuePlugins() []framework.PreEnqueuePlugin {
   649  	return f.preEnqueuePlugins
   650  }
   651  
   652  // EnqueueExtensions returns the registered reenqueue plugins.
   653  func (f *frameworkImpl) EnqueueExtensions() []framework.EnqueueExtensions {
   654  	return f.enqueueExtensions
   655  }
   656  
   657  // QueueSortFunc returns the function to sort pods in scheduling queue
   658  func (f *frameworkImpl) QueueSortFunc() framework.LessFunc {
   659  	if f == nil {
   660  		// If frameworkImpl is nil, simply keep their order unchanged.
   661  		// NOTE: this is primarily for tests.
   662  		return func(_, _ *framework.QueuedPodInfo) bool { return false }
   663  	}
   664  
   665  	if len(f.queueSortPlugins) == 0 {
   666  		panic("No QueueSort plugin is registered in the frameworkImpl.")
   667  	}
   668  
   669  	// Only one QueueSort plugin can be enabled.
   670  	return f.queueSortPlugins[0].Less
   671  }
   672  
   673  // RunPreFilterPlugins runs the set of configured PreFilter plugins. It returns
   674  // *Status and its code is set to non-success if any of the plugins returns
   675  // anything but Success/Skip.
   676  // When it returns Skip status, returned PreFilterResult and other fields in status are just ignored,
   677  // and coupled Filter plugin/PreFilterExtensions() will be skipped in this scheduling cycle.
   678  // If a non-success status is returned, then the scheduling cycle is aborted.
   679  func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (_ *framework.PreFilterResult, status *framework.Status) {
   680  	startTime := time.Now()
   681  	skipPlugins := sets.New[string]()
   682  	defer func() {
   683  		state.SkipFilterPlugins = skipPlugins
   684  		metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.PreFilter, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
   685  	}()
   686  	var result *framework.PreFilterResult
   687  	var pluginsWithNodes []string
   688  	logger := klog.FromContext(ctx)
   689  	verboseLogs := logger.V(4).Enabled()
   690  	if verboseLogs {
   691  		logger = klog.LoggerWithName(logger, "PreFilter")
   692  	}
   693  	var returnStatus *framework.Status
   694  	for _, pl := range f.preFilterPlugins {
   695  		ctx := ctx
   696  		if verboseLogs {
   697  			logger := klog.LoggerWithName(logger, pl.Name())
   698  			ctx = klog.NewContext(ctx, logger)
   699  		}
   700  		r, s := f.runPreFilterPlugin(ctx, pl, state, pod)
   701  		if s.IsSkip() {
   702  			skipPlugins.Insert(pl.Name())
   703  			continue
   704  		}
   705  		if !s.IsSuccess() {
   706  			s.SetPlugin(pl.Name())
   707  			if s.Code() == framework.UnschedulableAndUnresolvable {
   708  				// In this case, the preemption shouldn't happen in this scheduling cycle.
   709  				// So, no need to execute all PreFilter.
   710  				return nil, s
   711  			}
   712  			if s.Code() == framework.Unschedulable {
   713  				// In this case, the preemption should happen later in this scheduling cycle.
   714  				// So we need to execute all PreFilter.
   715  				// https://github.com/kubernetes/kubernetes/issues/119770
   716  				returnStatus = s
   717  				continue
   718  			}
   719  			return nil, framework.AsStatus(fmt.Errorf("running PreFilter plugin %q: %w", pl.Name(), s.AsError())).WithPlugin(pl.Name())
   720  		}
   721  		if !r.AllNodes() {
   722  			pluginsWithNodes = append(pluginsWithNodes, pl.Name())
   723  		}
   724  		result = result.Merge(r)
   725  		if !result.AllNodes() && len(result.NodeNames) == 0 {
   726  			msg := fmt.Sprintf("node(s) didn't satisfy plugin(s) %v simultaneously", pluginsWithNodes)
   727  			if len(pluginsWithNodes) == 1 {
   728  				msg = fmt.Sprintf("node(s) didn't satisfy plugin %v", pluginsWithNodes[0])
   729  			}
   730  
   731  			// When PreFilterResult filters out Nodes, the framework considers Nodes that are filtered out as getting "UnschedulableAndUnresolvable".
   732  			return result, framework.NewStatus(framework.UnschedulableAndUnresolvable, msg)
   733  		}
   734  	}
   735  	return result, returnStatus
   736  }
   737  
   738  func (f *frameworkImpl) runPreFilterPlugin(ctx context.Context, pl framework.PreFilterPlugin, state *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
   739  	if !state.ShouldRecordPluginMetrics() {
   740  		return pl.PreFilter(ctx, state, pod)
   741  	}
   742  	startTime := time.Now()
   743  	result, status := pl.PreFilter(ctx, state, pod)
   744  	f.metricsRecorder.ObservePluginDurationAsync(metrics.PreFilter, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
   745  	return result, status
   746  }
   747  
   748  // RunPreFilterExtensionAddPod calls the AddPod interface for the set of configured
   749  // PreFilter plugins. It returns directly if any of the plugins return any
   750  // status other than Success.
   751  func (f *frameworkImpl) RunPreFilterExtensionAddPod(
   752  	ctx context.Context,
   753  	state *framework.CycleState,
   754  	podToSchedule *v1.Pod,
   755  	podInfoToAdd *framework.PodInfo,
   756  	nodeInfo *framework.NodeInfo,
   757  ) (status *framework.Status) {
   758  	logger := klog.FromContext(ctx)
   759  	verboseLogs := logger.V(4).Enabled()
   760  	if verboseLogs {
   761  		logger = klog.LoggerWithName(logger, "PreFilterExtension")
   762  	}
   763  	for _, pl := range f.preFilterPlugins {
   764  		if pl.PreFilterExtensions() == nil || state.SkipFilterPlugins.Has(pl.Name()) {
   765  			continue
   766  		}
   767  		ctx := ctx
   768  		if verboseLogs {
   769  			logger := klog.LoggerWithName(logger, pl.Name())
   770  			ctx = klog.NewContext(ctx, logger)
   771  		}
   772  		status = f.runPreFilterExtensionAddPod(ctx, pl, state, podToSchedule, podInfoToAdd, nodeInfo)
   773  		if !status.IsSuccess() {
   774  			err := status.AsError()
   775  			logger.Error(err, "Plugin failed", "pod", klog.KObj(podToSchedule), "node", klog.KObj(nodeInfo.Node()), "operation", "addPod", "plugin", pl.Name())
   776  			return framework.AsStatus(fmt.Errorf("running AddPod on PreFilter plugin %q: %w", pl.Name(), err))
   777  		}
   778  	}
   779  
   780  	return nil
   781  }
   782  
   783  func (f *frameworkImpl) runPreFilterExtensionAddPod(ctx context.Context, pl framework.PreFilterPlugin, state *framework.CycleState, podToSchedule *v1.Pod, podInfoToAdd *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status {
   784  	if !state.ShouldRecordPluginMetrics() {
   785  		return pl.PreFilterExtensions().AddPod(ctx, state, podToSchedule, podInfoToAdd, nodeInfo)
   786  	}
   787  	startTime := time.Now()
   788  	status := pl.PreFilterExtensions().AddPod(ctx, state, podToSchedule, podInfoToAdd, nodeInfo)
   789  	f.metricsRecorder.ObservePluginDurationAsync(metrics.PreFilterExtensionAddPod, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
   790  	return status
   791  }
   792  
   793  // RunPreFilterExtensionRemovePod calls the RemovePod interface for the set of configured
   794  // PreFilter plugins. It returns directly if any of the plugins return any
   795  // status other than Success.
   796  func (f *frameworkImpl) RunPreFilterExtensionRemovePod(
   797  	ctx context.Context,
   798  	state *framework.CycleState,
   799  	podToSchedule *v1.Pod,
   800  	podInfoToRemove *framework.PodInfo,
   801  	nodeInfo *framework.NodeInfo,
   802  ) (status *framework.Status) {
   803  	logger := klog.FromContext(ctx)
   804  	verboseLogs := logger.V(4).Enabled()
   805  	if verboseLogs {
   806  		logger = klog.LoggerWithName(logger, "PreFilterExtension")
   807  	}
   808  	for _, pl := range f.preFilterPlugins {
   809  		if pl.PreFilterExtensions() == nil || state.SkipFilterPlugins.Has(pl.Name()) {
   810  			continue
   811  		}
   812  		ctx := ctx
   813  		if verboseLogs {
   814  			logger := klog.LoggerWithName(logger, pl.Name())
   815  			ctx = klog.NewContext(ctx, logger)
   816  		}
   817  		status = f.runPreFilterExtensionRemovePod(ctx, pl, state, podToSchedule, podInfoToRemove, nodeInfo)
   818  		if !status.IsSuccess() {
   819  			err := status.AsError()
   820  			logger.Error(err, "Plugin failed", "node", klog.KObj(nodeInfo.Node()), "operation", "removePod", "plugin", pl.Name(), "pod", klog.KObj(podToSchedule))
   821  			return framework.AsStatus(fmt.Errorf("running RemovePod on PreFilter plugin %q: %w", pl.Name(), err))
   822  		}
   823  	}
   824  
   825  	return nil
   826  }
   827  
   828  func (f *frameworkImpl) runPreFilterExtensionRemovePod(ctx context.Context, pl framework.PreFilterPlugin, state *framework.CycleState, podToSchedule *v1.Pod, podInfoToRemove *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status {
   829  	if !state.ShouldRecordPluginMetrics() {
   830  		return pl.PreFilterExtensions().RemovePod(ctx, state, podToSchedule, podInfoToRemove, nodeInfo)
   831  	}
   832  	startTime := time.Now()
   833  	status := pl.PreFilterExtensions().RemovePod(ctx, state, podToSchedule, podInfoToRemove, nodeInfo)
   834  	f.metricsRecorder.ObservePluginDurationAsync(metrics.PreFilterExtensionRemovePod, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
   835  	return status
   836  }
   837  
   838  // RunFilterPlugins runs the set of configured Filter plugins for pod on
   839  // the given node. If any of these plugins doesn't return "Success", the
   840  // given node is not suitable for running pod.
   841  // Meanwhile, the failure message and status are set for the given node.
   842  func (f *frameworkImpl) RunFilterPlugins(
   843  	ctx context.Context,
   844  	state *framework.CycleState,
   845  	pod *v1.Pod,
   846  	nodeInfo *framework.NodeInfo,
   847  ) *framework.Status {
   848  	logger := klog.FromContext(ctx)
   849  	verboseLogs := logger.V(4).Enabled()
   850  	if verboseLogs {
   851  		logger = klog.LoggerWithName(logger, "Filter")
   852  	}
   853  
   854  	for _, pl := range f.filterPlugins {
   855  		if state.SkipFilterPlugins.Has(pl.Name()) {
   856  			continue
   857  		}
   858  		ctx := ctx
   859  		if verboseLogs {
   860  			logger := klog.LoggerWithName(logger, pl.Name())
   861  			ctx = klog.NewContext(ctx, logger)
   862  		}
   863  		if status := f.runFilterPlugin(ctx, pl, state, pod, nodeInfo); !status.IsSuccess() {
   864  			if !status.IsRejected() {
   865  				// Filter plugins are not supposed to return any status other than
   866  				// Success or Unschedulable.
   867  				status = framework.AsStatus(fmt.Errorf("running %q filter plugin: %w", pl.Name(), status.AsError()))
   868  			}
   869  			status.SetPlugin(pl.Name())
   870  			return status
   871  		}
   872  	}
   873  
   874  	return nil
   875  }
   876  
   877  func (f *frameworkImpl) runFilterPlugin(ctx context.Context, pl framework.FilterPlugin, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
   878  	if !state.ShouldRecordPluginMetrics() {
   879  		return pl.Filter(ctx, state, pod, nodeInfo)
   880  	}
   881  	startTime := time.Now()
   882  	status := pl.Filter(ctx, state, pod, nodeInfo)
   883  	f.metricsRecorder.ObservePluginDurationAsync(metrics.Filter, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
   884  	return status
   885  }
   886  
   887  // RunPostFilterPlugins runs the set of configured PostFilter plugins until the first
   888  // Success, Error or UnschedulableAndUnresolvable is met; otherwise continues to execute all plugins.
   889  func (f *frameworkImpl) RunPostFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, filteredNodeStatusMap framework.NodeToStatusMap) (_ *framework.PostFilterResult, status *framework.Status) {
   890  	startTime := time.Now()
   891  	defer func() {
   892  		metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.PostFilter, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
   893  	}()
   894  
   895  	logger := klog.FromContext(ctx)
   896  	verboseLogs := logger.V(4).Enabled()
   897  	if verboseLogs {
   898  		logger = klog.LoggerWithName(logger, "PostFilter")
   899  	}
   900  
   901  	// `result` records the last meaningful(non-noop) PostFilterResult.
   902  	var result *framework.PostFilterResult
   903  	var reasons []string
   904  	var rejectorPlugin string
   905  	for _, pl := range f.postFilterPlugins {
   906  		ctx := ctx
   907  		if verboseLogs {
   908  			logger := klog.LoggerWithName(logger, pl.Name())
   909  			ctx = klog.NewContext(ctx, logger)
   910  		}
   911  		r, s := f.runPostFilterPlugin(ctx, pl, state, pod, filteredNodeStatusMap)
   912  		if s.IsSuccess() {
   913  			return r, s
   914  		} else if s.Code() == framework.UnschedulableAndUnresolvable {
   915  			return r, s.WithPlugin(pl.Name())
   916  		} else if !s.IsRejected() {
   917  			// Any status other than Success, Unschedulable or UnschedulableAndUnresolvable is Error.
   918  			return nil, framework.AsStatus(s.AsError()).WithPlugin(pl.Name())
   919  		} else if r != nil && r.Mode() != framework.ModeNoop {
   920  			result = r
   921  		}
   922  
   923  		reasons = append(reasons, s.Reasons()...)
   924  		// Record the first failed plugin unless we proved that
   925  		// the latter is more relevant.
   926  		if len(rejectorPlugin) == 0 {
   927  			rejectorPlugin = pl.Name()
   928  		}
   929  	}
   930  
   931  	return result, framework.NewStatus(framework.Unschedulable, reasons...).WithPlugin(rejectorPlugin)
   932  }
   933  
   934  func (f *frameworkImpl) runPostFilterPlugin(ctx context.Context, pl framework.PostFilterPlugin, state *framework.CycleState, pod *v1.Pod, filteredNodeStatusMap framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {
   935  	if !state.ShouldRecordPluginMetrics() {
   936  		return pl.PostFilter(ctx, state, pod, filteredNodeStatusMap)
   937  	}
   938  	startTime := time.Now()
   939  	r, s := pl.PostFilter(ctx, state, pod, filteredNodeStatusMap)
   940  	f.metricsRecorder.ObservePluginDurationAsync(metrics.PostFilter, pl.Name(), s.Code().String(), metrics.SinceInSeconds(startTime))
   941  	return r, s
   942  }
   943  
   944  // RunFilterPluginsWithNominatedPods runs the set of configured filter plugins
   945  // for nominated pod on the given node.
   946  // This function is called from two different places: Schedule and Preempt.
   947  // When it is called from Schedule, we want to test whether the pod is
   948  // schedulable on the node with all the existing pods on the node plus higher
   949  // and equal priority pods nominated to run on the node.
   950  // When it is called from Preempt, we should remove the victims of preemption
   951  // and add the nominated pods. Removal of the victims is done by
   952  // SelectVictimsOnNode(). Preempt removes victims from PreFilter state and
   953  // NodeInfo before calling this function.
   954  func (f *frameworkImpl) RunFilterPluginsWithNominatedPods(ctx context.Context, state *framework.CycleState, pod *v1.Pod, info *framework.NodeInfo) *framework.Status {
   955  	var status *framework.Status
   956  
   957  	podsAdded := false
   958  	// We run filters twice in some cases. If the node has greater or equal priority
   959  	// nominated pods, we run them when those pods are added to PreFilter state and nodeInfo.
   960  	// If all filters succeed in this pass, we run them again when these
   961  	// nominated pods are not added. This second pass is necessary because some
   962  	// filters such as inter-pod affinity may not pass without the nominated pods.
   963  	// If there are no nominated pods for the node or if the first run of the
   964  	// filters fail, we don't run the second pass.
   965  	// We consider only equal or higher priority pods in the first pass, because
   966  	// those are the current "pod" must yield to them and not take a space opened
   967  	// for running them. It is ok if the current "pod" take resources freed for
   968  	// lower priority pods.
   969  	// Requiring that the new pod is schedulable in both circumstances ensures that
   970  	// we are making a conservative decision: filters like resources and inter-pod
   971  	// anti-affinity are more likely to fail when the nominated pods are treated
   972  	// as running, while filters like pod affinity are more likely to fail when
   973  	// the nominated pods are treated as not running. We can't just assume the
   974  	// nominated pods are running because they are not running right now and in fact,
   975  	// they may end up getting scheduled to a different node.
   976  	logger := klog.FromContext(ctx)
   977  	logger = klog.LoggerWithName(logger, "FilterWithNominatedPods")
   978  	ctx = klog.NewContext(ctx, logger)
   979  	for i := 0; i < 2; i++ {
   980  		stateToUse := state
   981  		nodeInfoToUse := info
   982  		if i == 0 {
   983  			var err error
   984  			podsAdded, stateToUse, nodeInfoToUse, err = addNominatedPods(ctx, f, pod, state, info)
   985  			if err != nil {
   986  				return framework.AsStatus(err)
   987  			}
   988  		} else if !podsAdded || !status.IsSuccess() {
   989  			break
   990  		}
   991  
   992  		status = f.RunFilterPlugins(ctx, stateToUse, pod, nodeInfoToUse)
   993  		if !status.IsSuccess() && !status.IsRejected() {
   994  			return status
   995  		}
   996  	}
   997  
   998  	return status
   999  }
  1000  
  1001  // addNominatedPods adds pods with equal or greater priority which are nominated
  1002  // to run on the node. It returns 1) whether any pod was added, 2) augmented cycleState,
  1003  // 3) augmented nodeInfo.
  1004  func addNominatedPods(ctx context.Context, fh framework.Handle, pod *v1.Pod, state *framework.CycleState, nodeInfo *framework.NodeInfo) (bool, *framework.CycleState, *framework.NodeInfo, error) {
  1005  	if fh == nil {
  1006  		// This may happen only in tests.
  1007  		return false, state, nodeInfo, nil
  1008  	}
  1009  	nominatedPodInfos := fh.NominatedPodsForNode(nodeInfo.Node().Name)
  1010  	if len(nominatedPodInfos) == 0 {
  1011  		return false, state, nodeInfo, nil
  1012  	}
  1013  	nodeInfoOut := nodeInfo.Snapshot()
  1014  	stateOut := state.Clone()
  1015  	podsAdded := false
  1016  	for _, pi := range nominatedPodInfos {
  1017  		if corev1.PodPriority(pi.Pod) >= corev1.PodPriority(pod) && pi.Pod.UID != pod.UID {
  1018  			nodeInfoOut.AddPodInfo(pi)
  1019  			status := fh.RunPreFilterExtensionAddPod(ctx, stateOut, pod, pi, nodeInfoOut)
  1020  			if !status.IsSuccess() {
  1021  				return false, state, nodeInfo, status.AsError()
  1022  			}
  1023  			podsAdded = true
  1024  		}
  1025  	}
  1026  	return podsAdded, stateOut, nodeInfoOut, nil
  1027  }
  1028  
  1029  // RunPreScorePlugins runs the set of configured pre-score plugins. If any
  1030  // of these plugins returns any status other than Success/Skip, the given pod is rejected.
  1031  // When it returns Skip status, other fields in status are just ignored,
  1032  // and coupled Score plugin will be skipped in this scheduling cycle.
  1033  func (f *frameworkImpl) RunPreScorePlugins(
  1034  	ctx context.Context,
  1035  	state *framework.CycleState,
  1036  	pod *v1.Pod,
  1037  	nodes []*framework.NodeInfo,
  1038  ) (status *framework.Status) {
  1039  	startTime := time.Now()
  1040  	skipPlugins := sets.New[string]()
  1041  	defer func() {
  1042  		state.SkipScorePlugins = skipPlugins
  1043  		metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.PreScore, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
  1044  	}()
  1045  	logger := klog.FromContext(ctx)
  1046  	verboseLogs := logger.V(4).Enabled()
  1047  	if verboseLogs {
  1048  		logger = klog.LoggerWithName(logger, "PreScore")
  1049  	}
  1050  	for _, pl := range f.preScorePlugins {
  1051  		ctx := ctx
  1052  		if verboseLogs {
  1053  			logger := klog.LoggerWithName(logger, pl.Name())
  1054  			ctx = klog.NewContext(ctx, logger)
  1055  		}
  1056  		status = f.runPreScorePlugin(ctx, pl, state, pod, nodes)
  1057  		if status.IsSkip() {
  1058  			skipPlugins.Insert(pl.Name())
  1059  			continue
  1060  		}
  1061  		if !status.IsSuccess() {
  1062  			return framework.AsStatus(fmt.Errorf("running PreScore plugin %q: %w", pl.Name(), status.AsError()))
  1063  		}
  1064  	}
  1065  	return nil
  1066  }
  1067  
  1068  func (f *frameworkImpl) runPreScorePlugin(ctx context.Context, pl framework.PreScorePlugin, state *framework.CycleState, pod *v1.Pod, nodes []*framework.NodeInfo) *framework.Status {
  1069  	if !state.ShouldRecordPluginMetrics() {
  1070  		return pl.PreScore(ctx, state, pod, nodes)
  1071  	}
  1072  	startTime := time.Now()
  1073  	status := pl.PreScore(ctx, state, pod, nodes)
  1074  	f.metricsRecorder.ObservePluginDurationAsync(metrics.PreScore, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
  1075  	return status
  1076  }
  1077  
  1078  // RunScorePlugins runs the set of configured scoring plugins.
  1079  // It returns a list that stores scores from each plugin and total score for each Node.
  1080  // It also returns *Status, which is set to non-success if any of the plugins returns
  1081  // a non-success status.
  1082  func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodes []*framework.NodeInfo) (ns []framework.NodePluginScores, status *framework.Status) {
  1083  	startTime := time.Now()
  1084  	defer func() {
  1085  		metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.Score, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
  1086  	}()
  1087  	allNodePluginScores := make([]framework.NodePluginScores, len(nodes))
  1088  	numPlugins := len(f.scorePlugins)
  1089  	plugins := make([]framework.ScorePlugin, 0, numPlugins)
  1090  	pluginToNodeScores := make(map[string]framework.NodeScoreList, numPlugins)
  1091  	for _, pl := range f.scorePlugins {
  1092  		if state.SkipScorePlugins.Has(pl.Name()) {
  1093  			continue
  1094  		}
  1095  		plugins = append(plugins, pl)
  1096  		pluginToNodeScores[pl.Name()] = make(framework.NodeScoreList, len(nodes))
  1097  	}
  1098  	ctx, cancel := context.WithCancel(ctx)
  1099  	defer cancel()
  1100  	errCh := parallelize.NewErrorChannel()
  1101  
  1102  	if len(plugins) > 0 {
  1103  		logger := klog.FromContext(ctx)
  1104  		verboseLogs := logger.V(4).Enabled()
  1105  		if verboseLogs {
  1106  			logger = klog.LoggerWithName(logger, "Score")
  1107  		}
  1108  		// Run Score method for each node in parallel.
  1109  		f.Parallelizer().Until(ctx, len(nodes), func(index int) {
  1110  			nodeName := nodes[index].Node().Name
  1111  			logger := logger
  1112  			if verboseLogs {
  1113  				logger = klog.LoggerWithValues(logger, "node", klog.ObjectRef{Name: nodeName})
  1114  			}
  1115  			for _, pl := range plugins {
  1116  				ctx := ctx
  1117  				if verboseLogs {
  1118  					logger := klog.LoggerWithName(logger, pl.Name())
  1119  					ctx = klog.NewContext(ctx, logger)
  1120  				}
  1121  				s, status := f.runScorePlugin(ctx, pl, state, pod, nodeName)
  1122  				if !status.IsSuccess() {
  1123  					err := fmt.Errorf("plugin %q failed with: %w", pl.Name(), status.AsError())
  1124  					errCh.SendErrorWithCancel(err, cancel)
  1125  					return
  1126  				}
  1127  				pluginToNodeScores[pl.Name()][index] = framework.NodeScore{
  1128  					Name:  nodeName,
  1129  					Score: s,
  1130  				}
  1131  			}
  1132  		}, metrics.Score)
  1133  		if err := errCh.ReceiveError(); err != nil {
  1134  			return nil, framework.AsStatus(fmt.Errorf("running Score plugins: %w", err))
  1135  		}
  1136  	}
  1137  
  1138  	// Run NormalizeScore method for each ScorePlugin in parallel.
  1139  	f.Parallelizer().Until(ctx, len(plugins), func(index int) {
  1140  		pl := plugins[index]
  1141  		if pl.ScoreExtensions() == nil {
  1142  			return
  1143  		}
  1144  		nodeScoreList := pluginToNodeScores[pl.Name()]
  1145  		status := f.runScoreExtension(ctx, pl, state, pod, nodeScoreList)
  1146  		if !status.IsSuccess() {
  1147  			err := fmt.Errorf("plugin %q failed with: %w", pl.Name(), status.AsError())
  1148  			errCh.SendErrorWithCancel(err, cancel)
  1149  			return
  1150  		}
  1151  	}, metrics.Score)
  1152  	if err := errCh.ReceiveError(); err != nil {
  1153  		return nil, framework.AsStatus(fmt.Errorf("running Normalize on Score plugins: %w", err))
  1154  	}
  1155  
  1156  	// Apply score weight for each ScorePlugin in parallel,
  1157  	// and then, build allNodePluginScores.
  1158  	f.Parallelizer().Until(ctx, len(nodes), func(index int) {
  1159  		nodePluginScores := framework.NodePluginScores{
  1160  			Name:   nodes[index].Node().Name,
  1161  			Scores: make([]framework.PluginScore, len(plugins)),
  1162  		}
  1163  
  1164  		for i, pl := range plugins {
  1165  			weight := f.scorePluginWeight[pl.Name()]
  1166  			nodeScoreList := pluginToNodeScores[pl.Name()]
  1167  			score := nodeScoreList[index].Score
  1168  
  1169  			if score > framework.MaxNodeScore || score < framework.MinNodeScore {
  1170  				err := fmt.Errorf("plugin %q returns an invalid score %v, it should in the range of [%v, %v] after normalizing", pl.Name(), score, framework.MinNodeScore, framework.MaxNodeScore)
  1171  				errCh.SendErrorWithCancel(err, cancel)
  1172  				return
  1173  			}
  1174  			weightedScore := score * int64(weight)
  1175  			nodePluginScores.Scores[i] = framework.PluginScore{
  1176  				Name:  pl.Name(),
  1177  				Score: weightedScore,
  1178  			}
  1179  			nodePluginScores.TotalScore += weightedScore
  1180  		}
  1181  		allNodePluginScores[index] = nodePluginScores
  1182  	}, metrics.Score)
  1183  	if err := errCh.ReceiveError(); err != nil {
  1184  		return nil, framework.AsStatus(fmt.Errorf("applying score defaultWeights on Score plugins: %w", err))
  1185  	}
  1186  
  1187  	return allNodePluginScores, nil
  1188  }
  1189  
  1190  func (f *frameworkImpl) runScorePlugin(ctx context.Context, pl framework.ScorePlugin, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
  1191  	if !state.ShouldRecordPluginMetrics() {
  1192  		return pl.Score(ctx, state, pod, nodeName)
  1193  	}
  1194  	startTime := time.Now()
  1195  	s, status := pl.Score(ctx, state, pod, nodeName)
  1196  	f.metricsRecorder.ObservePluginDurationAsync(metrics.Score, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
  1197  	return s, status
  1198  }
  1199  
  1200  func (f *frameworkImpl) runScoreExtension(ctx context.Context, pl framework.ScorePlugin, state *framework.CycleState, pod *v1.Pod, nodeScoreList framework.NodeScoreList) *framework.Status {
  1201  	if !state.ShouldRecordPluginMetrics() {
  1202  		return pl.ScoreExtensions().NormalizeScore(ctx, state, pod, nodeScoreList)
  1203  	}
  1204  	startTime := time.Now()
  1205  	status := pl.ScoreExtensions().NormalizeScore(ctx, state, pod, nodeScoreList)
  1206  	f.metricsRecorder.ObservePluginDurationAsync(metrics.ScoreExtensionNormalize, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
  1207  	return status
  1208  }
  1209  
  1210  // RunPreBindPlugins runs the set of configured prebind plugins. It returns a
  1211  // failure (bool) if any of the plugins returns an error. It also returns an
  1212  // error containing the rejection message or the error occurred in the plugin.
  1213  func (f *frameworkImpl) RunPreBindPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (status *framework.Status) {
  1214  	startTime := time.Now()
  1215  	defer func() {
  1216  		metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.PreBind, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
  1217  	}()
  1218  	logger := klog.FromContext(ctx)
  1219  	verboseLogs := logger.V(4).Enabled()
  1220  	if verboseLogs {
  1221  		logger = klog.LoggerWithName(logger, "PreBind")
  1222  		logger = klog.LoggerWithValues(logger, "node", klog.ObjectRef{Name: nodeName})
  1223  	}
  1224  	for _, pl := range f.preBindPlugins {
  1225  		ctx := ctx
  1226  		if verboseLogs {
  1227  			logger := klog.LoggerWithName(logger, pl.Name())
  1228  			ctx = klog.NewContext(ctx, logger)
  1229  		}
  1230  		status = f.runPreBindPlugin(ctx, pl, state, pod, nodeName)
  1231  		if !status.IsSuccess() {
  1232  			if status.IsRejected() {
  1233  				logger.V(4).Info("Pod rejected by PreBind plugin", "pod", klog.KObj(pod), "node", nodeName, "plugin", pl.Name(), "status", status.Message())
  1234  				status.SetPlugin(pl.Name())
  1235  				return status
  1236  			}
  1237  			err := status.AsError()
  1238  			logger.Error(err, "Plugin failed", "plugin", pl.Name(), "pod", klog.KObj(pod), "node", nodeName)
  1239  			return framework.AsStatus(fmt.Errorf("running PreBind plugin %q: %w", pl.Name(), err))
  1240  		}
  1241  	}
  1242  	return nil
  1243  }
  1244  
  1245  func (f *frameworkImpl) runPreBindPlugin(ctx context.Context, pl framework.PreBindPlugin, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
  1246  	if !state.ShouldRecordPluginMetrics() {
  1247  		return pl.PreBind(ctx, state, pod, nodeName)
  1248  	}
  1249  	startTime := time.Now()
  1250  	status := pl.PreBind(ctx, state, pod, nodeName)
  1251  	f.metricsRecorder.ObservePluginDurationAsync(metrics.PreBind, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
  1252  	return status
  1253  }
  1254  
  1255  // RunBindPlugins runs the set of configured bind plugins until one returns a non `Skip` status.
  1256  func (f *frameworkImpl) RunBindPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (status *framework.Status) {
  1257  	startTime := time.Now()
  1258  	defer func() {
  1259  		metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.Bind, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
  1260  	}()
  1261  	if len(f.bindPlugins) == 0 {
  1262  		return framework.NewStatus(framework.Skip, "")
  1263  	}
  1264  	logger := klog.FromContext(ctx)
  1265  	verboseLogs := logger.V(4).Enabled()
  1266  	if verboseLogs {
  1267  		logger = klog.LoggerWithName(logger, "Bind")
  1268  	}
  1269  	for _, pl := range f.bindPlugins {
  1270  		ctx := ctx
  1271  		if verboseLogs {
  1272  			logger := klog.LoggerWithName(logger, pl.Name())
  1273  			ctx = klog.NewContext(ctx, logger)
  1274  		}
  1275  		status = f.runBindPlugin(ctx, pl, state, pod, nodeName)
  1276  		if status.IsSkip() {
  1277  			continue
  1278  		}
  1279  		if !status.IsSuccess() {
  1280  			if status.IsRejected() {
  1281  				logger.V(4).Info("Pod rejected by Bind plugin", "pod", klog.KObj(pod), "node", nodeName, "plugin", pl.Name(), "status", status.Message())
  1282  				status.SetPlugin(pl.Name())
  1283  				return status
  1284  			}
  1285  			err := status.AsError()
  1286  			logger.Error(err, "Plugin Failed", "plugin", pl.Name(), "pod", klog.KObj(pod), "node", nodeName)
  1287  			return framework.AsStatus(fmt.Errorf("running Bind plugin %q: %w", pl.Name(), err))
  1288  		}
  1289  		return status
  1290  	}
  1291  	return status
  1292  }
  1293  
  1294  func (f *frameworkImpl) runBindPlugin(ctx context.Context, bp framework.BindPlugin, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
  1295  	if !state.ShouldRecordPluginMetrics() {
  1296  		return bp.Bind(ctx, state, pod, nodeName)
  1297  	}
  1298  	startTime := time.Now()
  1299  	status := bp.Bind(ctx, state, pod, nodeName)
  1300  	f.metricsRecorder.ObservePluginDurationAsync(metrics.Bind, bp.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
  1301  	return status
  1302  }
  1303  
  1304  // RunPostBindPlugins runs the set of configured postbind plugins.
  1305  func (f *frameworkImpl) RunPostBindPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) {
  1306  	startTime := time.Now()
  1307  	defer func() {
  1308  		metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.PostBind, framework.Success.String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
  1309  	}()
  1310  	logger := klog.FromContext(ctx)
  1311  	verboseLogs := logger.V(4).Enabled()
  1312  	if verboseLogs {
  1313  		logger = klog.LoggerWithName(logger, "PostBind")
  1314  	}
  1315  	for _, pl := range f.postBindPlugins {
  1316  		ctx := ctx
  1317  		if verboseLogs {
  1318  			logger := klog.LoggerWithName(logger, pl.Name())
  1319  			ctx = klog.NewContext(ctx, logger)
  1320  		}
  1321  		f.runPostBindPlugin(ctx, pl, state, pod, nodeName)
  1322  	}
  1323  }
  1324  
  1325  func (f *frameworkImpl) runPostBindPlugin(ctx context.Context, pl framework.PostBindPlugin, state *framework.CycleState, pod *v1.Pod, nodeName string) {
  1326  	if !state.ShouldRecordPluginMetrics() {
  1327  		pl.PostBind(ctx, state, pod, nodeName)
  1328  		return
  1329  	}
  1330  	startTime := time.Now()
  1331  	pl.PostBind(ctx, state, pod, nodeName)
  1332  	f.metricsRecorder.ObservePluginDurationAsync(metrics.PostBind, pl.Name(), framework.Success.String(), metrics.SinceInSeconds(startTime))
  1333  }
  1334  
  1335  // RunReservePluginsReserve runs the Reserve method in the set of configured
  1336  // reserve plugins. If any of these plugins returns an error, it does not
  1337  // continue running the remaining ones and returns the error. In such a case,
  1338  // the pod will not be scheduled and the caller will be expected to call
  1339  // RunReservePluginsUnreserve.
  1340  func (f *frameworkImpl) RunReservePluginsReserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (status *framework.Status) {
  1341  	startTime := time.Now()
  1342  	defer func() {
  1343  		metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.Reserve, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
  1344  	}()
  1345  	logger := klog.FromContext(ctx)
  1346  	verboseLogs := logger.V(4).Enabled()
  1347  	if verboseLogs {
  1348  		logger = klog.LoggerWithName(logger, "Reserve")
  1349  		logger = klog.LoggerWithValues(logger, "node", klog.ObjectRef{Name: nodeName})
  1350  	}
  1351  	for _, pl := range f.reservePlugins {
  1352  		ctx := ctx
  1353  		if verboseLogs {
  1354  			logger := klog.LoggerWithName(logger, pl.Name())
  1355  			ctx = klog.NewContext(ctx, logger)
  1356  		}
  1357  		status = f.runReservePluginReserve(ctx, pl, state, pod, nodeName)
  1358  		if !status.IsSuccess() {
  1359  			if status.IsRejected() {
  1360  				logger.V(4).Info("Pod rejected by plugin", "pod", klog.KObj(pod), "plugin", pl.Name(), "status", status.Message())
  1361  				status.SetPlugin(pl.Name())
  1362  				return status
  1363  			}
  1364  			err := status.AsError()
  1365  			logger.Error(err, "Plugin failed", "plugin", pl.Name(), "pod", klog.KObj(pod))
  1366  			return framework.AsStatus(fmt.Errorf("running Reserve plugin %q: %w", pl.Name(), err))
  1367  		}
  1368  	}
  1369  	return nil
  1370  }
  1371  
  1372  func (f *frameworkImpl) runReservePluginReserve(ctx context.Context, pl framework.ReservePlugin, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
  1373  	if !state.ShouldRecordPluginMetrics() {
  1374  		return pl.Reserve(ctx, state, pod, nodeName)
  1375  	}
  1376  	startTime := time.Now()
  1377  	status := pl.Reserve(ctx, state, pod, nodeName)
  1378  	f.metricsRecorder.ObservePluginDurationAsync(metrics.Reserve, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
  1379  	return status
  1380  }
  1381  
  1382  // RunReservePluginsUnreserve runs the Unreserve method in the set of
  1383  // configured reserve plugins.
  1384  func (f *frameworkImpl) RunReservePluginsUnreserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) {
  1385  	startTime := time.Now()
  1386  	defer func() {
  1387  		metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.Unreserve, framework.Success.String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
  1388  	}()
  1389  	// Execute the Unreserve operation of each reserve plugin in the
  1390  	// *reverse* order in which the Reserve operation was executed.
  1391  	logger := klog.FromContext(ctx)
  1392  	verboseLogs := logger.V(4).Enabled()
  1393  	if verboseLogs {
  1394  		logger = klog.LoggerWithName(logger, "Unreserve")
  1395  		logger = klog.LoggerWithValues(logger, "node", klog.ObjectRef{Name: nodeName})
  1396  	}
  1397  	for i := len(f.reservePlugins) - 1; i >= 0; i-- {
  1398  		pl := f.reservePlugins[i]
  1399  		ctx := ctx
  1400  		if verboseLogs {
  1401  			logger := klog.LoggerWithName(logger, pl.Name())
  1402  			ctx = klog.NewContext(ctx, logger)
  1403  		}
  1404  		f.runReservePluginUnreserve(ctx, pl, state, pod, nodeName)
  1405  	}
  1406  }
  1407  
  1408  func (f *frameworkImpl) runReservePluginUnreserve(ctx context.Context, pl framework.ReservePlugin, state *framework.CycleState, pod *v1.Pod, nodeName string) {
  1409  	if !state.ShouldRecordPluginMetrics() {
  1410  		pl.Unreserve(ctx, state, pod, nodeName)
  1411  		return
  1412  	}
  1413  	startTime := time.Now()
  1414  	pl.Unreserve(ctx, state, pod, nodeName)
  1415  	f.metricsRecorder.ObservePluginDurationAsync(metrics.Unreserve, pl.Name(), framework.Success.String(), metrics.SinceInSeconds(startTime))
  1416  }
  1417  
  1418  // RunPermitPlugins runs the set of configured permit plugins. If any of these
  1419  // plugins returns a status other than "Success" or "Wait", it does not continue
  1420  // running the remaining plugins and returns an error. Otherwise, if any of the
  1421  // plugins returns "Wait", then this function will create and add waiting pod
  1422  // to a map of currently waiting pods and return status with "Wait" code.
  1423  // Pod will remain waiting pod for the minimum duration returned by the permit plugins.
  1424  func (f *frameworkImpl) RunPermitPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (status *framework.Status) {
  1425  	startTime := time.Now()
  1426  	defer func() {
  1427  		metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.Permit, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
  1428  	}()
  1429  	pluginsWaitTime := make(map[string]time.Duration)
  1430  	statusCode := framework.Success
  1431  	logger := klog.FromContext(ctx)
  1432  	verboseLogs := logger.V(4).Enabled()
  1433  	if verboseLogs {
  1434  		logger = klog.LoggerWithName(logger, "Permit")
  1435  		logger = klog.LoggerWithValues(logger, "node", klog.ObjectRef{Name: nodeName})
  1436  	}
  1437  	for _, pl := range f.permitPlugins {
  1438  		ctx := ctx
  1439  		if verboseLogs {
  1440  			logger := klog.LoggerWithName(logger, pl.Name())
  1441  			ctx = klog.NewContext(ctx, logger)
  1442  		}
  1443  		status, timeout := f.runPermitPlugin(ctx, pl, state, pod, nodeName)
  1444  		if !status.IsSuccess() {
  1445  			if status.IsRejected() {
  1446  				logger.V(4).Info("Pod rejected by plugin", "pod", klog.KObj(pod), "plugin", pl.Name(), "status", status.Message())
  1447  				return status.WithPlugin(pl.Name())
  1448  			}
  1449  			if status.IsWait() {
  1450  				// Not allowed to be greater than maxTimeout.
  1451  				if timeout > maxTimeout {
  1452  					timeout = maxTimeout
  1453  				}
  1454  				pluginsWaitTime[pl.Name()] = timeout
  1455  				statusCode = framework.Wait
  1456  			} else {
  1457  				err := status.AsError()
  1458  				logger.Error(err, "Plugin failed", "plugin", pl.Name(), "pod", klog.KObj(pod))
  1459  				return framework.AsStatus(fmt.Errorf("running Permit plugin %q: %w", pl.Name(), err)).WithPlugin(pl.Name())
  1460  			}
  1461  		}
  1462  	}
  1463  	if statusCode == framework.Wait {
  1464  		waitingPod := newWaitingPod(pod, pluginsWaitTime)
  1465  		f.waitingPods.add(waitingPod)
  1466  		msg := fmt.Sprintf("one or more plugins asked to wait and no plugin rejected pod %q", pod.Name)
  1467  		logger.V(4).Info("One or more plugins asked to wait and no plugin rejected pod", "pod", klog.KObj(pod))
  1468  		return framework.NewStatus(framework.Wait, msg)
  1469  	}
  1470  	return nil
  1471  }
  1472  
  1473  func (f *frameworkImpl) runPermitPlugin(ctx context.Context, pl framework.PermitPlugin, state *framework.CycleState, pod *v1.Pod, nodeName string) (*framework.Status, time.Duration) {
  1474  	if !state.ShouldRecordPluginMetrics() {
  1475  		return pl.Permit(ctx, state, pod, nodeName)
  1476  	}
  1477  	startTime := time.Now()
  1478  	status, timeout := pl.Permit(ctx, state, pod, nodeName)
  1479  	f.metricsRecorder.ObservePluginDurationAsync(metrics.Permit, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
  1480  	return status, timeout
  1481  }
  1482  
  1483  // WaitOnPermit will block, if the pod is a waiting pod, until the waiting pod is rejected or allowed.
  1484  func (f *frameworkImpl) WaitOnPermit(ctx context.Context, pod *v1.Pod) *framework.Status {
  1485  	waitingPod := f.waitingPods.get(pod.UID)
  1486  	if waitingPod == nil {
  1487  		return nil
  1488  	}
  1489  	defer f.waitingPods.remove(pod.UID)
  1490  
  1491  	logger := klog.FromContext(ctx)
  1492  	logger.V(4).Info("Pod waiting on permit", "pod", klog.KObj(pod))
  1493  
  1494  	startTime := time.Now()
  1495  	s := <-waitingPod.s
  1496  	metrics.PermitWaitDuration.WithLabelValues(s.Code().String()).Observe(metrics.SinceInSeconds(startTime))
  1497  
  1498  	if !s.IsSuccess() {
  1499  		if s.IsRejected() {
  1500  			logger.V(4).Info("Pod rejected while waiting on permit", "pod", klog.KObj(pod), "status", s.Message())
  1501  			return s
  1502  		}
  1503  		err := s.AsError()
  1504  		logger.Error(err, "Failed waiting on permit for pod", "pod", klog.KObj(pod))
  1505  		return framework.AsStatus(fmt.Errorf("waiting on permit for pod: %w", err)).WithPlugin(s.Plugin())
  1506  	}
  1507  	return nil
  1508  }
  1509  
  1510  // SnapshotSharedLister returns the scheduler's SharedLister of the latest NodeInfo
  1511  // snapshot. The snapshot is taken at the beginning of a scheduling cycle and remains
  1512  // unchanged until a pod finishes "Reserve". There is no guarantee that the information
  1513  // remains unchanged after "Reserve".
  1514  func (f *frameworkImpl) SnapshotSharedLister() framework.SharedLister {
  1515  	return f.snapshotSharedLister
  1516  }
  1517  
  1518  // IterateOverWaitingPods acquires a read lock and iterates over the WaitingPods map.
  1519  func (f *frameworkImpl) IterateOverWaitingPods(callback func(framework.WaitingPod)) {
  1520  	f.waitingPods.iterate(callback)
  1521  }
  1522  
  1523  // GetWaitingPod returns a reference to a WaitingPod given its UID.
  1524  func (f *frameworkImpl) GetWaitingPod(uid types.UID) framework.WaitingPod {
  1525  	if wp := f.waitingPods.get(uid); wp != nil {
  1526  		return wp
  1527  	}
  1528  	return nil // Returning nil instead of *waitingPod(nil).
  1529  }
  1530  
  1531  // RejectWaitingPod rejects a WaitingPod given its UID.
  1532  // The returned value indicates if the given pod is waiting or not.
  1533  func (f *frameworkImpl) RejectWaitingPod(uid types.UID) bool {
  1534  	if waitingPod := f.waitingPods.get(uid); waitingPod != nil {
  1535  		waitingPod.Reject("", "removed")
  1536  		return true
  1537  	}
  1538  	return false
  1539  }
  1540  
  1541  // HasFilterPlugins returns true if at least one filter plugin is defined.
  1542  func (f *frameworkImpl) HasFilterPlugins() bool {
  1543  	return len(f.filterPlugins) > 0
  1544  }
  1545  
  1546  // HasPostFilterPlugins returns true if at least one postFilter plugin is defined.
  1547  func (f *frameworkImpl) HasPostFilterPlugins() bool {
  1548  	return len(f.postFilterPlugins) > 0
  1549  }
  1550  
  1551  // HasScorePlugins returns true if at least one score plugin is defined.
  1552  func (f *frameworkImpl) HasScorePlugins() bool {
  1553  	return len(f.scorePlugins) > 0
  1554  }
  1555  
  1556  // ListPlugins returns a map of extension point name to plugin names configured at each extension
  1557  // point. Returns nil if no plugins where configured.
  1558  func (f *frameworkImpl) ListPlugins() *config.Plugins {
  1559  	m := config.Plugins{}
  1560  
  1561  	for _, e := range f.getExtensionPoints(&m) {
  1562  		plugins := reflect.ValueOf(e.slicePtr).Elem()
  1563  		extName := plugins.Type().Elem().Name()
  1564  		var cfgs []config.Plugin
  1565  		for i := 0; i < plugins.Len(); i++ {
  1566  			name := plugins.Index(i).Interface().(framework.Plugin).Name()
  1567  			p := config.Plugin{Name: name}
  1568  			if extName == "ScorePlugin" {
  1569  				// Weights apply only to score plugins.
  1570  				p.Weight = int32(f.scorePluginWeight[name])
  1571  			}
  1572  			cfgs = append(cfgs, p)
  1573  		}
  1574  		if len(cfgs) > 0 {
  1575  			e.plugins.Enabled = cfgs
  1576  		}
  1577  	}
  1578  	return &m
  1579  }
  1580  
  1581  // ClientSet returns a kubernetes clientset.
  1582  func (f *frameworkImpl) ClientSet() clientset.Interface {
  1583  	return f.clientSet
  1584  }
  1585  
  1586  // KubeConfig returns a kubernetes config.
  1587  func (f *frameworkImpl) KubeConfig() *restclient.Config {
  1588  	return f.kubeConfig
  1589  }
  1590  
  1591  // EventRecorder returns an event recorder.
  1592  func (f *frameworkImpl) EventRecorder() events.EventRecorder {
  1593  	return f.eventRecorder
  1594  }
  1595  
  1596  // SharedInformerFactory returns a shared informer factory.
  1597  func (f *frameworkImpl) SharedInformerFactory() informers.SharedInformerFactory {
  1598  	return f.informerFactory
  1599  }
  1600  
  1601  func (f *frameworkImpl) pluginsNeeded(plugins *config.Plugins) sets.Set[string] {
  1602  	pgSet := sets.Set[string]{}
  1603  
  1604  	if plugins == nil {
  1605  		return pgSet
  1606  	}
  1607  
  1608  	find := func(pgs *config.PluginSet) {
  1609  		for _, pg := range pgs.Enabled {
  1610  			pgSet.Insert(pg.Name)
  1611  		}
  1612  	}
  1613  
  1614  	for _, e := range f.getExtensionPoints(plugins) {
  1615  		find(e.plugins)
  1616  	}
  1617  	// Parse MultiPoint separately since they are not returned by f.getExtensionPoints()
  1618  	find(&plugins.MultiPoint)
  1619  
  1620  	return pgSet
  1621  }
  1622  
  1623  // ProfileName returns the profile name associated to this framework.
  1624  func (f *frameworkImpl) ProfileName() string {
  1625  	return f.profileName
  1626  }
  1627  
  1628  // PercentageOfNodesToScore returns percentageOfNodesToScore associated to a profile.
  1629  func (f *frameworkImpl) PercentageOfNodesToScore() *int32 {
  1630  	return f.percentageOfNodesToScore
  1631  }
  1632  
  1633  // Parallelizer returns a parallelizer holding parallelism for scheduler.
  1634  func (f *frameworkImpl) Parallelizer() parallelize.Parallelizer {
  1635  	return f.parallelizer
  1636  }
  1637  

View as plain text