
Source file src/k8s.io/kubernetes/pkg/scheduler/scheduler_test.go

Documentation: k8s.io/kubernetes/pkg/scheduler

     1  /*
     2  Copyright 2014 The Kubernetes Authors.
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     8      http://www.apache.org/licenses/LICENSE-2.0
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    17  package scheduler
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"sort"
    23  	"strings"
    24  	"testing"
    25  	"time"
    27  	"github.com/google/go-cmp/cmp"
    28  	v1 "k8s.io/api/core/v1"
    29  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    30  	"k8s.io/apimachinery/pkg/runtime"
    31  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    32  	"k8s.io/client-go/informers"
    33  	"k8s.io/client-go/kubernetes"
    34  	"k8s.io/client-go/kubernetes/fake"
    35  	"k8s.io/client-go/kubernetes/scheme"
    36  	"k8s.io/client-go/tools/cache"
    37  	"k8s.io/client-go/tools/events"
    38  	featuregatetesting "k8s.io/component-base/featuregate/testing"
    39  	"k8s.io/klog/v2"
    40  	"k8s.io/klog/v2/ktesting"
    41  	"k8s.io/kubernetes/pkg/features"
    42  	schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
    43  	"k8s.io/kubernetes/pkg/scheduler/apis/config/testing/defaults"
    44  	"k8s.io/kubernetes/pkg/scheduler/framework"
    45  	"k8s.io/kubernetes/pkg/scheduler/framework/plugins"
    46  	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder"
    47  	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort"
    48  	frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
    49  	internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
    50  	internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
    51  	"k8s.io/kubernetes/pkg/scheduler/profile"
    52  	st "k8s.io/kubernetes/pkg/scheduler/testing"
    53  	tf "k8s.io/kubernetes/pkg/scheduler/testing/framework"
    54  	testingclock "k8s.io/utils/clock/testing"
    55  	"k8s.io/utils/ptr"
    56  )
    58  func TestSchedulerCreation(t *testing.T) {
    59  	invalidRegistry := map[string]frameworkruntime.PluginFactory{
    60  		defaultbinder.Name: defaultbinder.New,
    61  	}
    62  	validRegistry := map[string]frameworkruntime.PluginFactory{
    63  		"Foo": defaultbinder.New,
    64  	}
    65  	cases := []struct {
    66  		name          string
    67  		opts          []Option
    68  		wantErr       string
    69  		wantProfiles  []string
    70  		wantExtenders []string
    71  	}{
    72  		{
    73  			name: "valid out-of-tree registry",
    74  			opts: []Option{
    75  				WithFrameworkOutOfTreeRegistry(validRegistry),
    76  				WithProfiles(
    77  					schedulerapi.KubeSchedulerProfile{
    78  						SchedulerName: "default-scheduler",
    79  						Plugins: &schedulerapi.Plugins{
    80  							QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
    81  							Bind:      schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
    82  						},
    83  					},
    84  				)},
    85  			wantProfiles: []string{"default-scheduler"},
    86  		},
    87  		{
    88  			name: "repeated plugin name in out-of-tree plugin",
    89  			opts: []Option{
    90  				WithFrameworkOutOfTreeRegistry(invalidRegistry),
    91  				WithProfiles(
    92  					schedulerapi.KubeSchedulerProfile{
    93  						SchedulerName: "default-scheduler",
    94  						Plugins: &schedulerapi.Plugins{
    95  							QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
    96  							Bind:      schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
    97  						},
    98  					},
    99  				)},
   100  			wantProfiles: []string{"default-scheduler"},
   101  			wantErr:      "a plugin named DefaultBinder already exists",
   102  		},
   103  		{
   104  			name: "multiple profiles",
   105  			opts: []Option{
   106  				WithProfiles(
   107  					schedulerapi.KubeSchedulerProfile{
   108  						SchedulerName: "foo",
   109  						Plugins: &schedulerapi.Plugins{
   110  							QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
   111  							Bind:      schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
   112  						},
   113  					},
   114  					schedulerapi.KubeSchedulerProfile{
   115  						SchedulerName: "bar",
   116  						Plugins: &schedulerapi.Plugins{
   117  							QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
   118  							Bind:      schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
   119  						},
   120  					},
   121  				)},
   122  			wantProfiles: []string{"bar", "foo"},
   123  		},
   124  		{
   125  			name: "Repeated profiles",
   126  			opts: []Option{
   127  				WithProfiles(
   128  					schedulerapi.KubeSchedulerProfile{
   129  						SchedulerName: "foo",
   130  						Plugins: &schedulerapi.Plugins{
   131  							QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
   132  							Bind:      schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
   133  						},
   134  					},
   135  					schedulerapi.KubeSchedulerProfile{
   136  						SchedulerName: "bar",
   137  						Plugins: &schedulerapi.Plugins{
   138  							QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
   139  							Bind:      schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
   140  						},
   141  					},
   142  					schedulerapi.KubeSchedulerProfile{
   143  						SchedulerName: "foo",
   144  						Plugins: &schedulerapi.Plugins{
   145  							QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
   146  							Bind:      schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
   147  						},
   148  					},
   149  				)},
   150  			wantErr: "duplicate profile with scheduler name \"foo\"",
   151  		},
   152  		{
   153  			name: "With extenders",
   154  			opts: []Option{
   155  				WithProfiles(
   156  					schedulerapi.KubeSchedulerProfile{
   157  						SchedulerName: "default-scheduler",
   158  						Plugins: &schedulerapi.Plugins{
   159  							QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
   160  							Bind:      schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
   161  						},
   162  					},
   163  				),
   164  				WithExtenders(
   165  					schedulerapi.Extender{
   166  						URLPrefix: "http://extender.kube-system/",
   167  					},
   168  				),
   169  			},
   170  			wantProfiles:  []string{"default-scheduler"},
   171  			wantExtenders: []string{"http://extender.kube-system/"},
   172  		},
   173  	}
   175  	for _, tc := range cases {
   176  		t.Run(tc.name, func(t *testing.T) {
   177  			client := fake.NewSimpleClientset()
   178  			informerFactory := informers.NewSharedInformerFactory(client, 0)
   180  			eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
   182  			_, ctx := ktesting.NewTestContext(t)
   183  			ctx, cancel := context.WithCancel(ctx)
   184  			defer cancel()
   185  			s, err := New(
   186  				ctx,
   187  				client,
   188  				informerFactory,
   189  				nil,
   190  				profile.NewRecorderFactory(eventBroadcaster),
   191  				tc.opts...,
   192  			)
   194  			// Errors
   195  			if len(tc.wantErr) != 0 {
   196  				if err == nil || !strings.Contains(err.Error(), tc.wantErr) {
   197  					t.Errorf("got error %q, want %q", err, tc.wantErr)
   198  				}
   199  				return
   200  			}
   201  			if err != nil {
   202  				t.Fatalf("Failed to create scheduler: %v", err)
   203  			}
   205  			// Profiles
   206  			profiles := make([]string, 0, len(s.Profiles))
   207  			for name := range s.Profiles {
   208  				profiles = append(profiles, name)
   209  			}
   210  			sort.Strings(profiles)
   211  			if diff := cmp.Diff(tc.wantProfiles, profiles); diff != "" {
   212  				t.Errorf("unexpected profiles (-want, +got):\n%s", diff)
   213  			}
   215  			// Extenders
   216  			if len(tc.wantExtenders) != 0 {
   217  				// Scheduler.Extenders
   218  				extenders := make([]string, 0, len(s.Extenders))
   219  				for _, e := range s.Extenders {
   220  					extenders = append(extenders, e.Name())
   221  				}
   222  				if diff := cmp.Diff(tc.wantExtenders, extenders); diff != "" {
   223  					t.Errorf("unexpected extenders (-want, +got):\n%s", diff)
   224  				}
   226  				// framework.Handle.Extenders()
   227  				for _, p := range s.Profiles {
   228  					extenders := make([]string, 0, len(p.Extenders()))
   229  					for _, e := range p.Extenders() {
   230  						extenders = append(extenders, e.Name())
   231  					}
   232  					if diff := cmp.Diff(tc.wantExtenders, extenders); diff != "" {
   233  						t.Errorf("unexpected extenders (-want, +got):\n%s", diff)
   234  					}
   235  				}
   236  			}
   237  		})
   238  	}
   239  }
   241  func TestFailureHandler(t *testing.T) {
   242  	testPod := st.MakePod().Name("test-pod").Namespace(v1.NamespaceDefault).Obj()
   243  	testPodUpdated := testPod.DeepCopy()
   244  	testPodUpdated.Labels = map[string]string{"foo": ""}
   246  	tests := []struct {
   247  		name                       string
   248  		podUpdatedDuringScheduling bool // pod is updated during a scheduling cycle
   249  		podDeletedDuringScheduling bool // pod is deleted during a scheduling cycle
   250  		expect                     *v1.Pod
   251  	}{
   252  		{
   253  			name:                       "pod is updated during a scheduling cycle",
   254  			podUpdatedDuringScheduling: true,
   255  			expect:                     testPodUpdated,
   256  		},
   257  		{
   258  			name:   "pod is not updated during a scheduling cycle",
   259  			expect: testPod,
   260  		},
   261  		{
   262  			name:                       "pod is deleted during a scheduling cycle",
   263  			podDeletedDuringScheduling: true,
   264  			expect:                     nil,
   265  		},
   266  	}
   268  	for _, tt := range tests {
   269  		t.Run(tt.name, func(t *testing.T) {
   270  			logger, ctx := ktesting.NewTestContext(t)
   271  			ctx, cancel := context.WithCancel(ctx)
   272  			defer cancel()
   274  			client := fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testPod}})
   275  			informerFactory := informers.NewSharedInformerFactory(client, 0)
   276  			podInformer := informerFactory.Core().V1().Pods()
   277  			// Need to add/update/delete testPod to the store.
   278  			podInformer.Informer().GetStore().Add(testPod)
   280  			queue := internalqueue.NewPriorityQueue(nil, informerFactory, internalqueue.WithClock(testingclock.NewFakeClock(time.Now())))
   281  			schedulerCache := internalcache.New(ctx, 30*time.Second)
   283  			if err := queue.Add(logger, testPod); err != nil {
   284  				t.Fatalf("Add failed: %v", err)
   285  			}
   287  			if _, err := queue.Pop(logger); err != nil {
   288  				t.Fatalf("Pop failed: %v", err)
   289  			}
   291  			if tt.podUpdatedDuringScheduling {
   292  				podInformer.Informer().GetStore().Update(testPodUpdated)
   293  				queue.Update(logger, testPod, testPodUpdated)
   294  			}
   295  			if tt.podDeletedDuringScheduling {
   296  				podInformer.Informer().GetStore().Delete(testPod)
   297  				queue.Delete(testPod)
   298  			}
   300  			s, fwk, err := initScheduler(ctx, schedulerCache, queue, client, informerFactory)
   301  			if err != nil {
   302  				t.Fatal(err)
   303  			}
   305  			testPodInfo := &framework.QueuedPodInfo{PodInfo: mustNewPodInfo(t, testPod)}
   306  			s.FailureHandler(ctx, fwk, testPodInfo, framework.NewStatus(framework.Unschedulable), nil, time.Now())
   308  			var got *v1.Pod
   309  			if tt.podUpdatedDuringScheduling {
   310  				head, e := queue.Pop(logger)
   311  				if e != nil {
   312  					t.Fatalf("Cannot pop pod from the activeQ: %v", e)
   313  				}
   314  				got = head.Pod
   315  			} else {
   316  				got = getPodFromPriorityQueue(queue, testPod)
   317  			}
   319  			if diff := cmp.Diff(tt.expect, got); diff != "" {
   320  				t.Errorf("Unexpected pod (-want, +got): %s", diff)
   321  			}
   322  		})
   323  	}
   324  }
   326  func TestFailureHandler_PodAlreadyBound(t *testing.T) {
   327  	logger, ctx := ktesting.NewTestContext(t)
   328  	ctx, cancel := context.WithCancel(ctx)
   329  	defer cancel()
   331  	nodeFoo := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
   332  	testPod := st.MakePod().Name("test-pod").Namespace(v1.NamespaceDefault).Node("foo").Obj()
   334  	client := fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testPod}}, &v1.NodeList{Items: []v1.Node{nodeFoo}})
   335  	informerFactory := informers.NewSharedInformerFactory(client, 0)
   336  	podInformer := informerFactory.Core().V1().Pods()
   337  	// Need to add testPod to the store.
   338  	podInformer.Informer().GetStore().Add(testPod)
   340  	queue := internalqueue.NewPriorityQueue(nil, informerFactory, internalqueue.WithClock(testingclock.NewFakeClock(time.Now())))
   341  	schedulerCache := internalcache.New(ctx, 30*time.Second)
   343  	// Add node to schedulerCache no matter it's deleted in API server or not.
   344  	schedulerCache.AddNode(logger, &nodeFoo)
   346  	s, fwk, err := initScheduler(ctx, schedulerCache, queue, client, informerFactory)
   347  	if err != nil {
   348  		t.Fatal(err)
   349  	}
   351  	testPodInfo := &framework.QueuedPodInfo{PodInfo: mustNewPodInfo(t, testPod)}
   352  	s.FailureHandler(ctx, fwk, testPodInfo, framework.NewStatus(framework.Unschedulable).WithError(fmt.Errorf("binding rejected: timeout")), nil, time.Now())
   354  	pod := getPodFromPriorityQueue(queue, testPod)
   355  	if pod != nil {
   356  		t.Fatalf("Unexpected pod: %v should not be in PriorityQueue when the NodeName of pod is not empty", pod.Name)
   357  	}
   358  }
   360  // TestWithPercentageOfNodesToScore tests scheduler's PercentageOfNodesToScore is set correctly.
   361  func TestWithPercentageOfNodesToScore(t *testing.T) {
   362  	tests := []struct {
   363  		name                           string
   364  		percentageOfNodesToScoreConfig *int32
   365  		wantedPercentageOfNodesToScore int32
   366  	}{
   367  		{
   368  			name:                           "percentageOfNodesScore is nil",
   369  			percentageOfNodesToScoreConfig: nil,
   370  			wantedPercentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
   371  		},
   372  		{
   373  			name:                           "percentageOfNodesScore is not nil",
   374  			percentageOfNodesToScoreConfig: ptr.To[int32](10),
   375  			wantedPercentageOfNodesToScore: 10,
   376  		},
   377  	}
   379  	for _, tt := range tests {
   380  		t.Run(tt.name, func(t *testing.T) {
   381  			client := fake.NewSimpleClientset()
   382  			informerFactory := informers.NewSharedInformerFactory(client, 0)
   383  			eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
   384  			_, ctx := ktesting.NewTestContext(t)
   385  			ctx, cancel := context.WithCancel(ctx)
   386  			defer cancel()
   387  			sched, err := New(
   388  				ctx,
   389  				client,
   390  				informerFactory,
   391  				nil,
   392  				profile.NewRecorderFactory(eventBroadcaster),
   393  				WithPercentageOfNodesToScore(tt.percentageOfNodesToScoreConfig),
   394  			)
   395  			if err != nil {
   396  				t.Fatalf("Failed to create scheduler: %v", err)
   397  			}
   398  			if sched.percentageOfNodesToScore != tt.wantedPercentageOfNodesToScore {
   399  				t.Errorf("scheduler.percercentageOfNodesToScore = %v, want %v", sched.percentageOfNodesToScore, tt.wantedPercentageOfNodesToScore)
   400  			}
   401  		})
   402  	}
   403  }
   405  // getPodFromPriorityQueue is the function used in the TestDefaultErrorFunc test to get
   406  // the specific pod from the given priority queue. It returns the found pod in the priority queue.
   407  func getPodFromPriorityQueue(queue *internalqueue.PriorityQueue, pod *v1.Pod) *v1.Pod {
   408  	podList, _ := queue.PendingPods()
   409  	if len(podList) == 0 {
   410  		return nil
   411  	}
   413  	queryPodKey, err := cache.MetaNamespaceKeyFunc(pod)
   414  	if err != nil {
   415  		return nil
   416  	}
   418  	for _, foundPod := range podList {
   419  		foundPodKey, err := cache.MetaNamespaceKeyFunc(foundPod)
   420  		if err != nil {
   421  			return nil
   422  		}
   424  		if foundPodKey == queryPodKey {
   425  			return foundPod
   426  		}
   427  	}
   429  	return nil
   430  }
   432  func initScheduler(ctx context.Context, cache internalcache.Cache, queue internalqueue.SchedulingQueue,
   433  	client kubernetes.Interface, informerFactory informers.SharedInformerFactory) (*Scheduler, framework.Framework, error) {
   434  	logger := klog.FromContext(ctx)
   435  	registerPluginFuncs := []tf.RegisterPluginFunc{
   436  		tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
   437  		tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
   438  	}
   439  	eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
   440  	fwk, err := tf.NewFramework(ctx,
   441  		registerPluginFuncs,
   442  		testSchedulerName,
   443  		frameworkruntime.WithClientSet(client),
   444  		frameworkruntime.WithInformerFactory(informerFactory),
   445  		frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName)),
   446  	)
   447  	if err != nil {
   448  		return nil, nil, err
   449  	}
   451  	s := &Scheduler{
   452  		Cache:           cache,
   453  		client:          client,
   454  		StopEverything:  ctx.Done(),
   455  		SchedulingQueue: queue,
   456  		Profiles:        profile.Map{testSchedulerName: fwk},
   457  		logger:          logger,
   458  	}
   459  	s.applyDefaultHandlers()
   461  	return s, fwk, nil
   462  }
   464  func TestInitPluginsWithIndexers(t *testing.T) {
   465  	tests := []struct {
   466  		name string
   467  		// the plugin registration ordering must not matter, being map traversal random
   468  		entrypoints map[string]frameworkruntime.PluginFactory
   469  		wantErr     string
   470  	}{
   471  		{
   472  			name: "register indexer, no conflicts",
   473  			entrypoints: map[string]frameworkruntime.PluginFactory{
   474  				"AddIndexer": func(ctx context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) {
   475  					podInformer := handle.SharedInformerFactory().Core().V1().Pods()
   476  					err := podInformer.Informer().AddIndexers(cache.Indexers{
   477  						"nodeName": indexByPodSpecNodeName,
   478  					})
   479  					return &TestPlugin{name: "AddIndexer"}, err
   480  				},
   481  			},
   482  		},
   483  		{
   484  			name: "register the same indexer name multiple times, conflict",
   485  			// order of registration doesn't matter
   486  			entrypoints: map[string]frameworkruntime.PluginFactory{
   487  				"AddIndexer1": func(ctx context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) {
   488  					podInformer := handle.SharedInformerFactory().Core().V1().Pods()
   489  					err := podInformer.Informer().AddIndexers(cache.Indexers{
   490  						"nodeName": indexByPodSpecNodeName,
   491  					})
   492  					return &TestPlugin{name: "AddIndexer1"}, err
   493  				},
   494  				"AddIndexer2": func(ctx context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) {
   495  					podInformer := handle.SharedInformerFactory().Core().V1().Pods()
   496  					err := podInformer.Informer().AddIndexers(cache.Indexers{
   497  						"nodeName": indexByPodAnnotationNodeName,
   498  					})
   499  					return &TestPlugin{name: "AddIndexer1"}, err
   500  				},
   501  			},
   502  			wantErr: "indexer conflict",
   503  		},
   504  		{
   505  			name: "register the same indexer body with different names, no conflicts",
   506  			// order of registration doesn't matter
   507  			entrypoints: map[string]frameworkruntime.PluginFactory{
   508  				"AddIndexer1": func(ctx context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) {
   509  					podInformer := handle.SharedInformerFactory().Core().V1().Pods()
   510  					err := podInformer.Informer().AddIndexers(cache.Indexers{
   511  						"nodeName1": indexByPodSpecNodeName,
   512  					})
   513  					return &TestPlugin{name: "AddIndexer1"}, err
   514  				},
   515  				"AddIndexer2": func(ctx context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) {
   516  					podInformer := handle.SharedInformerFactory().Core().V1().Pods()
   517  					err := podInformer.Informer().AddIndexers(cache.Indexers{
   518  						"nodeName2": indexByPodAnnotationNodeName,
   519  					})
   520  					return &TestPlugin{name: "AddIndexer2"}, err
   521  				},
   522  			},
   523  		},
   524  	}
   526  	for _, tt := range tests {
   527  		t.Run(tt.name, func(t *testing.T) {
   528  			fakeInformerFactory := NewInformerFactory(&fake.Clientset{}, 0*time.Second)
   530  			var registerPluginFuncs []tf.RegisterPluginFunc
   531  			for name, entrypoint := range tt.entrypoints {
   532  				registerPluginFuncs = append(registerPluginFuncs,
   533  					// anything supported by TestPlugin is fine
   534  					tf.RegisterFilterPlugin(name, entrypoint),
   535  				)
   536  			}
   537  			// we always need this
   538  			registerPluginFuncs = append(registerPluginFuncs,
   539  				tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
   540  				tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
   541  			)
   542  			_, ctx := ktesting.NewTestContext(t)
   543  			ctx, cancel := context.WithCancel(ctx)
   544  			defer cancel()
   545  			_, err := tf.NewFramework(ctx, registerPluginFuncs, "test", frameworkruntime.WithInformerFactory(fakeInformerFactory))
   547  			if len(tt.wantErr) > 0 {
   548  				if err == nil || !strings.Contains(err.Error(), tt.wantErr) {
   549  					t.Errorf("got error %q, want %q", err, tt.wantErr)
   550  				}
   551  				return
   552  			}
   553  			if err != nil {
   554  				t.Fatalf("Failed to create scheduler: %v", err)
   555  			}
   556  		})
   557  	}
   558  }
   560  func indexByPodSpecNodeName(obj interface{}) ([]string, error) {
   561  	pod, ok := obj.(*v1.Pod)
   562  	if !ok {
   563  		return []string{}, nil
   564  	}
   565  	if len(pod.Spec.NodeName) == 0 {
   566  		return []string{}, nil
   567  	}
   568  	return []string{pod.Spec.NodeName}, nil
   569  }
   571  func indexByPodAnnotationNodeName(obj interface{}) ([]string, error) {
   572  	pod, ok := obj.(*v1.Pod)
   573  	if !ok {
   574  		return []string{}, nil
   575  	}
   576  	if len(pod.Annotations) == 0 {
   577  		return []string{}, nil
   578  	}
   579  	nodeName, ok := pod.Annotations["node-name"]
   580  	if !ok {
   581  		return []string{}, nil
   582  	}
   583  	return []string{nodeName}, nil
   584  }
   586  const (
   587  	filterWithoutEnqueueExtensions = "filterWithoutEnqueueExtensions"
   588  	fakeNode                       = "fakeNode"
   589  	fakePod                        = "fakePod"
   590  	emptyEventsToRegister          = "emptyEventsToRegister"
   591  	queueSort                      = "no-op-queue-sort-plugin"
   592  	fakeBind                       = "bind-plugin"
   593  	emptyEventExtensions           = "emptyEventExtensions"
   594  )
   596  func Test_buildQueueingHintMap(t *testing.T) {
   597  	tests := []struct {
   598  		name                string
   599  		plugins             []framework.Plugin
   600  		want                map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction
   601  		featuregateDisabled bool
   602  	}{
   603  		{
   604  			name:    "filter without EnqueueExtensions plugin",
   605  			plugins: []framework.Plugin{&filterWithoutEnqueueExtensionsPlugin{}},
   606  			want: map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction{
   607  				{Resource: framework.Pod, ActionType: framework.All}: {
   608  					{PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn},
   609  				},
   610  				{Resource: framework.Node, ActionType: framework.All}: {
   611  					{PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn},
   612  				},
   613  				{Resource: framework.CSINode, ActionType: framework.All}: {
   614  					{PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn},
   615  				},
   616  				{Resource: framework.CSIDriver, ActionType: framework.All}: {
   617  					{PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn},
   618  				},
   619  				{Resource: framework.CSIStorageCapacity, ActionType: framework.All}: {
   620  					{PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn},
   621  				},
   622  				{Resource: framework.PersistentVolume, ActionType: framework.All}: {
   623  					{PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn},
   624  				},
   625  				{Resource: framework.StorageClass, ActionType: framework.All}: {
   626  					{PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn},
   627  				},
   628  				{Resource: framework.PersistentVolumeClaim, ActionType: framework.All}: {
   629  					{PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn},
   630  				},
   631  				{Resource: framework.PodSchedulingContext, ActionType: framework.All}: {
   632  					{PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn},
   633  				},
   634  				{Resource: framework.ResourceClaim, ActionType: framework.All}: {
   635  					{PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn},
   636  				},
   637  				{Resource: framework.ResourceClass, ActionType: framework.All}: {
   638  					{PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn},
   639  				},
   640  				{Resource: framework.ResourceClaimParameters, ActionType: framework.All}: {
   641  					{PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn},
   642  				},
   643  				{Resource: framework.ResourceClassParameters, ActionType: framework.All}: {
   644  					{PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn},
   645  				},
   646  			},
   647  		},
   648  		{
   649  			name:    "node and pod plugin",
   650  			plugins: []framework.Plugin{&fakeNodePlugin{}, &fakePodPlugin{}},
   651  			want: map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction{
   652  				{Resource: framework.Pod, ActionType: framework.Add}: {
   653  					{PluginName: fakePod, QueueingHintFn: fakePodPluginQueueingFn},
   654  				},
   655  				{Resource: framework.Node, ActionType: framework.Add}: {
   656  					{PluginName: fakeNode, QueueingHintFn: fakeNodePluginQueueingFn},
   657  				},
   658  				{Resource: framework.Node, ActionType: framework.UpdateNodeTaint}: {
   659  					{PluginName: fakeNode, QueueingHintFn: defaultQueueingHintFn}, // When Node/Add is registered, Node/UpdateNodeTaint is automatically registered.
   660  				},
   661  			},
   662  		},
   663  		{
   664  			name:                "node and pod plugin (featuregate is disabled)",
   665  			plugins:             []framework.Plugin{&fakeNodePlugin{}, &fakePodPlugin{}},
   666  			featuregateDisabled: true,
   667  			want: map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction{
   668  				{Resource: framework.Pod, ActionType: framework.Add}: {
   669  					{PluginName: fakePod, QueueingHintFn: defaultQueueingHintFn}, // default queueing hint due to disabled feature gate.
   670  				},
   671  				{Resource: framework.Node, ActionType: framework.Add}: {
   672  					{PluginName: fakeNode, QueueingHintFn: defaultQueueingHintFn}, // default queueing hint due to disabled feature gate.
   673  				},
   674  				{Resource: framework.Node, ActionType: framework.UpdateNodeTaint}: {
   675  					{PluginName: fakeNode, QueueingHintFn: defaultQueueingHintFn}, // When Node/Add is registered, Node/UpdateNodeTaint is automatically registered.
   676  				},
   677  			},
   678  		},
   679  		{
   680  			name:    "register plugin with empty event",
   681  			plugins: []framework.Plugin{&emptyEventPlugin{}},
   682  			want:    map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction{},
   683  		},
   684  		{
   685  			name:    "register plugins including emptyEventPlugin",
   686  			plugins: []framework.Plugin{&emptyEventPlugin{}, &fakeNodePlugin{}},
   687  			want: map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction{
   688  				{Resource: framework.Pod, ActionType: framework.Add}: {
   689  					{PluginName: fakePod, QueueingHintFn: fakePodPluginQueueingFn},
   690  				},
   691  				{Resource: framework.Node, ActionType: framework.Add}: {
   692  					{PluginName: fakeNode, QueueingHintFn: fakeNodePluginQueueingFn},
   693  				},
   694  				{Resource: framework.Node, ActionType: framework.UpdateNodeTaint}: {
   695  					{PluginName: fakeNode, QueueingHintFn: defaultQueueingHintFn}, // When Node/Add is registered, Node/UpdateNodeTaint is automatically registered.
   696  				},
   697  			},
   698  		},
   699  	}
   701  	for _, tt := range tests {
   702  		t.Run(tt.name, func(t *testing.T) {
   703  			defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerQueueingHints, !tt.featuregateDisabled)()
   704  			logger, ctx := ktesting.NewTestContext(t)
   705  			ctx, cancel := context.WithCancel(ctx)
   706  			defer cancel()
   707  			registry := frameworkruntime.Registry{}
   708  			cfgPls := &schedulerapi.Plugins{}
   709  			plugins := append(tt.plugins, &fakebindPlugin{}, &fakeQueueSortPlugin{})
   710  			for _, pl := range plugins {
   711  				tmpPl := pl
   712  				if err := registry.Register(pl.Name(), func(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) {
   713  					return tmpPl, nil
   714  				}); err != nil {
   715  					t.Fatalf("fail to register filter plugin (%s)", pl.Name())
   716  				}
   717  				cfgPls.MultiPoint.Enabled = append(cfgPls.MultiPoint.Enabled, schedulerapi.Plugin{Name: pl.Name()})
   718  			}
   720  			profile := schedulerapi.KubeSchedulerProfile{Plugins: cfgPls}
   721  			fwk, err := newFramework(ctx, registry, profile)
   722  			if err != nil {
   723  				t.Fatal(err)
   724  			}
   726  			exts := fwk.EnqueueExtensions()
   727  			// need to sort to make the test result stable.
   728  			sort.Slice(exts, func(i, j int) bool {
   729  				return exts[i].Name() < exts[j].Name()
   730  			})
   732  			got := buildQueueingHintMap(exts)
   734  			for e, fns := range got {
   735  				wantfns, ok := tt.want[e]
   736  				if !ok {
   737  					t.Errorf("got unexpected event %v", e)
   738  					continue
   739  				}
   740  				if len(fns) != len(wantfns) {
   741  					t.Errorf("got %v queueing hint functions, want %v", len(fns), len(wantfns))
   742  					continue
   743  				}
   744  				for i, fn := range fns {
   745  					if fn.PluginName != wantfns[i].PluginName {
   746  						t.Errorf("got plugin name %v, want %v", fn.PluginName, wantfns[i].PluginName)
   747  						continue
   748  					}
   749  					got, gotErr := fn.QueueingHintFn(logger, nil, nil, nil)
   750  					want, wantErr := wantfns[i].QueueingHintFn(logger, nil, nil, nil)
   751  					if got != want || gotErr != wantErr {
   752  						t.Errorf("got queueing hint function (%v) returning (%v, %v), expect it to return (%v, %v)", fn.PluginName, got, gotErr, want, wantErr)
   753  						continue
   754  					}
   755  				}
   756  			}
   757  		})
   758  	}
   759  }
   761  // Test_UnionedGVKs tests UnionedGVKs worked with buildQueueingHintMap.
   762  func Test_UnionedGVKs(t *testing.T) {
   763  	tests := []struct {
   764  		name    string
   765  		plugins schedulerapi.PluginSet
   766  		want    map[framework.GVK]framework.ActionType
   767  	}{
   768  		{
   769  			name: "filter without EnqueueExtensions plugin",
   770  			plugins: schedulerapi.PluginSet{
   771  				Enabled: []schedulerapi.Plugin{
   772  					{Name: filterWithoutEnqueueExtensions},
   773  					{Name: queueSort},
   774  					{Name: fakeBind},
   775  				},
   776  				Disabled: []schedulerapi.Plugin{{Name: "*"}}, // disable default plugins
   777  			},
   778  			want: map[framework.GVK]framework.ActionType{
   779  				framework.Pod:                     framework.All,
   780  				framework.Node:                    framework.All,
   781  				framework.CSINode:                 framework.All,
   782  				framework.CSIDriver:               framework.All,
   783  				framework.CSIStorageCapacity:      framework.All,
   784  				framework.PersistentVolume:        framework.All,
   785  				framework.PersistentVolumeClaim:   framework.All,
   786  				framework.StorageClass:            framework.All,
   787  				framework.PodSchedulingContext:    framework.All,
   788  				framework.ResourceClaim:           framework.All,
   789  				framework.ResourceClass:           framework.All,
   790  				framework.ResourceClaimParameters: framework.All,
   791  				framework.ResourceClassParameters: framework.All,
   792  			},
   793  		},
   794  		{
   795  			name: "node plugin",
   796  			plugins: schedulerapi.PluginSet{
   797  				Enabled: []schedulerapi.Plugin{
   798  					{Name: fakeNode},
   799  					{Name: queueSort},
   800  					{Name: fakeBind},
   801  				},
   802  				Disabled: []schedulerapi.Plugin{{Name: "*"}}, // disable default plugins
   803  			},
   804  			want: map[framework.GVK]framework.ActionType{
   805  				framework.Node: framework.Add | framework.UpdateNodeTaint, // When Node/Add is registered, Node/UpdateNodeTaint is automatically registered.
   806  			},
   807  		},
   808  		{
   809  			name: "pod plugin",
   810  			plugins: schedulerapi.PluginSet{
   811  				Enabled: []schedulerapi.Plugin{
   812  					{Name: fakePod},
   813  					{Name: queueSort},
   814  					{Name: fakeBind},
   815  				},
   816  				Disabled: []schedulerapi.Plugin{{Name: "*"}}, // disable default plugins
   817  			},
   818  			want: map[framework.GVK]framework.ActionType{
   819  				framework.Pod: framework.Add,
   820  			},
   821  		},
   822  		{
   823  			name: "node and pod plugin",
   824  			plugins: schedulerapi.PluginSet{
   825  				Enabled: []schedulerapi.Plugin{
   826  					{Name: fakePod},
   827  					{Name: fakeNode},
   828  					{Name: queueSort},
   829  					{Name: fakeBind},
   830  				},
   831  				Disabled: []schedulerapi.Plugin{{Name: "*"}}, // disable default plugins
   832  			},
   833  			want: map[framework.GVK]framework.ActionType{
   834  				framework.Pod:  framework.Add,
   835  				framework.Node: framework.Add | framework.UpdateNodeTaint, // When Node/Add is registered, Node/UpdateNodeTaint is automatically registered.
   836  			},
   837  		},
   838  		{
   839  			name: "empty EventsToRegister plugin",
   840  			plugins: schedulerapi.PluginSet{
   841  				Enabled: []schedulerapi.Plugin{
   842  					{Name: emptyEventsToRegister},
   843  					{Name: queueSort},
   844  					{Name: fakeBind},
   845  				},
   846  				Disabled: []schedulerapi.Plugin{{Name: "*"}}, // disable default plugins
   847  			},
   848  			want: map[framework.GVK]framework.ActionType{},
   849  		},
   850  		{
   851  			name:    "plugins with default profile",
   852  			plugins: schedulerapi.PluginSet{Enabled: defaults.PluginsV1.MultiPoint.Enabled},
   853  			want: map[framework.GVK]framework.ActionType{
   854  				framework.Pod:                   framework.All,
   855  				framework.Node:                  framework.All,
   856  				framework.CSINode:               framework.All - framework.Delete,
   857  				framework.CSIDriver:             framework.All - framework.Delete,
   858  				framework.CSIStorageCapacity:    framework.All - framework.Delete,
   859  				framework.PersistentVolume:      framework.All - framework.Delete,
   860  				framework.PersistentVolumeClaim: framework.All - framework.Delete,
   861  				framework.StorageClass:          framework.All - framework.Delete,
   862  			},
   863  		},
   864  	}
   865  	for _, tt := range tests {
   866  		t.Run(tt.name, func(t *testing.T) {
   867  			_, ctx := ktesting.NewTestContext(t)
   868  			ctx, cancel := context.WithCancel(ctx)
   869  			defer cancel()
   870  			registry := plugins.NewInTreeRegistry()
   872  			cfgPls := &schedulerapi.Plugins{MultiPoint: tt.plugins}
   873  			plugins := []framework.Plugin{&fakeNodePlugin{}, &fakePodPlugin{}, &filterWithoutEnqueueExtensionsPlugin{}, &emptyEventsToRegisterPlugin{}, &fakeQueueSortPlugin{}, &fakebindPlugin{}}
   874  			for _, pl := range plugins {
   875  				tmpPl := pl
   876  				if err := registry.Register(pl.Name(), func(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) {
   877  					return tmpPl, nil
   878  				}); err != nil {
   879  					t.Fatalf("fail to register filter plugin (%s)", pl.Name())
   880  				}
   881  			}
   883  			profile := schedulerapi.KubeSchedulerProfile{Plugins: cfgPls, PluginConfig: defaults.PluginConfigsV1}
   884  			fwk, err := newFramework(ctx, registry, profile)
   885  			if err != nil {
   886  				t.Fatal(err)
   887  			}
   889  			queueingHintsPerProfile := internalqueue.QueueingHintMapPerProfile{
   890  				"default": buildQueueingHintMap(fwk.EnqueueExtensions()),
   891  			}
   892  			got := unionedGVKs(queueingHintsPerProfile)
   894  			if diff := cmp.Diff(tt.want, got); diff != "" {
   895  				t.Errorf("Unexpected eventToPlugin map (-want,+got):%s", diff)
   896  			}
   897  		})
   898  	}
   899  }
   901  func newFramework(ctx context.Context, r frameworkruntime.Registry, profile schedulerapi.KubeSchedulerProfile) (framework.Framework, error) {
   902  	return frameworkruntime.NewFramework(ctx, r, &profile,
   903  		frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(nil, nil)),
   904  		frameworkruntime.WithInformerFactory(informers.NewSharedInformerFactory(fake.NewSimpleClientset(), 0)),
   905  	)
   906  }
   908  var _ framework.QueueSortPlugin = &fakeQueueSortPlugin{}
   910  // fakeQueueSortPlugin is a no-op implementation for QueueSort extension point.
   911  type fakeQueueSortPlugin struct{}
   913  func (pl *fakeQueueSortPlugin) Name() string {
   914  	return queueSort
   915  }
   917  func (pl *fakeQueueSortPlugin) Less(_, _ *framework.QueuedPodInfo) bool {
   918  	return false
   919  }
   921  var _ framework.BindPlugin = &fakebindPlugin{}
   923  // fakebindPlugin is a no-op implementation for Bind extension point.
   924  type fakebindPlugin struct{}
   926  func (t *fakebindPlugin) Name() string {
   927  	return fakeBind
   928  }
   930  func (t *fakebindPlugin) Bind(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) *framework.Status {
   931  	return nil
   932  }
   934  // filterWithoutEnqueueExtensionsPlugin implements Filter, but doesn't implement EnqueueExtensions.
   935  type filterWithoutEnqueueExtensionsPlugin struct{}
   937  func (*filterWithoutEnqueueExtensionsPlugin) Name() string { return filterWithoutEnqueueExtensions }
   939  func (*filterWithoutEnqueueExtensionsPlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ *framework.NodeInfo) *framework.Status {
   940  	return nil
   941  }
   943  var hintFromFakeNode = framework.QueueingHint(100)
   945  type fakeNodePlugin struct{}
   947  var fakeNodePluginQueueingFn = func(_ klog.Logger, _ *v1.Pod, _, _ interface{}) (framework.QueueingHint, error) {
   948  	return hintFromFakeNode, nil
   949  }
   951  func (*fakeNodePlugin) Name() string { return fakeNode }
   953  func (*fakeNodePlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ *framework.NodeInfo) *framework.Status {
   954  	return nil
   955  }
   957  func (pl *fakeNodePlugin) EventsToRegister() []framework.ClusterEventWithHint {
   958  	return []framework.ClusterEventWithHint{
   959  		{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add}, QueueingHintFn: fakeNodePluginQueueingFn},
   960  	}
   961  }
   963  var hintFromFakePod = framework.QueueingHint(101)
   965  type fakePodPlugin struct{}
   967  var fakePodPluginQueueingFn = func(_ klog.Logger, _ *v1.Pod, _, _ interface{}) (framework.QueueingHint, error) {
   968  	return hintFromFakePod, nil
   969  }
   971  func (*fakePodPlugin) Name() string { return fakePod }
   973  func (*fakePodPlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ *framework.NodeInfo) *framework.Status {
   974  	return nil
   975  }
   977  func (pl *fakePodPlugin) EventsToRegister() []framework.ClusterEventWithHint {
   978  	return []framework.ClusterEventWithHint{
   979  		{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Add}, QueueingHintFn: fakePodPluginQueueingFn},
   980  	}
   981  }
   983  type emptyEventPlugin struct{}
   985  func (*emptyEventPlugin) Name() string { return emptyEventExtensions }
   987  func (*emptyEventPlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ *framework.NodeInfo) *framework.Status {
   988  	return nil
   989  }
   991  func (pl *emptyEventPlugin) EventsToRegister() []framework.ClusterEventWithHint {
   992  	return nil
   993  }
   995  // emptyEventsToRegisterPlugin implement interface framework.EnqueueExtensions, but returns nil from EventsToRegister.
   996  // This can simulate a plugin registered at scheduler setup, but does nothing
   997  // due to some disabled feature gate.
   998  type emptyEventsToRegisterPlugin struct{}
  1000  func (*emptyEventsToRegisterPlugin) Name() string { return emptyEventsToRegister }
  1002  func (*emptyEventsToRegisterPlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ *framework.NodeInfo) *framework.Status {
  1003  	return nil
  1004  }
  1006  func (*emptyEventsToRegisterPlugin) EventsToRegister() []framework.ClusterEventWithHint { return nil }

View as plain text