...

Source file src/k8s.io/kubernetes/test/integration/scheduler/scheduler_test.go

Documentation: k8s.io/kubernetes/test/integration/scheduler

     1  /*
     2  Copyright 2015 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package scheduler
    18  
    19  // This file tests the scheduler.
    20  
    21  import (
    22  	"context"
    23  	"fmt"
    24  	"net/http"
    25  	"strings"
    26  	"sync/atomic"
    27  	"testing"
    28  	"time"
    29  
    30  	"github.com/google/go-cmp/cmp"
    31  	v1 "k8s.io/api/core/v1"
    32  	resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
    33  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    34  	"k8s.io/apimachinery/pkg/api/resource"
    35  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    36  	"k8s.io/apimachinery/pkg/runtime"
    37  	"k8s.io/apimachinery/pkg/util/wait"
    38  	"k8s.io/apimachinery/pkg/watch"
    39  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    40  	clientset "k8s.io/client-go/kubernetes"
    41  	corelisters "k8s.io/client-go/listers/core/v1"
    42  	"k8s.io/client-go/tools/cache"
    43  	featuregatetesting "k8s.io/component-base/featuregate/testing"
    44  	configv1 "k8s.io/kube-scheduler/config/v1"
    45  	"k8s.io/kubernetes/pkg/features"
    46  	"k8s.io/kubernetes/pkg/scheduler"
    47  	configtesting "k8s.io/kubernetes/pkg/scheduler/apis/config/testing"
    48  	st "k8s.io/kubernetes/pkg/scheduler/testing"
    49  	testutils "k8s.io/kubernetes/test/integration/util"
    50  	"k8s.io/kubernetes/test/utils/format"
    51  	"k8s.io/utils/pointer"
    52  )
    53  
    54  type nodeMutationFunc func(t *testing.T, n *v1.Node, nodeLister corelisters.NodeLister, c clientset.Interface)
    55  
    56  type nodeStateManager struct {
    57  	makeSchedulable   nodeMutationFunc
    58  	makeUnSchedulable nodeMutationFunc
    59  }
    60  
    61  func TestUnschedulableNodes(t *testing.T) {
    62  	testCtx := testutils.InitTestSchedulerWithNS(t, "unschedulable-nodes")
    63  
    64  	nodeLister := testCtx.InformerFactory.Core().V1().Nodes().Lister()
    65  	// NOTE: This test cannot run in parallel, because it is creating and deleting
    66  	// non-namespaced objects (Nodes).
    67  	defer testCtx.ClientSet.CoreV1().Nodes().DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{})
    68  
    69  	goodCondition := v1.NodeCondition{
    70  		Type:              v1.NodeReady,
    71  		Status:            v1.ConditionTrue,
    72  		Reason:            fmt.Sprintf("schedulable condition"),
    73  		LastHeartbeatTime: metav1.Time{Time: time.Now()},
    74  	}
    75  	// Create a new schedulable node, since we're first going to apply
    76  	// the unschedulable condition and verify that pods aren't scheduled.
    77  	node := &v1.Node{
    78  		ObjectMeta: metav1.ObjectMeta{Name: "node-scheduling-test-node"},
    79  		Spec:       v1.NodeSpec{Unschedulable: false},
    80  		Status: v1.NodeStatus{
    81  			Capacity: v1.ResourceList{
    82  				v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI),
    83  			},
    84  			Conditions: []v1.NodeCondition{goodCondition},
    85  		},
    86  	}
    87  	nodeKey, err := cache.MetaNamespaceKeyFunc(node)
    88  	if err != nil {
    89  		t.Fatalf("Couldn't retrieve key for node %v", node.Name)
    90  	}
    91  
    92  	// The test does the following for each nodeStateManager in this list:
    93  	//	1. Create a new node
    94  	//	2. Apply the makeUnSchedulable function
    95  	//	3. Create a new pod
    96  	//  4. Check that the pod doesn't get assigned to the node
    97  	//  5. Apply the schedulable function
    98  	//  6. Check that the pod *does* get assigned to the node
    99  	//  7. Delete the pod and node.
   100  
   101  	nodeModifications := []nodeStateManager{
   102  		// Test node.Spec.Unschedulable=true/false
   103  		{
   104  			makeUnSchedulable: func(t *testing.T, n *v1.Node, nodeLister corelisters.NodeLister, c clientset.Interface) {
   105  				n.Spec.Unschedulable = true
   106  				if _, err := c.CoreV1().Nodes().Update(context.TODO(), n, metav1.UpdateOptions{}); err != nil {
   107  					t.Fatalf("Failed to update node with unschedulable=true: %v", err)
   108  				}
   109  				err = testutils.WaitForReflection(t, nodeLister, nodeKey, func(node interface{}) bool {
   110  					// An unschedulable node should still be present in the store
   111  					// Nodes that are unschedulable or that are not ready or
   112  					// have their disk full (Node.Spec.Conditions) are excluded
   113  					// based on NodeConditionPredicate, a separate check
   114  					return node != nil && node.(*v1.Node).Spec.Unschedulable
   115  				})
   116  				if err != nil {
   117  					t.Fatalf("Failed to observe reflected update for setting unschedulable=true: %v", err)
   118  				}
   119  			},
   120  			makeSchedulable: func(t *testing.T, n *v1.Node, nodeLister corelisters.NodeLister, c clientset.Interface) {
   121  				n.Spec.Unschedulable = false
   122  				if _, err := c.CoreV1().Nodes().Update(context.TODO(), n, metav1.UpdateOptions{}); err != nil {
   123  					t.Fatalf("Failed to update node with unschedulable=false: %v", err)
   124  				}
   125  				err = testutils.WaitForReflection(t, nodeLister, nodeKey, func(node interface{}) bool {
   126  					return node != nil && node.(*v1.Node).Spec.Unschedulable == false
   127  				})
   128  				if err != nil {
   129  					t.Fatalf("Failed to observe reflected update for setting unschedulable=false: %v", err)
   130  				}
   131  			},
   132  		},
   133  	}
   134  
   135  	for i, mod := range nodeModifications {
   136  		unSchedNode, err := testutils.CreateNode(testCtx.ClientSet, node)
   137  		if err != nil {
   138  			t.Fatalf("Failed to create node: %v", err)
   139  		}
   140  
   141  		// Apply the unschedulable modification to the node, and wait for the reflection
   142  		mod.makeUnSchedulable(t, unSchedNode, nodeLister, testCtx.ClientSet)
   143  
   144  		// Create the new pod, note that this needs to happen post unschedulable
   145  		// modification or we have a race in the test.
   146  		myPod, err := testutils.CreatePausePodWithResource(testCtx.ClientSet, "node-scheduling-test-pod", testCtx.NS.Name, nil)
   147  		if err != nil {
   148  			t.Fatalf("Failed to create pod: %v", err)
   149  		}
   150  
   151  		// There are no schedulable nodes - the pod shouldn't be scheduled.
   152  		err = testutils.WaitForPodToScheduleWithTimeout(testCtx.ClientSet, myPod, 2*time.Second)
   153  		if err == nil {
   154  			t.Errorf("Test %d: Pod scheduled successfully on unschedulable nodes", i)
   155  		}
   156  		if !wait.Interrupted(err) {
   157  			t.Errorf("Test %d: failed while trying to confirm the pod does not get scheduled on the node: %v", i, err)
   158  		} else {
   159  			t.Logf("Test %d: Pod did not get scheduled on an unschedulable node", i)
   160  		}
   161  
   162  		// Apply the schedulable modification to the node, and wait for the reflection
   163  		schedNode, err := testCtx.ClientSet.CoreV1().Nodes().Get(context.TODO(), unSchedNode.Name, metav1.GetOptions{})
   164  		if err != nil {
   165  			t.Fatalf("Failed to get node: %v", err)
   166  		}
   167  		mod.makeSchedulable(t, schedNode, nodeLister, testCtx.ClientSet)
   168  
   169  		// Wait until the pod is scheduled.
   170  		if err := testutils.WaitForPodToSchedule(testCtx.ClientSet, myPod); err != nil {
   171  			t.Errorf("Test %d: failed to schedule a pod: %v", i, err)
   172  		} else {
   173  			t.Logf("Test %d: Pod got scheduled on a schedulable node", i)
   174  		}
   175  		// Clean up.
   176  		if err := testutils.DeletePod(testCtx.ClientSet, myPod.Name, myPod.Namespace); err != nil {
   177  			t.Errorf("Failed to delete pod: %v", err)
   178  		}
   179  		err = testCtx.ClientSet.CoreV1().Nodes().Delete(context.TODO(), schedNode.Name, metav1.DeleteOptions{})
   180  		if err != nil {
   181  			t.Errorf("Failed to delete node: %v", err)
   182  		}
   183  	}
   184  }
   185  
   186  func TestMultipleSchedulers(t *testing.T) {
   187  	// This integration tests the multi-scheduler feature in the following way:
   188  	// 1. create a default scheduler
   189  	// 2. create a node
   190  	// 3. create 3 pods: testPodNoAnnotation, testPodWithAnnotationFitsDefault and testPodWithAnnotationFitsFoo
   191  	//	  - note: the first two should be picked and scheduled by default scheduler while the last one should be
   192  	//	          picked by scheduler of name "foo-scheduler" which does not exist yet.
   193  	// 4. **check point-1**:
   194  	//	   - testPodNoAnnotation, testPodWithAnnotationFitsDefault should be scheduled
   195  	//	   - testPodWithAnnotationFitsFoo should NOT be scheduled
   196  	// 5. create a scheduler with name "foo-scheduler"
   197  	// 6. **check point-2**:
   198  	//     - testPodWithAnnotationFitsFoo should be scheduled
   199  
   200  	// 1. create and start default-scheduler
   201  	testCtx := testutils.InitTestSchedulerWithNS(t, "multi-scheduler")
   202  
   203  	// 2. create a node
   204  	node := &v1.Node{
   205  		ObjectMeta: metav1.ObjectMeta{Name: "node-multi-scheduler-test-node"},
   206  		Spec:       v1.NodeSpec{Unschedulable: false},
   207  		Status: v1.NodeStatus{
   208  			Capacity: v1.ResourceList{
   209  				v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI),
   210  			},
   211  		},
   212  	}
   213  	testutils.CreateNode(testCtx.ClientSet, node)
   214  
   215  	// 3. create 3 pods for testing
   216  	t.Logf("create 3 pods for testing")
   217  	testPod, err := testutils.CreatePausePodWithResource(testCtx.ClientSet, "pod-without-scheduler-name", testCtx.NS.Name, nil)
   218  	if err != nil {
   219  		t.Fatalf("Failed to create pod: %v", err)
   220  	}
   221  
   222  	defaultScheduler := "default-scheduler"
   223  	testPodFitsDefault, err := testutils.CreatePausePod(testCtx.ClientSet, testutils.InitPausePod(&testutils.PausePodConfig{Name: "pod-fits-default", Namespace: testCtx.NS.Name, SchedulerName: defaultScheduler}))
   224  	if err != nil {
   225  		t.Fatalf("Failed to create pod: %v", err)
   226  	}
   227  
   228  	fooScheduler := "foo-scheduler"
   229  	testPodFitsFoo, err := testutils.CreatePausePod(testCtx.ClientSet, testutils.InitPausePod(&testutils.PausePodConfig{Name: "pod-fits-foo", Namespace: testCtx.NS.Name, SchedulerName: fooScheduler}))
   230  	if err != nil {
   231  		t.Fatalf("Failed to create pod: %v", err)
   232  	}
   233  
   234  	// 4. **check point-1**:
   235  	//		- testPod, testPodFitsDefault should be scheduled
   236  	//		- testPodFitsFoo should NOT be scheduled
   237  	t.Logf("wait for pods scheduled")
   238  	if err := testutils.WaitForPodToSchedule(testCtx.ClientSet, testPod); err != nil {
   239  		t.Errorf("Test MultiScheduler: %s Pod not scheduled: %v", testPod.Name, err)
   240  	} else {
   241  		t.Logf("Test MultiScheduler: %s Pod scheduled", testPod.Name)
   242  	}
   243  
   244  	if err := testutils.WaitForPodToSchedule(testCtx.ClientSet, testPodFitsDefault); err != nil {
   245  		t.Errorf("Test MultiScheduler: %s Pod not scheduled: %v", testPodFitsDefault.Name, err)
   246  	} else {
   247  		t.Logf("Test MultiScheduler: %s Pod scheduled", testPodFitsDefault.Name)
   248  	}
   249  
   250  	if err := testutils.WaitForPodToScheduleWithTimeout(testCtx.ClientSet, testPodFitsFoo, time.Second*5); err == nil {
   251  		t.Errorf("Test MultiScheduler: %s Pod got scheduled, %v", testPodFitsFoo.Name, err)
   252  	} else {
   253  		t.Logf("Test MultiScheduler: %s Pod not scheduled", testPodFitsFoo.Name)
   254  	}
   255  
   256  	// 5. create and start a scheduler with name "foo-scheduler"
   257  	cfg := configtesting.V1ToInternalWithDefaults(t, configv1.KubeSchedulerConfiguration{
   258  		Profiles: []configv1.KubeSchedulerProfile{{
   259  			SchedulerName: pointer.String(fooScheduler),
   260  			PluginConfig: []configv1.PluginConfig{
   261  				{
   262  					Name: "VolumeBinding",
   263  					Args: runtime.RawExtension{
   264  						Object: &configv1.VolumeBindingArgs{
   265  							BindTimeoutSeconds: pointer.Int64(30),
   266  						},
   267  					},
   268  				},
   269  			}},
   270  		},
   271  	})
   272  	testCtx = testutils.InitTestSchedulerWithOptions(t, testCtx, 0, scheduler.WithProfiles(cfg.Profiles...))
   273  	testutils.SyncSchedulerInformerFactory(testCtx)
   274  	go testCtx.Scheduler.Run(testCtx.Ctx)
   275  
   276  	//	6. **check point-2**:
   277  	//		- testPodWithAnnotationFitsFoo should be scheduled
   278  	err = testutils.WaitForPodToSchedule(testCtx.ClientSet, testPodFitsFoo)
   279  	if err != nil {
   280  		t.Errorf("Test MultiScheduler: %s Pod not scheduled, %v", testPodFitsFoo.Name, err)
   281  	} else {
   282  		t.Logf("Test MultiScheduler: %s Pod scheduled", testPodFitsFoo.Name)
   283  	}
   284  }
   285  
   286  func TestMultipleSchedulingProfiles(t *testing.T) {
   287  	cfg := configtesting.V1ToInternalWithDefaults(t, configv1.KubeSchedulerConfiguration{
   288  		Profiles: []configv1.KubeSchedulerProfile{
   289  			{SchedulerName: pointer.String("default-scheduler")},
   290  			{SchedulerName: pointer.String("custom-scheduler")},
   291  		},
   292  	})
   293  
   294  	testCtx := testutils.InitTestSchedulerWithNS(t, "multi-scheduler", scheduler.WithProfiles(cfg.Profiles...))
   295  
   296  	node := &v1.Node{
   297  		ObjectMeta: metav1.ObjectMeta{Name: "node-multi-scheduler-test-node"},
   298  		Spec:       v1.NodeSpec{Unschedulable: false},
   299  		Status: v1.NodeStatus{
   300  			Capacity: v1.ResourceList{
   301  				v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI),
   302  			},
   303  		},
   304  	}
   305  	if _, err := testutils.CreateNode(testCtx.ClientSet, node); err != nil {
   306  		t.Fatal(err)
   307  	}
   308  
   309  	evs, err := testCtx.ClientSet.CoreV1().Events(testCtx.NS.Name).Watch(testCtx.Ctx, metav1.ListOptions{})
   310  	if err != nil {
   311  		t.Fatal(err)
   312  	}
   313  	defer evs.Stop()
   314  
   315  	for _, pc := range []*testutils.PausePodConfig{
   316  		{Name: "foo", Namespace: testCtx.NS.Name},
   317  		{Name: "bar", Namespace: testCtx.NS.Name, SchedulerName: "unknown-scheduler"},
   318  		{Name: "baz", Namespace: testCtx.NS.Name, SchedulerName: "default-scheduler"},
   319  		{Name: "zet", Namespace: testCtx.NS.Name, SchedulerName: "custom-scheduler"},
   320  	} {
   321  		if _, err := testutils.CreatePausePod(testCtx.ClientSet, testutils.InitPausePod(pc)); err != nil {
   322  			t.Fatal(err)
   323  		}
   324  	}
   325  
   326  	wantProfiles := map[string]string{
   327  		"foo": "default-scheduler",
   328  		"baz": "default-scheduler",
   329  		"zet": "custom-scheduler",
   330  	}
   331  
   332  	gotProfiles := make(map[string]string)
   333  	if err := wait.PollUntilContextTimeout(testCtx.Ctx, 100*time.Millisecond, 30*time.Second, false, func(ctx context.Context) (bool, error) {
   334  		var ev watch.Event
   335  		select {
   336  		case ev = <-evs.ResultChan():
   337  		case <-time.After(30 * time.Second):
   338  			return false, nil
   339  		}
   340  		e, ok := ev.Object.(*v1.Event)
   341  		if !ok || e.Reason != "Scheduled" {
   342  			return false, nil
   343  		}
   344  		gotProfiles[e.InvolvedObject.Name] = e.ReportingController
   345  		return len(gotProfiles) >= len(wantProfiles), nil
   346  	}); err != nil {
   347  		t.Errorf("waiting for scheduling events: %v", err)
   348  	}
   349  
   350  	if diff := cmp.Diff(wantProfiles, gotProfiles); diff != "" {
   351  		t.Errorf("pods scheduled by the wrong profile (-want, +got):\n%s", diff)
   352  	}
   353  }
   354  
   355  // This test will verify scheduler can work well regardless of whether kubelet is allocatable aware or not.
   356  func TestAllocatable(t *testing.T) {
   357  	testCtx := testutils.InitTestSchedulerWithNS(t, "allocatable")
   358  
   359  	// 2. create a node without allocatable awareness
   360  	nodeRes := map[v1.ResourceName]string{
   361  		v1.ResourcePods:   "32",
   362  		v1.ResourceCPU:    "30m",
   363  		v1.ResourceMemory: "30",
   364  	}
   365  	allocNode, err := testutils.CreateNode(testCtx.ClientSet, st.MakeNode().Name("node-allocatable-scheduler-test-node").Capacity(nodeRes).Obj())
   366  	if err != nil {
   367  		t.Fatalf("Failed to create node: %v", err)
   368  	}
   369  
   370  	// 3. create resource pod which requires less than Capacity
   371  	podName := "pod-test-allocatable"
   372  	podRes := &v1.ResourceList{
   373  		v1.ResourceCPU:    *resource.NewMilliQuantity(20, resource.DecimalSI),
   374  		v1.ResourceMemory: *resource.NewQuantity(20, resource.BinarySI),
   375  	}
   376  	testAllocPod, err := testutils.CreatePausePodWithResource(testCtx.ClientSet, podName, testCtx.NS.Name, podRes)
   377  	if err != nil {
   378  		t.Fatalf("Test allocatable unawareness failed to create pod: %v", err)
   379  	}
   380  
   381  	// 4. Test: this test pod should be scheduled since api-server will use Capacity as Allocatable
   382  	err = testutils.WaitForPodToScheduleWithTimeout(testCtx.ClientSet, testAllocPod, time.Second*5)
   383  	if err != nil {
   384  		t.Errorf("Test allocatable unawareness: %s Pod not scheduled: %v", testAllocPod.Name, err)
   385  	} else {
   386  		t.Logf("Test allocatable unawareness: %s Pod scheduled", testAllocPod.Name)
   387  	}
   388  
   389  	// 5. Change the node status to allocatable aware, note that Allocatable is less than Pod's requirement
   390  	allocNode.Status = v1.NodeStatus{
   391  		Capacity: v1.ResourceList{
   392  			v1.ResourcePods:   *resource.NewQuantity(32, resource.DecimalSI),
   393  			v1.ResourceCPU:    *resource.NewMilliQuantity(30, resource.DecimalSI),
   394  			v1.ResourceMemory: *resource.NewQuantity(30, resource.BinarySI),
   395  		},
   396  		Allocatable: v1.ResourceList{
   397  			v1.ResourcePods:   *resource.NewQuantity(32, resource.DecimalSI),
   398  			v1.ResourceCPU:    *resource.NewMilliQuantity(10, resource.DecimalSI),
   399  			v1.ResourceMemory: *resource.NewQuantity(10, resource.BinarySI),
   400  		},
   401  	}
   402  
   403  	if _, err := testCtx.ClientSet.CoreV1().Nodes().UpdateStatus(context.TODO(), allocNode, metav1.UpdateOptions{}); err != nil {
   404  		t.Fatalf("Failed to update node with Status.Allocatable: %v", err)
   405  	}
   406  
   407  	if err := testutils.DeletePod(testCtx.ClientSet, testAllocPod.Name, testCtx.NS.Name); err != nil {
   408  		t.Fatalf("Failed to remove the first pod: %v", err)
   409  	}
   410  
   411  	// 6. Make another pod with different name, same resource request
   412  	podName2 := "pod-test-allocatable2"
   413  	testAllocPod2, err := testutils.CreatePausePodWithResource(testCtx.ClientSet, podName2, testCtx.NS.Name, podRes)
   414  	if err != nil {
   415  		t.Fatalf("Test allocatable awareness failed to create pod: %v", err)
   416  	}
   417  
   418  	// 7. Test: this test pod should not be scheduled since it request more than Allocatable
   419  	if err := testutils.WaitForPodToScheduleWithTimeout(testCtx.ClientSet, testAllocPod2, time.Second*5); err == nil {
   420  		t.Errorf("Test allocatable awareness: %s Pod got scheduled unexpectedly, %v", testAllocPod2.Name, err)
   421  	} else {
   422  		t.Logf("Test allocatable awareness: %s Pod not scheduled as expected", testAllocPod2.Name)
   423  	}
   424  }
   425  
   426  // TestSchedulerInformers tests that scheduler receives informer events and updates its cache when
   427  // pods are scheduled by other schedulers.
   428  func TestSchedulerInformers(t *testing.T) {
   429  	// Initialize scheduler.
   430  	testCtx := testutils.InitTestSchedulerWithNS(t, "scheduler-informer")
   431  	cs := testCtx.ClientSet
   432  
   433  	defaultPodRes := &v1.ResourceRequirements{Requests: v1.ResourceList{
   434  		v1.ResourceCPU:    *resource.NewMilliQuantity(200, resource.DecimalSI),
   435  		v1.ResourceMemory: *resource.NewQuantity(200, resource.BinarySI)},
   436  	}
   437  	defaultNodeRes := map[v1.ResourceName]string{
   438  		v1.ResourcePods:   "32",
   439  		v1.ResourceCPU:    "500m",
   440  		v1.ResourceMemory: "500",
   441  	}
   442  
   443  	type nodeConfig struct {
   444  		name string
   445  		res  map[v1.ResourceName]string
   446  	}
   447  
   448  	tests := []struct {
   449  		name                string
   450  		nodes               []*nodeConfig
   451  		existingPods        []*v1.Pod
   452  		pod                 *v1.Pod
   453  		preemptedPodIndexes map[int]struct{}
   454  	}{
   455  		{
   456  			name:  "Pod cannot be scheduled when node is occupied by pods scheduled by other schedulers",
   457  			nodes: []*nodeConfig{{name: "node-1", res: defaultNodeRes}},
   458  			existingPods: []*v1.Pod{
   459  				testutils.InitPausePod(&testutils.PausePodConfig{
   460  					Name:          "pod1",
   461  					Namespace:     testCtx.NS.Name,
   462  					Resources:     defaultPodRes,
   463  					Labels:        map[string]string{"foo": "bar"},
   464  					NodeName:      "node-1",
   465  					SchedulerName: "foo-scheduler",
   466  				}),
   467  				testutils.InitPausePod(&testutils.PausePodConfig{
   468  					Name:          "pod2",
   469  					Namespace:     testCtx.NS.Name,
   470  					Resources:     defaultPodRes,
   471  					Labels:        map[string]string{"foo": "bar"},
   472  					NodeName:      "node-1",
   473  					SchedulerName: "bar-scheduler",
   474  				}),
   475  			},
   476  			pod: testutils.InitPausePod(&testutils.PausePodConfig{
   477  				Name:      "unschedulable-pod",
   478  				Namespace: testCtx.NS.Name,
   479  				Resources: defaultPodRes,
   480  			}),
   481  			preemptedPodIndexes: map[int]struct{}{2: {}},
   482  		},
   483  	}
   484  
   485  	for _, test := range tests {
   486  		t.Run(test.name, func(t *testing.T) {
   487  			for _, nodeConf := range test.nodes {
   488  				_, err := testutils.CreateNode(cs, st.MakeNode().Name(nodeConf.name).Capacity(nodeConf.res).Obj())
   489  				if err != nil {
   490  					t.Fatalf("Error creating node %v: %v", nodeConf.name, err)
   491  				}
   492  			}
   493  			// Ensure nodes are present in scheduler cache.
   494  			if err := testutils.WaitForNodesInCache(testCtx.Scheduler, len(test.nodes)); err != nil {
   495  				t.Fatal(err)
   496  			}
   497  
   498  			pods := make([]*v1.Pod, len(test.existingPods))
   499  			var err error
   500  			// Create and run existingPods.
   501  			for i, p := range test.existingPods {
   502  				if pods[i], err = testutils.RunPausePod(cs, p); err != nil {
   503  					t.Fatalf("Error running pause pod: %v", err)
   504  				}
   505  			}
   506  			// Create the new "pod".
   507  			unschedulable, err := testutils.CreatePausePod(cs, test.pod)
   508  			if err != nil {
   509  				t.Errorf("Error while creating new pod: %v", err)
   510  			}
   511  			if err := testutils.WaitForPodUnschedulable(cs, unschedulable); err != nil {
   512  				t.Errorf("Pod %v got scheduled: %v", unschedulable.Name, err)
   513  			}
   514  
   515  			// Cleanup
   516  			pods = append(pods, unschedulable)
   517  			testutils.CleanupPods(testCtx.Ctx, cs, t, pods)
   518  			if err := cs.PolicyV1().PodDisruptionBudgets(testCtx.NS.Name).DeleteCollection(testCtx.Ctx, metav1.DeleteOptions{}, metav1.ListOptions{}); err != nil {
   519  				t.Errorf("error whiling deleting PDBs, error: %v", err)
   520  			}
   521  			if err := cs.CoreV1().Nodes().DeleteCollection(testCtx.Ctx, metav1.DeleteOptions{}, metav1.ListOptions{}); err != nil {
   522  				t.Errorf("error whiling deleting nodes, error: %v", err)
   523  			}
   524  		})
   525  	}
   526  }
   527  
   528  func TestNodeEvents(t *testing.T) {
   529  	// The test verifies that unschedulable pods are re-queued
   530  	// on node update events. The scenario we are testing is the following:
   531  	// 1. Create pod1 and node1 that is small enough to only fit pod1; pod1 schedules on node1
   532  	// 2. Create pod2, it should be unschedulable due to insufficient cpu
   533  	// 3. Create node2 with a taint, pod2 should still not schedule
   534  	// 4. Remove the taint from node2; pod2 should now schedule on node2
   535  
   536  	testCtx := testutils.InitTestSchedulerWithNS(t, "node-events")
   537  	defer testCtx.ClientSet.CoreV1().Nodes().DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{})
   538  
   539  	// 1.1 create pod1
   540  	pod1, err := testutils.CreatePausePodWithResource(testCtx.ClientSet, "pod1", testCtx.NS.Name, &v1.ResourceList{
   541  		v1.ResourceCPU: *resource.NewMilliQuantity(80, resource.DecimalSI),
   542  	})
   543  	if err != nil {
   544  		t.Fatalf("Failed to create pod: %v", err)
   545  	}
   546  
   547  	// 1.2 Create node1
   548  	node1, err := testutils.CreateNode(testCtx.ClientSet, st.MakeNode().
   549  		Name("node-events-test-node1").
   550  		Capacity(map[v1.ResourceName]string{
   551  			v1.ResourcePods:   "32",
   552  			v1.ResourceCPU:    "100m",
   553  			v1.ResourceMemory: "30",
   554  		}).Obj())
   555  	if err != nil {
   556  		t.Fatalf("Failed to create %s: %v", node1.Name, err)
   557  	}
   558  
   559  	// 1.3 verify pod1 is scheduled
   560  	err = testutils.WaitForPodToScheduleWithTimeout(testCtx.ClientSet, pod1, time.Second*5)
   561  	if err != nil {
   562  		t.Errorf("Pod %s didn't schedule: %v", pod1.Name, err)
   563  	}
   564  
   565  	// 2. create pod2
   566  	pod2, err := testutils.CreatePausePodWithResource(testCtx.ClientSet, "pod2", testCtx.NS.Name, &v1.ResourceList{
   567  		v1.ResourceCPU: *resource.NewMilliQuantity(40, resource.DecimalSI),
   568  	})
   569  	if err != nil {
   570  		t.Fatalf("Failed to create pod %v: %v", pod2.Name, err)
   571  	}
   572  
   573  	if err := testutils.WaitForPodUnschedulable(testCtx.ClientSet, pod2); err != nil {
   574  		t.Errorf("Pod %v got scheduled: %v", pod2.Name, err)
   575  	}
   576  
   577  	// 3.1 Create node2 with a taint
   578  	node2 := st.MakeNode().
   579  		Name("node-events-test-node2").
   580  		Capacity(map[v1.ResourceName]string{
   581  			v1.ResourcePods:   "32",
   582  			v1.ResourceCPU:    "100m",
   583  			v1.ResourceMemory: "30",
   584  		}).
   585  		Label("affinity-key", "affinity-value").
   586  		Taints([]v1.Taint{{Key: "taint-key", Effect: v1.TaintEffectNoSchedule}}).Obj()
   587  	node2, err = testutils.CreateNode(testCtx.ClientSet, node2)
   588  	if err != nil {
   589  		t.Fatalf("Failed to create %s: %v", node2.Name, err)
   590  	}
   591  	// make sure the scheduler received the node add event by creating a pod that only fits node2
   592  	plugPod := st.MakePod().Name("plug-pod").Namespace(testCtx.NS.Name).Container("pause").
   593  		Req(map[v1.ResourceName]string{v1.ResourceCPU: "40m"}).
   594  		NodeAffinityIn("affinity-key", []string{"affinity-value"}).
   595  		Toleration("taint-key").Obj()
   596  	plugPod, err = testCtx.ClientSet.CoreV1().Pods(plugPod.Namespace).Create(context.TODO(), plugPod, metav1.CreateOptions{})
   597  	if err != nil {
   598  		t.Fatalf("Failed to create pod %v: %v", plugPod.Name, err)
   599  	}
   600  	err = testutils.WaitForPodToScheduleWithTimeout(testCtx.ClientSet, plugPod, time.Second*5)
   601  	if err != nil {
   602  		t.Errorf("Pod %s didn't schedule: %v", plugPod.Name, err)
   603  	}
   604  
   605  	// 3.2 pod2 still unschedulable
   606  	if err := testutils.WaitForPodUnschedulable(testCtx.ClientSet, pod2); err != nil {
   607  		t.Errorf("Pod %v got scheduled: %v", pod2.Name, err)
   608  	}
   609  
   610  	// 4. Remove node taint, pod2 should schedule
   611  	node2.Spec.Taints = nil
   612  	node2, err = testutils.UpdateNode(testCtx.ClientSet, node2)
   613  	if err != nil {
   614  		t.Fatalf("Failed to update %s: %v", node2.Name, err)
   615  	}
   616  
   617  	err = testutils.WaitForPodToScheduleWithTimeout(testCtx.ClientSet, pod2, time.Second*5)
   618  	if err != nil {
   619  		t.Errorf("Pod %s didn't schedule: %v", pod2.Name, err)
   620  	}
   621  
   622  }
   623  
   624  // TestPodSchedulingContextSSA checks that the dynamicresources plugin falls
   625  // back to SSA successfully when the normal Update call encountered
   626  // a conflict.
   627  //
   628  // This is an integration test because:
   629  //   - Unit testing does not cover RBAC rules.
   630  //   - Triggering this particular race is harder in E2E testing
   631  //     and harder to verify (needs apiserver metrics and there's
   632  //     no standard API for those).
   633  func TestPodSchedulingContextSSA(t *testing.T) {
   634  	defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DynamicResourceAllocation, true)()
   635  
   636  	testCtx := testutils.InitTestAPIServer(t, "podschedulingcontext-ssa", nil)
   637  	testCtx.DisableEventSink = true
   638  	testCtx = testutils.InitTestSchedulerWithOptions(t, testCtx, 0)
   639  	testutils.SyncSchedulerInformerFactory(testCtx)
   640  	go testCtx.Scheduler.Run(testCtx.SchedulerCtx)
   641  
   642  	// Set up enough objects that the scheduler will start trying to
   643  	// schedule the pod and create the PodSchedulingContext.
   644  	nodeRes := map[v1.ResourceName]string{
   645  		v1.ResourcePods:   "32",
   646  		v1.ResourceCPU:    "30m",
   647  		v1.ResourceMemory: "30",
   648  	}
   649  	for _, name := range []string{"node-a", "node-b"} {
   650  		if _, err := testutils.CreateNode(testCtx.ClientSet, st.MakeNode().Name(name).Capacity(nodeRes).Obj()); err != nil {
   651  			t.Fatalf("Failed to create node: %v", err)
   652  		}
   653  	}
   654  
   655  	defer func() {
   656  		if err := testCtx.ClientSet.ResourceV1alpha2().ResourceClasses().DeleteCollection(testCtx.Ctx, metav1.DeleteOptions{}, metav1.ListOptions{}); err != nil {
   657  			t.Errorf("Unexpected error deleting ResourceClasses: %v", err)
   658  		}
   659  	}()
   660  	class := &resourcev1alpha2.ResourceClass{
   661  		ObjectMeta: metav1.ObjectMeta{
   662  			Name: "my-class",
   663  		},
   664  		DriverName: "does-not-matter",
   665  	}
   666  	if _, err := testCtx.ClientSet.ResourceV1alpha2().ResourceClasses().Create(testCtx.Ctx, class, metav1.CreateOptions{}); err != nil {
   667  		t.Fatalf("Failed to create class: %v", err)
   668  	}
   669  
   670  	claim := &resourcev1alpha2.ResourceClaim{
   671  		ObjectMeta: metav1.ObjectMeta{
   672  			Name:      "my-claim",
   673  			Namespace: testCtx.NS.Name,
   674  		},
   675  		Spec: resourcev1alpha2.ResourceClaimSpec{
   676  			ResourceClassName: class.Name,
   677  		},
   678  	}
   679  	if _, err := testCtx.ClientSet.ResourceV1alpha2().ResourceClaims(claim.Namespace).Create(testCtx.Ctx, claim, metav1.CreateOptions{}); err != nil {
   680  		t.Fatalf("Failed to create claim: %v", err)
   681  	}
   682  
   683  	podConf := testutils.PausePodConfig{
   684  		Name:      "testpod",
   685  		Namespace: testCtx.NS.Name,
   686  	}
   687  	pod := testutils.InitPausePod(&podConf)
   688  	podClaimName := "myclaim"
   689  	pod.Spec.Containers[0].Resources.Claims = []v1.ResourceClaim{{Name: podClaimName}}
   690  	pod.Spec.ResourceClaims = []v1.PodResourceClaim{{Name: podClaimName, Source: v1.ClaimSource{ResourceClaimName: &claim.Name}}}
   691  	if _, err := testCtx.ClientSet.CoreV1().Pods(pod.Namespace).Create(testCtx.Ctx, pod, metav1.CreateOptions{}); err != nil {
   692  		t.Fatalf("Failed to create pod: %v", err)
   693  	}
   694  
   695  	// Check that the PodSchedulingContext exists and has a selected node.
   696  	var schedulingCtx *resourcev1alpha2.PodSchedulingContext
   697  	if err := wait.PollUntilContextTimeout(testCtx.Ctx, 10*time.Microsecond, 30*time.Second, true,
   698  		func(context.Context) (bool, error) {
   699  			var err error
   700  			schedulingCtx, err = testCtx.ClientSet.ResourceV1alpha2().PodSchedulingContexts(pod.Namespace).Get(testCtx.Ctx, pod.Name, metav1.GetOptions{})
   701  			if apierrors.IsNotFound(err) {
   702  				return false, nil
   703  			}
   704  			if err == nil && schedulingCtx.Spec.SelectedNode != "" {
   705  				return true, nil
   706  			}
   707  			return false, err
   708  		}); err != nil {
   709  		t.Fatalf("Failed while waiting for PodSchedulingContext with selected node: %v\nLast PodSchedulingContext:\n%s", err, format.Object(schedulingCtx, 1))
   710  	}
   711  
   712  	// Force the plugin to use SSA.
   713  	var podSchedulingContextPatchCounter atomic.Int64
   714  	roundTrip := testutils.RoundTripWrapper(func(transport http.RoundTripper, req *http.Request) (*http.Response, error) {
   715  		if strings.HasPrefix(req.URL.Path, "/apis/resource.k8s.io/") &&
   716  			strings.HasSuffix(req.URL.Path, "/podschedulingcontexts/"+pod.Name) {
   717  			switch req.Method {
   718  			case http.MethodPut, http.MethodPost:
   719  				return &http.Response{
   720  					Status:     fmt.Sprintf("%d %s", http.StatusConflict, metav1.StatusReasonConflict),
   721  					StatusCode: http.StatusConflict,
   722  				}, nil
   723  			case http.MethodPatch:
   724  				podSchedulingContextPatchCounter.Add(1)
   725  			}
   726  		}
   727  		return transport.RoundTrip(req)
   728  	})
   729  	testCtx.RoundTrip.Store(&roundTrip)
   730  
   731  	// Now force the scheduler to update the PodSchedulingContext by setting UnsuitableNodes so that
   732  	// the selected node is not suitable.
   733  	schedulingCtx.Status.ResourceClaims = []resourcev1alpha2.ResourceClaimSchedulingStatus{{
   734  		Name:            podClaimName,
   735  		UnsuitableNodes: []string{schedulingCtx.Spec.SelectedNode},
   736  	}}
   737  
   738  	if _, err := testCtx.ClientSet.ResourceV1alpha2().PodSchedulingContexts(pod.Namespace).UpdateStatus(testCtx.Ctx, schedulingCtx, metav1.UpdateOptions{}); err != nil {
   739  		t.Fatalf("Unexpected PodSchedulingContext status update error: %v", err)
   740  	}
   741  
   742  	// We know that the scheduler has to use SSA because above we inject a conflict
   743  	// error whenever it tries to use a plain update. We just need to wait for it...
   744  	if err := wait.PollUntilContextTimeout(testCtx.Ctx, 10*time.Microsecond, time.Minute, true,
   745  		func(context.Context) (bool, error) {
   746  			return podSchedulingContextPatchCounter.Load() > 0, nil
   747  		}); err != nil {
   748  		t.Fatalf("Failed while waiting for PodSchedulingContext Patch: %v", err)
   749  	}
   750  }
   751  

View as plain text