1
2
3
4
19
20 package storage
21
22 import (
23 "context"
24
25 "github.com/onsi/ginkgo/v2"
26 "github.com/onsi/gomega"
27
28 "fmt"
29 "strings"
30 "time"
31
32 "encoding/json"
33
34 appsv1 "k8s.io/api/apps/v1"
35 v1 "k8s.io/api/core/v1"
36 storagev1 "k8s.io/api/storage/v1"
37 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
38 "k8s.io/apimachinery/pkg/labels"
39 "k8s.io/apimachinery/pkg/types"
40 "k8s.io/apimachinery/pkg/util/sets"
41 "k8s.io/apimachinery/pkg/util/strategicpatch"
42 "k8s.io/apimachinery/pkg/util/wait"
43 clientset "k8s.io/client-go/kubernetes"
44 volumehelpers "k8s.io/cloud-provider/volume/helpers"
45 podutil "k8s.io/kubernetes/pkg/api/v1/pod"
46 "k8s.io/kubernetes/test/e2e/framework"
47 e2enode "k8s.io/kubernetes/test/e2e/framework/node"
48 e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
49 e2epv "k8s.io/kubernetes/test/e2e/framework/pv"
50 e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
51 "k8s.io/kubernetes/test/e2e/storage/testsuites"
52 "k8s.io/kubernetes/test/e2e/storage/utils"
53 imageutils "k8s.io/kubernetes/test/utils/image"
54 admissionapi "k8s.io/pod-security-admission/api"
55 )
56
57 const (
58 pvDeletionTimeout = 3 * time.Minute
59 statefulSetReadyTimeout = 3 * time.Minute
60 taintKeyPrefix = "zoneTaint_"
61 repdMinSize = "200Gi"
62 pvcName = "regional-pd-vol"
63 )
64
65 var _ = utils.SIGDescribe("Regional PD", func() {
66 f := framework.NewDefaultFramework("regional-pd")
67 f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
68
69
70 var c clientset.Interface
71 var ns string
72
73 ginkgo.BeforeEach(func(ctx context.Context) {
74 c = f.ClientSet
75 ns = f.Namespace.Name
76
77 e2eskipper.SkipUnlessProviderIs("gce", "gke")
78 e2eskipper.SkipUnlessMultizone(ctx, c)
79 })
80
81 ginkgo.Describe("RegionalPD", func() {
82 f.It("should provision storage", f.WithSlow(), func(ctx context.Context) {
83 testVolumeProvisioning(ctx, c, f.Timeouts, ns)
84 })
85
86 f.It("should provision storage with delayed binding", f.WithSlow(), func(ctx context.Context) {
87 testRegionalDelayedBinding(ctx, c, ns, 1 )
88 testRegionalDelayedBinding(ctx, c, ns, 3 )
89 })
90
91 f.It("should provision storage in the allowedTopologies", f.WithSlow(), func(ctx context.Context) {
92 testRegionalAllowedTopologies(ctx, c, ns)
93 })
94
95 f.It("should provision storage in the allowedTopologies with delayed binding", f.WithSlow(), func(ctx context.Context) {
96 testRegionalAllowedTopologiesWithDelayedBinding(ctx, c, ns, 1 )
97 testRegionalAllowedTopologiesWithDelayedBinding(ctx, c, ns, 3 )
98 })
99
100 f.It("should failover to a different zone when all nodes in one zone become unreachable", f.WithSlow(), f.WithDisruptive(), func(ctx context.Context) {
101 testZonalFailover(ctx, c, ns)
102 })
103 })
104 })
105
106 func testVolumeProvisioning(ctx context.Context, c clientset.Interface, t *framework.TimeoutContext, ns string) {
107 cloudZones := getTwoRandomZones(ctx, c)
108
109
110
111 tests := []testsuites.StorageClassTest{
112 {
113 Name: "HDD Regional PD on GCE/GKE",
114 CloudProviders: []string{"gce", "gke"},
115 Provisioner: "kubernetes.io/gce-pd",
116 Timeouts: framework.NewTimeoutContext(),
117 Parameters: map[string]string{
118 "type": "pd-standard",
119 "zones": strings.Join(cloudZones, ","),
120 "replication-type": "regional-pd",
121 },
122 ClaimSize: repdMinSize,
123 ExpectedSize: repdMinSize,
124 PvCheck: func(ctx context.Context, claim *v1.PersistentVolumeClaim) {
125 volume := testsuites.PVWriteReadSingleNodeCheck(ctx, c, t, claim, e2epod.NodeSelection{})
126 gomega.Expect(volume).NotTo(gomega.BeNil())
127
128 err := checkGCEPD(volume, "pd-standard")
129 framework.ExpectNoError(err, "checkGCEPD")
130 err = verifyZonesInPV(volume, sets.NewString(cloudZones...), true )
131 framework.ExpectNoError(err, "verifyZonesInPV")
132
133 },
134 },
135 {
136 Name: "HDD Regional PD with auto zone selection on GCE/GKE",
137 CloudProviders: []string{"gce", "gke"},
138 Provisioner: "kubernetes.io/gce-pd",
139 Timeouts: framework.NewTimeoutContext(),
140 Parameters: map[string]string{
141 "type": "pd-standard",
142 "replication-type": "regional-pd",
143 },
144 ClaimSize: repdMinSize,
145 ExpectedSize: repdMinSize,
146 PvCheck: func(ctx context.Context, claim *v1.PersistentVolumeClaim) {
147 volume := testsuites.PVWriteReadSingleNodeCheck(ctx, c, t, claim, e2epod.NodeSelection{})
148 gomega.Expect(volume).NotTo(gomega.BeNil())
149
150 err := checkGCEPD(volume, "pd-standard")
151 framework.ExpectNoError(err, "checkGCEPD")
152 zones, err := e2enode.GetClusterZones(ctx, c)
153 framework.ExpectNoError(err, "GetClusterZones")
154 err = verifyZonesInPV(volume, zones, false )
155 framework.ExpectNoError(err, "verifyZonesInPV")
156 },
157 },
158 }
159
160 for _, test := range tests {
161 test.Client = c
162 computedStorageClass := testsuites.SetupStorageClass(ctx, test.Client, newStorageClass(test, ns, "" ))
163 test.Class = computedStorageClass
164 test.Claim = e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{
165 ClaimSize: test.ClaimSize,
166 StorageClassName: &(test.Class.Name),
167 VolumeMode: &test.VolumeMode,
168 }, ns)
169
170 test.TestDynamicProvisioning(ctx)
171 }
172 }
173
174 func testZonalFailover(ctx context.Context, c clientset.Interface, ns string) {
175 cloudZones := getTwoRandomZones(ctx, c)
176 testSpec := testsuites.StorageClassTest{
177 Name: "Regional PD Failover on GCE/GKE",
178 CloudProviders: []string{"gce", "gke"},
179 Timeouts: framework.NewTimeoutContext(),
180 Provisioner: "kubernetes.io/gce-pd",
181 Parameters: map[string]string{
182 "type": "pd-standard",
183 "zones": strings.Join(cloudZones, ","),
184 "replication-type": "regional-pd",
185 },
186 ClaimSize: repdMinSize,
187 ExpectedSize: repdMinSize,
188 }
189 class := newStorageClass(testSpec, ns, "" )
190 claimTemplate := e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{
191 NamePrefix: pvcName,
192 ClaimSize: testSpec.ClaimSize,
193 StorageClassName: &(class.Name),
194 VolumeMode: &testSpec.VolumeMode,
195 }, ns)
196 statefulSet, service, regionalPDLabels := newStatefulSet(claimTemplate, ns)
197
198 ginkgo.By("creating a StorageClass " + class.Name)
199 _, err := c.StorageV1().StorageClasses().Create(ctx, class, metav1.CreateOptions{})
200 framework.ExpectNoError(err)
201 defer func() {
202 framework.Logf("deleting storage class %s", class.Name)
203 framework.ExpectNoError(c.StorageV1().StorageClasses().Delete(ctx, class.Name, metav1.DeleteOptions{}),
204 "Error deleting StorageClass %s", class.Name)
205 }()
206
207 ginkgo.By("creating a StatefulSet")
208 _, err = c.CoreV1().Services(ns).Create(ctx, service, metav1.CreateOptions{})
209 framework.ExpectNoError(err)
210 _, err = c.AppsV1().StatefulSets(ns).Create(ctx, statefulSet, metav1.CreateOptions{})
211 framework.ExpectNoError(err)
212
213 ginkgo.DeferCleanup(func(ctx context.Context) {
214 framework.Logf("deleting statefulset%q/%q", statefulSet.Namespace, statefulSet.Name)
215
216 framework.ExpectNoError(c.AppsV1().StatefulSets(ns).Delete(ctx, statefulSet.Name, metav1.DeleteOptions{}),
217 "Error deleting StatefulSet %s", statefulSet.Name)
218
219 framework.Logf("deleting claims in namespace %s", ns)
220 pvc := getPVC(ctx, c, ns, regionalPDLabels)
221 framework.ExpectNoError(c.CoreV1().PersistentVolumeClaims(pvc.Namespace).Delete(ctx, pvc.Name, metav1.DeleteOptions{}),
222 "Error deleting claim %s.", pvc.Name)
223 if pvc.Spec.VolumeName != "" {
224 err = e2epv.WaitForPersistentVolumeDeleted(ctx, c, pvc.Spec.VolumeName, framework.Poll, pvDeletionTimeout)
225 if err != nil {
226 framework.Logf("WARNING: PV %s is not yet deleted, and subsequent tests may be affected.", pvc.Spec.VolumeName)
227 }
228 }
229 })
230
231 err = waitForStatefulSetReplicasReady(ctx, statefulSet.Name, ns, c, framework.Poll, statefulSetReadyTimeout)
232 if err != nil {
233 pod := getPod(ctx, c, ns, regionalPDLabels)
234 if !podutil.IsPodReadyConditionTrue(pod.Status) {
235 framework.Failf("The statefulset pod %s was expected to be ready, instead has the following conditions: %v", pod.Name, pod.Status.Conditions)
236 }
237 framework.ExpectNoError(err)
238 }
239
240 pvc := getPVC(ctx, c, ns, regionalPDLabels)
241
242 ginkgo.By("getting zone information from pod")
243 pod := getPod(ctx, c, ns, regionalPDLabels)
244 nodeName := pod.Spec.NodeName
245 node, err := c.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
246 framework.ExpectNoError(err)
247 podZone := node.Labels[v1.LabelTopologyZone]
248
249 ginkgo.By("tainting nodes in the zone the pod is scheduled in")
250 selector := labels.SelectorFromSet(labels.Set(map[string]string{v1.LabelTopologyZone: podZone}))
251 nodesInZone, err := c.CoreV1().Nodes().List(ctx, metav1.ListOptions{LabelSelector: selector.String()})
252 framework.ExpectNoError(err)
253 addTaint(ctx, c, ns, nodesInZone.Items, podZone)
254
255 ginkgo.By("deleting StatefulSet pod")
256 err = c.CoreV1().Pods(ns).Delete(ctx, pod.Name, metav1.DeleteOptions{})
257
258
259 ginkgo.By("verifying the pod is scheduled in a different zone.")
260 var otherZone string
261 if cloudZones[0] == podZone {
262 otherZone = cloudZones[1]
263 } else {
264 otherZone = cloudZones[0]
265 }
266 waitErr := wait.PollUntilContextTimeout(ctx, framework.Poll, statefulSetReadyTimeout, true, func(ctx context.Context) (bool, error) {
267 framework.Logf("Checking whether new pod is scheduled in zone %q", otherZone)
268 pod := getPod(ctx, c, ns, regionalPDLabels)
269 node, err := c.CoreV1().Nodes().Get(ctx, pod.Spec.NodeName, metav1.GetOptions{})
270 if err != nil {
271 return false, nil
272 }
273 newPodZone := node.Labels[v1.LabelTopologyZone]
274 return newPodZone == otherZone, nil
275 })
276 framework.ExpectNoError(waitErr, "Error waiting for pod to be scheduled in a different zone (%q): %v", otherZone, err)
277
278 err = waitForStatefulSetReplicasReady(ctx, statefulSet.Name, ns, c, 3*time.Second, framework.RestartPodReadyAgainTimeout)
279 if err != nil {
280 pod := getPod(ctx, c, ns, regionalPDLabels)
281 if !podutil.IsPodReadyConditionTrue(pod.Status) {
282 framework.Failf("The statefulset pod %s was expected to be ready, instead has the following conditions: %v", pod.Name, pod.Status.Conditions)
283 }
284 framework.ExpectNoError(err)
285 }
286
287 ginkgo.By("verifying the same PVC is used by the new pod")
288 gomega.Expect(getPVC(ctx, c, ns, regionalPDLabels).Name).To(gomega.Equal(pvc.Name), "The same PVC should be used after failover.")
289
290 ginkgo.By("verifying the container output has 2 lines, indicating the pod has been created twice using the same regional PD.")
291 logs, err := e2epod.GetPodLogs(ctx, c, ns, pod.Name, "")
292 framework.ExpectNoError(err,
293 "Error getting logs from pod %s in namespace %s", pod.Name, ns)
294 lineCount := len(strings.Split(strings.TrimSpace(logs), "\n"))
295 expectedLineCount := 2
296 gomega.Expect(lineCount).To(gomega.Equal(expectedLineCount), "Line count of the written file should be %d.", expectedLineCount)
297
298 }
299
300 func addTaint(ctx context.Context, c clientset.Interface, ns string, nodes []v1.Node, podZone string) {
301 for _, node := range nodes {
302 oldData, err := json.Marshal(node)
303 framework.ExpectNoError(err)
304
305 node.Spec.Taints = append(node.Spec.Taints, v1.Taint{
306 Key: taintKeyPrefix + ns,
307 Value: podZone,
308 Effect: v1.TaintEffectNoSchedule,
309 })
310
311 newData, err := json.Marshal(node)
312 framework.ExpectNoError(err)
313
314 patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Node{})
315 framework.ExpectNoError(err)
316
317 reversePatchBytes, err := strategicpatch.CreateTwoWayMergePatch(newData, oldData, v1.Node{})
318 framework.ExpectNoError(err)
319
320 _, err = c.CoreV1().Nodes().Patch(ctx, node.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
321 framework.ExpectNoError(err)
322
323 nodeName := node.Name
324 ginkgo.DeferCleanup(func(ctx context.Context) {
325 framework.Logf("removing taint for node %q", nodeName)
326 _, err := c.CoreV1().Nodes().Patch(ctx, nodeName, types.StrategicMergePatchType, reversePatchBytes, metav1.PatchOptions{})
327 framework.ExpectNoError(err)
328 })
329 }
330 }
331
332 func testRegionalDelayedBinding(ctx context.Context, c clientset.Interface, ns string, pvcCount int) {
333 test := testsuites.StorageClassTest{
334 Client: c,
335 Name: "Regional PD storage class with waitForFirstConsumer test on GCE",
336 Provisioner: "kubernetes.io/gce-pd",
337 Timeouts: framework.NewTimeoutContext(),
338 Parameters: map[string]string{
339 "type": "pd-standard",
340 "replication-type": "regional-pd",
341 },
342 ClaimSize: repdMinSize,
343 DelayBinding: true,
344 }
345
346 suffix := "delayed-regional"
347
348 test.Class = testsuites.SetupStorageClass(ctx, test.Client, newStorageClass(test, ns, suffix))
349 var claims []*v1.PersistentVolumeClaim
350 for i := 0; i < pvcCount; i++ {
351 claim := e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{
352 ClaimSize: test.ClaimSize,
353 StorageClassName: &(test.Class.Name),
354 VolumeMode: &test.VolumeMode,
355 }, ns)
356 claims = append(claims, claim)
357 }
358 pvs, node := test.TestBindingWaitForFirstConsumerMultiPVC(ctx, claims, nil , false )
359 if node == nil {
360 framework.Failf("unexpected nil node found")
361 }
362 zone, ok := node.Labels[v1.LabelTopologyZone]
363 if !ok {
364 framework.Failf("label %s not found on Node", v1.LabelTopologyZone)
365 }
366 for _, pv := range pvs {
367 checkZoneFromLabelAndAffinity(pv, zone, false)
368 }
369 }
370
371 func testRegionalAllowedTopologies(ctx context.Context, c clientset.Interface, ns string) {
372 test := testsuites.StorageClassTest{
373 Name: "Regional PD storage class with allowedTopologies test on GCE",
374 Provisioner: "kubernetes.io/gce-pd",
375 Timeouts: framework.NewTimeoutContext(),
376 Parameters: map[string]string{
377 "type": "pd-standard",
378 "replication-type": "regional-pd",
379 },
380 ClaimSize: repdMinSize,
381 ExpectedSize: repdMinSize,
382 }
383
384 suffix := "topo-regional"
385 test.Client = c
386 test.Class = testsuites.SetupStorageClass(ctx, test.Client, newStorageClass(test, ns, suffix))
387 zones := getTwoRandomZones(ctx, c)
388 addAllowedTopologiesToStorageClass(c, test.Class, zones)
389 test.Claim = e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{
390 NamePrefix: pvcName,
391 ClaimSize: test.ClaimSize,
392 StorageClassName: &(test.Class.Name),
393 VolumeMode: &test.VolumeMode,
394 }, ns)
395
396 pv := test.TestDynamicProvisioning(ctx)
397 checkZonesFromLabelAndAffinity(pv, sets.NewString(zones...), true)
398 }
399
400 func testRegionalAllowedTopologiesWithDelayedBinding(ctx context.Context, c clientset.Interface, ns string, pvcCount int) {
401 test := testsuites.StorageClassTest{
402 Client: c,
403 Timeouts: framework.NewTimeoutContext(),
404 Name: "Regional PD storage class with allowedTopologies and waitForFirstConsumer test on GCE",
405 Provisioner: "kubernetes.io/gce-pd",
406 Parameters: map[string]string{
407 "type": "pd-standard",
408 "replication-type": "regional-pd",
409 },
410 ClaimSize: repdMinSize,
411 DelayBinding: true,
412 }
413
414 suffix := "topo-delayed-regional"
415 test.Class = testsuites.SetupStorageClass(ctx, test.Client, newStorageClass(test, ns, suffix))
416 topoZones := getTwoRandomZones(ctx, c)
417 addAllowedTopologiesToStorageClass(c, test.Class, topoZones)
418 var claims []*v1.PersistentVolumeClaim
419 for i := 0; i < pvcCount; i++ {
420 claim := e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{
421 ClaimSize: test.ClaimSize,
422 StorageClassName: &(test.Class.Name),
423 VolumeMode: &test.VolumeMode,
424 }, ns)
425 claims = append(claims, claim)
426 }
427 pvs, node := test.TestBindingWaitForFirstConsumerMultiPVC(ctx, claims, nil , false )
428 if node == nil {
429 framework.Failf("unexpected nil node found")
430 }
431 nodeZone, ok := node.Labels[v1.LabelTopologyZone]
432 if !ok {
433 framework.Failf("label %s not found on Node", v1.LabelTopologyZone)
434 }
435 zoneFound := false
436 for _, zone := range topoZones {
437 if zone == nodeZone {
438 zoneFound = true
439 break
440 }
441 }
442 if !zoneFound {
443 framework.Failf("zones specified in AllowedTopologies: %v does not contain zone of node where PV got provisioned: %s", topoZones, nodeZone)
444 }
445 for _, pv := range pvs {
446 checkZonesFromLabelAndAffinity(pv, sets.NewString(topoZones...), true)
447 }
448 }
449
450 func getPVC(ctx context.Context, c clientset.Interface, ns string, pvcLabels map[string]string) *v1.PersistentVolumeClaim {
451 selector := labels.Set(pvcLabels).AsSelector()
452 options := metav1.ListOptions{LabelSelector: selector.String()}
453 pvcList, err := c.CoreV1().PersistentVolumeClaims(ns).List(ctx, options)
454 framework.ExpectNoError(err)
455 gomega.Expect(pvcList.Items).To(gomega.HaveLen(1), "There should be exactly 1 PVC matched.")
456
457 return &pvcList.Items[0]
458 }
459
460 func getPod(ctx context.Context, c clientset.Interface, ns string, podLabels map[string]string) *v1.Pod {
461 selector := labels.Set(podLabels).AsSelector()
462 options := metav1.ListOptions{LabelSelector: selector.String()}
463 podList, err := c.CoreV1().Pods(ns).List(ctx, options)
464 framework.ExpectNoError(err)
465 gomega.Expect(podList.Items).To(gomega.HaveLen(1), "There should be exactly 1 pod matched.")
466
467 return &podList.Items[0]
468 }
469
470 func addAllowedTopologiesToStorageClass(c clientset.Interface, sc *storagev1.StorageClass, zones []string) {
471 term := v1.TopologySelectorTerm{
472 MatchLabelExpressions: []v1.TopologySelectorLabelRequirement{
473 {
474 Key: v1.LabelTopologyZone,
475 Values: zones,
476 },
477 },
478 }
479 sc.AllowedTopologies = append(sc.AllowedTopologies, term)
480 }
481
482
483 func newStatefulSet(claimTemplate *v1.PersistentVolumeClaim, ns string) (sts *appsv1.StatefulSet, svc *v1.Service, labels map[string]string) {
484 var replicas int32 = 1
485 labels = map[string]string{"app": "regional-pd-workload"}
486
487 svc = &v1.Service{
488 ObjectMeta: metav1.ObjectMeta{
489 Name: "regional-pd-service",
490 Namespace: ns,
491 Labels: labels,
492 },
493 Spec: v1.ServiceSpec{
494 Ports: []v1.ServicePort{{
495 Port: 80,
496 Name: "web",
497 }},
498 ClusterIP: v1.ClusterIPNone,
499 Selector: labels,
500 },
501 }
502
503 sts = &appsv1.StatefulSet{
504 ObjectMeta: metav1.ObjectMeta{
505 Name: "regional-pd-sts",
506 Namespace: ns,
507 },
508 Spec: appsv1.StatefulSetSpec{
509 Selector: &metav1.LabelSelector{
510 MatchLabels: labels,
511 },
512 ServiceName: svc.Name,
513 Replicas: &replicas,
514 Template: *newPodTemplate(labels),
515 VolumeClaimTemplates: []v1.PersistentVolumeClaim{*claimTemplate},
516 },
517 }
518
519 return
520 }
521
522 func newPodTemplate(labels map[string]string) *v1.PodTemplateSpec {
523 return &v1.PodTemplateSpec{
524 ObjectMeta: metav1.ObjectMeta{
525 Labels: labels,
526 },
527 Spec: v1.PodSpec{
528 Containers: []v1.Container{
529
530
531 {
532 Name: "busybox",
533 Image: imageutils.GetE2EImage(imageutils.BusyBox),
534 Command: []string{"sh", "-c"},
535 Args: []string{
536 "echo ${POD_NAME} >> /mnt/data/regional-pd/pods.txt;" +
537 "cat /mnt/data/regional-pd/pods.txt;" +
538 "sleep 3600;",
539 },
540 Env: []v1.EnvVar{{
541 Name: "POD_NAME",
542 ValueFrom: &v1.EnvVarSource{
543 FieldRef: &v1.ObjectFieldSelector{
544 FieldPath: "metadata.name",
545 },
546 },
547 }},
548 Ports: []v1.ContainerPort{{
549 ContainerPort: 80,
550 Name: "web",
551 }},
552 VolumeMounts: []v1.VolumeMount{{
553 Name: pvcName,
554 MountPath: "/mnt/data/regional-pd",
555 }},
556 },
557 },
558 },
559 }
560 }
561
562 func getTwoRandomZones(ctx context.Context, c clientset.Interface) []string {
563 zones, err := e2enode.GetClusterZones(ctx, c)
564 framework.ExpectNoError(err)
565 gomega.Expect(zones.Len()).To(gomega.BeNumerically(">=", 2),
566 "The test should only be run in multizone clusters.")
567
568 zone1, _ := zones.PopAny()
569 zone2, _ := zones.PopAny()
570 return []string{zone1, zone2}
571 }
572
573
574
575 func verifyZonesInPV(volume *v1.PersistentVolume, zones sets.String, match bool) error {
576 pvZones, err := volumehelpers.LabelZonesToSet(volume.Labels[v1.LabelTopologyZone])
577 if err != nil {
578 return err
579 }
580
581 if match && zones.Equal(pvZones) || !match && zones.IsSuperset(pvZones) {
582 return nil
583 }
584
585 return fmt.Errorf("Zones in StorageClass are %v, but zones in PV are %v", zones, pvZones)
586
587 }
588
589 func checkZoneFromLabelAndAffinity(pv *v1.PersistentVolume, zone string, matchZone bool) {
590 checkZonesFromLabelAndAffinity(pv, sets.NewString(zone), matchZone)
591 }
592
593
594
595
596 func checkZonesFromLabelAndAffinity(pv *v1.PersistentVolume, zones sets.String, matchZones bool) {
597 ginkgo.By("checking PV's zone label and node affinity terms match expected zone")
598 if pv == nil {
599 framework.Failf("nil pv passed")
600 }
601 pvLabel, ok := pv.Labels[v1.LabelTopologyZone]
602 if !ok {
603 framework.Failf("label %s not found on PV", v1.LabelTopologyZone)
604 }
605
606 zonesFromLabel, err := volumehelpers.LabelZonesToSet(pvLabel)
607 if err != nil {
608 framework.Failf("unable to parse zone labels %s: %v", pvLabel, err)
609 }
610 if matchZones && !zonesFromLabel.Equal(zones) {
611 framework.Failf("value[s] of %s label for PV: %v does not match expected zone[s]: %v", v1.LabelTopologyZone, zonesFromLabel, zones)
612 }
613 if !matchZones && !zonesFromLabel.IsSuperset(zones) {
614 framework.Failf("value[s] of %s label for PV: %v does not contain expected zone[s]: %v", v1.LabelTopologyZone, zonesFromLabel, zones)
615 }
616 if pv.Spec.NodeAffinity == nil {
617 framework.Failf("node affinity not found in PV spec %v", pv.Spec)
618 }
619 if len(pv.Spec.NodeAffinity.Required.NodeSelectorTerms) == 0 {
620 framework.Failf("node selector terms not found in PV spec %v", pv.Spec)
621 }
622
623 for _, term := range pv.Spec.NodeAffinity.Required.NodeSelectorTerms {
624 keyFound := false
625 for _, r := range term.MatchExpressions {
626 if r.Key != v1.LabelTopologyZone {
627 continue
628 }
629 keyFound = true
630 zonesFromNodeAffinity := sets.NewString(r.Values...)
631 if matchZones && !zonesFromNodeAffinity.Equal(zones) {
632 framework.Failf("zones from NodeAffinity of PV: %v does not equal expected zone[s]: %v", zonesFromNodeAffinity, zones)
633 }
634 if !matchZones && !zonesFromNodeAffinity.IsSuperset(zones) {
635 framework.Failf("zones from NodeAffinity of PV: %v does not contain expected zone[s]: %v", zonesFromNodeAffinity, zones)
636 }
637 break
638 }
639 if !keyFound {
640 framework.Failf("label %s not found in term %v", v1.LabelTopologyZone, term)
641 }
642 }
643 }
644
645
646 func waitForStatefulSetReplicasReady(ctx context.Context, statefulSetName, ns string, c clientset.Interface, Poll, timeout time.Duration) error {
647 framework.Logf("Waiting up to %v for StatefulSet %s to have all replicas ready", timeout, statefulSetName)
648 for start := time.Now(); time.Since(start) < timeout; time.Sleep(Poll) {
649 sts, err := c.AppsV1().StatefulSets(ns).Get(ctx, statefulSetName, metav1.GetOptions{})
650 if err != nil {
651 framework.Logf("Get StatefulSet %s failed, ignoring for %v: %v", statefulSetName, Poll, err)
652 continue
653 }
654 if sts.Status.ReadyReplicas == *sts.Spec.Replicas {
655 framework.Logf("All %d replicas of StatefulSet %s are ready. (%v)", sts.Status.ReadyReplicas, statefulSetName, time.Since(start))
656 return nil
657 }
658 framework.Logf("StatefulSet %s found but there are %d ready replicas and %d total replicas.", statefulSetName, sts.Status.ReadyReplicas, *sts.Spec.Replicas)
659 }
660 return fmt.Errorf("StatefulSet %s still has unready pods within %v", statefulSetName, timeout)
661 }
662
View as plain text