...

Source file src/k8s.io/kubernetes/test/integration/volumescheduling/volume_binding_test.go

Documentation: k8s.io/kubernetes/test/integration/volumescheduling

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package volumescheduling
    18  
    19  // This file tests the VolumeScheduling feature.
    20  
    21  import (
    22  	"context"
    23  	"fmt"
    24  	"strconv"
    25  	"strings"
    26  	"testing"
    27  	"time"
    28  
    29  	"k8s.io/klog/v2"
    30  
    31  	v1 "k8s.io/api/core/v1"
    32  	storagev1 "k8s.io/api/storage/v1"
    33  	"k8s.io/apimachinery/pkg/api/resource"
    34  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    35  	"k8s.io/apimachinery/pkg/util/rand"
    36  	"k8s.io/apimachinery/pkg/util/sets"
    37  	"k8s.io/apimachinery/pkg/util/wait"
    38  	"k8s.io/client-go/informers"
    39  	clientset "k8s.io/client-go/kubernetes"
    40  	"k8s.io/client-go/util/workqueue"
    41  	"k8s.io/kubernetes/pkg/controller/volume/persistentvolume"
    42  	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodevolumelimits"
    43  	"k8s.io/kubernetes/pkg/volume"
    44  	volumetest "k8s.io/kubernetes/pkg/volume/testing"
    45  	testutil "k8s.io/kubernetes/test/integration/util"
    46  	imageutils "k8s.io/kubernetes/test/utils/image"
    47  )
    48  
    49  type testConfig struct {
    50  	client   clientset.Interface
    51  	ns       string
    52  	stop     <-chan struct{}
    53  	teardown func()
    54  }
    55  
    56  var (
    57  	// Delete API objects immediately
    58  	deletePeriod = int64(0)
    59  	deleteOption = metav1.DeleteOptions{GracePeriodSeconds: &deletePeriod}
    60  
    61  	modeWait      = storagev1.VolumeBindingWaitForFirstConsumer
    62  	modeImmediate = storagev1.VolumeBindingImmediate
    63  
    64  	classWait         = "wait"
    65  	classImmediate    = "immediate"
    66  	classDynamic      = "dynamic"
    67  	classTopoMismatch = "topomismatch"
    68  
    69  	sharedClasses = map[string]*storagev1.StorageClass{
    70  		classImmediate: makeStorageClass(classImmediate, &modeImmediate),
    71  		classWait:      makeStorageClass(classWait, &modeWait),
    72  	}
    73  )
    74  
    75  const (
    76  	node1                 = "node-1"
    77  	node2                 = "node-2"
    78  	podLimit              = 50
    79  	volsPerPod            = 3
    80  	nodeAffinityLabelKey  = "kubernetes.io/hostname"
    81  	provisionerPluginName = "mock-provisioner.kubernetes.io"
    82  )
    83  
    84  type testPV struct {
    85  	name        string
    86  	scName      string
    87  	preboundPVC string
    88  	node        string
    89  }
    90  
    91  type testPVC struct {
    92  	name       string
    93  	scName     string
    94  	preboundPV string
    95  }
    96  
    97  func TestVolumeBinding(t *testing.T) {
    98  	config := setupCluster(t, "volume-scheduling-", 2, 0, 0)
    99  	defer config.teardown()
   100  
   101  	cases := map[string]struct {
   102  		pod  *v1.Pod
   103  		pvs  []*testPV
   104  		pvcs []*testPVC
   105  		// Create these, but they should not be bound in the end
   106  		unboundPvcs []*testPVC
   107  		unboundPvs  []*testPV
   108  		shouldFail  bool
   109  	}{
   110  		"immediate can bind": {
   111  			pod:  makePod("pod-i-canbind", config.ns, []string{"pvc-i-canbind"}),
   112  			pvs:  []*testPV{{"pv-i-canbind", classImmediate, "", node1}},
   113  			pvcs: []*testPVC{{"pvc-i-canbind", classImmediate, ""}},
   114  		},
   115  		"immediate cannot bind": {
   116  			pod:         makePod("pod-i-cannotbind", config.ns, []string{"pvc-i-cannotbind"}),
   117  			unboundPvcs: []*testPVC{{"pvc-i-cannotbind", classImmediate, ""}},
   118  			shouldFail:  true,
   119  		},
   120  		"immediate pvc prebound": {
   121  			pod:  makePod("pod-i-pvc-prebound", config.ns, []string{"pvc-i-prebound"}),
   122  			pvs:  []*testPV{{"pv-i-pvc-prebound", classImmediate, "", node1}},
   123  			pvcs: []*testPVC{{"pvc-i-prebound", classImmediate, "pv-i-pvc-prebound"}},
   124  		},
   125  		"immediate pv prebound": {
   126  			pod:  makePod("pod-i-pv-prebound", config.ns, []string{"pvc-i-pv-prebound"}),
   127  			pvs:  []*testPV{{"pv-i-prebound", classImmediate, "pvc-i-pv-prebound", node1}},
   128  			pvcs: []*testPVC{{"pvc-i-pv-prebound", classImmediate, ""}},
   129  		},
   130  		"wait can bind": {
   131  			pod:  makePod("pod-w-canbind", config.ns, []string{"pvc-w-canbind"}),
   132  			pvs:  []*testPV{{"pv-w-canbind", classWait, "", node1}},
   133  			pvcs: []*testPVC{{"pvc-w-canbind", classWait, ""}},
   134  		},
   135  		"wait cannot bind": {
   136  			pod:         makePod("pod-w-cannotbind", config.ns, []string{"pvc-w-cannotbind"}),
   137  			unboundPvcs: []*testPVC{{"pvc-w-cannotbind", classWait, ""}},
   138  			shouldFail:  true,
   139  		},
   140  		"wait pvc prebound": {
   141  			pod:  makePod("pod-w-pvc-prebound", config.ns, []string{"pvc-w-prebound"}),
   142  			pvs:  []*testPV{{"pv-w-pvc-prebound", classWait, "", node1}},
   143  			pvcs: []*testPVC{{"pvc-w-prebound", classWait, "pv-w-pvc-prebound"}},
   144  		},
   145  		"wait pv prebound": {
   146  			pod:  makePod("pod-w-pv-prebound", config.ns, []string{"pvc-w-pv-prebound"}),
   147  			pvs:  []*testPV{{"pv-w-prebound", classWait, "pvc-w-pv-prebound", node1}},
   148  			pvcs: []*testPVC{{"pvc-w-pv-prebound", classWait, ""}},
   149  		},
   150  		"wait can bind two": {
   151  			pod: makePod("pod-w-canbind-2", config.ns, []string{"pvc-w-canbind-2", "pvc-w-canbind-3"}),
   152  			pvs: []*testPV{
   153  				{"pv-w-canbind-2", classWait, "", node2},
   154  				{"pv-w-canbind-3", classWait, "", node2},
   155  			},
   156  			pvcs: []*testPVC{
   157  				{"pvc-w-canbind-2", classWait, ""},
   158  				{"pvc-w-canbind-3", classWait, ""},
   159  			},
   160  			unboundPvs: []*testPV{
   161  				{"pv-w-canbind-5", classWait, "", node1},
   162  			},
   163  		},
   164  		"wait cannot bind two": {
   165  			pod: makePod("pod-w-cannotbind-2", config.ns, []string{"pvc-w-cannotbind-1", "pvc-w-cannotbind-2"}),
   166  			unboundPvcs: []*testPVC{
   167  				{"pvc-w-cannotbind-1", classWait, ""},
   168  				{"pvc-w-cannotbind-2", classWait, ""},
   169  			},
   170  			unboundPvs: []*testPV{
   171  				{"pv-w-cannotbind-1", classWait, "", node2},
   172  				{"pv-w-cannotbind-2", classWait, "", node1},
   173  			},
   174  			shouldFail: true,
   175  		},
   176  		"mix immediate and wait": {
   177  			pod: makePod("pod-mix-bound", config.ns, []string{"pvc-w-canbind-4", "pvc-i-canbind-2"}),
   178  			pvs: []*testPV{
   179  				{"pv-w-canbind-4", classWait, "", node1},
   180  				{"pv-i-canbind-2", classImmediate, "", node1},
   181  			},
   182  			pvcs: []*testPVC{
   183  				{"pvc-w-canbind-4", classWait, ""},
   184  				{"pvc-i-canbind-2", classImmediate, ""},
   185  			},
   186  		},
   187  	}
   188  
   189  	for name, test := range cases {
   190  		klog.Infof("Running test %v", name)
   191  
   192  		// Create two StorageClasses
   193  		suffix := rand.String(4)
   194  		classes := map[string]*storagev1.StorageClass{}
   195  		classes[classImmediate] = makeStorageClass(fmt.Sprintf("immediate-%v", suffix), &modeImmediate)
   196  		classes[classWait] = makeStorageClass(fmt.Sprintf("wait-%v", suffix), &modeWait)
   197  		for _, sc := range classes {
   198  			if _, err := config.client.StorageV1().StorageClasses().Create(context.TODO(), sc, metav1.CreateOptions{}); err != nil {
   199  				t.Fatalf("Failed to create StorageClass %q: %v", sc.Name, err)
   200  			}
   201  		}
   202  
   203  		// Create PVs
   204  		for _, pvConfig := range test.pvs {
   205  			pv := makePV(pvConfig.name, classes[pvConfig.scName].Name, pvConfig.preboundPVC, config.ns, pvConfig.node)
   206  			if _, err := config.client.CoreV1().PersistentVolumes().Create(context.TODO(), pv, metav1.CreateOptions{}); err != nil {
   207  				t.Fatalf("Failed to create PersistentVolume %q: %v", pv.Name, err)
   208  			}
   209  		}
   210  
   211  		for _, pvConfig := range test.unboundPvs {
   212  			pv := makePV(pvConfig.name, classes[pvConfig.scName].Name, pvConfig.preboundPVC, config.ns, pvConfig.node)
   213  			if _, err := config.client.CoreV1().PersistentVolumes().Create(context.TODO(), pv, metav1.CreateOptions{}); err != nil {
   214  				t.Fatalf("Failed to create PersistentVolume %q: %v", pv.Name, err)
   215  			}
   216  		}
   217  
   218  		// Wait for PVs to become available to avoid race condition in PV controller
   219  		// https://github.com/kubernetes/kubernetes/issues/85320
   220  		for _, pvConfig := range test.pvs {
   221  			if err := waitForPVPhase(config.client, pvConfig.name, v1.VolumeAvailable); err != nil {
   222  				t.Fatalf("PersistentVolume %q failed to become available: %v", pvConfig.name, err)
   223  			}
   224  		}
   225  
   226  		for _, pvConfig := range test.unboundPvs {
   227  			if err := waitForPVPhase(config.client, pvConfig.name, v1.VolumeAvailable); err != nil {
   228  				t.Fatalf("PersistentVolume %q failed to become available: %v", pvConfig.name, err)
   229  			}
   230  		}
   231  
   232  		// Create PVCs
   233  		for _, pvcConfig := range test.pvcs {
   234  			pvc := makePVC(pvcConfig.name, config.ns, &classes[pvcConfig.scName].Name, pvcConfig.preboundPV)
   235  			if _, err := config.client.CoreV1().PersistentVolumeClaims(config.ns).Create(context.TODO(), pvc, metav1.CreateOptions{}); err != nil {
   236  				t.Fatalf("Failed to create PersistentVolumeClaim %q: %v", pvc.Name, err)
   237  			}
   238  		}
   239  		for _, pvcConfig := range test.unboundPvcs {
   240  			pvc := makePVC(pvcConfig.name, config.ns, &classes[pvcConfig.scName].Name, pvcConfig.preboundPV)
   241  			if _, err := config.client.CoreV1().PersistentVolumeClaims(config.ns).Create(context.TODO(), pvc, metav1.CreateOptions{}); err != nil {
   242  				t.Fatalf("Failed to create PersistentVolumeClaim %q: %v", pvc.Name, err)
   243  			}
   244  		}
   245  
   246  		// Create Pod
   247  		if _, err := config.client.CoreV1().Pods(config.ns).Create(context.TODO(), test.pod, metav1.CreateOptions{}); err != nil {
   248  			t.Fatalf("Failed to create Pod %q: %v", test.pod.Name, err)
   249  		}
   250  		if test.shouldFail {
   251  			if err := waitForPodUnschedulable(config.client, test.pod); err != nil {
   252  				t.Errorf("Pod %q was not unschedulable: %v", test.pod.Name, err)
   253  			}
   254  		} else {
   255  			if err := waitForPodToSchedule(config.client, test.pod); err != nil {
   256  				t.Errorf("Failed to schedule Pod %q: %v", test.pod.Name, err)
   257  			}
   258  		}
   259  
   260  		// Validate PVC/PV binding
   261  		for _, pvc := range test.pvcs {
   262  			validatePVCPhase(t, config.client, pvc.name, config.ns, v1.ClaimBound, false)
   263  		}
   264  		for _, pvc := range test.unboundPvcs {
   265  			validatePVCPhase(t, config.client, pvc.name, config.ns, v1.ClaimPending, false)
   266  		}
   267  		for _, pv := range test.pvs {
   268  			validatePVPhase(t, config.client, pv.name, v1.VolumeBound)
   269  		}
   270  		for _, pv := range test.unboundPvs {
   271  			validatePVPhase(t, config.client, pv.name, v1.VolumeAvailable)
   272  		}
   273  
   274  		// Force delete objects, but they still may not be immediately removed
   275  		deleteTestObjects(config.client, config.ns, deleteOption)
   276  	}
   277  }
   278  
   279  // TestVolumeBindingRescheduling tests scheduler will retry scheduling when needed.
   280  func TestVolumeBindingRescheduling(t *testing.T) {
   281  	config := setupCluster(t, "volume-scheduling-", 2, 0, 0)
   282  	defer config.teardown()
   283  
   284  	storageClassName := "local-storage"
   285  
   286  	cases := map[string]struct {
   287  		pod        *v1.Pod
   288  		pvcs       []*testPVC
   289  		pvs        []*testPV
   290  		trigger    func(config *testConfig)
   291  		shouldFail bool
   292  	}{
   293  		"reschedule on WaitForFirstConsumer dynamic storage class add": {
   294  			pod: makePod("pod-reschedule-onclassadd-dynamic", config.ns, []string{"pvc-reschedule-onclassadd-dynamic"}),
   295  			pvcs: []*testPVC{
   296  				{"pvc-reschedule-onclassadd-dynamic", "", ""},
   297  			},
   298  			trigger: func(config *testConfig) {
   299  				sc := makeDynamicProvisionerStorageClass(storageClassName, &modeWait, nil)
   300  				if _, err := config.client.StorageV1().StorageClasses().Create(context.TODO(), sc, metav1.CreateOptions{}); err != nil {
   301  					t.Fatalf("Failed to create StorageClass %q: %v", sc.Name, err)
   302  				}
   303  			},
   304  			shouldFail: false,
   305  		},
   306  		"reschedule on WaitForFirstConsumer static storage class add": {
   307  			pod: makePod("pod-reschedule-onclassadd-static", config.ns, []string{"pvc-reschedule-onclassadd-static"}),
   308  			pvcs: []*testPVC{
   309  				{"pvc-reschedule-onclassadd-static", "", ""},
   310  			},
   311  			trigger: func(config *testConfig) {
   312  				sc := makeStorageClass(storageClassName, &modeWait)
   313  				if _, err := config.client.StorageV1().StorageClasses().Create(context.TODO(), sc, metav1.CreateOptions{}); err != nil {
   314  					t.Fatalf("Failed to create StorageClass %q: %v", sc.Name, err)
   315  				}
   316  				// Create pv for this class to mock static provisioner behavior.
   317  				pv := makePV("pv-reschedule-onclassadd-static", storageClassName, "", "", node1)
   318  				if pv, err := config.client.CoreV1().PersistentVolumes().Create(context.TODO(), pv, metav1.CreateOptions{}); err != nil {
   319  					t.Fatalf("Failed to create PersistentVolume %q: %v", pv.Name, err)
   320  				}
   321  			},
   322  			shouldFail: false,
   323  		},
   324  		"reschedule on delay binding PVC add": {
   325  			pod: makePod("pod-reschedule-onpvcadd", config.ns, []string{"pvc-reschedule-onpvcadd"}),
   326  			pvs: []*testPV{
   327  				{
   328  					name:   "pv-reschedule-onpvcadd",
   329  					scName: classWait,
   330  					node:   node1,
   331  				},
   332  			},
   333  			trigger: func(config *testConfig) {
   334  				pvc := makePVC("pvc-reschedule-onpvcadd", config.ns, &classWait, "")
   335  				if _, err := config.client.CoreV1().PersistentVolumeClaims(config.ns).Create(context.TODO(), pvc, metav1.CreateOptions{}); err != nil {
   336  					t.Fatalf("Failed to create PersistentVolumeClaim %q: %v", pvc.Name, err)
   337  				}
   338  			},
   339  			shouldFail: false,
   340  		},
   341  	}
   342  
   343  	for name, test := range cases {
   344  		klog.Infof("Running test %v", name)
   345  
   346  		if test.pod == nil {
   347  			t.Fatal("pod is required for this test")
   348  		}
   349  
   350  		// Create unbound pvc
   351  		for _, pvcConfig := range test.pvcs {
   352  			pvc := makePVC(pvcConfig.name, config.ns, &storageClassName, "")
   353  			if _, err := config.client.CoreV1().PersistentVolumeClaims(config.ns).Create(context.TODO(), pvc, metav1.CreateOptions{}); err != nil {
   354  				t.Fatalf("Failed to create PersistentVolumeClaim %q: %v", pvc.Name, err)
   355  			}
   356  		}
   357  
   358  		// Create PVs
   359  		for _, pvConfig := range test.pvs {
   360  			pv := makePV(pvConfig.name, sharedClasses[pvConfig.scName].Name, pvConfig.preboundPVC, config.ns, pvConfig.node)
   361  			if _, err := config.client.CoreV1().PersistentVolumes().Create(context.TODO(), pv, metav1.CreateOptions{}); err != nil {
   362  				t.Fatalf("Failed to create PersistentVolume %q: %v", pv.Name, err)
   363  			}
   364  		}
   365  
   366  		// Create pod
   367  		if _, err := config.client.CoreV1().Pods(config.ns).Create(context.TODO(), test.pod, metav1.CreateOptions{}); err != nil {
   368  			t.Fatalf("Failed to create Pod %q: %v", test.pod.Name, err)
   369  		}
   370  
   371  		// Wait for pod is unschedulable.
   372  		klog.Infof("Waiting for pod is unschedulable")
   373  		if err := waitForPodUnschedulable(config.client, test.pod); err != nil {
   374  			t.Errorf("Failed as Pod %s was not unschedulable: %v", test.pod.Name, err)
   375  		}
   376  
   377  		// Trigger
   378  		test.trigger(config)
   379  
   380  		// Wait for pod is scheduled or unschedulable.
   381  		if !test.shouldFail {
   382  			klog.Infof("Waiting for pod is scheduled")
   383  			if err := waitForPodToSchedule(config.client, test.pod); err != nil {
   384  				t.Errorf("Failed to schedule Pod %q: %v", test.pod.Name, err)
   385  			}
   386  		} else {
   387  			klog.Infof("Waiting for pod is unschedulable")
   388  			if err := waitForPodUnschedulable(config.client, test.pod); err != nil {
   389  				t.Errorf("Failed as Pod %s was not unschedulable: %v", test.pod.Name, err)
   390  			}
   391  		}
   392  
   393  		// Force delete objects, but they still may not be immediately removed
   394  		deleteTestObjects(config.client, config.ns, deleteOption)
   395  	}
   396  }
   397  
   398  // TestVolumeBindingStress creates <podLimit> pods, each with <volsPerPod> unbound or prebound PVCs.
   399  // PVs are precreated.
   400  func TestVolumeBindingStress(t *testing.T) {
   401  	testVolumeBindingStress(t, 0, false, 0)
   402  }
   403  
   404  // Like TestVolumeBindingStress but with scheduler resync. In real cluster,
   405  // scheduler will schedule failed pod frequently due to various events, e.g.
   406  // service/node update events.
   407  // This is useful to detect possible race conditions.
   408  func TestVolumeBindingStressWithSchedulerResync(t *testing.T) {
   409  	testVolumeBindingStress(t, time.Second, false, 0)
   410  }
   411  
   412  // Like TestVolumeBindingStress but with fast dynamic provisioning
   413  func TestVolumeBindingDynamicStressFast(t *testing.T) {
   414  	testVolumeBindingStress(t, 0, true, 0)
   415  }
   416  
   417  // Like TestVolumeBindingStress but with slow dynamic provisioning
   418  func TestVolumeBindingDynamicStressSlow(t *testing.T) {
   419  	testVolumeBindingStress(t, 0, true, 10)
   420  }
   421  
   422  func testVolumeBindingStress(t *testing.T, schedulerResyncPeriod time.Duration, dynamic bool, provisionDelaySeconds int) {
   423  	config := setupCluster(t, "volume-binding-stress-", 1, schedulerResyncPeriod, provisionDelaySeconds)
   424  	defer config.teardown()
   425  
   426  	// Set max volume limit to the number of PVCs the test will create
   427  	// TODO: remove when max volume limit allows setting through storageclass
   428  	t.Setenv(nodevolumelimits.KubeMaxPDVols, fmt.Sprintf("%v", podLimit*volsPerPod))
   429  
   430  	scName := &classWait
   431  	if dynamic {
   432  		scName = &classDynamic
   433  		sc := makeDynamicProvisionerStorageClass(*scName, &modeWait, nil)
   434  		if _, err := config.client.StorageV1().StorageClasses().Create(context.TODO(), sc, metav1.CreateOptions{}); err != nil {
   435  			t.Fatalf("Failed to create StorageClass %q: %v", sc.Name, err)
   436  		}
   437  	}
   438  
   439  	klog.Infof("Start creating PVs and PVCs")
   440  	// Create enough PVs and PVCs for all the pods
   441  	podVolumesCount := podLimit * volsPerPod
   442  	pvs := make([]*v1.PersistentVolume, podVolumesCount)
   443  	pvcs := make([]*v1.PersistentVolumeClaim, podVolumesCount)
   444  	workqueue.ParallelizeUntil(context.TODO(), 16, podVolumesCount, func(i int) {
   445  		var (
   446  			pv      *v1.PersistentVolume
   447  			pvc     *v1.PersistentVolumeClaim
   448  			pvName  = fmt.Sprintf("pv-stress-%v", i)
   449  			pvcName = fmt.Sprintf("pvc-stress-%v", i)
   450  		)
   451  		// Don't create pvs for dynamic provisioning test
   452  		if !dynamic {
   453  			if rand.Int()%2 == 0 {
   454  				// static unbound pvs
   455  				pv = makePV(pvName, *scName, "", "", node1)
   456  			} else {
   457  				// static prebound pvs
   458  				pv = makePV(pvName, classImmediate, pvcName, config.ns, node1)
   459  			}
   460  			if pv, err := config.client.CoreV1().PersistentVolumes().Create(context.TODO(), pv, metav1.CreateOptions{}); err != nil {
   461  				t.Fatalf("Failed to create PersistentVolume %q: %v", pv.Name, err)
   462  			}
   463  			pvs[i] = pv
   464  		}
   465  		if pv != nil && pv.Spec.ClaimRef != nil && pv.Spec.ClaimRef.Name == pvcName {
   466  			pvc = makePVC(pvcName, config.ns, &classImmediate, pv.Name)
   467  		} else {
   468  			pvc = makePVC(pvcName, config.ns, scName, "")
   469  		}
   470  		if pvc, err := config.client.CoreV1().PersistentVolumeClaims(config.ns).Create(context.TODO(), pvc, metav1.CreateOptions{}); err != nil {
   471  			t.Fatalf("Failed to create PersistentVolumeClaim %q: %v", pvc.Name, err)
   472  		}
   473  		pvcs[i] = pvc
   474  	})
   475  
   476  	klog.Infof("Start creating Pods")
   477  	pods := make([]*v1.Pod, podLimit)
   478  	workqueue.ParallelizeUntil(context.TODO(), 16, podLimit, func(i int) {
   479  		// Generate string of all the PVCs for the pod
   480  		podPvcs := []string{}
   481  		for j := i * volsPerPod; j < (i+1)*volsPerPod; j++ {
   482  			podPvcs = append(podPvcs, pvcs[j].Name)
   483  		}
   484  
   485  		pod := makePod(fmt.Sprintf("pod%03d", i), config.ns, podPvcs)
   486  		if pod, err := config.client.CoreV1().Pods(config.ns).Create(context.TODO(), pod, metav1.CreateOptions{}); err != nil {
   487  			t.Fatalf("Failed to create Pod %q: %v", pod.Name, err)
   488  		}
   489  		pods[i] = pod
   490  	})
   491  
   492  	klog.Infof("Start validating pod scheduled")
   493  	// Validate Pods scheduled
   494  	workqueue.ParallelizeUntil(context.TODO(), 16, len(pods), func(i int) {
   495  		pod := pods[i]
   496  		// Use increased timeout for stress test because there is a higher chance of
   497  		// PV sync error
   498  		if err := waitForPodToScheduleWithTimeout(config.client, pod, 2*time.Minute); err != nil {
   499  			t.Errorf("Failed to schedule Pod %q: %v", pod.Name, err)
   500  		}
   501  	})
   502  
   503  	klog.Infof("Start validating PVCs scheduled")
   504  	// Validate PVC/PV binding
   505  	workqueue.ParallelizeUntil(context.TODO(), 16, len(pvcs), func(i int) {
   506  		validatePVCPhase(t, config.client, pvcs[i].Name, config.ns, v1.ClaimBound, dynamic)
   507  	})
   508  
   509  	// Don't validate pv for dynamic provisioning test
   510  	if !dynamic {
   511  		klog.Infof("Start validating PVs scheduled")
   512  		workqueue.ParallelizeUntil(context.TODO(), 16, len(pvs), func(i int) {
   513  			validatePVPhase(t, config.client, pvs[i].Name, v1.VolumeBound)
   514  		})
   515  	}
   516  }
   517  
   518  func testVolumeBindingWithAffinity(t *testing.T, anti bool, numNodes, numPods, numPVsFirstNode int) {
   519  	config := setupCluster(t, "volume-pod-affinity-", numNodes, 0, 0)
   520  	defer config.teardown()
   521  
   522  	pods := []*v1.Pod{}
   523  	pvcs := []*v1.PersistentVolumeClaim{}
   524  
   525  	// Create PVs for the first node
   526  	for i := 0; i < numPVsFirstNode; i++ {
   527  		pv := makePV(fmt.Sprintf("pv-node1-%v", i), classWait, "", "", node1)
   528  		if pv, err := config.client.CoreV1().PersistentVolumes().Create(context.TODO(), pv, metav1.CreateOptions{}); err != nil {
   529  			t.Fatalf("Failed to create PersistentVolume %q: %v", pv.Name, err)
   530  		}
   531  	}
   532  
   533  	// Create 1 PV per Node for the remaining nodes
   534  	for i := 2; i <= numNodes; i++ {
   535  		pv := makePV(fmt.Sprintf("pv-node%v-0", i), classWait, "", "", fmt.Sprintf("node-%v", i))
   536  		if pv, err := config.client.CoreV1().PersistentVolumes().Create(context.TODO(), pv, metav1.CreateOptions{}); err != nil {
   537  			t.Fatalf("Failed to create PersistentVolume %q: %v", pv.Name, err)
   538  		}
   539  	}
   540  
   541  	// Create pods
   542  	for i := 0; i < numPods; i++ {
   543  		// Create one pvc per pod
   544  		pvc := makePVC(fmt.Sprintf("pvc-%v", i), config.ns, &classWait, "")
   545  		if pvc, err := config.client.CoreV1().PersistentVolumeClaims(config.ns).Create(context.TODO(), pvc, metav1.CreateOptions{}); err != nil {
   546  			t.Fatalf("Failed to create PersistentVolumeClaim %q: %v", pvc.Name, err)
   547  		}
   548  		pvcs = append(pvcs, pvc)
   549  
   550  		// Create pod with pod affinity
   551  		pod := makePod(fmt.Sprintf("pod%03d", i), config.ns, []string{pvc.Name})
   552  		pod.Spec.Affinity = &v1.Affinity{}
   553  		affinityTerms := []v1.PodAffinityTerm{
   554  			{
   555  				LabelSelector: &metav1.LabelSelector{
   556  					MatchExpressions: []metav1.LabelSelectorRequirement{
   557  						{
   558  							Key:      "app",
   559  							Operator: metav1.LabelSelectorOpIn,
   560  							Values:   []string{"volume-binding-test"},
   561  						},
   562  					},
   563  				},
   564  				TopologyKey: nodeAffinityLabelKey,
   565  			},
   566  		}
   567  		if anti {
   568  			pod.Spec.Affinity.PodAntiAffinity = &v1.PodAntiAffinity{
   569  				RequiredDuringSchedulingIgnoredDuringExecution: affinityTerms,
   570  			}
   571  		} else {
   572  			pod.Spec.Affinity.PodAffinity = &v1.PodAffinity{
   573  				RequiredDuringSchedulingIgnoredDuringExecution: affinityTerms,
   574  			}
   575  		}
   576  
   577  		if pod, err := config.client.CoreV1().Pods(config.ns).Create(context.TODO(), pod, metav1.CreateOptions{}); err != nil {
   578  			t.Fatalf("Failed to create Pod %q: %v", pod.Name, err)
   579  		}
   580  		pods = append(pods, pod)
   581  	}
   582  
   583  	// Validate Pods scheduled
   584  	scheduledNodes := sets.NewString()
   585  	for _, pod := range pods {
   586  		if err := waitForPodToSchedule(config.client, pod); err != nil {
   587  			t.Errorf("Failed to schedule Pod %q: %v", pod.Name, err)
   588  		} else {
   589  			// Keep track of all the nodes that the Pods were scheduled on
   590  			pod, err = config.client.CoreV1().Pods(config.ns).Get(context.TODO(), pod.Name, metav1.GetOptions{})
   591  			if err != nil {
   592  				t.Fatalf("Failed to get Pod %q: %v", pod.Name, err)
   593  			}
   594  			if pod.Spec.NodeName == "" {
   595  				t.Fatalf("Pod %q node name unset after scheduling", pod.Name)
   596  			}
   597  			scheduledNodes.Insert(pod.Spec.NodeName)
   598  		}
   599  	}
   600  
   601  	// Validate the affinity policy
   602  	if anti {
   603  		// The pods should have been spread across different nodes
   604  		if scheduledNodes.Len() != numPods {
   605  			t.Errorf("Pods were scheduled across %v nodes instead of %v", scheduledNodes.Len(), numPods)
   606  		}
   607  	} else {
   608  		// The pods should have been scheduled on 1 node
   609  		if scheduledNodes.Len() != 1 {
   610  			t.Errorf("Pods were scheduled across %v nodes instead of %v", scheduledNodes.Len(), 1)
   611  		}
   612  	}
   613  
   614  	// Validate PVC binding
   615  	for _, pvc := range pvcs {
   616  		validatePVCPhase(t, config.client, pvc.Name, config.ns, v1.ClaimBound, false)
   617  	}
   618  }
   619  
   620  func TestVolumeBindingWithAntiAffinity(t *testing.T) {
   621  	numNodes := 10
   622  	// Create as many pods as number of nodes
   623  	numPods := numNodes
   624  	// Create many more PVs on node1 to increase chance of selecting node1
   625  	numPVsFirstNode := 10 * numNodes
   626  
   627  	testVolumeBindingWithAffinity(t, true, numNodes, numPods, numPVsFirstNode)
   628  }
   629  
   630  func TestVolumeBindingWithAffinity(t *testing.T) {
   631  	numPods := 10
   632  	// Create many more nodes to increase chance of selecting a PV on a different node than node1
   633  	numNodes := 10 * numPods
   634  	// Create numPods PVs on the first node
   635  	numPVsFirstNode := numPods
   636  
   637  	testVolumeBindingWithAffinity(t, true, numNodes, numPods, numPVsFirstNode)
   638  }
   639  
   640  func TestPVAffinityConflict(t *testing.T) {
   641  	config := setupCluster(t, "volume-scheduling-", 3, 0, 0)
   642  	defer config.teardown()
   643  
   644  	pv := makePV("local-pv", classImmediate, "", "", node1)
   645  	pvc := makePVC("local-pvc", config.ns, &classImmediate, "")
   646  
   647  	// Create PV
   648  	if _, err := config.client.CoreV1().PersistentVolumes().Create(context.TODO(), pv, metav1.CreateOptions{}); err != nil {
   649  		t.Fatalf("Failed to create PersistentVolume %q: %v", pv.Name, err)
   650  	}
   651  
   652  	// Create PVC
   653  	if _, err := config.client.CoreV1().PersistentVolumeClaims(config.ns).Create(context.TODO(), pvc, metav1.CreateOptions{}); err != nil {
   654  		t.Fatalf("Failed to create PersistentVolumeClaim %q: %v", pvc.Name, err)
   655  	}
   656  
   657  	// Wait for PVC bound
   658  	if err := waitForPVCBound(config.client, pvc); err != nil {
   659  		t.Fatalf("PVC %q failed to bind: %v", pvc.Name, err)
   660  	}
   661  
   662  	nodeMarkers := []interface{}{
   663  		markNodeAffinity,
   664  		markNodeSelector,
   665  	}
   666  	for i := 0; i < len(nodeMarkers); i++ {
   667  		podName := "local-pod-" + strconv.Itoa(i+1)
   668  		pod := makePod(podName, config.ns, []string{"local-pvc"})
   669  		nodeMarkers[i].(func(*v1.Pod, string))(pod, "node-2")
   670  		// Create Pod
   671  		if _, err := config.client.CoreV1().Pods(config.ns).Create(context.TODO(), pod, metav1.CreateOptions{}); err != nil {
   672  			t.Fatalf("Failed to create Pod %q: %v", pod.Name, err)
   673  		}
   674  		// Give time to scheduler to attempt to schedule pod
   675  		if err := waitForPodUnschedulable(config.client, pod); err != nil {
   676  			t.Errorf("Failed as Pod %s was not unschedulable: %v", pod.Name, err)
   677  		}
   678  		// Check pod conditions
   679  		p, err := config.client.CoreV1().Pods(config.ns).Get(context.TODO(), podName, metav1.GetOptions{})
   680  		if err != nil {
   681  			t.Fatalf("Failed to access Pod %s status: %v", podName, err)
   682  		}
   683  		if strings.Compare(string(p.Status.Phase), "Pending") != 0 {
   684  			t.Fatalf("Failed as Pod %s was in: %s state and not in expected: Pending state", podName, p.Status.Phase)
   685  		}
   686  		if strings.Compare(p.Status.Conditions[0].Reason, "Unschedulable") != 0 {
   687  			t.Fatalf("Failed as Pod %s reason was: %s but expected: Unschedulable", podName, p.Status.Conditions[0].Reason)
   688  		}
   689  		if !strings.Contains(p.Status.Conditions[0].Message, "node(s) didn't match Pod's node affinity") {
   690  			t.Fatalf("Failed as Pod's %s failure message does not contain expected message: node(s) didn't match Pod's node affinity. Got message %q", podName, p.Status.Conditions[0].Message)
   691  		}
   692  		// Deleting test pod
   693  		if err := config.client.CoreV1().Pods(config.ns).Delete(context.TODO(), podName, metav1.DeleteOptions{}); err != nil {
   694  			t.Fatalf("Failed to delete Pod %s: %v", podName, err)
   695  		}
   696  	}
   697  }
   698  
   699  func TestVolumeProvision(t *testing.T) {
   700  	config := setupCluster(t, "volume-scheduling", 1, 0, 0)
   701  	defer config.teardown()
   702  
   703  	type testcaseType struct {
   704  		pod             *v1.Pod
   705  		pvs             []*testPV
   706  		boundPvcs       []*testPVC
   707  		provisionedPvcs []*testPVC
   708  		// Create these, but they should not be bound in the end
   709  		unboundPvcs []*testPVC
   710  		shouldFail  bool
   711  	}
   712  
   713  	cases := map[string]testcaseType{
   714  		"wait provisioned": {
   715  			pod:             makePod("pod-pvc-canprovision", config.ns, []string{"pvc-canprovision"}),
   716  			provisionedPvcs: []*testPVC{{"pvc-canprovision", classWait, ""}},
   717  		},
   718  		"topolgy unsatisfied": {
   719  			pod:         makePod("pod-pvc-topomismatch", config.ns, []string{"pvc-topomismatch"}),
   720  			unboundPvcs: []*testPVC{{"pvc-topomismatch", classTopoMismatch, ""}},
   721  			shouldFail:  true,
   722  		},
   723  		"wait one bound, one provisioned": {
   724  			pod:             makePod("pod-pvc-canbind-or-provision", config.ns, []string{"pvc-w-canbind", "pvc-canprovision"}),
   725  			pvs:             []*testPV{{"pv-w-canbind", classWait, "", node1}},
   726  			boundPvcs:       []*testPVC{{"pvc-w-canbind", classWait, ""}},
   727  			provisionedPvcs: []*testPVC{{"pvc-canprovision", classWait, ""}},
   728  		},
   729  		"one immediate pv prebound, one wait provisioned": {
   730  			pod:             makePod("pod-i-pv-prebound-w-provisioned", config.ns, []string{"pvc-i-pv-prebound", "pvc-canprovision"}),
   731  			pvs:             []*testPV{{"pv-i-prebound", classImmediate, "pvc-i-pv-prebound", node1}},
   732  			boundPvcs:       []*testPVC{{"pvc-i-pv-prebound", classImmediate, ""}},
   733  			provisionedPvcs: []*testPVC{{"pvc-canprovision", classWait, ""}},
   734  		},
   735  		"wait one pv prebound, one provisioned": {
   736  			pod:             makePod("pod-w-pv-prebound-w-provisioned", config.ns, []string{"pvc-w-pv-prebound", "pvc-canprovision"}),
   737  			pvs:             []*testPV{{"pv-w-prebound", classWait, "pvc-w-pv-prebound", node1}},
   738  			boundPvcs:       []*testPVC{{"pvc-w-pv-prebound", classWait, ""}},
   739  			provisionedPvcs: []*testPVC{{"pvc-canprovision", classWait, ""}},
   740  		},
   741  		"immediate provisioned by controller": {
   742  			pod: makePod("pod-i-unbound", config.ns, []string{"pvc-controller-provisioned"}),
   743  			// A pvc of immediate binding mode is expected to be provisioned by controller,
   744  			// we treat it as "bound" here because it is supposed to be in same state
   745  			// with bound claims, i.e. in bound status and has no selectedNode annotation.
   746  			boundPvcs: []*testPVC{{"pvc-controller-provisioned", classImmediate, ""}},
   747  		},
   748  	}
   749  
   750  	run := func(t *testing.T, test testcaseType) {
   751  		t.Log("Creating StorageClass")
   752  		suffix := rand.String(4)
   753  		classes := map[string]*storagev1.StorageClass{}
   754  		classes[classImmediate] = makeDynamicProvisionerStorageClass(fmt.Sprintf("immediate-%v", suffix), &modeImmediate, nil)
   755  		classes[classWait] = makeDynamicProvisionerStorageClass(fmt.Sprintf("wait-%v", suffix), &modeWait, nil)
   756  		topo := []v1.TopologySelectorTerm{
   757  			{
   758  				MatchLabelExpressions: []v1.TopologySelectorLabelRequirement{
   759  					{
   760  						Key:    nodeAffinityLabelKey,
   761  						Values: []string{node2},
   762  					},
   763  				},
   764  			},
   765  		}
   766  		classes[classTopoMismatch] = makeDynamicProvisionerStorageClass(fmt.Sprintf("topomismatch-%v", suffix), &modeWait, topo)
   767  		for _, sc := range classes {
   768  			if _, err := config.client.StorageV1().StorageClasses().Create(context.TODO(), sc, metav1.CreateOptions{}); err != nil {
   769  				t.Fatalf("Failed to create StorageClass %q: %v", sc.Name, err)
   770  			}
   771  		}
   772  
   773  		t.Log("Creating PVs")
   774  		for _, pvConfig := range test.pvs {
   775  			pv := makePV(pvConfig.name, classes[pvConfig.scName].Name, pvConfig.preboundPVC, config.ns, pvConfig.node)
   776  			if _, err := config.client.CoreV1().PersistentVolumes().Create(context.TODO(), pv, metav1.CreateOptions{}); err != nil {
   777  				t.Fatalf("Failed to create PersistentVolume %q: %v", pv.Name, err)
   778  			}
   779  			if err := waitForPVPhase(config.client, pvConfig.name, v1.VolumeAvailable); err != nil {
   780  				t.Fatalf("PersistentVolume %q failed to become available: %v", pvConfig.name, err)
   781  			}
   782  		}
   783  
   784  		t.Log("Creating PVCs")
   785  		for _, pvcConfig := range test.boundPvcs {
   786  			pvc := makePVC(pvcConfig.name, config.ns, &classes[pvcConfig.scName].Name, pvcConfig.preboundPV)
   787  			if _, err := config.client.CoreV1().PersistentVolumeClaims(config.ns).Create(context.TODO(), pvc, metav1.CreateOptions{}); err != nil {
   788  				t.Fatalf("Failed to create PersistentVolumeClaim %q: %v", pvc.Name, err)
   789  			}
   790  		}
   791  
   792  		t.Log("Creating unbound PVCs")
   793  		for _, pvcConfig := range test.unboundPvcs {
   794  			pvc := makePVC(pvcConfig.name, config.ns, &classes[pvcConfig.scName].Name, pvcConfig.preboundPV)
   795  			if _, err := config.client.CoreV1().PersistentVolumeClaims(config.ns).Create(context.TODO(), pvc, metav1.CreateOptions{}); err != nil {
   796  				t.Fatalf("Failed to create PersistentVolumeClaim %q: %v", pvc.Name, err)
   797  			}
   798  		}
   799  
   800  		t.Log("Creating unbound PVCs which should be dynamically provisioned")
   801  		for _, pvcConfig := range test.provisionedPvcs {
   802  			pvc := makePVC(pvcConfig.name, config.ns, &classes[pvcConfig.scName].Name, pvcConfig.preboundPV)
   803  			if _, err := config.client.CoreV1().PersistentVolumeClaims(config.ns).Create(context.TODO(), pvc, metav1.CreateOptions{}); err != nil {
   804  				t.Fatalf("Failed to create PersistentVolumeClaim %q: %v", pvc.Name, err)
   805  			}
   806  		}
   807  
   808  		t.Log("Creating the pod to schedule")
   809  		if _, err := config.client.CoreV1().Pods(config.ns).Create(context.TODO(), test.pod, metav1.CreateOptions{}); err != nil {
   810  			t.Fatalf("Failed to create Pod %q: %v", test.pod.Name, err)
   811  		}
   812  		if test.shouldFail {
   813  			if err := waitForPodUnschedulable(config.client, test.pod); err != nil {
   814  				t.Errorf("Pod %q was not unschedulable: %v", test.pod.Name, err)
   815  			}
   816  		} else {
   817  			if err := waitForPodToSchedule(config.client, test.pod); err != nil {
   818  				t.Errorf("Failed to schedule Pod %q: %v", test.pod.Name, err)
   819  			}
   820  		}
   821  
   822  		t.Log("Validating PVC/PV binding")
   823  		for _, pvc := range test.boundPvcs {
   824  			validatePVCPhase(t, config.client, pvc.name, config.ns, v1.ClaimBound, false)
   825  		}
   826  		for _, pvc := range test.unboundPvcs {
   827  			validatePVCPhase(t, config.client, pvc.name, config.ns, v1.ClaimPending, false)
   828  		}
   829  		for _, pvc := range test.provisionedPvcs {
   830  			validatePVCPhase(t, config.client, pvc.name, config.ns, v1.ClaimBound, true)
   831  		}
   832  		for _, pv := range test.pvs {
   833  			validatePVPhase(t, config.client, pv.name, v1.VolumeBound)
   834  		}
   835  
   836  		// Force delete objects, but they still may not be immediately removed
   837  		t.Log("Deleting test objects")
   838  		deleteTestObjects(config.client, config.ns, deleteOption)
   839  	}
   840  
   841  	for name, test := range cases {
   842  		t.Run(name, func(t *testing.T) { run(t, test) })
   843  	}
   844  }
   845  
   846  // TestCapacity covers different scenarios involving CSIStorageCapacity objects.
   847  func TestCapacity(t *testing.T) {
   848  	config := setupCluster(t, "volume-scheduling", 1, 0, 0)
   849  	defer config.teardown()
   850  
   851  	type testcaseType struct {
   852  		pod               *v1.Pod
   853  		pvcs              []*testPVC
   854  		haveCapacity      bool
   855  		capacitySupported bool
   856  	}
   857  
   858  	cases := map[string]testcaseType{
   859  		"baseline": {
   860  			pod:  makePod("pod-pvc-canprovision", config.ns, []string{"pvc-canprovision"}),
   861  			pvcs: []*testPVC{{"pvc-canprovision", classWait, ""}},
   862  		},
   863  		"out of space": {
   864  			pod:               makePod("pod-pvc-canprovision", config.ns, []string{"pvc-canprovision"}),
   865  			pvcs:              []*testPVC{{"pvc-canprovision", classWait, ""}},
   866  			capacitySupported: true,
   867  		},
   868  		"with space": {
   869  			pod:               makePod("pod-pvc-canprovision", config.ns, []string{"pvc-canprovision"}),
   870  			pvcs:              []*testPVC{{"pvc-canprovision", classWait, ""}},
   871  			capacitySupported: true,
   872  			haveCapacity:      true,
   873  		},
   874  		"ignored": {
   875  			pod:          makePod("pod-pvc-canprovision", config.ns, []string{"pvc-canprovision"}),
   876  			pvcs:         []*testPVC{{"pvc-canprovision", classWait, ""}},
   877  			haveCapacity: true,
   878  		},
   879  	}
   880  
   881  	run := func(t *testing.T, test testcaseType) {
   882  		// Create StorageClasses
   883  		suffix := rand.String(4)
   884  		classes := map[string]*storagev1.StorageClass{}
   885  		classes[classImmediate] = makeDynamicProvisionerStorageClass(fmt.Sprintf("immediate-%v", suffix), &modeImmediate, nil)
   886  		classes[classWait] = makeDynamicProvisionerStorageClass(fmt.Sprintf("wait-%v", suffix), &modeWait, nil)
   887  		topo := []v1.TopologySelectorTerm{
   888  			{
   889  				MatchLabelExpressions: []v1.TopologySelectorLabelRequirement{
   890  					{
   891  						Key:    nodeAffinityLabelKey,
   892  						Values: []string{node2},
   893  					},
   894  				},
   895  			},
   896  		}
   897  		classes[classTopoMismatch] = makeDynamicProvisionerStorageClass(fmt.Sprintf("topomismatch-%v", suffix), &modeWait, topo)
   898  		for _, sc := range classes {
   899  			if _, err := config.client.StorageV1().StorageClasses().Create(context.TODO(), sc, metav1.CreateOptions{}); err != nil {
   900  				t.Fatalf("Failed to create StorageClass %q: %v", sc.Name, err)
   901  			}
   902  		}
   903  
   904  		// The provisioner isn't actually a CSI driver, but
   905  		// that doesn't matter here.
   906  		if test.capacitySupported {
   907  			if _, err := config.client.StorageV1().CSIDrivers().Create(context.TODO(),
   908  				&storagev1.CSIDriver{
   909  					ObjectMeta: metav1.ObjectMeta{
   910  						Name: provisionerPluginName,
   911  					},
   912  					Spec: storagev1.CSIDriverSpec{
   913  						StorageCapacity: &test.capacitySupported,
   914  					},
   915  				},
   916  				metav1.CreateOptions{}); err != nil {
   917  				t.Fatalf("Failed to create CSIDriver: %v", err)
   918  			}
   919  
   920  			// kube-scheduler may need some time before it gets the CSIDriver object.
   921  			// Without it, scheduling will happen without considering capacity, which
   922  			// is not what we want to test.
   923  			time.Sleep(5 * time.Second)
   924  		}
   925  
   926  		// Create CSIStorageCapacity
   927  		if test.haveCapacity {
   928  			if _, err := config.client.StorageV1().CSIStorageCapacities("default").Create(context.TODO(),
   929  				&storagev1.CSIStorageCapacity{
   930  					ObjectMeta: metav1.ObjectMeta{
   931  						GenerateName: "foo-",
   932  					},
   933  					StorageClassName: classes[classWait].Name,
   934  					NodeTopology:     &metav1.LabelSelector{},
   935  					// More than the 5Gi used in makePVC.
   936  					Capacity: resource.NewQuantity(6*1024*1024*1024, resource.BinarySI),
   937  				},
   938  				metav1.CreateOptions{}); err != nil {
   939  				t.Fatalf("Failed to create CSIStorageCapacity: %v", err)
   940  			}
   941  		}
   942  
   943  		// Create PVCs
   944  		for _, pvcConfig := range test.pvcs {
   945  			pvc := makePVC(pvcConfig.name, config.ns, &classes[pvcConfig.scName].Name, pvcConfig.preboundPV)
   946  			if _, err := config.client.CoreV1().PersistentVolumeClaims(config.ns).Create(context.TODO(), pvc, metav1.CreateOptions{}); err != nil {
   947  				t.Fatalf("Failed to create PersistentVolumeClaim %q: %v", pvc.Name, err)
   948  			}
   949  		}
   950  
   951  		// Create Pod
   952  		if _, err := config.client.CoreV1().Pods(config.ns).Create(context.TODO(), test.pod, metav1.CreateOptions{}); err != nil {
   953  			t.Fatalf("Failed to create Pod %q: %v", test.pod.Name, err)
   954  		}
   955  
   956  		// Lack of capacity prevents pod scheduling and binding.
   957  		shouldFail := test.capacitySupported && !test.haveCapacity
   958  		if shouldFail {
   959  			if err := waitForPodUnschedulable(config.client, test.pod); err != nil {
   960  				t.Errorf("Pod %q was not unschedulable: %v", test.pod.Name, err)
   961  			}
   962  		} else {
   963  			if err := waitForPodToSchedule(config.client, test.pod); err != nil {
   964  				t.Errorf("Failed to schedule Pod %q: %v", test.pod.Name, err)
   965  			}
   966  		}
   967  
   968  		// Validate
   969  		for _, pvc := range test.pvcs {
   970  			if shouldFail {
   971  				validatePVCPhase(t, config.client, pvc.name, config.ns, v1.ClaimPending, false)
   972  			} else {
   973  				validatePVCPhase(t, config.client, pvc.name, config.ns, v1.ClaimBound, true)
   974  			}
   975  		}
   976  
   977  		// Force delete objects, but they still may not be immediately removed
   978  		deleteTestObjects(config.client, config.ns, deleteOption)
   979  	}
   980  
   981  	for name, test := range cases {
   982  		t.Run(name, func(t *testing.T) { run(t, test) })
   983  	}
   984  }
   985  
   986  // TestRescheduleProvisioning validate that PV controller will remove
   987  // selectedNode annotation from a claim to reschedule volume provision
   988  // on provision failure.
   989  func TestRescheduleProvisioning(t *testing.T) {
   990  	testCtx := testutil.InitTestAPIServer(t, "reschedule-volume-provision", nil)
   991  
   992  	clientset := testCtx.ClientSet
   993  	ns := testCtx.NS.Name
   994  
   995  	defer func() {
   996  		deleteTestObjects(clientset, ns, metav1.DeleteOptions{})
   997  	}()
   998  
   999  	ctrl, informerFactory, err := initPVController(t, testCtx, 0)
  1000  	if err != nil {
  1001  		t.Fatalf("Failed to create PV controller: %v", err)
  1002  	}
  1003  
  1004  	// Prepare node and storage class.
  1005  	testNode := makeNode(1)
  1006  	if _, err := clientset.CoreV1().Nodes().Create(context.TODO(), testNode, metav1.CreateOptions{}); err != nil {
  1007  		t.Fatalf("Failed to create Node %q: %v", testNode.Name, err)
  1008  	}
  1009  	scName := "fail-provision"
  1010  	sc := makeDynamicProvisionerStorageClass(scName, &modeWait, nil)
  1011  	// Expect the storage class fail to provision.
  1012  	sc.Parameters[volumetest.ExpectProvisionFailureKey] = ""
  1013  	if _, err := clientset.StorageV1().StorageClasses().Create(context.TODO(), sc, metav1.CreateOptions{}); err != nil {
  1014  		t.Fatalf("Failed to create StorageClass %q: %v", sc.Name, err)
  1015  	}
  1016  
  1017  	// Create a pvc with selected node annotation.
  1018  	pvcName := "pvc-fail-to-provision"
  1019  	pvc := makePVC(pvcName, ns, &scName, "")
  1020  	pvc.Annotations = map[string]string{"volume.kubernetes.io/selected-node": node1}
  1021  	pvc, err = clientset.CoreV1().PersistentVolumeClaims(ns).Create(context.TODO(), pvc, metav1.CreateOptions{})
  1022  	if err != nil {
  1023  		t.Fatalf("Failed to create PersistentVolumeClaim %q: %v", pvc.Name, err)
  1024  	}
  1025  	// Validate selectedNode annotation exists on created claim.
  1026  	selectedNodeAnn, exist := pvc.Annotations["volume.kubernetes.io/selected-node"]
  1027  	if !exist || selectedNodeAnn != node1 {
  1028  		t.Fatalf("Created pvc is not annotated as expected")
  1029  	}
  1030  
  1031  	// Start controller.
  1032  	go ctrl.Run(testCtx.Ctx)
  1033  	informerFactory.Start(testCtx.Ctx.Done())
  1034  	informerFactory.WaitForCacheSync(testCtx.Ctx.Done())
  1035  
  1036  	// Validate that the annotation is removed by controller for provision reschedule.
  1037  	if err := waitForProvisionAnn(clientset, pvc, false); err != nil {
  1038  		t.Errorf("Expect to reschedule provision for PVC %v/%v, but still found selected-node annotation on it", ns, pvcName)
  1039  	}
  1040  }
  1041  
  1042  func setupCluster(t *testing.T, nsName string, numberOfNodes int, resyncPeriod time.Duration, provisionDelaySeconds int) *testConfig {
  1043  	testCtx := testutil.InitTestSchedulerWithOptions(t, testutil.InitTestAPIServer(t, nsName, nil), resyncPeriod)
  1044  	testutil.SyncSchedulerInformerFactory(testCtx)
  1045  	go testCtx.Scheduler.Run(testCtx.Ctx)
  1046  
  1047  	clientset := testCtx.ClientSet
  1048  	ns := testCtx.NS.Name
  1049  
  1050  	ctrl, informerFactory, err := initPVController(t, testCtx, provisionDelaySeconds)
  1051  	if err != nil {
  1052  		t.Fatalf("Failed to create PV controller: %v", err)
  1053  	}
  1054  	go ctrl.Run(testCtx.Ctx)
  1055  	// Start informer factory after all controllers are configured and running.
  1056  	informerFactory.Start(testCtx.Ctx.Done())
  1057  	informerFactory.WaitForCacheSync(testCtx.Ctx.Done())
  1058  
  1059  	// Create shared objects
  1060  	// Create nodes
  1061  	for i := 0; i < numberOfNodes; i++ {
  1062  		testNode := makeNode(i + 1)
  1063  		if _, err := clientset.CoreV1().Nodes().Create(context.TODO(), testNode, metav1.CreateOptions{}); err != nil {
  1064  			t.Fatalf("Failed to create Node %q: %v", testNode.Name, err)
  1065  		}
  1066  	}
  1067  
  1068  	// Create SCs
  1069  	for _, sc := range sharedClasses {
  1070  		if _, err := clientset.StorageV1().StorageClasses().Create(context.TODO(), sc, metav1.CreateOptions{}); err != nil {
  1071  			t.Fatalf("Failed to create StorageClass %q: %v", sc.Name, err)
  1072  		}
  1073  	}
  1074  
  1075  	return &testConfig{
  1076  		client: clientset,
  1077  		ns:     ns,
  1078  		stop:   testCtx.Ctx.Done(),
  1079  		teardown: func() {
  1080  			klog.Infof("test cluster %q start to tear down", ns)
  1081  			deleteTestObjects(clientset, ns, metav1.DeleteOptions{})
  1082  		},
  1083  	}
  1084  }
  1085  
  1086  func initPVController(t *testing.T, testCtx *testutil.TestContext, provisionDelaySeconds int) (*persistentvolume.PersistentVolumeController, informers.SharedInformerFactory, error) {
  1087  	clientset := testCtx.ClientSet
  1088  	// Informers factory for controllers
  1089  	informerFactory := informers.NewSharedInformerFactory(clientset, 0)
  1090  
  1091  	// Start PV controller for volume binding.
  1092  	host := volumetest.NewFakeVolumeHost(t, "/tmp/fake", nil, nil)
  1093  	plugin := &volumetest.FakeVolumePlugin{
  1094  		PluginName:             provisionerPluginName,
  1095  		Host:                   host,
  1096  		Config:                 volume.VolumeConfig{},
  1097  		LastProvisionerOptions: volume.VolumeOptions{},
  1098  		ProvisionDelaySeconds:  provisionDelaySeconds,
  1099  		NewAttacherCallCount:   0,
  1100  		NewDetacherCallCount:   0,
  1101  		Mounters:               nil,
  1102  		Unmounters:             nil,
  1103  		Attachers:              nil,
  1104  		Detachers:              nil,
  1105  	}
  1106  	plugins := []volume.VolumePlugin{plugin}
  1107  
  1108  	params := persistentvolume.ControllerParameters{
  1109  		KubeClient: clientset,
  1110  		// Use a frequent resync period to retry API update conflicts due to
  1111  		// https://github.com/kubernetes/kubernetes/issues/85320
  1112  		SyncPeriod:                5 * time.Second,
  1113  		VolumePlugins:             plugins,
  1114  		Cloud:                     nil,
  1115  		ClusterName:               "volume-test-cluster",
  1116  		VolumeInformer:            informerFactory.Core().V1().PersistentVolumes(),
  1117  		ClaimInformer:             informerFactory.Core().V1().PersistentVolumeClaims(),
  1118  		ClassInformer:             informerFactory.Storage().V1().StorageClasses(),
  1119  		PodInformer:               informerFactory.Core().V1().Pods(),
  1120  		NodeInformer:              informerFactory.Core().V1().Nodes(),
  1121  		EnableDynamicProvisioning: true,
  1122  	}
  1123  	ctrl, err := persistentvolume.NewController(testCtx.Ctx, params)
  1124  	if err != nil {
  1125  		return nil, nil, err
  1126  	}
  1127  
  1128  	return ctrl, informerFactory, nil
  1129  }
  1130  
  1131  func deleteTestObjects(client clientset.Interface, ns string, option metav1.DeleteOptions) {
  1132  	client.CoreV1().Pods(ns).DeleteCollection(context.TODO(), option, metav1.ListOptions{})
  1133  	client.CoreV1().PersistentVolumeClaims(ns).DeleteCollection(context.TODO(), option, metav1.ListOptions{})
  1134  	client.CoreV1().PersistentVolumes().DeleteCollection(context.TODO(), option, metav1.ListOptions{})
  1135  	client.StorageV1().StorageClasses().DeleteCollection(context.TODO(), option, metav1.ListOptions{})
  1136  	client.StorageV1().CSIDrivers().DeleteCollection(context.TODO(), option, metav1.ListOptions{})
  1137  	client.StorageV1().CSIStorageCapacities("default").DeleteCollection(context.TODO(), option, metav1.ListOptions{})
  1138  }
  1139  
  1140  func makeStorageClass(name string, mode *storagev1.VolumeBindingMode) *storagev1.StorageClass {
  1141  	return &storagev1.StorageClass{
  1142  		ObjectMeta: metav1.ObjectMeta{
  1143  			Name: name,
  1144  		},
  1145  		Provisioner:       "kubernetes.io/no-provisioner",
  1146  		VolumeBindingMode: mode,
  1147  	}
  1148  }
  1149  
  1150  func makeDynamicProvisionerStorageClass(name string, mode *storagev1.VolumeBindingMode, allowedTopologies []v1.TopologySelectorTerm) *storagev1.StorageClass {
  1151  	return &storagev1.StorageClass{
  1152  		ObjectMeta: metav1.ObjectMeta{
  1153  			Name: name,
  1154  		},
  1155  		Provisioner:       provisionerPluginName,
  1156  		VolumeBindingMode: mode,
  1157  		AllowedTopologies: allowedTopologies,
  1158  		Parameters:        map[string]string{},
  1159  	}
  1160  }
  1161  
  1162  func makePV(name, scName, pvcName, ns, node string) *v1.PersistentVolume {
  1163  	pv := &v1.PersistentVolume{
  1164  		ObjectMeta: metav1.ObjectMeta{
  1165  			Name:        name,
  1166  			Annotations: map[string]string{},
  1167  		},
  1168  		Spec: v1.PersistentVolumeSpec{
  1169  			Capacity: v1.ResourceList{
  1170  				v1.ResourceName(v1.ResourceStorage): resource.MustParse("5Gi"),
  1171  			},
  1172  			AccessModes: []v1.PersistentVolumeAccessMode{
  1173  				v1.ReadWriteOnce,
  1174  			},
  1175  			StorageClassName: scName,
  1176  			PersistentVolumeSource: v1.PersistentVolumeSource{
  1177  				Local: &v1.LocalVolumeSource{
  1178  					Path: "/test-path",
  1179  				},
  1180  			},
  1181  		},
  1182  	}
  1183  
  1184  	if pvcName != "" {
  1185  		pv.Spec.ClaimRef = &v1.ObjectReference{Name: pvcName, Namespace: ns}
  1186  	}
  1187  
  1188  	if node != "" {
  1189  		pv.Spec.NodeAffinity = &v1.VolumeNodeAffinity{
  1190  			Required: &v1.NodeSelector{
  1191  				NodeSelectorTerms: []v1.NodeSelectorTerm{
  1192  					{
  1193  						MatchExpressions: []v1.NodeSelectorRequirement{
  1194  							{
  1195  								Key:      nodeAffinityLabelKey,
  1196  								Operator: v1.NodeSelectorOpIn,
  1197  								Values:   []string{node},
  1198  							},
  1199  						},
  1200  					},
  1201  				},
  1202  			},
  1203  		}
  1204  	}
  1205  
  1206  	return pv
  1207  }
  1208  
  1209  func makePVC(name, ns string, scName *string, volumeName string) *v1.PersistentVolumeClaim {
  1210  	return &v1.PersistentVolumeClaim{
  1211  		ObjectMeta: metav1.ObjectMeta{
  1212  			Name:      name,
  1213  			Namespace: ns,
  1214  		},
  1215  		Spec: v1.PersistentVolumeClaimSpec{
  1216  			AccessModes: []v1.PersistentVolumeAccessMode{
  1217  				v1.ReadWriteOnce,
  1218  			},
  1219  			Resources: v1.VolumeResourceRequirements{
  1220  				Requests: v1.ResourceList{
  1221  					v1.ResourceName(v1.ResourceStorage): resource.MustParse("5Gi"),
  1222  				},
  1223  			},
  1224  			StorageClassName: scName,
  1225  			VolumeName:       volumeName,
  1226  		},
  1227  	}
  1228  }
  1229  
  1230  func makePod(name, ns string, pvcs []string) *v1.Pod {
  1231  	volumes := []v1.Volume{}
  1232  	for i, pvc := range pvcs {
  1233  		volumes = append(volumes, v1.Volume{
  1234  			Name: fmt.Sprintf("vol%v", i),
  1235  			VolumeSource: v1.VolumeSource{
  1236  				PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
  1237  					ClaimName: pvc,
  1238  				},
  1239  			},
  1240  		})
  1241  	}
  1242  
  1243  	return &v1.Pod{
  1244  		ObjectMeta: metav1.ObjectMeta{
  1245  			Name:      name,
  1246  			Namespace: ns,
  1247  			Labels: map[string]string{
  1248  				"app": "volume-binding-test",
  1249  			},
  1250  		},
  1251  		Spec: v1.PodSpec{
  1252  			Containers: []v1.Container{
  1253  				{
  1254  					Name:    "write-pod",
  1255  					Image:   imageutils.GetE2EImage(imageutils.BusyBox),
  1256  					Command: []string{"/bin/sh"},
  1257  					Args:    []string{"-c", "while true; do sleep 1; done"},
  1258  				},
  1259  			},
  1260  			Volumes: volumes,
  1261  		},
  1262  	}
  1263  }
  1264  
  1265  // makeNode creates a node with the name "node-<index>"
  1266  func makeNode(index int) *v1.Node {
  1267  	name := fmt.Sprintf("node-%d", index)
  1268  	return &v1.Node{
  1269  		ObjectMeta: metav1.ObjectMeta{
  1270  			Name:   name,
  1271  			Labels: map[string]string{nodeAffinityLabelKey: name},
  1272  		},
  1273  		Spec: v1.NodeSpec{Unschedulable: false},
  1274  		Status: v1.NodeStatus{
  1275  			Capacity: v1.ResourceList{
  1276  				v1.ResourcePods: *resource.NewQuantity(podLimit, resource.DecimalSI),
  1277  			},
  1278  			Conditions: []v1.NodeCondition{
  1279  				{
  1280  					Type:              v1.NodeReady,
  1281  					Status:            v1.ConditionTrue,
  1282  					Reason:            fmt.Sprintf("schedulable condition"),
  1283  					LastHeartbeatTime: metav1.Time{Time: time.Now()},
  1284  				},
  1285  			},
  1286  		},
  1287  	}
  1288  }
  1289  
  1290  func validatePVCPhase(t *testing.T, client clientset.Interface, pvcName string, ns string, phase v1.PersistentVolumeClaimPhase, isProvisioned bool) {
  1291  	claim, err := client.CoreV1().PersistentVolumeClaims(ns).Get(context.TODO(), pvcName, metav1.GetOptions{})
  1292  	if err != nil {
  1293  		t.Errorf("Failed to get PVC %v/%v: %v", ns, pvcName, err)
  1294  	}
  1295  
  1296  	if claim.Status.Phase != phase {
  1297  		t.Errorf("PVC %v/%v phase not %v, got %v", ns, pvcName, phase, claim.Status.Phase)
  1298  	}
  1299  
  1300  	// Check whether the bound claim is provisioned/bound as expect.
  1301  	if phase == v1.ClaimBound {
  1302  		if err := validateProvisionAnn(claim, isProvisioned); err != nil {
  1303  			t.Errorf("Provisoning annotation on PVC %v/%v not as expected: %v", ns, pvcName, err)
  1304  		}
  1305  	}
  1306  }
  1307  
  1308  func validateProvisionAnn(claim *v1.PersistentVolumeClaim, volIsProvisioned bool) error {
  1309  	selectedNode, provisionAnnoExist := claim.Annotations["volume.kubernetes.io/selected-node"]
  1310  	if volIsProvisioned {
  1311  		if !provisionAnnoExist {
  1312  			return fmt.Errorf("PVC %v/%v expected to be provisioned, but no selected-node annotation found", claim.Namespace, claim.Name)
  1313  		}
  1314  		if selectedNode != node1 {
  1315  			return fmt.Errorf("PVC %v/%v expected to be annotated as %v, but got %v", claim.Namespace, claim.Name, node1, selectedNode)
  1316  		}
  1317  	}
  1318  	if !volIsProvisioned && provisionAnnoExist {
  1319  		return fmt.Errorf("PVC %v/%v not expected to be provisioned, but found selected-node annotation", claim.Namespace, claim.Name)
  1320  	}
  1321  
  1322  	return nil
  1323  }
  1324  
  1325  func waitForProvisionAnn(client clientset.Interface, pvc *v1.PersistentVolumeClaim, annShouldExist bool) error {
  1326  	return wait.Poll(time.Second, 30*time.Second, func() (bool, error) {
  1327  		claim, err := client.CoreV1().PersistentVolumeClaims(pvc.Namespace).Get(context.TODO(), pvc.Name, metav1.GetOptions{})
  1328  		if err != nil {
  1329  			return false, err
  1330  		}
  1331  		if err := validateProvisionAnn(claim, annShouldExist); err == nil {
  1332  			return true, nil
  1333  		}
  1334  		return false, nil
  1335  	})
  1336  }
  1337  
  1338  func validatePVPhase(t *testing.T, client clientset.Interface, pvName string, phase v1.PersistentVolumePhase) {
  1339  	pv, err := client.CoreV1().PersistentVolumes().Get(context.TODO(), pvName, metav1.GetOptions{})
  1340  	if err != nil {
  1341  		t.Errorf("Failed to get PV %v: %v", pvName, err)
  1342  	}
  1343  
  1344  	if pv.Status.Phase != phase {
  1345  		t.Errorf("PV %v phase not %v, got %v", pvName, phase, pv.Status.Phase)
  1346  	}
  1347  }
  1348  
  1349  func waitForPVPhase(client clientset.Interface, pvName string, phase v1.PersistentVolumePhase) error {
  1350  	return wait.PollImmediate(time.Second, 30*time.Second, func() (bool, error) {
  1351  		pv, err := client.CoreV1().PersistentVolumes().Get(context.TODO(), pvName, metav1.GetOptions{})
  1352  		if err != nil {
  1353  			return false, err
  1354  		}
  1355  
  1356  		if pv.Status.Phase == phase {
  1357  			return true, nil
  1358  		}
  1359  		return false, nil
  1360  	})
  1361  }
  1362  
  1363  func waitForPVCBound(client clientset.Interface, pvc *v1.PersistentVolumeClaim) error {
  1364  	return wait.Poll(time.Second, 30*time.Second, func() (bool, error) {
  1365  		claim, err := client.CoreV1().PersistentVolumeClaims(pvc.Namespace).Get(context.TODO(), pvc.Name, metav1.GetOptions{})
  1366  		if err != nil {
  1367  			return false, err
  1368  		}
  1369  		if claim.Status.Phase == v1.ClaimBound {
  1370  			return true, nil
  1371  		}
  1372  		return false, nil
  1373  	})
  1374  }
  1375  
  1376  func markNodeAffinity(pod *v1.Pod, node string) {
  1377  	affinity := &v1.Affinity{
  1378  		NodeAffinity: &v1.NodeAffinity{
  1379  			RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
  1380  				NodeSelectorTerms: []v1.NodeSelectorTerm{
  1381  					{
  1382  						MatchExpressions: []v1.NodeSelectorRequirement{
  1383  							{
  1384  								Key:      nodeAffinityLabelKey,
  1385  								Operator: v1.NodeSelectorOpIn,
  1386  								Values:   []string{node},
  1387  							},
  1388  						},
  1389  					},
  1390  				},
  1391  			},
  1392  		},
  1393  	}
  1394  	pod.Spec.Affinity = affinity
  1395  }
  1396  
  1397  func markNodeSelector(pod *v1.Pod, node string) {
  1398  	ns := map[string]string{
  1399  		nodeAffinityLabelKey: node,
  1400  	}
  1401  	pod.Spec.NodeSelector = ns
  1402  }
  1403  

View as plain text