...

Source file src/k8s.io/kubernetes/test/integration/scheduler/queue_test.go

Documentation: k8s.io/kubernetes/test/integration/scheduler

     1  /*
     2  Copyright 2021 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package scheduler
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"testing"
    23  	"time"
    24  
    25  	v1 "k8s.io/api/core/v1"
    26  	apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
    27  	apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
    28  	"k8s.io/apimachinery/pkg/api/errors"
    29  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    30  	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    31  	"k8s.io/apimachinery/pkg/runtime"
    32  	"k8s.io/apimachinery/pkg/runtime/schema"
    33  	"k8s.io/apimachinery/pkg/types"
    34  	"k8s.io/apimachinery/pkg/util/sets"
    35  	"k8s.io/apimachinery/pkg/util/uuid"
    36  	"k8s.io/apimachinery/pkg/util/wait"
    37  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    38  	"k8s.io/client-go/dynamic"
    39  	"k8s.io/client-go/kubernetes"
    40  	featuregatetesting "k8s.io/component-base/featuregate/testing"
    41  	"k8s.io/klog/v2"
    42  	configv1 "k8s.io/kube-scheduler/config/v1"
    43  	apiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
    44  	"k8s.io/kubernetes/pkg/features"
    45  	"k8s.io/kubernetes/pkg/scheduler"
    46  	configtesting "k8s.io/kubernetes/pkg/scheduler/apis/config/testing"
    47  	"k8s.io/kubernetes/pkg/scheduler/framework"
    48  	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder"
    49  	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
    50  	frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
    51  	st "k8s.io/kubernetes/pkg/scheduler/testing"
    52  	testfwk "k8s.io/kubernetes/test/integration/framework"
    53  	testutils "k8s.io/kubernetes/test/integration/util"
    54  	imageutils "k8s.io/kubernetes/test/utils/image"
    55  	"k8s.io/utils/pointer"
    56  )
    57  
    58  func TestSchedulingGates(t *testing.T) {
    59  	tests := []struct {
    60  		name                  string
    61  		pods                  []*v1.Pod
    62  		want                  []string
    63  		rmPodsSchedulingGates []int
    64  		wantPostGatesRemoval  []string
    65  	}{
    66  		{
    67  			name: "regular pods",
    68  			pods: []*v1.Pod{
    69  				st.MakePod().Name("p1").Container("pause").Obj(),
    70  				st.MakePod().Name("p2").Container("pause").Obj(),
    71  			},
    72  			want: []string{"p1", "p2"},
    73  		},
    74  		{
    75  			name: "one pod carrying scheduling gates",
    76  			pods: []*v1.Pod{
    77  				st.MakePod().Name("p1").SchedulingGates([]string{"foo"}).Container("pause").Obj(),
    78  				st.MakePod().Name("p2").Container("pause").Obj(),
    79  			},
    80  			want: []string{"p2"},
    81  		},
    82  		{
    83  			name: "two pod carrying scheduling gates, and remove gates of one pod",
    84  			pods: []*v1.Pod{
    85  				st.MakePod().Name("p1").SchedulingGates([]string{"foo"}).Container("pause").Obj(),
    86  				st.MakePod().Name("p2").SchedulingGates([]string{"bar"}).Container("pause").Obj(),
    87  				st.MakePod().Name("p3").Container("pause").Obj(),
    88  			},
    89  			want:                  []string{"p3"},
    90  			rmPodsSchedulingGates: []int{1}, // remove gates of 'p2'
    91  			wantPostGatesRemoval:  []string{"p2"},
    92  		},
    93  	}
    94  
    95  	for _, tt := range tests {
    96  		t.Run(tt.name, func(t *testing.T) {
    97  			// Use zero backoff seconds to bypass backoffQ.
    98  			// It's intended to not start the scheduler's queue, and hence to
    99  			// not start any flushing logic. We will pop and schedule the Pods manually later.
   100  			testCtx := testutils.InitTestSchedulerWithOptions(
   101  				t,
   102  				testutils.InitTestAPIServer(t, "pod-scheduling-gates", nil),
   103  				0,
   104  				scheduler.WithPodInitialBackoffSeconds(0),
   105  				scheduler.WithPodMaxBackoffSeconds(0),
   106  			)
   107  			testutils.SyncSchedulerInformerFactory(testCtx)
   108  
   109  			cs, ns, ctx := testCtx.ClientSet, testCtx.NS.Name, testCtx.Ctx
   110  			for _, p := range tt.pods {
   111  				p.Namespace = ns
   112  				if _, err := cs.CoreV1().Pods(ns).Create(ctx, p, metav1.CreateOptions{}); err != nil {
   113  					t.Fatalf("Failed to create Pod %q: %v", p.Name, err)
   114  				}
   115  			}
   116  
   117  			// Wait for the pods to be present in the scheduling queue.
   118  			if err := wait.PollUntilContextTimeout(ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) {
   119  				pendingPods, _ := testCtx.Scheduler.SchedulingQueue.PendingPods()
   120  				return len(pendingPods) == len(tt.pods), nil
   121  			}); err != nil {
   122  				t.Fatal(err)
   123  			}
   124  
   125  			// Pop the expected pods out. They should be de-queueable.
   126  			for _, wantPod := range tt.want {
   127  				podInfo := testutils.NextPodOrDie(t, testCtx)
   128  				if got := podInfo.Pod.Name; got != wantPod {
   129  					t.Errorf("Want %v to be popped out, but got %v", wantPod, got)
   130  				}
   131  			}
   132  
   133  			if len(tt.rmPodsSchedulingGates) == 0 {
   134  				return
   135  			}
   136  			// Remove scheduling gates from the pod spec.
   137  			for _, idx := range tt.rmPodsSchedulingGates {
   138  				patch := `{"spec": {"schedulingGates": null}}`
   139  				podName := tt.pods[idx].Name
   140  				if _, err := cs.CoreV1().Pods(ns).Patch(ctx, podName, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}); err != nil {
   141  					t.Fatalf("Failed to patch pod %v: %v", podName, err)
   142  				}
   143  			}
   144  			// Pop the expected pods out. They should be de-queueable.
   145  			for _, wantPod := range tt.wantPostGatesRemoval {
   146  				podInfo := testutils.NextPodOrDie(t, testCtx)
   147  				if got := podInfo.Pod.Name; got != wantPod {
   148  					t.Errorf("Want %v to be popped out, but got %v", wantPod, got)
   149  				}
   150  			}
   151  		})
   152  	}
   153  }
   154  
   155  // TestCoreResourceEnqueue verify Pods failed by in-tree default plugins can be
   156  // moved properly upon their registered events.
   157  func TestCoreResourceEnqueue(t *testing.T) {
   158  	tests := []struct {
   159  		name string
   160  		// initialNode is the Node to be created at first.
   161  		initialNode *v1.Node
   162  		// initialPod is the Pod to be created at first if it's not empty.
   163  		initialPod *v1.Pod
   164  		// pods are the list of Pods to be created.
   165  		// All of them are expected to be unschedulable at first.
   166  		pods []*v1.Pod
   167  		// triggerFn is the function that triggers the event to move Pods.
   168  		triggerFn func(testCtx *testutils.TestContext) error
   169  		// wantRequeuedPods is the map of Pods that are expected to be requeued after triggerFn.
   170  		wantRequeuedPods sets.Set[string]
   171  	}{
   172  		{
   173  			name:        "Pod without a required toleration to a node isn't requeued to activeQ",
   174  			initialNode: st.MakeNode().Name("fake-node").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Taints([]v1.Taint{{Key: v1.TaintNodeNotReady, Effect: v1.TaintEffectNoSchedule}}).Obj(),
   175  			pods: []*v1.Pod{
   176  				// - Pod1 doesn't have the required toleration and will be rejected by the TaintToleration plugin.
   177  				//   (TaintToleration plugin is evaluated before NodeResourcesFit plugin.)
   178  				// - Pod2 has the required toleration, but requests a large amount of CPU - will be rejected by the NodeResourcesFit plugin.
   179  				st.MakePod().Name("pod1").Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Container("image").Obj(),
   180  				st.MakePod().Name("pod2").Toleration(v1.TaintNodeNotReady).Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Container("image").Obj(),
   181  			},
   182  			triggerFn: func(testCtx *testutils.TestContext) error {
   183  				// Trigger a NodeChange event by increasing CPU capacity.
   184  				// It makes Pod2 schedulable.
   185  				// Pod1 is not requeued because the Node is still unready and it doesn't have the required toleration.
   186  				if _, err := testCtx.ClientSet.CoreV1().Nodes().UpdateStatus(testCtx.Ctx, st.MakeNode().Name("fake-node").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Taints([]v1.Taint{{Key: v1.TaintNodeNotReady, Effect: v1.TaintEffectNoSchedule}}).Obj(), metav1.UpdateOptions{}); err != nil {
   187  					return fmt.Errorf("failed to update the node: %w", err)
   188  				}
   189  				return nil
   190  			},
   191  			wantRequeuedPods: sets.New("pod2"),
   192  		},
   193  		{
   194  			name:        "Pod rejected by the PodAffinity plugin is requeued when a new Node is created and turned to ready",
   195  			initialNode: st.MakeNode().Name("fake-node").Label("node", "fake-node").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Obj(),
   196  			initialPod:  st.MakePod().Label("anti", "anti").Name("pod1").PodAntiAffinityExists("anti", "node", st.PodAntiAffinityWithRequiredReq).Container("image").Node("fake-node").Obj(),
   197  			pods: []*v1.Pod{
   198  				// - Pod2 will be rejected by the PodAffinity plugin.
   199  				st.MakePod().Label("anti", "anti").Name("pod2").PodAntiAffinityExists("anti", "node", st.PodAntiAffinityWithRequiredReq).Container("image").Obj(),
   200  			},
   201  			triggerFn: func(testCtx *testutils.TestContext) error {
   202  				// Trigger a NodeCreated event.
   203  				// Note that this Node has a un-ready taint and pod2 should be requeued ideally because unschedulable plugins registered for pod2 is PodAffinity.
   204  				// However, due to preCheck, it's not requeueing pod2 to activeQ.
   205  				// It'll be fixed by the removal of preCheck in the future.
   206  				// https://github.com/kubernetes/kubernetes/issues/110175
   207  				node := st.MakeNode().Name("fake-node2").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Taints([]v1.Taint{{Key: v1.TaintNodeNotReady, Effect: v1.TaintEffectNoSchedule}}).Obj()
   208  				if _, err := testCtx.ClientSet.CoreV1().Nodes().Create(testCtx.Ctx, st.MakeNode().Name("fake-node2").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Taints([]v1.Taint{{Key: "foo", Effect: v1.TaintEffectNoSchedule}}).Obj(), metav1.CreateOptions{}); err != nil {
   209  					return fmt.Errorf("failed to create a newnode: %w", err)
   210  				}
   211  
   212  				// As a mitigation of an issue described above, all plugins subscribing Node/Add event register UpdateNodeTaint too.
   213  				// So, this removal of taint moves pod2 to activeQ.
   214  				node.Spec.Taints = nil
   215  				if _, err := testCtx.ClientSet.CoreV1().Nodes().Update(testCtx.Ctx, node, metav1.UpdateOptions{}); err != nil {
   216  					return fmt.Errorf("failed to remove taints off the node: %w", err)
   217  				}
   218  				return nil
   219  			},
   220  			wantRequeuedPods: sets.New("pod2"),
   221  		},
   222  	}
   223  
   224  	for _, featureEnabled := range []bool{false, true} {
   225  		for _, tt := range tests {
   226  			t.Run(fmt.Sprintf("%s [SchedulerQueueingHints enabled: %v]", tt.name, featureEnabled), func(t *testing.T) {
   227  				defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerQueueingHints, featureEnabled)()
   228  
   229  				// Use zero backoff seconds to bypass backoffQ.
   230  				// It's intended to not start the scheduler's queue, and hence to
   231  				// not start any flushing logic. We will pop and schedule the Pods manually later.
   232  				testCtx := testutils.InitTestSchedulerWithOptions(
   233  					t,
   234  					testutils.InitTestAPIServer(t, "core-res-enqueue", nil),
   235  					0,
   236  					scheduler.WithPodInitialBackoffSeconds(0),
   237  					scheduler.WithPodMaxBackoffSeconds(0),
   238  				)
   239  				testutils.SyncSchedulerInformerFactory(testCtx)
   240  
   241  				defer testCtx.Scheduler.SchedulingQueue.Close()
   242  
   243  				cs, ns, ctx := testCtx.ClientSet, testCtx.NS.Name, testCtx.Ctx
   244  				// Create one Node with a taint.
   245  				if _, err := cs.CoreV1().Nodes().Create(ctx, tt.initialNode, metav1.CreateOptions{}); err != nil {
   246  					t.Fatalf("Failed to create an initial Node %q: %v", tt.initialNode.Name, err)
   247  				}
   248  
   249  				if tt.initialPod != nil {
   250  					if _, err := cs.CoreV1().Pods(ns).Create(ctx, tt.initialPod, metav1.CreateOptions{}); err != nil {
   251  						t.Fatalf("Failed to create an initial Pod %q: %v", tt.initialPod.Name, err)
   252  					}
   253  				}
   254  
   255  				for _, pod := range tt.pods {
   256  					if _, err := cs.CoreV1().Pods(ns).Create(ctx, pod, metav1.CreateOptions{}); err != nil {
   257  						t.Fatalf("Failed to create Pod %q: %v", pod.Name, err)
   258  					}
   259  				}
   260  
   261  				// Wait for the tt.pods to be present in the scheduling queue.
   262  				if err := wait.PollUntilContextTimeout(ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) {
   263  					pendingPods, _ := testCtx.Scheduler.SchedulingQueue.PendingPods()
   264  					return len(pendingPods) == len(tt.pods), nil
   265  				}); err != nil {
   266  					t.Fatal(err)
   267  				}
   268  
   269  				t.Log("Confirmed Pods in the scheduling queue, starting to schedule them")
   270  
   271  				// Pop all pods out. They should be unschedulable.
   272  				for i := 0; i < len(tt.pods); i++ {
   273  					testCtx.Scheduler.ScheduleOne(testCtx.Ctx)
   274  				}
   275  				// Wait for the tt.pods to be still present in the scheduling queue.
   276  				if err := wait.PollUntilContextTimeout(ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) {
   277  					pendingPods, _ := testCtx.Scheduler.SchedulingQueue.PendingPods()
   278  					return len(pendingPods) == len(tt.pods), nil
   279  				}); err != nil {
   280  					t.Fatal(err)
   281  				}
   282  
   283  				t.Log("finished initial schedulings for all Pods, will trigger triggerFn")
   284  
   285  				err := tt.triggerFn(testCtx)
   286  				if err != nil {
   287  					t.Fatalf("Failed to trigger the event: %v", err)
   288  				}
   289  
   290  				t.Log("triggered tt.triggerFn, will check if tt.requeuedPods are requeued")
   291  
   292  				// Wait for the tt.pods to be still present in the scheduling queue.
   293  				var requeuedPods sets.Set[string]
   294  				if err := wait.PollUntilContextTimeout(ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) {
   295  					requeuedPods = sets.Set[string]{} // reset
   296  					for _, requeuedPod := range testCtx.Scheduler.SchedulingQueue.PodsInActiveQ() {
   297  						requeuedPods.Insert(requeuedPod.Name)
   298  					}
   299  
   300  					return requeuedPods.Equal(tt.wantRequeuedPods), nil
   301  				}); err != nil {
   302  					t.Fatalf("Expect Pods %v to be requeued, but %v are requeued actually", tt.wantRequeuedPods, requeuedPods)
   303  				}
   304  			})
   305  		}
   306  	}
   307  }
   308  
   309  var _ framework.FilterPlugin = &fakeCRPlugin{}
   310  var _ framework.EnqueueExtensions = &fakeCRPlugin{}
   311  
   312  type fakeCRPlugin struct{}
   313  
   314  func (f *fakeCRPlugin) Name() string {
   315  	return "fakeCRPlugin"
   316  }
   317  
   318  func (f *fakeCRPlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ *framework.NodeInfo) *framework.Status {
   319  	return framework.NewStatus(framework.Unschedulable, "always fail")
   320  }
   321  
   322  // EventsToRegister returns the possible events that may make a Pod
   323  // failed by this plugin schedulable.
   324  func (f *fakeCRPlugin) EventsToRegister() []framework.ClusterEventWithHint {
   325  	return []framework.ClusterEventWithHint{
   326  		{Event: framework.ClusterEvent{Resource: "foos.v1.example.com", ActionType: framework.All}},
   327  	}
   328  }
   329  
   330  // TestCustomResourceEnqueue constructs a fake plugin that registers custom resources
   331  // to verify Pods failed by this plugin can be moved properly upon CR events.
   332  func TestCustomResourceEnqueue(t *testing.T) {
   333  	// Start API Server with apiextensions supported.
   334  	server := apiservertesting.StartTestServerOrDie(
   335  		t, apiservertesting.NewDefaultTestServerOptions(),
   336  		[]string{"--disable-admission-plugins=ServiceAccount,TaintNodesByCondition", "--runtime-config=api/all=true"},
   337  		testfwk.SharedEtcd(),
   338  	)
   339  	testCtx := &testutils.TestContext{}
   340  	ctx, cancel := context.WithCancel(context.Background())
   341  	testCtx.Ctx = ctx
   342  	testCtx.CloseFn = func() {
   343  		cancel()
   344  		server.TearDownFn()
   345  	}
   346  
   347  	apiExtensionClient := apiextensionsclient.NewForConfigOrDie(server.ClientConfig)
   348  	dynamicClient := dynamic.NewForConfigOrDie(server.ClientConfig)
   349  
   350  	// Create a Foo CRD.
   351  	fooCRD := &apiextensionsv1.CustomResourceDefinition{
   352  		ObjectMeta: metav1.ObjectMeta{
   353  			Name: "foos.example.com",
   354  		},
   355  		Spec: apiextensionsv1.CustomResourceDefinitionSpec{
   356  			Group: "example.com",
   357  			Scope: apiextensionsv1.NamespaceScoped,
   358  			Names: apiextensionsv1.CustomResourceDefinitionNames{
   359  				Plural: "foos",
   360  				Kind:   "Foo",
   361  			},
   362  			Versions: []apiextensionsv1.CustomResourceDefinitionVersion{
   363  				{
   364  					Name:    "v1",
   365  					Served:  true,
   366  					Storage: true,
   367  					Schema: &apiextensionsv1.CustomResourceValidation{
   368  						OpenAPIV3Schema: &apiextensionsv1.JSONSchemaProps{
   369  							Type: "object",
   370  							Properties: map[string]apiextensionsv1.JSONSchemaProps{
   371  								"field": {Type: "string"},
   372  							},
   373  						},
   374  					},
   375  				},
   376  			},
   377  		},
   378  	}
   379  	var err error
   380  	fooCRD, err = apiExtensionClient.ApiextensionsV1().CustomResourceDefinitions().Create(testCtx.Ctx, fooCRD, metav1.CreateOptions{})
   381  	if err != nil {
   382  		t.Fatal(err)
   383  	}
   384  
   385  	registry := frameworkruntime.Registry{
   386  		"fakeCRPlugin": func(_ context.Context, _ runtime.Object, fh framework.Handle) (framework.Plugin, error) {
   387  			return &fakeCRPlugin{}, nil
   388  		},
   389  	}
   390  	cfg := configtesting.V1ToInternalWithDefaults(t, configv1.KubeSchedulerConfiguration{
   391  		Profiles: []configv1.KubeSchedulerProfile{{
   392  			SchedulerName: pointer.String(v1.DefaultSchedulerName),
   393  			Plugins: &configv1.Plugins{
   394  				Filter: configv1.PluginSet{
   395  					Enabled: []configv1.Plugin{
   396  						{Name: "fakeCRPlugin"},
   397  					},
   398  				},
   399  			},
   400  		}}})
   401  
   402  	testCtx.KubeConfig = server.ClientConfig
   403  	testCtx.ClientSet = kubernetes.NewForConfigOrDie(server.ClientConfig)
   404  	testCtx.NS, err = testCtx.ClientSet.CoreV1().Namespaces().Create(testCtx.Ctx, &v1.Namespace{
   405  		ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("cr-enqueue-%v", string(uuid.NewUUID()))}}, metav1.CreateOptions{})
   406  	if err != nil && !errors.IsAlreadyExists(err) {
   407  		t.Fatalf("Failed to integration test ns: %v", err)
   408  	}
   409  
   410  	// Use zero backoff seconds to bypass backoffQ.
   411  	// It's intended to not start the scheduler's queue, and hence to
   412  	// not start any flushing logic. We will pop and schedule the Pods manually later.
   413  	testCtx = testutils.InitTestSchedulerWithOptions(
   414  		t,
   415  		testCtx,
   416  		0,
   417  		scheduler.WithProfiles(cfg.Profiles...),
   418  		scheduler.WithFrameworkOutOfTreeRegistry(registry),
   419  		scheduler.WithPodInitialBackoffSeconds(0),
   420  		scheduler.WithPodMaxBackoffSeconds(0),
   421  	)
   422  	testutils.SyncSchedulerInformerFactory(testCtx)
   423  
   424  	defer testutils.CleanupTest(t, testCtx)
   425  
   426  	cs, ns, ctx := testCtx.ClientSet, testCtx.NS.Name, testCtx.Ctx
   427  	logger := klog.FromContext(ctx)
   428  	// Create one Node.
   429  	node := st.MakeNode().Name("fake-node").Obj()
   430  	if _, err := cs.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{}); err != nil {
   431  		t.Fatalf("Failed to create Node %q: %v", node.Name, err)
   432  	}
   433  
   434  	// Create a testing Pod.
   435  	pause := imageutils.GetPauseImageName()
   436  	pod := st.MakePod().Namespace(ns).Name("fake-pod").Container(pause).Obj()
   437  	if _, err := cs.CoreV1().Pods(ns).Create(ctx, pod, metav1.CreateOptions{}); err != nil {
   438  		t.Fatalf("Failed to create Pod %q: %v", pod.Name, err)
   439  	}
   440  
   441  	// Wait for the testing Pod to be present in the scheduling queue.
   442  	if err := wait.PollUntilContextTimeout(ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) {
   443  		pendingPods, _ := testCtx.Scheduler.SchedulingQueue.PendingPods()
   444  		return len(pendingPods) == 1, nil
   445  	}); err != nil {
   446  		t.Fatal(err)
   447  	}
   448  
   449  	// Pop fake-pod out. It should be unschedulable.
   450  	podInfo := testutils.NextPodOrDie(t, testCtx)
   451  	fwk, ok := testCtx.Scheduler.Profiles[podInfo.Pod.Spec.SchedulerName]
   452  	if !ok {
   453  		t.Fatalf("Cannot find the profile for Pod %v", podInfo.Pod.Name)
   454  	}
   455  	// Schedule the Pod manually.
   456  	_, fitError := testCtx.Scheduler.SchedulePod(ctx, fwk, framework.NewCycleState(), podInfo.Pod)
   457  	// The fitError is expected to be non-nil as it failed the fakeCRPlugin plugin.
   458  	if fitError == nil {
   459  		t.Fatalf("Expect Pod %v to fail at scheduling.", podInfo.Pod.Name)
   460  	}
   461  	testCtx.Scheduler.FailureHandler(ctx, fwk, podInfo, framework.NewStatus(framework.Unschedulable).WithError(fitError), nil, time.Now())
   462  
   463  	// Scheduling cycle is incremented from 0 to 1 after NextPod() is called, so
   464  	// pass a number larger than 1 to move Pod to unschedulablePods.
   465  	testCtx.Scheduler.SchedulingQueue.AddUnschedulableIfNotPresent(logger, podInfo, 10)
   466  
   467  	// Trigger a Custom Resource event.
   468  	// We expect this event to trigger moving the test Pod from unschedulablePods to activeQ.
   469  	crdGVR := schema.GroupVersionResource{Group: fooCRD.Spec.Group, Version: fooCRD.Spec.Versions[0].Name, Resource: "foos"}
   470  	crClient := dynamicClient.Resource(crdGVR).Namespace(ns)
   471  	if _, err := crClient.Create(ctx, &unstructured.Unstructured{
   472  		Object: map[string]interface{}{
   473  			"apiVersion": "example.com/v1",
   474  			"kind":       "Foo",
   475  			"metadata":   map[string]interface{}{"name": "foo1"},
   476  		},
   477  	}, metav1.CreateOptions{}); err != nil {
   478  		t.Fatalf("Unable to create cr: %v", err)
   479  	}
   480  
   481  	// Now we should be able to pop the Pod from activeQ again.
   482  	podInfo = testutils.NextPodOrDie(t, testCtx)
   483  	if podInfo.Attempts != 2 {
   484  		t.Errorf("Expected the Pod to be attempted 2 times, but got %v", podInfo.Attempts)
   485  	}
   486  }
   487  
   488  // TestRequeueByBindFailure verify Pods failed by bind plugin are
   489  // put back to the queue regardless of whether event happens or not.
   490  func TestRequeueByBindFailure(t *testing.T) {
   491  	fakeBind := &firstFailBindPlugin{}
   492  	registry := frameworkruntime.Registry{
   493  		"firstFailBindPlugin": func(ctx context.Context, o runtime.Object, fh framework.Handle) (framework.Plugin, error) {
   494  			binder, err := defaultbinder.New(ctx, nil, fh)
   495  			if err != nil {
   496  				return nil, err
   497  			}
   498  
   499  			fakeBind.defaultBinderPlugin = binder.(framework.BindPlugin)
   500  			return fakeBind, nil
   501  		},
   502  	}
   503  
   504  	cfg := configtesting.V1ToInternalWithDefaults(t, configv1.KubeSchedulerConfiguration{
   505  		Profiles: []configv1.KubeSchedulerProfile{{
   506  			SchedulerName: pointer.String(v1.DefaultSchedulerName),
   507  			Plugins: &configv1.Plugins{
   508  				MultiPoint: configv1.PluginSet{
   509  					Enabled: []configv1.Plugin{
   510  						{Name: "firstFailBindPlugin"},
   511  					},
   512  					Disabled: []configv1.Plugin{
   513  						{Name: names.DefaultBinder},
   514  					},
   515  				},
   516  			},
   517  		}}})
   518  
   519  	// Use zero backoff seconds to bypass backoffQ.
   520  	testCtx := testutils.InitTestSchedulerWithOptions(
   521  		t,
   522  		testutils.InitTestAPIServer(t, "core-res-enqueue", nil),
   523  		0,
   524  		scheduler.WithPodInitialBackoffSeconds(0),
   525  		scheduler.WithPodMaxBackoffSeconds(0),
   526  		scheduler.WithProfiles(cfg.Profiles...),
   527  		scheduler.WithFrameworkOutOfTreeRegistry(registry),
   528  	)
   529  	testutils.SyncSchedulerInformerFactory(testCtx)
   530  
   531  	go testCtx.Scheduler.Run(testCtx.Ctx)
   532  
   533  	cs, ns, ctx := testCtx.ClientSet, testCtx.NS.Name, testCtx.Ctx
   534  	node := st.MakeNode().Name("fake-node").Obj()
   535  	if _, err := cs.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{}); err != nil {
   536  		t.Fatalf("Failed to create Node %q: %v", node.Name, err)
   537  	}
   538  	// create a pod.
   539  	pod := st.MakePod().Namespace(ns).Name("pod-1").Container(imageutils.GetPauseImageName()).Obj()
   540  	if _, err := cs.CoreV1().Pods(ns).Create(ctx, pod, metav1.CreateOptions{}); err != nil {
   541  		t.Fatalf("Failed to create Pod %q: %v", pod.Name, err)
   542  	}
   543  
   544  	// 1. first binding try should fail.
   545  	// 2. The pod should be enqueued to activeQ/backoffQ without any event.
   546  	// 3. The pod should be scheduled in the second binding try.
   547  	// Here, waiting until (3).
   548  	err := wait.PollUntilContextTimeout(ctx, 200*time.Millisecond, wait.ForeverTestTimeout, false, testutils.PodScheduled(cs, ns, pod.Name))
   549  	if err != nil {
   550  		t.Fatalf("Expect pod-1 to be scheduled by the bind plugin: %v", err)
   551  	}
   552  
   553  	// Make sure the first binding trial was failed, and this pod is scheduled at the second trial.
   554  	if fakeBind.counter != 1 {
   555  		t.Fatalf("Expect pod-1 to be scheduled by the bind plugin in the second binding try: %v", err)
   556  	}
   557  }
   558  
   559  // firstFailBindPlugin rejects the Pod in the first Bind call.
   560  type firstFailBindPlugin struct {
   561  	counter             int
   562  	defaultBinderPlugin framework.BindPlugin
   563  }
   564  
   565  func (*firstFailBindPlugin) Name() string {
   566  	return "firstFailBindPlugin"
   567  }
   568  
   569  func (p *firstFailBindPlugin) Bind(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodename string) *framework.Status {
   570  	if p.counter == 0 {
   571  		// fail in the first Bind call.
   572  		p.counter++
   573  		return framework.NewStatus(framework.Error, "firstFailBindPlugin rejects the Pod")
   574  	}
   575  
   576  	return p.defaultBinderPlugin.Bind(ctx, state, pod, nodename)
   577  }
   578  
   579  // TestRequeueByPermitRejection verify Pods failed by permit plugins in the binding cycle are
   580  // put back to the queue, according to the correct scheduling cycle number.
   581  func TestRequeueByPermitRejection(t *testing.T) {
   582  	defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerQueueingHints, true)()
   583  	queueingHintCalledCounter := 0
   584  	fakePermit := &fakePermitPlugin{}
   585  	registry := frameworkruntime.Registry{
   586  		fakePermitPluginName: func(ctx context.Context, o runtime.Object, fh framework.Handle) (framework.Plugin, error) {
   587  			fakePermit = &fakePermitPlugin{
   588  				frameworkHandler: fh,
   589  				schedulingHint: func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
   590  					queueingHintCalledCounter++
   591  					return framework.Queue, nil
   592  				},
   593  			}
   594  			return fakePermit, nil
   595  		},
   596  	}
   597  	cfg := configtesting.V1ToInternalWithDefaults(t, configv1.KubeSchedulerConfiguration{
   598  		Profiles: []configv1.KubeSchedulerProfile{{
   599  			SchedulerName: pointer.String(v1.DefaultSchedulerName),
   600  			Plugins: &configv1.Plugins{
   601  				MultiPoint: configv1.PluginSet{
   602  					Enabled: []configv1.Plugin{
   603  						{Name: fakePermitPluginName},
   604  					},
   605  				},
   606  			},
   607  		}}})
   608  
   609  	// Use zero backoff seconds to bypass backoffQ.
   610  	testCtx := testutils.InitTestSchedulerWithOptions(
   611  		t,
   612  		testutils.InitTestAPIServer(t, "core-res-enqueue", nil),
   613  		0,
   614  		scheduler.WithPodInitialBackoffSeconds(0),
   615  		scheduler.WithPodMaxBackoffSeconds(0),
   616  		scheduler.WithProfiles(cfg.Profiles...),
   617  		scheduler.WithFrameworkOutOfTreeRegistry(registry),
   618  	)
   619  	testutils.SyncSchedulerInformerFactory(testCtx)
   620  
   621  	go testCtx.Scheduler.Run(testCtx.Ctx)
   622  
   623  	cs, ns, ctx := testCtx.ClientSet, testCtx.NS.Name, testCtx.Ctx
   624  	node := st.MakeNode().Name("fake-node").Obj()
   625  	if _, err := cs.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{}); err != nil {
   626  		t.Fatalf("Failed to create Node %q: %v", node.Name, err)
   627  	}
   628  	// create a pod.
   629  	pod := st.MakePod().Namespace(ns).Name("pod-1").Container(imageutils.GetPauseImageName()).Obj()
   630  	if _, err := cs.CoreV1().Pods(ns).Create(ctx, pod, metav1.CreateOptions{}); err != nil {
   631  		t.Fatalf("Failed to create Pod %q: %v", pod.Name, err)
   632  	}
   633  
   634  	// update node label. (causes the NodeUpdate event)
   635  	node.Labels = map[string]string{"updated": ""}
   636  	if _, err := cs.CoreV1().Nodes().Update(ctx, node, metav1.UpdateOptions{}); err != nil {
   637  		t.Fatalf("Failed to add labels to the node: %v", err)
   638  	}
   639  
   640  	// create a pod to increment the scheduling cycle number in the scheduling queue.
   641  	// We can make sure NodeUpdate event, that has happened in the previous scheduling cycle, makes Pod to be enqueued to activeQ via the scheduling queue.
   642  	pod = st.MakePod().Namespace(ns).Name("pod-2").Container(imageutils.GetPauseImageName()).Obj()
   643  	if _, err := cs.CoreV1().Pods(ns).Create(ctx, pod, metav1.CreateOptions{}); err != nil {
   644  		t.Fatalf("Failed to create Pod %q: %v", pod.Name, err)
   645  	}
   646  
   647  	// reject pod-1 to simulate the failure in Permit plugins.
   648  	// This pod-1 should be enqueued to activeQ because the NodeUpdate event has happened.
   649  	fakePermit.frameworkHandler.IterateOverWaitingPods(func(wp framework.WaitingPod) {
   650  		if wp.GetPod().Name == "pod-1" {
   651  			wp.Reject(fakePermitPluginName, "fakePermitPlugin rejects the Pod")
   652  			return
   653  		}
   654  	})
   655  
   656  	// Wait for pod-2 to be scheduled.
   657  	err := wait.PollUntilContextTimeout(ctx, 200*time.Millisecond, wait.ForeverTestTimeout, false, func(ctx context.Context) (done bool, err error) {
   658  		fakePermit.frameworkHandler.IterateOverWaitingPods(func(wp framework.WaitingPod) {
   659  			if wp.GetPod().Name == "pod-2" {
   660  				wp.Allow(fakePermitPluginName)
   661  			}
   662  		})
   663  
   664  		return testutils.PodScheduled(cs, ns, "pod-2")(ctx)
   665  	})
   666  	if err != nil {
   667  		t.Fatalf("Expect pod-2 to be scheduled")
   668  	}
   669  
   670  	err = wait.PollUntilContextTimeout(ctx, 200*time.Millisecond, wait.ForeverTestTimeout, false, func(ctx context.Context) (done bool, err error) {
   671  		pod1Found := false
   672  		fakePermit.frameworkHandler.IterateOverWaitingPods(func(wp framework.WaitingPod) {
   673  			if wp.GetPod().Name == "pod-1" {
   674  				pod1Found = true
   675  				wp.Allow(fakePermitPluginName)
   676  			}
   677  		})
   678  		return pod1Found, nil
   679  	})
   680  	if err != nil {
   681  		t.Fatal("Expect pod-1 to be scheduled again")
   682  	}
   683  
   684  	if queueingHintCalledCounter != 1 {
   685  		t.Fatalf("Expected the scheduling hint to be called 1 time, but %v", queueingHintCalledCounter)
   686  	}
   687  }
   688  
   689  type fakePermitPlugin struct {
   690  	frameworkHandler framework.Handle
   691  	schedulingHint   framework.QueueingHintFn
   692  }
   693  
   694  const fakePermitPluginName = "fakePermitPlugin"
   695  
   696  func (p *fakePermitPlugin) Name() string {
   697  	return fakePermitPluginName
   698  }
   699  
   700  func (p *fakePermitPlugin) Permit(ctx context.Context, state *framework.CycleState, _ *v1.Pod, _ string) (*framework.Status, time.Duration) {
   701  	return framework.NewStatus(framework.Wait), wait.ForeverTestTimeout
   702  }
   703  
   704  func (p *fakePermitPlugin) EventsToRegister() []framework.ClusterEventWithHint {
   705  	return []framework.ClusterEventWithHint{
   706  		{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.UpdateNodeLabel}, QueueingHintFn: p.schedulingHint},
   707  	}
   708  }
   709  

View as plain text