...

Source file src/k8s.io/kubernetes/pkg/scheduler/scheduler_test.go

Documentation: k8s.io/kubernetes/pkg/scheduler

     1  /*
     2  Copyright 2014 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package scheduler
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"sort"
    23  	"strings"
    24  	"testing"
    25  	"time"
    26  
    27  	"github.com/google/go-cmp/cmp"
    28  	v1 "k8s.io/api/core/v1"
    29  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    30  	"k8s.io/apimachinery/pkg/runtime"
    31  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    32  	"k8s.io/client-go/informers"
    33  	"k8s.io/client-go/kubernetes"
    34  	"k8s.io/client-go/kubernetes/fake"
    35  	"k8s.io/client-go/kubernetes/scheme"
    36  	"k8s.io/client-go/tools/cache"
    37  	"k8s.io/client-go/tools/events"
    38  	featuregatetesting "k8s.io/component-base/featuregate/testing"
    39  	"k8s.io/klog/v2"
    40  	"k8s.io/klog/v2/ktesting"
    41  	"k8s.io/kubernetes/pkg/features"
    42  	schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
    43  	"k8s.io/kubernetes/pkg/scheduler/apis/config/testing/defaults"
    44  	"k8s.io/kubernetes/pkg/scheduler/framework"
    45  	"k8s.io/kubernetes/pkg/scheduler/framework/plugins"
    46  	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder"
    47  	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort"
    48  	frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
    49  	internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
    50  	internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
    51  	"k8s.io/kubernetes/pkg/scheduler/profile"
    52  	st "k8s.io/kubernetes/pkg/scheduler/testing"
    53  	tf "k8s.io/kubernetes/pkg/scheduler/testing/framework"
    54  	testingclock "k8s.io/utils/clock/testing"
    55  	"k8s.io/utils/ptr"
    56  )
    57  
    58  func TestSchedulerCreation(t *testing.T) {
    59  	invalidRegistry := map[string]frameworkruntime.PluginFactory{
    60  		defaultbinder.Name: defaultbinder.New,
    61  	}
    62  	validRegistry := map[string]frameworkruntime.PluginFactory{
    63  		"Foo": defaultbinder.New,
    64  	}
    65  	cases := []struct {
    66  		name          string
    67  		opts          []Option
    68  		wantErr       string
    69  		wantProfiles  []string
    70  		wantExtenders []string
    71  	}{
    72  		{
    73  			name: "valid out-of-tree registry",
    74  			opts: []Option{
    75  				WithFrameworkOutOfTreeRegistry(validRegistry),
    76  				WithProfiles(
    77  					schedulerapi.KubeSchedulerProfile{
    78  						SchedulerName: "default-scheduler",
    79  						Plugins: &schedulerapi.Plugins{
    80  							QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
    81  							Bind:      schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
    82  						},
    83  					},
    84  				)},
    85  			wantProfiles: []string{"default-scheduler"},
    86  		},
    87  		{
    88  			name: "repeated plugin name in out-of-tree plugin",
    89  			opts: []Option{
    90  				WithFrameworkOutOfTreeRegistry(invalidRegistry),
    91  				WithProfiles(
    92  					schedulerapi.KubeSchedulerProfile{
    93  						SchedulerName: "default-scheduler",
    94  						Plugins: &schedulerapi.Plugins{
    95  							QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
    96  							Bind:      schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
    97  						},
    98  					},
    99  				)},
   100  			wantProfiles: []string{"default-scheduler"},
   101  			wantErr:      "a plugin named DefaultBinder already exists",
   102  		},
   103  		{
   104  			name: "multiple profiles",
   105  			opts: []Option{
   106  				WithProfiles(
   107  					schedulerapi.KubeSchedulerProfile{
   108  						SchedulerName: "foo",
   109  						Plugins: &schedulerapi.Plugins{
   110  							QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
   111  							Bind:      schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
   112  						},
   113  					},
   114  					schedulerapi.KubeSchedulerProfile{
   115  						SchedulerName: "bar",
   116  						Plugins: &schedulerapi.Plugins{
   117  							QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
   118  							Bind:      schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
   119  						},
   120  					},
   121  				)},
   122  			wantProfiles: []string{"bar", "foo"},
   123  		},
   124  		{
   125  			name: "Repeated profiles",
   126  			opts: []Option{
   127  				WithProfiles(
   128  					schedulerapi.KubeSchedulerProfile{
   129  						SchedulerName: "foo",
   130  						Plugins: &schedulerapi.Plugins{
   131  							QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
   132  							Bind:      schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
   133  						},
   134  					},
   135  					schedulerapi.KubeSchedulerProfile{
   136  						SchedulerName: "bar",
   137  						Plugins: &schedulerapi.Plugins{
   138  							QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
   139  							Bind:      schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
   140  						},
   141  					},
   142  					schedulerapi.KubeSchedulerProfile{
   143  						SchedulerName: "foo",
   144  						Plugins: &schedulerapi.Plugins{
   145  							QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
   146  							Bind:      schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
   147  						},
   148  					},
   149  				)},
   150  			wantErr: "duplicate profile with scheduler name \"foo\"",
   151  		},
   152  		{
   153  			name: "With extenders",
   154  			opts: []Option{
   155  				WithProfiles(
   156  					schedulerapi.KubeSchedulerProfile{
   157  						SchedulerName: "default-scheduler",
   158  						Plugins: &schedulerapi.Plugins{
   159  							QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
   160  							Bind:      schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
   161  						},
   162  					},
   163  				),
   164  				WithExtenders(
   165  					schedulerapi.Extender{
   166  						URLPrefix: "http://extender.kube-system/",
   167  					},
   168  				),
   169  			},
   170  			wantProfiles:  []string{"default-scheduler"},
   171  			wantExtenders: []string{"http://extender.kube-system/"},
   172  		},
   173  	}
   174  
   175  	for _, tc := range cases {
   176  		t.Run(tc.name, func(t *testing.T) {
   177  			client := fake.NewSimpleClientset()
   178  			informerFactory := informers.NewSharedInformerFactory(client, 0)
   179  
   180  			eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
   181  
   182  			_, ctx := ktesting.NewTestContext(t)
   183  			ctx, cancel := context.WithCancel(ctx)
   184  			defer cancel()
   185  			s, err := New(
   186  				ctx,
   187  				client,
   188  				informerFactory,
   189  				nil,
   190  				profile.NewRecorderFactory(eventBroadcaster),
   191  				tc.opts...,
   192  			)
   193  
   194  			// Errors
   195  			if len(tc.wantErr) != 0 {
   196  				if err == nil || !strings.Contains(err.Error(), tc.wantErr) {
   197  					t.Errorf("got error %q, want %q", err, tc.wantErr)
   198  				}
   199  				return
   200  			}
   201  			if err != nil {
   202  				t.Fatalf("Failed to create scheduler: %v", err)
   203  			}
   204  
   205  			// Profiles
   206  			profiles := make([]string, 0, len(s.Profiles))
   207  			for name := range s.Profiles {
   208  				profiles = append(profiles, name)
   209  			}
   210  			sort.Strings(profiles)
   211  			if diff := cmp.Diff(tc.wantProfiles, profiles); diff != "" {
   212  				t.Errorf("unexpected profiles (-want, +got):\n%s", diff)
   213  			}
   214  
   215  			// Extenders
   216  			if len(tc.wantExtenders) != 0 {
   217  				// Scheduler.Extenders
   218  				extenders := make([]string, 0, len(s.Extenders))
   219  				for _, e := range s.Extenders {
   220  					extenders = append(extenders, e.Name())
   221  				}
   222  				if diff := cmp.Diff(tc.wantExtenders, extenders); diff != "" {
   223  					t.Errorf("unexpected extenders (-want, +got):\n%s", diff)
   224  				}
   225  
   226  				// framework.Handle.Extenders()
   227  				for _, p := range s.Profiles {
   228  					extenders := make([]string, 0, len(p.Extenders()))
   229  					for _, e := range p.Extenders() {
   230  						extenders = append(extenders, e.Name())
   231  					}
   232  					if diff := cmp.Diff(tc.wantExtenders, extenders); diff != "" {
   233  						t.Errorf("unexpected extenders (-want, +got):\n%s", diff)
   234  					}
   235  				}
   236  			}
   237  		})
   238  	}
   239  }
   240  
   241  func TestFailureHandler(t *testing.T) {
   242  	testPod := st.MakePod().Name("test-pod").Namespace(v1.NamespaceDefault).Obj()
   243  	testPodUpdated := testPod.DeepCopy()
   244  	testPodUpdated.Labels = map[string]string{"foo": ""}
   245  
   246  	tests := []struct {
   247  		name                       string
   248  		podUpdatedDuringScheduling bool // pod is updated during a scheduling cycle
   249  		podDeletedDuringScheduling bool // pod is deleted during a scheduling cycle
   250  		expect                     *v1.Pod
   251  	}{
   252  		{
   253  			name:                       "pod is updated during a scheduling cycle",
   254  			podUpdatedDuringScheduling: true,
   255  			expect:                     testPodUpdated,
   256  		},
   257  		{
   258  			name:   "pod is not updated during a scheduling cycle",
   259  			expect: testPod,
   260  		},
   261  		{
   262  			name:                       "pod is deleted during a scheduling cycle",
   263  			podDeletedDuringScheduling: true,
   264  			expect:                     nil,
   265  		},
   266  	}
   267  
   268  	for _, tt := range tests {
   269  		t.Run(tt.name, func(t *testing.T) {
   270  			logger, ctx := ktesting.NewTestContext(t)
   271  			ctx, cancel := context.WithCancel(ctx)
   272  			defer cancel()
   273  
   274  			client := fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testPod}})
   275  			informerFactory := informers.NewSharedInformerFactory(client, 0)
   276  			podInformer := informerFactory.Core().V1().Pods()
   277  			// Need to add/update/delete testPod to the store.
   278  			podInformer.Informer().GetStore().Add(testPod)
   279  
   280  			queue := internalqueue.NewPriorityQueue(nil, informerFactory, internalqueue.WithClock(testingclock.NewFakeClock(time.Now())))
   281  			schedulerCache := internalcache.New(ctx, 30*time.Second)
   282  
   283  			if err := queue.Add(logger, testPod); err != nil {
   284  				t.Fatalf("Add failed: %v", err)
   285  			}
   286  
   287  			if _, err := queue.Pop(logger); err != nil {
   288  				t.Fatalf("Pop failed: %v", err)
   289  			}
   290  
   291  			if tt.podUpdatedDuringScheduling {
   292  				podInformer.Informer().GetStore().Update(testPodUpdated)
   293  				queue.Update(logger, testPod, testPodUpdated)
   294  			}
   295  			if tt.podDeletedDuringScheduling {
   296  				podInformer.Informer().GetStore().Delete(testPod)
   297  				queue.Delete(testPod)
   298  			}
   299  
   300  			s, fwk, err := initScheduler(ctx, schedulerCache, queue, client, informerFactory)
   301  			if err != nil {
   302  				t.Fatal(err)
   303  			}
   304  
   305  			testPodInfo := &framework.QueuedPodInfo{PodInfo: mustNewPodInfo(t, testPod)}
   306  			s.FailureHandler(ctx, fwk, testPodInfo, framework.NewStatus(framework.Unschedulable), nil, time.Now())
   307  
   308  			var got *v1.Pod
   309  			if tt.podUpdatedDuringScheduling {
   310  				head, e := queue.Pop(logger)
   311  				if e != nil {
   312  					t.Fatalf("Cannot pop pod from the activeQ: %v", e)
   313  				}
   314  				got = head.Pod
   315  			} else {
   316  				got = getPodFromPriorityQueue(queue, testPod)
   317  			}
   318  
   319  			if diff := cmp.Diff(tt.expect, got); diff != "" {
   320  				t.Errorf("Unexpected pod (-want, +got): %s", diff)
   321  			}
   322  		})
   323  	}
   324  }
   325  
   326  func TestFailureHandler_PodAlreadyBound(t *testing.T) {
   327  	logger, ctx := ktesting.NewTestContext(t)
   328  	ctx, cancel := context.WithCancel(ctx)
   329  	defer cancel()
   330  
   331  	nodeFoo := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
   332  	testPod := st.MakePod().Name("test-pod").Namespace(v1.NamespaceDefault).Node("foo").Obj()
   333  
   334  	client := fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testPod}}, &v1.NodeList{Items: []v1.Node{nodeFoo}})
   335  	informerFactory := informers.NewSharedInformerFactory(client, 0)
   336  	podInformer := informerFactory.Core().V1().Pods()
   337  	// Need to add testPod to the store.
   338  	podInformer.Informer().GetStore().Add(testPod)
   339  
   340  	queue := internalqueue.NewPriorityQueue(nil, informerFactory, internalqueue.WithClock(testingclock.NewFakeClock(time.Now())))
   341  	schedulerCache := internalcache.New(ctx, 30*time.Second)
   342  
   343  	// Add node to schedulerCache no matter it's deleted in API server or not.
   344  	schedulerCache.AddNode(logger, &nodeFoo)
   345  
   346  	s, fwk, err := initScheduler(ctx, schedulerCache, queue, client, informerFactory)
   347  	if err != nil {
   348  		t.Fatal(err)
   349  	}
   350  
   351  	testPodInfo := &framework.QueuedPodInfo{PodInfo: mustNewPodInfo(t, testPod)}
   352  	s.FailureHandler(ctx, fwk, testPodInfo, framework.NewStatus(framework.Unschedulable).WithError(fmt.Errorf("binding rejected: timeout")), nil, time.Now())
   353  
   354  	pod := getPodFromPriorityQueue(queue, testPod)
   355  	if pod != nil {
   356  		t.Fatalf("Unexpected pod: %v should not be in PriorityQueue when the NodeName of pod is not empty", pod.Name)
   357  	}
   358  }
   359  
   360  // TestWithPercentageOfNodesToScore tests scheduler's PercentageOfNodesToScore is set correctly.
   361  func TestWithPercentageOfNodesToScore(t *testing.T) {
   362  	tests := []struct {
   363  		name                           string
   364  		percentageOfNodesToScoreConfig *int32
   365  		wantedPercentageOfNodesToScore int32
   366  	}{
   367  		{
   368  			name:                           "percentageOfNodesScore is nil",
   369  			percentageOfNodesToScoreConfig: nil,
   370  			wantedPercentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
   371  		},
   372  		{
   373  			name:                           "percentageOfNodesScore is not nil",
   374  			percentageOfNodesToScoreConfig: ptr.To[int32](10),
   375  			wantedPercentageOfNodesToScore: 10,
   376  		},
   377  	}
   378  
   379  	for _, tt := range tests {
   380  		t.Run(tt.name, func(t *testing.T) {
   381  			client := fake.NewSimpleClientset()
   382  			informerFactory := informers.NewSharedInformerFactory(client, 0)
   383  			eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
   384  			_, ctx := ktesting.NewTestContext(t)
   385  			ctx, cancel := context.WithCancel(ctx)
   386  			defer cancel()
   387  			sched, err := New(
   388  				ctx,
   389  				client,
   390  				informerFactory,
   391  				nil,
   392  				profile.NewRecorderFactory(eventBroadcaster),
   393  				WithPercentageOfNodesToScore(tt.percentageOfNodesToScoreConfig),
   394  			)
   395  			if err != nil {
   396  				t.Fatalf("Failed to create scheduler: %v", err)
   397  			}
   398  			if sched.percentageOfNodesToScore != tt.wantedPercentageOfNodesToScore {
   399  				t.Errorf("scheduler.percercentageOfNodesToScore = %v, want %v", sched.percentageOfNodesToScore, tt.wantedPercentageOfNodesToScore)
   400  			}
   401  		})
   402  	}
   403  }
   404  
   405  // getPodFromPriorityQueue is the function used in the TestDefaultErrorFunc test to get
   406  // the specific pod from the given priority queue. It returns the found pod in the priority queue.
   407  func getPodFromPriorityQueue(queue *internalqueue.PriorityQueue, pod *v1.Pod) *v1.Pod {
   408  	podList, _ := queue.PendingPods()
   409  	if len(podList) == 0 {
   410  		return nil
   411  	}
   412  
   413  	queryPodKey, err := cache.MetaNamespaceKeyFunc(pod)
   414  	if err != nil {
   415  		return nil
   416  	}
   417  
   418  	for _, foundPod := range podList {
   419  		foundPodKey, err := cache.MetaNamespaceKeyFunc(foundPod)
   420  		if err != nil {
   421  			return nil
   422  		}
   423  
   424  		if foundPodKey == queryPodKey {
   425  			return foundPod
   426  		}
   427  	}
   428  
   429  	return nil
   430  }
   431  
   432  func initScheduler(ctx context.Context, cache internalcache.Cache, queue internalqueue.SchedulingQueue,
   433  	client kubernetes.Interface, informerFactory informers.SharedInformerFactory) (*Scheduler, framework.Framework, error) {
   434  	logger := klog.FromContext(ctx)
   435  	registerPluginFuncs := []tf.RegisterPluginFunc{
   436  		tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
   437  		tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
   438  	}
   439  	eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
   440  	fwk, err := tf.NewFramework(ctx,
   441  		registerPluginFuncs,
   442  		testSchedulerName,
   443  		frameworkruntime.WithClientSet(client),
   444  		frameworkruntime.WithInformerFactory(informerFactory),
   445  		frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName)),
   446  	)
   447  	if err != nil {
   448  		return nil, nil, err
   449  	}
   450  
   451  	s := &Scheduler{
   452  		Cache:           cache,
   453  		client:          client,
   454  		StopEverything:  ctx.Done(),
   455  		SchedulingQueue: queue,
   456  		Profiles:        profile.Map{testSchedulerName: fwk},
   457  		logger:          logger,
   458  	}
   459  	s.applyDefaultHandlers()
   460  
   461  	return s, fwk, nil
   462  }
   463  
   464  func TestInitPluginsWithIndexers(t *testing.T) {
   465  	tests := []struct {
   466  		name string
   467  		// the plugin registration ordering must not matter, being map traversal random
   468  		entrypoints map[string]frameworkruntime.PluginFactory
   469  		wantErr     string
   470  	}{
   471  		{
   472  			name: "register indexer, no conflicts",
   473  			entrypoints: map[string]frameworkruntime.PluginFactory{
   474  				"AddIndexer": func(ctx context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) {
   475  					podInformer := handle.SharedInformerFactory().Core().V1().Pods()
   476  					err := podInformer.Informer().AddIndexers(cache.Indexers{
   477  						"nodeName": indexByPodSpecNodeName,
   478  					})
   479  					return &TestPlugin{name: "AddIndexer"}, err
   480  				},
   481  			},
   482  		},
   483  		{
   484  			name: "register the same indexer name multiple times, conflict",
   485  			// order of registration doesn't matter
   486  			entrypoints: map[string]frameworkruntime.PluginFactory{
   487  				"AddIndexer1": func(ctx context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) {
   488  					podInformer := handle.SharedInformerFactory().Core().V1().Pods()
   489  					err := podInformer.Informer().AddIndexers(cache.Indexers{
   490  						"nodeName": indexByPodSpecNodeName,
   491  					})
   492  					return &TestPlugin{name: "AddIndexer1"}, err
   493  				},
   494  				"AddIndexer2": func(ctx context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) {
   495  					podInformer := handle.SharedInformerFactory().Core().V1().Pods()
   496  					err := podInformer.Informer().AddIndexers(cache.Indexers{
   497  						"nodeName": indexByPodAnnotationNodeName,
   498  					})
   499  					return &TestPlugin{name: "AddIndexer1"}, err
   500  				},
   501  			},
   502  			wantErr: "indexer conflict",
   503  		},
   504  		{
   505  			name: "register the same indexer body with different names, no conflicts",
   506  			// order of registration doesn't matter
   507  			entrypoints: map[string]frameworkruntime.PluginFactory{
   508  				"AddIndexer1": func(ctx context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) {
   509  					podInformer := handle.SharedInformerFactory().Core().V1().Pods()
   510  					err := podInformer.Informer().AddIndexers(cache.Indexers{
   511  						"nodeName1": indexByPodSpecNodeName,
   512  					})
   513  					return &TestPlugin{name: "AddIndexer1"}, err
   514  				},
   515  				"AddIndexer2": func(ctx context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) {
   516  					podInformer := handle.SharedInformerFactory().Core().V1().Pods()
   517  					err := podInformer.Informer().AddIndexers(cache.Indexers{
   518  						"nodeName2": indexByPodAnnotationNodeName,
   519  					})
   520  					return &TestPlugin{name: "AddIndexer2"}, err
   521  				},
   522  			},
   523  		},
   524  	}
   525  
   526  	for _, tt := range tests {
   527  		t.Run(tt.name, func(t *testing.T) {
   528  			fakeInformerFactory := NewInformerFactory(&fake.Clientset{}, 0*time.Second)
   529  
   530  			var registerPluginFuncs []tf.RegisterPluginFunc
   531  			for name, entrypoint := range tt.entrypoints {
   532  				registerPluginFuncs = append(registerPluginFuncs,
   533  					// anything supported by TestPlugin is fine
   534  					tf.RegisterFilterPlugin(name, entrypoint),
   535  				)
   536  			}
   537  			// we always need this
   538  			registerPluginFuncs = append(registerPluginFuncs,
   539  				tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
   540  				tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
   541  			)
   542  			_, ctx := ktesting.NewTestContext(t)
   543  			ctx, cancel := context.WithCancel(ctx)
   544  			defer cancel()
   545  			_, err := tf.NewFramework(ctx, registerPluginFuncs, "test", frameworkruntime.WithInformerFactory(fakeInformerFactory))
   546  
   547  			if len(tt.wantErr) > 0 {
   548  				if err == nil || !strings.Contains(err.Error(), tt.wantErr) {
   549  					t.Errorf("got error %q, want %q", err, tt.wantErr)
   550  				}
   551  				return
   552  			}
   553  			if err != nil {
   554  				t.Fatalf("Failed to create scheduler: %v", err)
   555  			}
   556  		})
   557  	}
   558  }
   559  
   560  func indexByPodSpecNodeName(obj interface{}) ([]string, error) {
   561  	pod, ok := obj.(*v1.Pod)
   562  	if !ok {
   563  		return []string{}, nil
   564  	}
   565  	if len(pod.Spec.NodeName) == 0 {
   566  		return []string{}, nil
   567  	}
   568  	return []string{pod.Spec.NodeName}, nil
   569  }
   570  
   571  func indexByPodAnnotationNodeName(obj interface{}) ([]string, error) {
   572  	pod, ok := obj.(*v1.Pod)
   573  	if !ok {
   574  		return []string{}, nil
   575  	}
   576  	if len(pod.Annotations) == 0 {
   577  		return []string{}, nil
   578  	}
   579  	nodeName, ok := pod.Annotations["node-name"]
   580  	if !ok {
   581  		return []string{}, nil
   582  	}
   583  	return []string{nodeName}, nil
   584  }
   585  
   586  const (
   587  	filterWithoutEnqueueExtensions = "filterWithoutEnqueueExtensions"
   588  	fakeNode                       = "fakeNode"
   589  	fakePod                        = "fakePod"
   590  	emptyEventsToRegister          = "emptyEventsToRegister"
   591  	queueSort                      = "no-op-queue-sort-plugin"
   592  	fakeBind                       = "bind-plugin"
   593  	emptyEventExtensions           = "emptyEventExtensions"
   594  )
   595  
   596  func Test_buildQueueingHintMap(t *testing.T) {
   597  	tests := []struct {
   598  		name                string
   599  		plugins             []framework.Plugin
   600  		want                map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction
   601  		featuregateDisabled bool
   602  	}{
   603  		{
   604  			name:    "filter without EnqueueExtensions plugin",
   605  			plugins: []framework.Plugin{&filterWithoutEnqueueExtensionsPlugin{}},
   606  			want: map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction{
   607  				{Resource: framework.Pod, ActionType: framework.All}: {
   608  					{PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn},
   609  				},
   610  				{Resource: framework.Node, ActionType: framework.All}: {
   611  					{PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn},
   612  				},
   613  				{Resource: framework.CSINode, ActionType: framework.All}: {
   614  					{PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn},
   615  				},
   616  				{Resource: framework.CSIDriver, ActionType: framework.All}: {
   617  					{PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn},
   618  				},
   619  				{Resource: framework.CSIStorageCapacity, ActionType: framework.All}: {
   620  					{PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn},
   621  				},
   622  				{Resource: framework.PersistentVolume, ActionType: framework.All}: {
   623  					{PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn},
   624  				},
   625  				{Resource: framework.StorageClass, ActionType: framework.All}: {
   626  					{PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn},
   627  				},
   628  				{Resource: framework.PersistentVolumeClaim, ActionType: framework.All}: {
   629  					{PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn},
   630  				},
   631  				{Resource: framework.PodSchedulingContext, ActionType: framework.All}: {
   632  					{PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn},
   633  				},
   634  				{Resource: framework.ResourceClaim, ActionType: framework.All}: {
   635  					{PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn},
   636  				},
   637  				{Resource: framework.ResourceClass, ActionType: framework.All}: {
   638  					{PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn},
   639  				},
   640  				{Resource: framework.ResourceClaimParameters, ActionType: framework.All}: {
   641  					{PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn},
   642  				},
   643  				{Resource: framework.ResourceClassParameters, ActionType: framework.All}: {
   644  					{PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn},
   645  				},
   646  			},
   647  		},
   648  		{
   649  			name:    "node and pod plugin",
   650  			plugins: []framework.Plugin{&fakeNodePlugin{}, &fakePodPlugin{}},
   651  			want: map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction{
   652  				{Resource: framework.Pod, ActionType: framework.Add}: {
   653  					{PluginName: fakePod, QueueingHintFn: fakePodPluginQueueingFn},
   654  				},
   655  				{Resource: framework.Node, ActionType: framework.Add}: {
   656  					{PluginName: fakeNode, QueueingHintFn: fakeNodePluginQueueingFn},
   657  				},
   658  				{Resource: framework.Node, ActionType: framework.UpdateNodeTaint}: {
   659  					{PluginName: fakeNode, QueueingHintFn: defaultQueueingHintFn}, // When Node/Add is registered, Node/UpdateNodeTaint is automatically registered.
   660  				},
   661  			},
   662  		},
   663  		{
   664  			name:                "node and pod plugin (featuregate is disabled)",
   665  			plugins:             []framework.Plugin{&fakeNodePlugin{}, &fakePodPlugin{}},
   666  			featuregateDisabled: true,
   667  			want: map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction{
   668  				{Resource: framework.Pod, ActionType: framework.Add}: {
   669  					{PluginName: fakePod, QueueingHintFn: defaultQueueingHintFn}, // default queueing hint due to disabled feature gate.
   670  				},
   671  				{Resource: framework.Node, ActionType: framework.Add}: {
   672  					{PluginName: fakeNode, QueueingHintFn: defaultQueueingHintFn}, // default queueing hint due to disabled feature gate.
   673  				},
   674  				{Resource: framework.Node, ActionType: framework.UpdateNodeTaint}: {
   675  					{PluginName: fakeNode, QueueingHintFn: defaultQueueingHintFn}, // When Node/Add is registered, Node/UpdateNodeTaint is automatically registered.
   676  				},
   677  			},
   678  		},
   679  		{
   680  			name:    "register plugin with empty event",
   681  			plugins: []framework.Plugin{&emptyEventPlugin{}},
   682  			want:    map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction{},
   683  		},
   684  		{
   685  			name:    "register plugins including emptyEventPlugin",
   686  			plugins: []framework.Plugin{&emptyEventPlugin{}, &fakeNodePlugin{}},
   687  			want: map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction{
   688  				{Resource: framework.Pod, ActionType: framework.Add}: {
   689  					{PluginName: fakePod, QueueingHintFn: fakePodPluginQueueingFn},
   690  				},
   691  				{Resource: framework.Node, ActionType: framework.Add}: {
   692  					{PluginName: fakeNode, QueueingHintFn: fakeNodePluginQueueingFn},
   693  				},
   694  				{Resource: framework.Node, ActionType: framework.UpdateNodeTaint}: {
   695  					{PluginName: fakeNode, QueueingHintFn: defaultQueueingHintFn}, // When Node/Add is registered, Node/UpdateNodeTaint is automatically registered.
   696  				},
   697  			},
   698  		},
   699  	}
   700  
   701  	for _, tt := range tests {
   702  		t.Run(tt.name, func(t *testing.T) {
   703  			defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerQueueingHints, !tt.featuregateDisabled)()
   704  			logger, ctx := ktesting.NewTestContext(t)
   705  			ctx, cancel := context.WithCancel(ctx)
   706  			defer cancel()
   707  			registry := frameworkruntime.Registry{}
   708  			cfgPls := &schedulerapi.Plugins{}
   709  			plugins := append(tt.plugins, &fakebindPlugin{}, &fakeQueueSortPlugin{})
   710  			for _, pl := range plugins {
   711  				tmpPl := pl
   712  				if err := registry.Register(pl.Name(), func(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) {
   713  					return tmpPl, nil
   714  				}); err != nil {
   715  					t.Fatalf("fail to register filter plugin (%s)", pl.Name())
   716  				}
   717  				cfgPls.MultiPoint.Enabled = append(cfgPls.MultiPoint.Enabled, schedulerapi.Plugin{Name: pl.Name()})
   718  			}
   719  
   720  			profile := schedulerapi.KubeSchedulerProfile{Plugins: cfgPls}
   721  			fwk, err := newFramework(ctx, registry, profile)
   722  			if err != nil {
   723  				t.Fatal(err)
   724  			}
   725  
   726  			exts := fwk.EnqueueExtensions()
   727  			// need to sort to make the test result stable.
   728  			sort.Slice(exts, func(i, j int) bool {
   729  				return exts[i].Name() < exts[j].Name()
   730  			})
   731  
   732  			got := buildQueueingHintMap(exts)
   733  
   734  			for e, fns := range got {
   735  				wantfns, ok := tt.want[e]
   736  				if !ok {
   737  					t.Errorf("got unexpected event %v", e)
   738  					continue
   739  				}
   740  				if len(fns) != len(wantfns) {
   741  					t.Errorf("got %v queueing hint functions, want %v", len(fns), len(wantfns))
   742  					continue
   743  				}
   744  				for i, fn := range fns {
   745  					if fn.PluginName != wantfns[i].PluginName {
   746  						t.Errorf("got plugin name %v, want %v", fn.PluginName, wantfns[i].PluginName)
   747  						continue
   748  					}
   749  					got, gotErr := fn.QueueingHintFn(logger, nil, nil, nil)
   750  					want, wantErr := wantfns[i].QueueingHintFn(logger, nil, nil, nil)
   751  					if got != want || gotErr != wantErr {
   752  						t.Errorf("got queueing hint function (%v) returning (%v, %v), expect it to return (%v, %v)", fn.PluginName, got, gotErr, want, wantErr)
   753  						continue
   754  					}
   755  				}
   756  			}
   757  		})
   758  	}
   759  }
   760  
   761  // Test_UnionedGVKs tests UnionedGVKs worked with buildQueueingHintMap.
   762  func Test_UnionedGVKs(t *testing.T) {
   763  	tests := []struct {
   764  		name    string
   765  		plugins schedulerapi.PluginSet
   766  		want    map[framework.GVK]framework.ActionType
   767  	}{
   768  		{
   769  			name: "filter without EnqueueExtensions plugin",
   770  			plugins: schedulerapi.PluginSet{
   771  				Enabled: []schedulerapi.Plugin{
   772  					{Name: filterWithoutEnqueueExtensions},
   773  					{Name: queueSort},
   774  					{Name: fakeBind},
   775  				},
   776  				Disabled: []schedulerapi.Plugin{{Name: "*"}}, // disable default plugins
   777  			},
   778  			want: map[framework.GVK]framework.ActionType{
   779  				framework.Pod:                     framework.All,
   780  				framework.Node:                    framework.All,
   781  				framework.CSINode:                 framework.All,
   782  				framework.CSIDriver:               framework.All,
   783  				framework.CSIStorageCapacity:      framework.All,
   784  				framework.PersistentVolume:        framework.All,
   785  				framework.PersistentVolumeClaim:   framework.All,
   786  				framework.StorageClass:            framework.All,
   787  				framework.PodSchedulingContext:    framework.All,
   788  				framework.ResourceClaim:           framework.All,
   789  				framework.ResourceClass:           framework.All,
   790  				framework.ResourceClaimParameters: framework.All,
   791  				framework.ResourceClassParameters: framework.All,
   792  			},
   793  		},
   794  		{
   795  			name: "node plugin",
   796  			plugins: schedulerapi.PluginSet{
   797  				Enabled: []schedulerapi.Plugin{
   798  					{Name: fakeNode},
   799  					{Name: queueSort},
   800  					{Name: fakeBind},
   801  				},
   802  				Disabled: []schedulerapi.Plugin{{Name: "*"}}, // disable default plugins
   803  			},
   804  			want: map[framework.GVK]framework.ActionType{
   805  				framework.Node: framework.Add | framework.UpdateNodeTaint, // When Node/Add is registered, Node/UpdateNodeTaint is automatically registered.
   806  			},
   807  		},
   808  		{
   809  			name: "pod plugin",
   810  			plugins: schedulerapi.PluginSet{
   811  				Enabled: []schedulerapi.Plugin{
   812  					{Name: fakePod},
   813  					{Name: queueSort},
   814  					{Name: fakeBind},
   815  				},
   816  				Disabled: []schedulerapi.Plugin{{Name: "*"}}, // disable default plugins
   817  			},
   818  			want: map[framework.GVK]framework.ActionType{
   819  				framework.Pod: framework.Add,
   820  			},
   821  		},
   822  		{
   823  			name: "node and pod plugin",
   824  			plugins: schedulerapi.PluginSet{
   825  				Enabled: []schedulerapi.Plugin{
   826  					{Name: fakePod},
   827  					{Name: fakeNode},
   828  					{Name: queueSort},
   829  					{Name: fakeBind},
   830  				},
   831  				Disabled: []schedulerapi.Plugin{{Name: "*"}}, // disable default plugins
   832  			},
   833  			want: map[framework.GVK]framework.ActionType{
   834  				framework.Pod:  framework.Add,
   835  				framework.Node: framework.Add | framework.UpdateNodeTaint, // When Node/Add is registered, Node/UpdateNodeTaint is automatically registered.
   836  			},
   837  		},
   838  		{
   839  			name: "empty EventsToRegister plugin",
   840  			plugins: schedulerapi.PluginSet{
   841  				Enabled: []schedulerapi.Plugin{
   842  					{Name: emptyEventsToRegister},
   843  					{Name: queueSort},
   844  					{Name: fakeBind},
   845  				},
   846  				Disabled: []schedulerapi.Plugin{{Name: "*"}}, // disable default plugins
   847  			},
   848  			want: map[framework.GVK]framework.ActionType{},
   849  		},
   850  		{
   851  			name:    "plugins with default profile",
   852  			plugins: schedulerapi.PluginSet{Enabled: defaults.PluginsV1.MultiPoint.Enabled},
   853  			want: map[framework.GVK]framework.ActionType{
   854  				framework.Pod:                   framework.All,
   855  				framework.Node:                  framework.All,
   856  				framework.CSINode:               framework.All - framework.Delete,
   857  				framework.CSIDriver:             framework.All - framework.Delete,
   858  				framework.CSIStorageCapacity:    framework.All - framework.Delete,
   859  				framework.PersistentVolume:      framework.All - framework.Delete,
   860  				framework.PersistentVolumeClaim: framework.All - framework.Delete,
   861  				framework.StorageClass:          framework.All - framework.Delete,
   862  			},
   863  		},
   864  	}
   865  	for _, tt := range tests {
   866  		t.Run(tt.name, func(t *testing.T) {
   867  			_, ctx := ktesting.NewTestContext(t)
   868  			ctx, cancel := context.WithCancel(ctx)
   869  			defer cancel()
   870  			registry := plugins.NewInTreeRegistry()
   871  
   872  			cfgPls := &schedulerapi.Plugins{MultiPoint: tt.plugins}
   873  			plugins := []framework.Plugin{&fakeNodePlugin{}, &fakePodPlugin{}, &filterWithoutEnqueueExtensionsPlugin{}, &emptyEventsToRegisterPlugin{}, &fakeQueueSortPlugin{}, &fakebindPlugin{}}
   874  			for _, pl := range plugins {
   875  				tmpPl := pl
   876  				if err := registry.Register(pl.Name(), func(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) {
   877  					return tmpPl, nil
   878  				}); err != nil {
   879  					t.Fatalf("fail to register filter plugin (%s)", pl.Name())
   880  				}
   881  			}
   882  
   883  			profile := schedulerapi.KubeSchedulerProfile{Plugins: cfgPls, PluginConfig: defaults.PluginConfigsV1}
   884  			fwk, err := newFramework(ctx, registry, profile)
   885  			if err != nil {
   886  				t.Fatal(err)
   887  			}
   888  
   889  			queueingHintsPerProfile := internalqueue.QueueingHintMapPerProfile{
   890  				"default": buildQueueingHintMap(fwk.EnqueueExtensions()),
   891  			}
   892  			got := unionedGVKs(queueingHintsPerProfile)
   893  
   894  			if diff := cmp.Diff(tt.want, got); diff != "" {
   895  				t.Errorf("Unexpected eventToPlugin map (-want,+got):%s", diff)
   896  			}
   897  		})
   898  	}
   899  }
   900  
   901  func newFramework(ctx context.Context, r frameworkruntime.Registry, profile schedulerapi.KubeSchedulerProfile) (framework.Framework, error) {
   902  	return frameworkruntime.NewFramework(ctx, r, &profile,
   903  		frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(nil, nil)),
   904  		frameworkruntime.WithInformerFactory(informers.NewSharedInformerFactory(fake.NewSimpleClientset(), 0)),
   905  	)
   906  }
   907  
   908  var _ framework.QueueSortPlugin = &fakeQueueSortPlugin{}
   909  
   910  // fakeQueueSortPlugin is a no-op implementation for QueueSort extension point.
   911  type fakeQueueSortPlugin struct{}
   912  
   913  func (pl *fakeQueueSortPlugin) Name() string {
   914  	return queueSort
   915  }
   916  
   917  func (pl *fakeQueueSortPlugin) Less(_, _ *framework.QueuedPodInfo) bool {
   918  	return false
   919  }
   920  
   921  var _ framework.BindPlugin = &fakebindPlugin{}
   922  
   923  // fakebindPlugin is a no-op implementation for Bind extension point.
   924  type fakebindPlugin struct{}
   925  
   926  func (t *fakebindPlugin) Name() string {
   927  	return fakeBind
   928  }
   929  
   930  func (t *fakebindPlugin) Bind(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) *framework.Status {
   931  	return nil
   932  }
   933  
   934  // filterWithoutEnqueueExtensionsPlugin implements Filter, but doesn't implement EnqueueExtensions.
   935  type filterWithoutEnqueueExtensionsPlugin struct{}
   936  
   937  func (*filterWithoutEnqueueExtensionsPlugin) Name() string { return filterWithoutEnqueueExtensions }
   938  
   939  func (*filterWithoutEnqueueExtensionsPlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ *framework.NodeInfo) *framework.Status {
   940  	return nil
   941  }
   942  
   943  var hintFromFakeNode = framework.QueueingHint(100)
   944  
   945  type fakeNodePlugin struct{}
   946  
   947  var fakeNodePluginQueueingFn = func(_ klog.Logger, _ *v1.Pod, _, _ interface{}) (framework.QueueingHint, error) {
   948  	return hintFromFakeNode, nil
   949  }
   950  
   951  func (*fakeNodePlugin) Name() string { return fakeNode }
   952  
   953  func (*fakeNodePlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ *framework.NodeInfo) *framework.Status {
   954  	return nil
   955  }
   956  
   957  func (pl *fakeNodePlugin) EventsToRegister() []framework.ClusterEventWithHint {
   958  	return []framework.ClusterEventWithHint{
   959  		{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add}, QueueingHintFn: fakeNodePluginQueueingFn},
   960  	}
   961  }
   962  
   963  var hintFromFakePod = framework.QueueingHint(101)
   964  
   965  type fakePodPlugin struct{}
   966  
   967  var fakePodPluginQueueingFn = func(_ klog.Logger, _ *v1.Pod, _, _ interface{}) (framework.QueueingHint, error) {
   968  	return hintFromFakePod, nil
   969  }
   970  
   971  func (*fakePodPlugin) Name() string { return fakePod }
   972  
   973  func (*fakePodPlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ *framework.NodeInfo) *framework.Status {
   974  	return nil
   975  }
   976  
   977  func (pl *fakePodPlugin) EventsToRegister() []framework.ClusterEventWithHint {
   978  	return []framework.ClusterEventWithHint{
   979  		{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Add}, QueueingHintFn: fakePodPluginQueueingFn},
   980  	}
   981  }
   982  
   983  type emptyEventPlugin struct{}
   984  
   985  func (*emptyEventPlugin) Name() string { return emptyEventExtensions }
   986  
   987  func (*emptyEventPlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ *framework.NodeInfo) *framework.Status {
   988  	return nil
   989  }
   990  
   991  func (pl *emptyEventPlugin) EventsToRegister() []framework.ClusterEventWithHint {
   992  	return nil
   993  }
   994  
   995  // emptyEventsToRegisterPlugin implement interface framework.EnqueueExtensions, but returns nil from EventsToRegister.
   996  // This can simulate a plugin registered at scheduler setup, but does nothing
   997  // due to some disabled feature gate.
   998  type emptyEventsToRegisterPlugin struct{}
   999  
  1000  func (*emptyEventsToRegisterPlugin) Name() string { return emptyEventsToRegister }
  1001  
  1002  func (*emptyEventsToRegisterPlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ *framework.NodeInfo) *framework.Status {
  1003  	return nil
  1004  }
  1005  
  1006  func (*emptyEventsToRegisterPlugin) EventsToRegister() []framework.ClusterEventWithHint { return nil }
  1007  

View as plain text