...

Source file src/k8s.io/kubernetes/pkg/scheduler/scheduler.go

Documentation: k8s.io/kubernetes/pkg/scheduler

     1  /*
     2  Copyright 2014 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package scheduler
    18  
    19  import (
    20  	"context"
    21  	"errors"
    22  	"fmt"
    23  	"time"
    24  
    25  	v1 "k8s.io/api/core/v1"
    26  	"k8s.io/apimachinery/pkg/api/meta"
    27  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    28  	"k8s.io/apimachinery/pkg/util/wait"
    29  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    30  	"k8s.io/client-go/dynamic/dynamicinformer"
    31  	"k8s.io/client-go/informers"
    32  	coreinformers "k8s.io/client-go/informers/core/v1"
    33  	clientset "k8s.io/client-go/kubernetes"
    34  	restclient "k8s.io/client-go/rest"
    35  	"k8s.io/client-go/tools/cache"
    36  	"k8s.io/klog/v2"
    37  	configv1 "k8s.io/kube-scheduler/config/v1"
    38  	"k8s.io/kubernetes/pkg/features"
    39  	schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
    40  	"k8s.io/kubernetes/pkg/scheduler/apis/config/scheme"
    41  	"k8s.io/kubernetes/pkg/scheduler/framework"
    42  	"k8s.io/kubernetes/pkg/scheduler/framework/parallelize"
    43  	frameworkplugins "k8s.io/kubernetes/pkg/scheduler/framework/plugins"
    44  	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
    45  	frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
    46  	internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
    47  	cachedebugger "k8s.io/kubernetes/pkg/scheduler/internal/cache/debugger"
    48  	internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
    49  	"k8s.io/kubernetes/pkg/scheduler/metrics"
    50  	"k8s.io/kubernetes/pkg/scheduler/profile"
    51  )
    52  
    53  const (
    54  	// Duration the scheduler will wait before expiring an assumed pod.
    55  	// See issue #106361 for more details about this parameter and its value.
    56  	durationToExpireAssumedPod time.Duration = 0
    57  )
    58  
    59  // ErrNoNodesAvailable is used to describe the error that no nodes available to schedule pods.
    60  var ErrNoNodesAvailable = fmt.Errorf("no nodes available to schedule pods")
    61  
    62  // Scheduler watches for new unscheduled pods. It attempts to find
    63  // nodes that they fit on and writes bindings back to the api server.
    64  type Scheduler struct {
    65  	// It is expected that changes made via Cache will be observed
    66  	// by NodeLister and Algorithm.
    67  	Cache internalcache.Cache
    68  
    69  	Extenders []framework.Extender
    70  
    71  	// NextPod should be a function that blocks until the next pod
    72  	// is available. We don't use a channel for this, because scheduling
    73  	// a pod may take some amount of time and we don't want pods to get
    74  	// stale while they sit in a channel.
    75  	NextPod func(logger klog.Logger) (*framework.QueuedPodInfo, error)
    76  
    77  	// FailureHandler is called upon a scheduling failure.
    78  	FailureHandler FailureHandlerFn
    79  
    80  	// SchedulePod tries to schedule the given pod to one of the nodes in the node list.
    81  	// Return a struct of ScheduleResult with the name of suggested host on success,
    82  	// otherwise will return a FitError with reasons.
    83  	SchedulePod func(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (ScheduleResult, error)
    84  
    85  	// Close this to shut down the scheduler.
    86  	StopEverything <-chan struct{}
    87  
    88  	// SchedulingQueue holds pods to be scheduled
    89  	SchedulingQueue internalqueue.SchedulingQueue
    90  
    91  	// Profiles are the scheduling profiles.
    92  	Profiles profile.Map
    93  
    94  	client clientset.Interface
    95  
    96  	nodeInfoSnapshot *internalcache.Snapshot
    97  
    98  	percentageOfNodesToScore int32
    99  
   100  	nextStartNodeIndex int
   101  
   102  	// logger *must* be initialized when creating a Scheduler,
   103  	// otherwise logging functions will access a nil sink and
   104  	// panic.
   105  	logger klog.Logger
   106  
   107  	// registeredHandlers contains the registrations of all handlers. It's used to check if all handlers have finished syncing before the scheduling cycles start.
   108  	registeredHandlers []cache.ResourceEventHandlerRegistration
   109  }
   110  
   111  func (sched *Scheduler) applyDefaultHandlers() {
   112  	sched.SchedulePod = sched.schedulePod
   113  	sched.FailureHandler = sched.handleSchedulingFailure
   114  }
   115  
   116  type schedulerOptions struct {
   117  	componentConfigVersion string
   118  	kubeConfig             *restclient.Config
   119  	// Overridden by profile level percentageOfNodesToScore if set in v1.
   120  	percentageOfNodesToScore          int32
   121  	podInitialBackoffSeconds          int64
   122  	podMaxBackoffSeconds              int64
   123  	podMaxInUnschedulablePodsDuration time.Duration
   124  	// Contains out-of-tree plugins to be merged with the in-tree registry.
   125  	frameworkOutOfTreeRegistry frameworkruntime.Registry
   126  	profiles                   []schedulerapi.KubeSchedulerProfile
   127  	extenders                  []schedulerapi.Extender
   128  	frameworkCapturer          FrameworkCapturer
   129  	parallelism                int32
   130  	applyDefaultProfile        bool
   131  }
   132  
   133  // Option configures a Scheduler
   134  type Option func(*schedulerOptions)
   135  
   136  // ScheduleResult represents the result of scheduling a pod.
   137  type ScheduleResult struct {
   138  	// Name of the selected node.
   139  	SuggestedHost string
   140  	// The number of nodes the scheduler evaluated the pod against in the filtering
   141  	// phase and beyond.
   142  	// Note that it contains the number of nodes that filtered out by PreFilterResult.
   143  	EvaluatedNodes int
   144  	// The number of nodes out of the evaluated ones that fit the pod.
   145  	FeasibleNodes int
   146  	// The nominating info for scheduling cycle.
   147  	nominatingInfo *framework.NominatingInfo
   148  }
   149  
   150  // WithComponentConfigVersion sets the component config version to the
   151  // KubeSchedulerConfiguration version used. The string should be the full
   152  // scheme group/version of the external type we converted from (for example
   153  // "kubescheduler.config.k8s.io/v1")
   154  func WithComponentConfigVersion(apiVersion string) Option {
   155  	return func(o *schedulerOptions) {
   156  		o.componentConfigVersion = apiVersion
   157  	}
   158  }
   159  
   160  // WithKubeConfig sets the kube config for Scheduler.
   161  func WithKubeConfig(cfg *restclient.Config) Option {
   162  	return func(o *schedulerOptions) {
   163  		o.kubeConfig = cfg
   164  	}
   165  }
   166  
   167  // WithProfiles sets profiles for Scheduler. By default, there is one profile
   168  // with the name "default-scheduler".
   169  func WithProfiles(p ...schedulerapi.KubeSchedulerProfile) Option {
   170  	return func(o *schedulerOptions) {
   171  		o.profiles = p
   172  		o.applyDefaultProfile = false
   173  	}
   174  }
   175  
   176  // WithParallelism sets the parallelism for all scheduler algorithms. Default is 16.
   177  func WithParallelism(threads int32) Option {
   178  	return func(o *schedulerOptions) {
   179  		o.parallelism = threads
   180  	}
   181  }
   182  
   183  // WithPercentageOfNodesToScore sets percentageOfNodesToScore for Scheduler.
   184  // The default value of 0 will use an adaptive percentage: 50 - (num of nodes)/125.
   185  func WithPercentageOfNodesToScore(percentageOfNodesToScore *int32) Option {
   186  	return func(o *schedulerOptions) {
   187  		if percentageOfNodesToScore != nil {
   188  			o.percentageOfNodesToScore = *percentageOfNodesToScore
   189  		}
   190  	}
   191  }
   192  
   193  // WithFrameworkOutOfTreeRegistry sets the registry for out-of-tree plugins. Those plugins
   194  // will be appended to the default registry.
   195  func WithFrameworkOutOfTreeRegistry(registry frameworkruntime.Registry) Option {
   196  	return func(o *schedulerOptions) {
   197  		o.frameworkOutOfTreeRegistry = registry
   198  	}
   199  }
   200  
   201  // WithPodInitialBackoffSeconds sets podInitialBackoffSeconds for Scheduler, the default value is 1
   202  func WithPodInitialBackoffSeconds(podInitialBackoffSeconds int64) Option {
   203  	return func(o *schedulerOptions) {
   204  		o.podInitialBackoffSeconds = podInitialBackoffSeconds
   205  	}
   206  }
   207  
   208  // WithPodMaxBackoffSeconds sets podMaxBackoffSeconds for Scheduler, the default value is 10
   209  func WithPodMaxBackoffSeconds(podMaxBackoffSeconds int64) Option {
   210  	return func(o *schedulerOptions) {
   211  		o.podMaxBackoffSeconds = podMaxBackoffSeconds
   212  	}
   213  }
   214  
   215  // WithPodMaxInUnschedulablePodsDuration sets podMaxInUnschedulablePodsDuration for PriorityQueue.
   216  func WithPodMaxInUnschedulablePodsDuration(duration time.Duration) Option {
   217  	return func(o *schedulerOptions) {
   218  		o.podMaxInUnschedulablePodsDuration = duration
   219  	}
   220  }
   221  
   222  // WithExtenders sets extenders for the Scheduler
   223  func WithExtenders(e ...schedulerapi.Extender) Option {
   224  	return func(o *schedulerOptions) {
   225  		o.extenders = e
   226  	}
   227  }
   228  
   229  // FrameworkCapturer is used for registering a notify function in building framework.
   230  type FrameworkCapturer func(schedulerapi.KubeSchedulerProfile)
   231  
   232  // WithBuildFrameworkCapturer sets a notify function for getting buildFramework details.
   233  func WithBuildFrameworkCapturer(fc FrameworkCapturer) Option {
   234  	return func(o *schedulerOptions) {
   235  		o.frameworkCapturer = fc
   236  	}
   237  }
   238  
   239  var defaultSchedulerOptions = schedulerOptions{
   240  	percentageOfNodesToScore:          schedulerapi.DefaultPercentageOfNodesToScore,
   241  	podInitialBackoffSeconds:          int64(internalqueue.DefaultPodInitialBackoffDuration.Seconds()),
   242  	podMaxBackoffSeconds:              int64(internalqueue.DefaultPodMaxBackoffDuration.Seconds()),
   243  	podMaxInUnschedulablePodsDuration: internalqueue.DefaultPodMaxInUnschedulablePodsDuration,
   244  	parallelism:                       int32(parallelize.DefaultParallelism),
   245  	// Ideally we would statically set the default profile here, but we can't because
   246  	// creating the default profile may require testing feature gates, which may get
   247  	// set dynamically in tests. Therefore, we delay creating it until New is actually
   248  	// invoked.
   249  	applyDefaultProfile: true,
   250  }
   251  
   252  // New returns a Scheduler
   253  func New(ctx context.Context,
   254  	client clientset.Interface,
   255  	informerFactory informers.SharedInformerFactory,
   256  	dynInformerFactory dynamicinformer.DynamicSharedInformerFactory,
   257  	recorderFactory profile.RecorderFactory,
   258  	opts ...Option) (*Scheduler, error) {
   259  
   260  	logger := klog.FromContext(ctx)
   261  	stopEverything := ctx.Done()
   262  
   263  	options := defaultSchedulerOptions
   264  	for _, opt := range opts {
   265  		opt(&options)
   266  	}
   267  
   268  	if options.applyDefaultProfile {
   269  		var versionedCfg configv1.KubeSchedulerConfiguration
   270  		scheme.Scheme.Default(&versionedCfg)
   271  		cfg := schedulerapi.KubeSchedulerConfiguration{}
   272  		if err := scheme.Scheme.Convert(&versionedCfg, &cfg, nil); err != nil {
   273  			return nil, err
   274  		}
   275  		options.profiles = cfg.Profiles
   276  	}
   277  
   278  	registry := frameworkplugins.NewInTreeRegistry()
   279  	if err := registry.Merge(options.frameworkOutOfTreeRegistry); err != nil {
   280  		return nil, err
   281  	}
   282  
   283  	metrics.Register()
   284  
   285  	extenders, err := buildExtenders(logger, options.extenders, options.profiles)
   286  	if err != nil {
   287  		return nil, fmt.Errorf("couldn't build extenders: %w", err)
   288  	}
   289  
   290  	podLister := informerFactory.Core().V1().Pods().Lister()
   291  	nodeLister := informerFactory.Core().V1().Nodes().Lister()
   292  
   293  	snapshot := internalcache.NewEmptySnapshot()
   294  	metricsRecorder := metrics.NewMetricsAsyncRecorder(1000, time.Second, stopEverything)
   295  
   296  	profiles, err := profile.NewMap(ctx, options.profiles, registry, recorderFactory,
   297  		frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion),
   298  		frameworkruntime.WithClientSet(client),
   299  		frameworkruntime.WithKubeConfig(options.kubeConfig),
   300  		frameworkruntime.WithInformerFactory(informerFactory),
   301  		frameworkruntime.WithSnapshotSharedLister(snapshot),
   302  		frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(options.frameworkCapturer)),
   303  		frameworkruntime.WithParallelism(int(options.parallelism)),
   304  		frameworkruntime.WithExtenders(extenders),
   305  		frameworkruntime.WithMetricsRecorder(metricsRecorder),
   306  	)
   307  	if err != nil {
   308  		return nil, fmt.Errorf("initializing profiles: %v", err)
   309  	}
   310  
   311  	if len(profiles) == 0 {
   312  		return nil, errors.New("at least one profile is required")
   313  	}
   314  
   315  	preEnqueuePluginMap := make(map[string][]framework.PreEnqueuePlugin)
   316  	queueingHintsPerProfile := make(internalqueue.QueueingHintMapPerProfile)
   317  	for profileName, profile := range profiles {
   318  		preEnqueuePluginMap[profileName] = profile.PreEnqueuePlugins()
   319  		queueingHintsPerProfile[profileName] = buildQueueingHintMap(profile.EnqueueExtensions())
   320  	}
   321  
   322  	podQueue := internalqueue.NewSchedulingQueue(
   323  		profiles[options.profiles[0].SchedulerName].QueueSortFunc(),
   324  		informerFactory,
   325  		internalqueue.WithPodInitialBackoffDuration(time.Duration(options.podInitialBackoffSeconds)*time.Second),
   326  		internalqueue.WithPodMaxBackoffDuration(time.Duration(options.podMaxBackoffSeconds)*time.Second),
   327  		internalqueue.WithPodLister(podLister),
   328  		internalqueue.WithPodMaxInUnschedulablePodsDuration(options.podMaxInUnschedulablePodsDuration),
   329  		internalqueue.WithPreEnqueuePluginMap(preEnqueuePluginMap),
   330  		internalqueue.WithQueueingHintMapPerProfile(queueingHintsPerProfile),
   331  		internalqueue.WithPluginMetricsSamplePercent(pluginMetricsSamplePercent),
   332  		internalqueue.WithMetricsRecorder(*metricsRecorder),
   333  	)
   334  
   335  	for _, fwk := range profiles {
   336  		fwk.SetPodNominator(podQueue)
   337  	}
   338  
   339  	schedulerCache := internalcache.New(ctx, durationToExpireAssumedPod)
   340  
   341  	// Setup cache debugger.
   342  	debugger := cachedebugger.New(nodeLister, podLister, schedulerCache, podQueue)
   343  	debugger.ListenForSignal(ctx)
   344  
   345  	sched := &Scheduler{
   346  		Cache:                    schedulerCache,
   347  		client:                   client,
   348  		nodeInfoSnapshot:         snapshot,
   349  		percentageOfNodesToScore: options.percentageOfNodesToScore,
   350  		Extenders:                extenders,
   351  		StopEverything:           stopEverything,
   352  		SchedulingQueue:          podQueue,
   353  		Profiles:                 profiles,
   354  		logger:                   logger,
   355  	}
   356  	sched.NextPod = podQueue.Pop
   357  	sched.applyDefaultHandlers()
   358  
   359  	if err = addAllEventHandlers(sched, informerFactory, dynInformerFactory, unionedGVKs(queueingHintsPerProfile)); err != nil {
   360  		return nil, fmt.Errorf("adding event handlers: %w", err)
   361  	}
   362  
   363  	return sched, nil
   364  }
   365  
   366  // defaultQueueingHintFn is the default queueing hint function.
   367  // It always returns Queue as the queueing hint.
   368  var defaultQueueingHintFn = func(_ klog.Logger, _ *v1.Pod, _, _ interface{}) (framework.QueueingHint, error) {
   369  	return framework.Queue, nil
   370  }
   371  
   372  func buildQueueingHintMap(es []framework.EnqueueExtensions) internalqueue.QueueingHintMap {
   373  	queueingHintMap := make(internalqueue.QueueingHintMap)
   374  	for _, e := range es {
   375  		events := e.EventsToRegister()
   376  
   377  		// This will happen when plugin registers with empty events, it's usually the case a pod
   378  		// will become reschedulable only for self-update, e.g. schedulingGates plugin, the pod
   379  		// will enter into the activeQ via priorityQueue.Update().
   380  		if len(events) == 0 {
   381  			continue
   382  		}
   383  
   384  		// Note: Rarely, a plugin implements EnqueueExtensions but returns nil.
   385  		// We treat it as: the plugin is not interested in any event, and hence pod failed by that plugin
   386  		// cannot be moved by any regular cluster event.
   387  		// So, we can just ignore such EventsToRegister here.
   388  
   389  		registerNodeAdded := false
   390  		registerNodeTaintUpdated := false
   391  		for _, event := range events {
   392  			fn := event.QueueingHintFn
   393  			if fn == nil || !utilfeature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints) {
   394  				fn = defaultQueueingHintFn
   395  			}
   396  
   397  			if event.Event.Resource == framework.Node {
   398  				if event.Event.ActionType&framework.Add != 0 {
   399  					registerNodeAdded = true
   400  				}
   401  				if event.Event.ActionType&framework.UpdateNodeTaint != 0 {
   402  					registerNodeTaintUpdated = true
   403  				}
   404  			}
   405  
   406  			queueingHintMap[event.Event] = append(queueingHintMap[event.Event], &internalqueue.QueueingHintFunction{
   407  				PluginName:     e.Name(),
   408  				QueueingHintFn: fn,
   409  			})
   410  		}
   411  		if registerNodeAdded && !registerNodeTaintUpdated {
   412  			// Temporally fix for the issue https://github.com/kubernetes/kubernetes/issues/109437
   413  			// NodeAdded QueueingHint isn't always called because of preCheck.
   414  			// It's definitely not something expected for plugin developers,
   415  			// and registering UpdateNodeTaint event is the only mitigation for now.
   416  			//
   417  			// So, here registers UpdateNodeTaint event for plugins that has NodeAdded event, but don't have UpdateNodeTaint event.
   418  			// It has a bad impact for the requeuing efficiency though, a lot better than some Pods being stuch in the
   419  			// unschedulable pod pool.
   420  			// This behavior will be removed when we remove the preCheck feature.
   421  			// See: https://github.com/kubernetes/kubernetes/issues/110175
   422  			queueingHintMap[framework.ClusterEvent{Resource: framework.Node, ActionType: framework.UpdateNodeTaint}] =
   423  				append(queueingHintMap[framework.ClusterEvent{Resource: framework.Node, ActionType: framework.UpdateNodeTaint}],
   424  					&internalqueue.QueueingHintFunction{
   425  						PluginName:     e.Name(),
   426  						QueueingHintFn: defaultQueueingHintFn,
   427  					},
   428  				)
   429  		}
   430  	}
   431  	return queueingHintMap
   432  }
   433  
   434  // Run begins watching and scheduling. It starts scheduling and blocked until the context is done.
   435  func (sched *Scheduler) Run(ctx context.Context) {
   436  	logger := klog.FromContext(ctx)
   437  	sched.SchedulingQueue.Run(logger)
   438  
   439  	// We need to start scheduleOne loop in a dedicated goroutine,
   440  	// because scheduleOne function hangs on getting the next item
   441  	// from the SchedulingQueue.
   442  	// If there are no new pods to schedule, it will be hanging there
   443  	// and if done in this goroutine it will be blocking closing
   444  	// SchedulingQueue, in effect causing a deadlock on shutdown.
   445  	go wait.UntilWithContext(ctx, sched.ScheduleOne, 0)
   446  
   447  	<-ctx.Done()
   448  	sched.SchedulingQueue.Close()
   449  
   450  	// If the plugins satisfy the io.Closer interface, they are closed.
   451  	err := sched.Profiles.Close()
   452  	if err != nil {
   453  		logger.Error(err, "Failed to close plugins")
   454  	}
   455  }
   456  
   457  // NewInformerFactory creates a SharedInformerFactory and initializes a scheduler specific
   458  // in-place podInformer.
   459  func NewInformerFactory(cs clientset.Interface, resyncPeriod time.Duration) informers.SharedInformerFactory {
   460  	informerFactory := informers.NewSharedInformerFactory(cs, resyncPeriod)
   461  	informerFactory.InformerFor(&v1.Pod{}, newPodInformer)
   462  	return informerFactory
   463  }
   464  
   465  func buildExtenders(logger klog.Logger, extenders []schedulerapi.Extender, profiles []schedulerapi.KubeSchedulerProfile) ([]framework.Extender, error) {
   466  	var fExtenders []framework.Extender
   467  	if len(extenders) == 0 {
   468  		return nil, nil
   469  	}
   470  
   471  	var ignoredExtendedResources []string
   472  	var ignorableExtenders []framework.Extender
   473  	for i := range extenders {
   474  		logger.V(2).Info("Creating extender", "extender", extenders[i])
   475  		extender, err := NewHTTPExtender(&extenders[i])
   476  		if err != nil {
   477  			return nil, err
   478  		}
   479  		if !extender.IsIgnorable() {
   480  			fExtenders = append(fExtenders, extender)
   481  		} else {
   482  			ignorableExtenders = append(ignorableExtenders, extender)
   483  		}
   484  		for _, r := range extenders[i].ManagedResources {
   485  			if r.IgnoredByScheduler {
   486  				ignoredExtendedResources = append(ignoredExtendedResources, r.Name)
   487  			}
   488  		}
   489  	}
   490  	// place ignorable extenders to the tail of extenders
   491  	fExtenders = append(fExtenders, ignorableExtenders...)
   492  
   493  	// If there are any extended resources found from the Extenders, append them to the pluginConfig for each profile.
   494  	// This should only have an effect on ComponentConfig, where it is possible to configure Extenders and
   495  	// plugin args (and in which case the extender ignored resources take precedence).
   496  	if len(ignoredExtendedResources) == 0 {
   497  		return fExtenders, nil
   498  	}
   499  
   500  	for i := range profiles {
   501  		prof := &profiles[i]
   502  		var found = false
   503  		for k := range prof.PluginConfig {
   504  			if prof.PluginConfig[k].Name == noderesources.Name {
   505  				// Update the existing args
   506  				pc := &prof.PluginConfig[k]
   507  				args, ok := pc.Args.(*schedulerapi.NodeResourcesFitArgs)
   508  				if !ok {
   509  					return nil, fmt.Errorf("want args to be of type NodeResourcesFitArgs, got %T", pc.Args)
   510  				}
   511  				args.IgnoredResources = ignoredExtendedResources
   512  				found = true
   513  				break
   514  			}
   515  		}
   516  		if !found {
   517  			return nil, fmt.Errorf("can't find NodeResourcesFitArgs in plugin config")
   518  		}
   519  	}
   520  	return fExtenders, nil
   521  }
   522  
   523  type FailureHandlerFn func(ctx context.Context, fwk framework.Framework, podInfo *framework.QueuedPodInfo, status *framework.Status, nominatingInfo *framework.NominatingInfo, start time.Time)
   524  
   525  func unionedGVKs(queueingHintsPerProfile internalqueue.QueueingHintMapPerProfile) map[framework.GVK]framework.ActionType {
   526  	gvkMap := make(map[framework.GVK]framework.ActionType)
   527  	for _, queueingHints := range queueingHintsPerProfile {
   528  		for evt := range queueingHints {
   529  			if _, ok := gvkMap[evt.Resource]; ok {
   530  				gvkMap[evt.Resource] |= evt.ActionType
   531  			} else {
   532  				gvkMap[evt.Resource] = evt.ActionType
   533  			}
   534  		}
   535  	}
   536  	return gvkMap
   537  }
   538  
   539  // newPodInformer creates a shared index informer that returns only non-terminal pods.
   540  // The PodInformer allows indexers to be added, but note that only non-conflict indexers are allowed.
   541  func newPodInformer(cs clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
   542  	selector := fmt.Sprintf("status.phase!=%v,status.phase!=%v", v1.PodSucceeded, v1.PodFailed)
   543  	tweakListOptions := func(options *metav1.ListOptions) {
   544  		options.FieldSelector = selector
   545  	}
   546  	informer := coreinformers.NewFilteredPodInformer(cs, metav1.NamespaceAll, resyncPeriod, cache.Indexers{}, tweakListOptions)
   547  
   548  	// Dropping `.metadata.managedFields` to improve memory usage.
   549  	// The Extract workflow (i.e. `ExtractPod`) should be unused.
   550  	trim := func(obj interface{}) (interface{}, error) {
   551  		if accessor, err := meta.Accessor(obj); err == nil {
   552  			if accessor.GetManagedFields() != nil {
   553  				accessor.SetManagedFields(nil)
   554  			}
   555  		}
   556  		return obj, nil
   557  	}
   558  	informer.SetTransform(trim)
   559  	return informer
   560  }
   561  

View as plain text