...

Source file src/k8s.io/kubernetes/test/e2e/storage/testsuites/topology.go

Documentation: k8s.io/kubernetes/test/e2e/storage/testsuites

     1  /*
     2  Copyright 2019 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  // This suite tests volume topology
    18  
    19  package testsuites
    20  
    21  import (
    22  	"context"
    23  	"fmt"
    24  	"math/rand"
    25  
    26  	"github.com/onsi/ginkgo/v2"
    27  	"github.com/onsi/gomega"
    28  
    29  	v1 "k8s.io/api/core/v1"
    30  	storagev1 "k8s.io/api/storage/v1"
    31  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    32  	clientset "k8s.io/client-go/kubernetes"
    33  	"k8s.io/kubernetes/test/e2e/framework"
    34  	e2enode "k8s.io/kubernetes/test/e2e/framework/node"
    35  	e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
    36  	e2epv "k8s.io/kubernetes/test/e2e/framework/pv"
    37  	e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
    38  	storageframework "k8s.io/kubernetes/test/e2e/storage/framework"
    39  	storageutils "k8s.io/kubernetes/test/e2e/storage/utils"
    40  	admissionapi "k8s.io/pod-security-admission/api"
    41  )
    42  
    43  type topologyTestSuite struct {
    44  	tsInfo storageframework.TestSuiteInfo
    45  }
    46  
    47  type topologyTest struct {
    48  	config *storageframework.PerTestConfig
    49  
    50  	resource      storageframework.VolumeResource
    51  	pod           *v1.Pod
    52  	allTopologies []topology
    53  }
    54  
    55  type topology map[string]string
    56  
    57  // InitCustomTopologyTestSuite returns topologyTestSuite that implements TestSuite interface
    58  // using custom test patterns
    59  func InitCustomTopologyTestSuite(patterns []storageframework.TestPattern) storageframework.TestSuite {
    60  	return &topologyTestSuite{
    61  		tsInfo: storageframework.TestSuiteInfo{
    62  			Name:         "topology",
    63  			TestPatterns: patterns,
    64  		},
    65  	}
    66  }
    67  
    68  // InitTopologyTestSuite returns topologyTestSuite that implements TestSuite interface
    69  // using testsuite default patterns
    70  func InitTopologyTestSuite() storageframework.TestSuite {
    71  	patterns := []storageframework.TestPattern{
    72  		storageframework.TopologyImmediate,
    73  		storageframework.TopologyDelayed,
    74  	}
    75  	return InitCustomTopologyTestSuite(patterns)
    76  }
    77  
    78  func (t *topologyTestSuite) GetTestSuiteInfo() storageframework.TestSuiteInfo {
    79  	return t.tsInfo
    80  }
    81  
    82  func (t *topologyTestSuite) SkipUnsupportedTests(driver storageframework.TestDriver, pattern storageframework.TestPattern) {
    83  	dInfo := driver.GetDriverInfo()
    84  	var ok bool
    85  	_, ok = driver.(storageframework.DynamicPVTestDriver)
    86  	if !ok {
    87  		e2eskipper.Skipf("Driver %s doesn't support %v -- skipping", dInfo.Name, pattern.VolType)
    88  	}
    89  
    90  	if !dInfo.Capabilities[storageframework.CapTopology] {
    91  		e2eskipper.Skipf("Driver %q does not support topology - skipping", dInfo.Name)
    92  	}
    93  }
    94  
    95  func (t *topologyTestSuite) DefineTests(driver storageframework.TestDriver, pattern storageframework.TestPattern) {
    96  	var (
    97  		dInfo   = driver.GetDriverInfo()
    98  		dDriver storageframework.DynamicPVTestDriver
    99  		cs      clientset.Interface
   100  		err     error
   101  	)
   102  
   103  	// Beware that it also registers an AfterEach which renders f unusable. Any code using
   104  	// f must run inside an It or Context callback.
   105  	f := framework.NewFrameworkWithCustomTimeouts("topology", storageframework.GetDriverTimeouts(driver))
   106  	f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
   107  
   108  	init := func(ctx context.Context) *topologyTest {
   109  		dDriver, _ = driver.(storageframework.DynamicPVTestDriver)
   110  		l := &topologyTest{}
   111  
   112  		// Now do the more expensive test initialization.
   113  		l.config = driver.PrepareTest(ctx, f)
   114  
   115  		l.resource = storageframework.VolumeResource{
   116  			Config:  l.config,
   117  			Pattern: pattern,
   118  		}
   119  
   120  		// After driver is installed, check driver topologies on nodes
   121  		cs = f.ClientSet
   122  		keys := dInfo.TopologyKeys
   123  		if len(keys) == 0 {
   124  			e2eskipper.Skipf("Driver didn't provide topology keys -- skipping")
   125  		}
   126  
   127  		ginkgo.DeferCleanup(t.CleanupResources, cs, l)
   128  
   129  		if dInfo.NumAllowedTopologies == 0 {
   130  			// Any plugin that supports topology defaults to 1 topology
   131  			dInfo.NumAllowedTopologies = 1
   132  		}
   133  		// We collect 1 additional topology, if possible, for the conflicting topology test
   134  		// case, but it's not needed for the positive test
   135  		l.allTopologies, err = t.getCurrentTopologies(ctx, cs, keys, dInfo.NumAllowedTopologies+1)
   136  		framework.ExpectNoError(err, "failed to get current driver topologies")
   137  		if len(l.allTopologies) < dInfo.NumAllowedTopologies {
   138  			e2eskipper.Skipf("Not enough topologies in cluster -- skipping")
   139  		}
   140  
   141  		l.resource.Sc = dDriver.GetDynamicProvisionStorageClass(ctx, l.config, pattern.FsType)
   142  		gomega.Expect(l.resource.Sc).ToNot(gomega.BeNil(), "driver failed to provide a StorageClass")
   143  		l.resource.Sc.VolumeBindingMode = &pattern.BindingMode
   144  
   145  		testVolumeSizeRange := t.GetTestSuiteInfo().SupportedSizeRange
   146  		driverVolumeSizeRange := dDriver.GetDriverInfo().SupportedSizeRange
   147  		claimSize, err := storageutils.GetSizeRangesIntersection(testVolumeSizeRange, driverVolumeSizeRange)
   148  		framework.ExpectNoError(err, "determine intersection of test size range %+v and driver size range %+v", testVolumeSizeRange, driverVolumeSizeRange)
   149  		l.resource.Pvc = e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{
   150  			ClaimSize:        claimSize,
   151  			StorageClassName: &(l.resource.Sc.Name),
   152  		}, l.config.Framework.Namespace.Name)
   153  
   154  		migrationCheck := newMigrationOpCheck(ctx, f.ClientSet, f.ClientConfig(), dInfo.InTreePluginName)
   155  		ginkgo.DeferCleanup(migrationCheck.validateMigrationVolumeOpCounts)
   156  
   157  		return l
   158  	}
   159  
   160  	ginkgo.It("should provision a volume and schedule a pod with AllowedTopologies", func(ctx context.Context) {
   161  		l := init(ctx)
   162  
   163  		// If possible, exclude one topology, otherwise allow them all
   164  		excludedIndex := -1
   165  		if len(l.allTopologies) > dInfo.NumAllowedTopologies {
   166  			excludedIndex = rand.Intn(len(l.allTopologies))
   167  		}
   168  		allowedTopologies := t.setAllowedTopologies(l.resource.Sc, l.allTopologies, excludedIndex)
   169  
   170  		t.createResources(ctx, cs, l, nil)
   171  
   172  		err = e2epod.WaitTimeoutForPodRunningInNamespace(ctx, cs, l.pod.Name, l.pod.Namespace, f.Timeouts.PodStart)
   173  		framework.ExpectNoError(err)
   174  
   175  		ginkgo.By("Verifying pod scheduled to correct node")
   176  		pod, err := cs.CoreV1().Pods(l.pod.Namespace).Get(ctx, l.pod.Name, metav1.GetOptions{})
   177  		framework.ExpectNoError(err)
   178  
   179  		node, err := cs.CoreV1().Nodes().Get(ctx, pod.Spec.NodeName, metav1.GetOptions{})
   180  		framework.ExpectNoError(err)
   181  
   182  		t.verifyNodeTopology(node, allowedTopologies)
   183  	})
   184  
   185  	ginkgo.It("should fail to schedule a pod which has topologies that conflict with AllowedTopologies", func(ctx context.Context) {
   186  		l := init(ctx)
   187  
   188  		if len(l.allTopologies) < dInfo.NumAllowedTopologies+1 {
   189  			e2eskipper.Skipf("Not enough topologies in cluster -- skipping")
   190  		}
   191  
   192  		// Exclude one topology
   193  		excludedIndex := rand.Intn(len(l.allTopologies))
   194  		t.setAllowedTopologies(l.resource.Sc, l.allTopologies, excludedIndex)
   195  
   196  		// Set pod nodeSelector to the excluded topology
   197  		exprs := []v1.NodeSelectorRequirement{}
   198  		for k, v := range l.allTopologies[excludedIndex] {
   199  			exprs = append(exprs, v1.NodeSelectorRequirement{
   200  				Key:      k,
   201  				Operator: v1.NodeSelectorOpIn,
   202  				Values:   []string{v},
   203  			})
   204  		}
   205  
   206  		affinity := &v1.Affinity{
   207  			NodeAffinity: &v1.NodeAffinity{
   208  				RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
   209  					NodeSelectorTerms: []v1.NodeSelectorTerm{
   210  						{
   211  							MatchExpressions: exprs,
   212  						},
   213  					},
   214  				},
   215  			},
   216  		}
   217  		t.createResources(ctx, cs, l, affinity)
   218  
   219  		// Wait for pod to fail scheduling
   220  		// With delayed binding, the scheduler errors before provisioning
   221  		// With immediate binding, the volume gets provisioned but cannot be scheduled
   222  		err = e2epod.WaitForPodNameUnschedulableInNamespace(ctx, cs, l.pod.Name, l.pod.Namespace)
   223  		framework.ExpectNoError(err)
   224  	})
   225  }
   226  
   227  // getCurrentTopologies() goes through all Nodes and returns up to maxCount unique driver topologies
   228  func (t *topologyTestSuite) getCurrentTopologies(ctx context.Context, cs clientset.Interface, keys []string, maxCount int) ([]topology, error) {
   229  	nodes, err := e2enode.GetReadySchedulableNodes(ctx, cs)
   230  	if err != nil {
   231  		return nil, err
   232  	}
   233  
   234  	topos := []topology{}
   235  
   236  	// TODO: scale?
   237  	for _, n := range nodes.Items {
   238  		topo := map[string]string{}
   239  		for _, k := range keys {
   240  			v, ok := n.Labels[k]
   241  			if !ok {
   242  				return nil, fmt.Errorf("node %v missing topology label %v", n.Name, k)
   243  			}
   244  			topo[k] = v
   245  		}
   246  
   247  		found := false
   248  		for _, existingTopo := range topos {
   249  			if topologyEqual(existingTopo, topo) {
   250  				found = true
   251  				break
   252  			}
   253  		}
   254  		if !found {
   255  			framework.Logf("found topology %v", topo)
   256  			topos = append(topos, topo)
   257  		}
   258  		if len(topos) >= maxCount {
   259  			break
   260  		}
   261  	}
   262  	return topos, nil
   263  }
   264  
   265  // reflect.DeepEqual doesn't seem to work
   266  func topologyEqual(t1, t2 topology) bool {
   267  	if len(t1) != len(t2) {
   268  		return false
   269  	}
   270  	for k1, v1 := range t1 {
   271  		if v2, ok := t2[k1]; !ok || v1 != v2 {
   272  			return false
   273  		}
   274  	}
   275  	return true
   276  }
   277  
   278  // Set StorageClass.Allowed topologies from topos while excluding the topology at excludedIndex.
   279  // excludedIndex can be -1 to specify nothing should be excluded.
   280  // Return the list of allowed topologies generated.
   281  func (t *topologyTestSuite) setAllowedTopologies(sc *storagev1.StorageClass, topos []topology, excludedIndex int) []topology {
   282  	allowedTopologies := []topology{}
   283  	sc.AllowedTopologies = []v1.TopologySelectorTerm{}
   284  
   285  	for i := 0; i < len(topos); i++ {
   286  		if i != excludedIndex {
   287  			exprs := []v1.TopologySelectorLabelRequirement{}
   288  			for k, v := range topos[i] {
   289  				exprs = append(exprs, v1.TopologySelectorLabelRequirement{
   290  					Key:    k,
   291  					Values: []string{v},
   292  				})
   293  			}
   294  			sc.AllowedTopologies = append(sc.AllowedTopologies, v1.TopologySelectorTerm{MatchLabelExpressions: exprs})
   295  			allowedTopologies = append(allowedTopologies, topos[i])
   296  		}
   297  	}
   298  	return allowedTopologies
   299  }
   300  
   301  func (t *topologyTestSuite) verifyNodeTopology(node *v1.Node, allowedTopos []topology) {
   302  	for _, topo := range allowedTopos {
   303  		for k, v := range topo {
   304  			nodeV, _ := node.Labels[k]
   305  			if nodeV == v {
   306  				return
   307  			}
   308  		}
   309  	}
   310  	framework.Failf("node %v topology labels %+v doesn't match allowed topologies +%v", node.Name, node.Labels, allowedTopos)
   311  }
   312  
   313  func (t *topologyTestSuite) createResources(ctx context.Context, cs clientset.Interface, l *topologyTest, affinity *v1.Affinity) {
   314  	var err error
   315  	framework.Logf("Creating storage class object and pvc object for driver - sc: %v, pvc: %v", l.resource.Sc, l.resource.Pvc)
   316  
   317  	ginkgo.By("Creating sc")
   318  	l.resource.Sc, err = cs.StorageV1().StorageClasses().Create(ctx, l.resource.Sc, metav1.CreateOptions{})
   319  	framework.ExpectNoError(err)
   320  
   321  	ginkgo.By("Creating pvc")
   322  	l.resource.Pvc, err = cs.CoreV1().PersistentVolumeClaims(l.resource.Pvc.Namespace).Create(ctx, l.resource.Pvc, metav1.CreateOptions{})
   323  	framework.ExpectNoError(err)
   324  
   325  	ginkgo.By("Creating pod")
   326  	podConfig := e2epod.Config{
   327  		NS:            l.config.Framework.Namespace.Name,
   328  		PVCs:          []*v1.PersistentVolumeClaim{l.resource.Pvc},
   329  		NodeSelection: e2epod.NodeSelection{Affinity: affinity, Selector: l.config.ClientNodeSelection.Selector},
   330  		SeLinuxLabel:  e2epod.GetLinuxLabel(),
   331  		ImageID:       e2epod.GetDefaultTestImageID(),
   332  	}
   333  	l.pod, err = e2epod.MakeSecPod(&podConfig)
   334  	framework.ExpectNoError(err)
   335  	l.pod, err = cs.CoreV1().Pods(l.pod.Namespace).Create(ctx, l.pod, metav1.CreateOptions{})
   336  	framework.ExpectNoError(err)
   337  }
   338  
   339  func (t *topologyTestSuite) CleanupResources(ctx context.Context, cs clientset.Interface, l *topologyTest) {
   340  	if l.pod != nil {
   341  		ginkgo.By("Deleting pod")
   342  		err := e2epod.DeletePodWithWait(ctx, cs, l.pod)
   343  		framework.ExpectNoError(err, "while deleting pod")
   344  	}
   345  
   346  	err := l.resource.CleanupResource(ctx)
   347  	framework.ExpectNoError(err, "while clean up resource")
   348  }
   349  

View as plain text