1
16
17
18
19 package testsuites
20
21 import (
22 "context"
23 "fmt"
24 "math/rand"
25
26 "github.com/onsi/ginkgo/v2"
27 "github.com/onsi/gomega"
28
29 v1 "k8s.io/api/core/v1"
30 storagev1 "k8s.io/api/storage/v1"
31 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
32 clientset "k8s.io/client-go/kubernetes"
33 "k8s.io/kubernetes/test/e2e/framework"
34 e2enode "k8s.io/kubernetes/test/e2e/framework/node"
35 e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
36 e2epv "k8s.io/kubernetes/test/e2e/framework/pv"
37 e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
38 storageframework "k8s.io/kubernetes/test/e2e/storage/framework"
39 storageutils "k8s.io/kubernetes/test/e2e/storage/utils"
40 admissionapi "k8s.io/pod-security-admission/api"
41 )
42
43 type topologyTestSuite struct {
44 tsInfo storageframework.TestSuiteInfo
45 }
46
47 type topologyTest struct {
48 config *storageframework.PerTestConfig
49
50 resource storageframework.VolumeResource
51 pod *v1.Pod
52 allTopologies []topology
53 }
54
55 type topology map[string]string
56
57
58
59 func InitCustomTopologyTestSuite(patterns []storageframework.TestPattern) storageframework.TestSuite {
60 return &topologyTestSuite{
61 tsInfo: storageframework.TestSuiteInfo{
62 Name: "topology",
63 TestPatterns: patterns,
64 },
65 }
66 }
67
68
69
70 func InitTopologyTestSuite() storageframework.TestSuite {
71 patterns := []storageframework.TestPattern{
72 storageframework.TopologyImmediate,
73 storageframework.TopologyDelayed,
74 }
75 return InitCustomTopologyTestSuite(patterns)
76 }
77
78 func (t *topologyTestSuite) GetTestSuiteInfo() storageframework.TestSuiteInfo {
79 return t.tsInfo
80 }
81
82 func (t *topologyTestSuite) SkipUnsupportedTests(driver storageframework.TestDriver, pattern storageframework.TestPattern) {
83 dInfo := driver.GetDriverInfo()
84 var ok bool
85 _, ok = driver.(storageframework.DynamicPVTestDriver)
86 if !ok {
87 e2eskipper.Skipf("Driver %s doesn't support %v -- skipping", dInfo.Name, pattern.VolType)
88 }
89
90 if !dInfo.Capabilities[storageframework.CapTopology] {
91 e2eskipper.Skipf("Driver %q does not support topology - skipping", dInfo.Name)
92 }
93 }
94
95 func (t *topologyTestSuite) DefineTests(driver storageframework.TestDriver, pattern storageframework.TestPattern) {
96 var (
97 dInfo = driver.GetDriverInfo()
98 dDriver storageframework.DynamicPVTestDriver
99 cs clientset.Interface
100 err error
101 )
102
103
104
105 f := framework.NewFrameworkWithCustomTimeouts("topology", storageframework.GetDriverTimeouts(driver))
106 f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
107
108 init := func(ctx context.Context) *topologyTest {
109 dDriver, _ = driver.(storageframework.DynamicPVTestDriver)
110 l := &topologyTest{}
111
112
113 l.config = driver.PrepareTest(ctx, f)
114
115 l.resource = storageframework.VolumeResource{
116 Config: l.config,
117 Pattern: pattern,
118 }
119
120
121 cs = f.ClientSet
122 keys := dInfo.TopologyKeys
123 if len(keys) == 0 {
124 e2eskipper.Skipf("Driver didn't provide topology keys -- skipping")
125 }
126
127 ginkgo.DeferCleanup(t.CleanupResources, cs, l)
128
129 if dInfo.NumAllowedTopologies == 0 {
130
131 dInfo.NumAllowedTopologies = 1
132 }
133
134
135 l.allTopologies, err = t.getCurrentTopologies(ctx, cs, keys, dInfo.NumAllowedTopologies+1)
136 framework.ExpectNoError(err, "failed to get current driver topologies")
137 if len(l.allTopologies) < dInfo.NumAllowedTopologies {
138 e2eskipper.Skipf("Not enough topologies in cluster -- skipping")
139 }
140
141 l.resource.Sc = dDriver.GetDynamicProvisionStorageClass(ctx, l.config, pattern.FsType)
142 gomega.Expect(l.resource.Sc).ToNot(gomega.BeNil(), "driver failed to provide a StorageClass")
143 l.resource.Sc.VolumeBindingMode = &pattern.BindingMode
144
145 testVolumeSizeRange := t.GetTestSuiteInfo().SupportedSizeRange
146 driverVolumeSizeRange := dDriver.GetDriverInfo().SupportedSizeRange
147 claimSize, err := storageutils.GetSizeRangesIntersection(testVolumeSizeRange, driverVolumeSizeRange)
148 framework.ExpectNoError(err, "determine intersection of test size range %+v and driver size range %+v", testVolumeSizeRange, driverVolumeSizeRange)
149 l.resource.Pvc = e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{
150 ClaimSize: claimSize,
151 StorageClassName: &(l.resource.Sc.Name),
152 }, l.config.Framework.Namespace.Name)
153
154 migrationCheck := newMigrationOpCheck(ctx, f.ClientSet, f.ClientConfig(), dInfo.InTreePluginName)
155 ginkgo.DeferCleanup(migrationCheck.validateMigrationVolumeOpCounts)
156
157 return l
158 }
159
160 ginkgo.It("should provision a volume and schedule a pod with AllowedTopologies", func(ctx context.Context) {
161 l := init(ctx)
162
163
164 excludedIndex := -1
165 if len(l.allTopologies) > dInfo.NumAllowedTopologies {
166 excludedIndex = rand.Intn(len(l.allTopologies))
167 }
168 allowedTopologies := t.setAllowedTopologies(l.resource.Sc, l.allTopologies, excludedIndex)
169
170 t.createResources(ctx, cs, l, nil)
171
172 err = e2epod.WaitTimeoutForPodRunningInNamespace(ctx, cs, l.pod.Name, l.pod.Namespace, f.Timeouts.PodStart)
173 framework.ExpectNoError(err)
174
175 ginkgo.By("Verifying pod scheduled to correct node")
176 pod, err := cs.CoreV1().Pods(l.pod.Namespace).Get(ctx, l.pod.Name, metav1.GetOptions{})
177 framework.ExpectNoError(err)
178
179 node, err := cs.CoreV1().Nodes().Get(ctx, pod.Spec.NodeName, metav1.GetOptions{})
180 framework.ExpectNoError(err)
181
182 t.verifyNodeTopology(node, allowedTopologies)
183 })
184
185 ginkgo.It("should fail to schedule a pod which has topologies that conflict with AllowedTopologies", func(ctx context.Context) {
186 l := init(ctx)
187
188 if len(l.allTopologies) < dInfo.NumAllowedTopologies+1 {
189 e2eskipper.Skipf("Not enough topologies in cluster -- skipping")
190 }
191
192
193 excludedIndex := rand.Intn(len(l.allTopologies))
194 t.setAllowedTopologies(l.resource.Sc, l.allTopologies, excludedIndex)
195
196
197 exprs := []v1.NodeSelectorRequirement{}
198 for k, v := range l.allTopologies[excludedIndex] {
199 exprs = append(exprs, v1.NodeSelectorRequirement{
200 Key: k,
201 Operator: v1.NodeSelectorOpIn,
202 Values: []string{v},
203 })
204 }
205
206 affinity := &v1.Affinity{
207 NodeAffinity: &v1.NodeAffinity{
208 RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
209 NodeSelectorTerms: []v1.NodeSelectorTerm{
210 {
211 MatchExpressions: exprs,
212 },
213 },
214 },
215 },
216 }
217 t.createResources(ctx, cs, l, affinity)
218
219
220
221
222 err = e2epod.WaitForPodNameUnschedulableInNamespace(ctx, cs, l.pod.Name, l.pod.Namespace)
223 framework.ExpectNoError(err)
224 })
225 }
226
227
228 func (t *topologyTestSuite) getCurrentTopologies(ctx context.Context, cs clientset.Interface, keys []string, maxCount int) ([]topology, error) {
229 nodes, err := e2enode.GetReadySchedulableNodes(ctx, cs)
230 if err != nil {
231 return nil, err
232 }
233
234 topos := []topology{}
235
236
237 for _, n := range nodes.Items {
238 topo := map[string]string{}
239 for _, k := range keys {
240 v, ok := n.Labels[k]
241 if !ok {
242 return nil, fmt.Errorf("node %v missing topology label %v", n.Name, k)
243 }
244 topo[k] = v
245 }
246
247 found := false
248 for _, existingTopo := range topos {
249 if topologyEqual(existingTopo, topo) {
250 found = true
251 break
252 }
253 }
254 if !found {
255 framework.Logf("found topology %v", topo)
256 topos = append(topos, topo)
257 }
258 if len(topos) >= maxCount {
259 break
260 }
261 }
262 return topos, nil
263 }
264
265
266 func topologyEqual(t1, t2 topology) bool {
267 if len(t1) != len(t2) {
268 return false
269 }
270 for k1, v1 := range t1 {
271 if v2, ok := t2[k1]; !ok || v1 != v2 {
272 return false
273 }
274 }
275 return true
276 }
277
278
279
280
281 func (t *topologyTestSuite) setAllowedTopologies(sc *storagev1.StorageClass, topos []topology, excludedIndex int) []topology {
282 allowedTopologies := []topology{}
283 sc.AllowedTopologies = []v1.TopologySelectorTerm{}
284
285 for i := 0; i < len(topos); i++ {
286 if i != excludedIndex {
287 exprs := []v1.TopologySelectorLabelRequirement{}
288 for k, v := range topos[i] {
289 exprs = append(exprs, v1.TopologySelectorLabelRequirement{
290 Key: k,
291 Values: []string{v},
292 })
293 }
294 sc.AllowedTopologies = append(sc.AllowedTopologies, v1.TopologySelectorTerm{MatchLabelExpressions: exprs})
295 allowedTopologies = append(allowedTopologies, topos[i])
296 }
297 }
298 return allowedTopologies
299 }
300
301 func (t *topologyTestSuite) verifyNodeTopology(node *v1.Node, allowedTopos []topology) {
302 for _, topo := range allowedTopos {
303 for k, v := range topo {
304 nodeV, _ := node.Labels[k]
305 if nodeV == v {
306 return
307 }
308 }
309 }
310 framework.Failf("node %v topology labels %+v doesn't match allowed topologies +%v", node.Name, node.Labels, allowedTopos)
311 }
312
313 func (t *topologyTestSuite) createResources(ctx context.Context, cs clientset.Interface, l *topologyTest, affinity *v1.Affinity) {
314 var err error
315 framework.Logf("Creating storage class object and pvc object for driver - sc: %v, pvc: %v", l.resource.Sc, l.resource.Pvc)
316
317 ginkgo.By("Creating sc")
318 l.resource.Sc, err = cs.StorageV1().StorageClasses().Create(ctx, l.resource.Sc, metav1.CreateOptions{})
319 framework.ExpectNoError(err)
320
321 ginkgo.By("Creating pvc")
322 l.resource.Pvc, err = cs.CoreV1().PersistentVolumeClaims(l.resource.Pvc.Namespace).Create(ctx, l.resource.Pvc, metav1.CreateOptions{})
323 framework.ExpectNoError(err)
324
325 ginkgo.By("Creating pod")
326 podConfig := e2epod.Config{
327 NS: l.config.Framework.Namespace.Name,
328 PVCs: []*v1.PersistentVolumeClaim{l.resource.Pvc},
329 NodeSelection: e2epod.NodeSelection{Affinity: affinity, Selector: l.config.ClientNodeSelection.Selector},
330 SeLinuxLabel: e2epod.GetLinuxLabel(),
331 ImageID: e2epod.GetDefaultTestImageID(),
332 }
333 l.pod, err = e2epod.MakeSecPod(&podConfig)
334 framework.ExpectNoError(err)
335 l.pod, err = cs.CoreV1().Pods(l.pod.Namespace).Create(ctx, l.pod, metav1.CreateOptions{})
336 framework.ExpectNoError(err)
337 }
338
339 func (t *topologyTestSuite) CleanupResources(ctx context.Context, cs clientset.Interface, l *topologyTest) {
340 if l.pod != nil {
341 ginkgo.By("Deleting pod")
342 err := e2epod.DeletePodWithWait(ctx, cs, l.pod)
343 framework.ExpectNoError(err, "while deleting pod")
344 }
345
346 err := l.resource.CleanupResource(ctx)
347 framework.ExpectNoError(err, "while clean up resource")
348 }
349
View as plain text