20 package storage
22 import (
23 "context"
25 "github.com/onsi/ginkgo/v2"
26 "github.com/onsi/gomega"
28 "fmt"
29 "strings"
30 "time"
32 "encoding/json"
34 appsv1 "k8s.io/api/apps/v1"
35 v1 "k8s.io/api/core/v1"
36 storagev1 "k8s.io/api/storage/v1"
37 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
38 "k8s.io/apimachinery/pkg/labels"
39 "k8s.io/apimachinery/pkg/types"
40 "k8s.io/apimachinery/pkg/util/sets"
41 "k8s.io/apimachinery/pkg/util/strategicpatch"
42 "k8s.io/apimachinery/pkg/util/wait"
43 clientset "k8s.io/client-go/kubernetes"
44 volumehelpers "k8s.io/cloud-provider/volume/helpers"
45 podutil "k8s.io/kubernetes/pkg/api/v1/pod"
46 "k8s.io/kubernetes/test/e2e/framework"
47 e2enode "k8s.io/kubernetes/test/e2e/framework/node"
48 e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
49 e2epv "k8s.io/kubernetes/test/e2e/framework/pv"
50 e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
51 "k8s.io/kubernetes/test/e2e/storage/testsuites"
52 "k8s.io/kubernetes/test/e2e/storage/utils"
53 imageutils "k8s.io/kubernetes/test/utils/image"
54 admissionapi "k8s.io/pod-security-admission/api"
55 )
57 const (
58 pvDeletionTimeout = 3 * time.Minute
59 statefulSetReadyTimeout = 3 * time.Minute
60 taintKeyPrefix = "zoneTaint_"
61 repdMinSize = "200Gi"
62 pvcName = "regional-pd-vol"
63 )
65 var _ = utils.SIGDescribe("Regional PD", func() {
66 f := framework.NewDefaultFramework("regional-pd")
67 f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
70 var c clientset.Interface
71 var ns string
73 ginkgo.BeforeEach(func(ctx context.Context) {
74 c = f.ClientSet
75 ns = f.Namespace.Name
77 e2eskipper.SkipUnlessProviderIs("gce", "gke")
78 e2eskipper.SkipUnlessMultizone(ctx, c)
79 })
81 ginkgo.Describe("RegionalPD", func() {
82 f.It("should provision storage", f.WithSlow(), func(ctx context.Context) {
83 testVolumeProvisioning(ctx, c, f.Timeouts, ns)
84 })
86 f.It("should provision storage with delayed binding", f.WithSlow(), func(ctx context.Context) {
87 testRegionalDelayedBinding(ctx, c, ns, 1 )
88 testRegionalDelayedBinding(ctx, c, ns, 3 )
89 })
91 f.It("should provision storage in the allowedTopologies", f.WithSlow(), func(ctx context.Context) {
92 testRegionalAllowedTopologies(ctx, c, ns)
93 })
95 f.It("should provision storage in the allowedTopologies with delayed binding", f.WithSlow(), func(ctx context.Context) {
96 testRegionalAllowedTopologiesWithDelayedBinding(ctx, c, ns, 1 )
97 testRegionalAllowedTopologiesWithDelayedBinding(ctx, c, ns, 3 )
98 })
100 f.It("should failover to a different zone when all nodes in one zone become unreachable", f.WithSlow(), f.WithDisruptive(), func(ctx context.Context) {
101 testZonalFailover(ctx, c, ns)
102 })
103 })
104 })
106 func testVolumeProvisioning(ctx context.Context, c clientset.Interface, t *framework.TimeoutContext, ns string) {
107 cloudZones := getTwoRandomZones(ctx, c)
111 tests := []testsuites.StorageClassTest{
112 {
113 Name: "HDD Regional PD on GCE/GKE",
114 CloudProviders: []string{"gce", "gke"},
115 Provisioner: "kubernetes.io/gce-pd",
116 Timeouts: framework.NewTimeoutContext(),
117 Parameters: map[string]string{
118 "type": "pd-standard",
119 "zones": strings.Join(cloudZones, ","),
120 "replication-type": "regional-pd",
121 },
122 ClaimSize: repdMinSize,
123 ExpectedSize: repdMinSize,
124 PvCheck: func(ctx context.Context, claim *v1.PersistentVolumeClaim) {
125 volume := testsuites.PVWriteReadSingleNodeCheck(ctx, c, t, claim, e2epod.NodeSelection{})
126 gomega.Expect(volume).NotTo(gomega.BeNil())
128 err := checkGCEPD(volume, "pd-standard")
129 framework.ExpectNoError(err, "checkGCEPD")
130 err = verifyZonesInPV(volume, sets.NewString(cloudZones...), true )
131 framework.ExpectNoError(err, "verifyZonesInPV")
133 },
134 },
135 {
136 Name: "HDD Regional PD with auto zone selection on GCE/GKE",
137 CloudProviders: []string{"gce", "gke"},
138 Provisioner: "kubernetes.io/gce-pd",
139 Timeouts: framework.NewTimeoutContext(),
140 Parameters: map[string]string{
141 "type": "pd-standard",
142 "replication-type": "regional-pd",
143 },
144 ClaimSize: repdMinSize,
145 ExpectedSize: repdMinSize,
146 PvCheck: func(ctx context.Context, claim *v1.PersistentVolumeClaim) {
147 volume := testsuites.PVWriteReadSingleNodeCheck(ctx, c, t, claim, e2epod.NodeSelection{})
148 gomega.Expect(volume).NotTo(gomega.BeNil())
150 err := checkGCEPD(volume, "pd-standard")
151 framework.ExpectNoError(err, "checkGCEPD")
152 zones, err := e2enode.GetClusterZones(ctx, c)
153 framework.ExpectNoError(err, "GetClusterZones")
154 err = verifyZonesInPV(volume, zones, false )
155 framework.ExpectNoError(err, "verifyZonesInPV")
156 },
157 },
158 }
160 for _, test := range tests {
161 test.Client = c
162 computedStorageClass := testsuites.SetupStorageClass(ctx, test.Client, newStorageClass(test, ns, "" ))
163 test.Class = computedStorageClass
164 test.Claim = e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{
165 ClaimSize: test.ClaimSize,
166 StorageClassName: &(test.Class.Name),
167 VolumeMode: &test.VolumeMode,
168 }, ns)
170 test.TestDynamicProvisioning(ctx)
171 }
172 }
174 func testZonalFailover(ctx context.Context, c clientset.Interface, ns string) {
175 cloudZones := getTwoRandomZones(ctx, c)
176 testSpec := testsuites.StorageClassTest{
177 Name: "Regional PD Failover on GCE/GKE",
178 CloudProviders: []string{"gce", "gke"},
179 Timeouts: framework.NewTimeoutContext(),
180 Provisioner: "kubernetes.io/gce-pd",
181 Parameters: map[string]string{
182 "type": "pd-standard",
183 "zones": strings.Join(cloudZones, ","),
184 "replication-type": "regional-pd",
185 },
186 ClaimSize: repdMinSize,
187 ExpectedSize: repdMinSize,
188 }
189 class := newStorageClass(testSpec, ns, "" )
190 claimTemplate := e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{
191 NamePrefix: pvcName,
192 ClaimSize: testSpec.ClaimSize,
193 StorageClassName: &(class.Name),
194 VolumeMode: &testSpec.VolumeMode,
195 }, ns)
196 statefulSet, service, regionalPDLabels := newStatefulSet(claimTemplate, ns)
198 ginkgo.By("creating a StorageClass " + class.Name)
199 _, err := c.StorageV1().StorageClasses().Create(ctx, class, metav1.CreateOptions{})
200 framework.ExpectNoError(err)
201 defer func() {
202 framework.Logf("deleting storage class %s", class.Name)
203 framework.ExpectNoError(c.StorageV1().StorageClasses().Delete(ctx, class.Name, metav1.DeleteOptions{}),
204 "Error deleting StorageClass %s", class.Name)
205 }()
207 ginkgo.By("creating a StatefulSet")
208 _, err = c.CoreV1().Services(ns).Create(ctx, service, metav1.CreateOptions{})
209 framework.ExpectNoError(err)
210 _, err = c.AppsV1().StatefulSets(ns).Create(ctx, statefulSet, metav1.CreateOptions{})
211 framework.ExpectNoError(err)
213 ginkgo.DeferCleanup(func(ctx context.Context) {
214 framework.Logf("deleting statefulset%q/%q", statefulSet.Namespace, statefulSet.Name)
216 framework.ExpectNoError(c.AppsV1().StatefulSets(ns).Delete(ctx, statefulSet.Name, metav1.DeleteOptions{}),
217 "Error deleting StatefulSet %s", statefulSet.Name)
219 framework.Logf("deleting claims in namespace %s", ns)
220 pvc := getPVC(ctx, c, ns, regionalPDLabels)
221 framework.ExpectNoError(c.CoreV1().PersistentVolumeClaims(pvc.Namespace).Delete(ctx, pvc.Name, metav1.DeleteOptions{}),
222 "Error deleting claim %s.", pvc.Name)
223 if pvc.Spec.VolumeName != "" {
224 err = e2epv.WaitForPersistentVolumeDeleted(ctx, c, pvc.Spec.VolumeName, framework.Poll, pvDeletionTimeout)
225 if err != nil {
226 framework.Logf("WARNING: PV %s is not yet deleted, and subsequent tests may be affected.", pvc.Spec.VolumeName)
227 }
228 }
229 })
231 err = waitForStatefulSetReplicasReady(ctx, statefulSet.Name, ns, c, framework.Poll, statefulSetReadyTimeout)
232 if err != nil {
233 pod := getPod(ctx, c, ns, regionalPDLabels)
234 if !podutil.IsPodReadyConditionTrue(pod.Status) {
235 framework.Failf("The statefulset pod %s was expected to be ready, instead has the following conditions: %v", pod.Name, pod.Status.Conditions)
236 }
237 framework.ExpectNoError(err)
238 }
240 pvc := getPVC(ctx, c, ns, regionalPDLabels)
242 ginkgo.By("getting zone information from pod")
243 pod := getPod(ctx, c, ns, regionalPDLabels)
244 nodeName := pod.Spec.NodeName
245 node, err := c.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
246 framework.ExpectNoError(err)
247 podZone := node.Labels[v1.LabelTopologyZone]
249 ginkgo.By("tainting nodes in the zone the pod is scheduled in")
250 selector := labels.SelectorFromSet(labels.Set(map[string]string{v1.LabelTopologyZone: podZone}))
251 nodesInZone, err := c.CoreV1().Nodes().List(ctx, metav1.ListOptions{LabelSelector: selector.String()})
252 framework.ExpectNoError(err)
253 addTaint(ctx, c, ns, nodesInZone.Items, podZone)
255 ginkgo.By("deleting StatefulSet pod")
256 err = c.CoreV1().Pods(ns).Delete(ctx, pod.Name, metav1.DeleteOptions{})
259 ginkgo.By("verifying the pod is scheduled in a different zone.")
260 var otherZone string
261 if cloudZones[0] == podZone {
262 otherZone = cloudZones[1]
263 } else {
264 otherZone = cloudZones[0]
265 }
266 waitErr := wait.PollUntilContextTimeout(ctx, framework.Poll, statefulSetReadyTimeout, true, func(ctx context.Context) (bool, error) {
267 framework.Logf("Checking whether new pod is scheduled in zone %q", otherZone)
268 pod := getPod(ctx, c, ns, regionalPDLabels)
269 node, err := c.CoreV1().Nodes().Get(ctx, pod.Spec.NodeName, metav1.GetOptions{})
270 if err != nil {
271 return false, nil
272 }
273 newPodZone := node.Labels[v1.LabelTopologyZone]
274 return newPodZone == otherZone, nil
275 })
276 framework.ExpectNoError(waitErr, "Error waiting for pod to be scheduled in a different zone (%q): %v", otherZone, err)
278 err = waitForStatefulSetReplicasReady(ctx, statefulSet.Name, ns, c, 3*time.Second, framework.RestartPodReadyAgainTimeout)
279 if err != nil {
280 pod := getPod(ctx, c, ns, regionalPDLabels)
281 if !podutil.IsPodReadyConditionTrue(pod.Status) {
282 framework.Failf("The statefulset pod %s was expected to be ready, instead has the following conditions: %v", pod.Name, pod.Status.Conditions)
283 }
284 framework.ExpectNoError(err)
285 }
287 ginkgo.By("verifying the same PVC is used by the new pod")
288 gomega.Expect(getPVC(ctx, c, ns, regionalPDLabels).Name).To(gomega.Equal(pvc.Name), "The same PVC should be used after failover.")
290 ginkgo.By("verifying the container output has 2 lines, indicating the pod has been created twice using the same regional PD.")
291 logs, err := e2epod.GetPodLogs(ctx, c, ns, pod.Name, "")
292 framework.ExpectNoError(err,
293 "Error getting logs from pod %s in namespace %s", pod.Name, ns)
294 lineCount := len(strings.Split(strings.TrimSpace(logs), "\n"))
295 expectedLineCount := 2
296 gomega.Expect(lineCount).To(gomega.Equal(expectedLineCount), "Line count of the written file should be %d.", expectedLineCount)
298 }
300 func addTaint(ctx context.Context, c clientset.Interface, ns string, nodes []v1.Node, podZone string) {
301 for _, node := range nodes {
302 oldData, err := json.Marshal(node)
303 framework.ExpectNoError(err)
305 node.Spec.Taints = append(node.Spec.Taints, v1.Taint{
306 Key: taintKeyPrefix + ns,
307 Value: podZone,
308 Effect: v1.TaintEffectNoSchedule,
309 })
311 newData, err := json.Marshal(node)
312 framework.ExpectNoError(err)
314 patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Node{})
315 framework.ExpectNoError(err)
317 reversePatchBytes, err := strategicpatch.CreateTwoWayMergePatch(newData, oldData, v1.Node{})
318 framework.ExpectNoError(err)
320 _, err = c.CoreV1().Nodes().Patch(ctx, node.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
321 framework.ExpectNoError(err)
323 nodeName := node.Name
324 ginkgo.DeferCleanup(func(ctx context.Context) {
325 framework.Logf("removing taint for node %q", nodeName)
326 _, err := c.CoreV1().Nodes().Patch(ctx, nodeName, types.StrategicMergePatchType, reversePatchBytes, metav1.PatchOptions{})
327 framework.ExpectNoError(err)
328 })
329 }
330 }
332 func testRegionalDelayedBinding(ctx context.Context, c clientset.Interface, ns string, pvcCount int) {
333 test := testsuites.StorageClassTest{
334 Client: c,
335 Name: "Regional PD storage class with waitForFirstConsumer test on GCE",
336 Provisioner: "kubernetes.io/gce-pd",
337 Timeouts: framework.NewTimeoutContext(),
338 Parameters: map[string]string{
339 "type": "pd-standard",
340 "replication-type": "regional-pd",
341 },
342 ClaimSize: repdMinSize,
343 DelayBinding: true,
344 }
346 suffix := "delayed-regional"
348 test.Class = testsuites.SetupStorageClass(ctx, test.Client, newStorageClass(test, ns, suffix))
349 var claims []*v1.PersistentVolumeClaim
350 for i := 0; i < pvcCount; i++ {
351 claim := e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{
352 ClaimSize: test.ClaimSize,
353 StorageClassName: &(test.Class.Name),
354 VolumeMode: &test.VolumeMode,
355 }, ns)
356 claims = append(claims, claim)
357 }
358 pvs, node := test.TestBindingWaitForFirstConsumerMultiPVC(ctx, claims, nil , false )
359 if node == nil {
360 framework.Failf("unexpected nil node found")
361 }
362 zone, ok := node.Labels[v1.LabelTopologyZone]
363 if !ok {
364 framework.Failf("label %s not found on Node", v1.LabelTopologyZone)
365 }
366 for _, pv := range pvs {
367 checkZoneFromLabelAndAffinity(pv, zone, false)
368 }
369 }
371 func testRegionalAllowedTopologies(ctx context.Context, c clientset.Interface, ns string) {
372 test := testsuites.StorageClassTest{
373 Name: "Regional PD storage class with allowedTopologies test on GCE",
374 Provisioner: "kubernetes.io/gce-pd",
375 Timeouts: framework.NewTimeoutContext(),
376 Parameters: map[string]string{
377 "type": "pd-standard",
378 "replication-type": "regional-pd",
379 },
380 ClaimSize: repdMinSize,
381 ExpectedSize: repdMinSize,
382 }
384 suffix := "topo-regional"
385 test.Client = c
386 test.Class = testsuites.SetupStorageClass(ctx, test.Client, newStorageClass(test, ns, suffix))
387 zones := getTwoRandomZones(ctx, c)
388 addAllowedTopologiesToStorageClass(c, test.Class, zones)
389 test.Claim = e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{
390 NamePrefix: pvcName,
391 ClaimSize: test.ClaimSize,
392 StorageClassName: &(test.Class.Name),
393 VolumeMode: &test.VolumeMode,
394 }, ns)
396 pv := test.TestDynamicProvisioning(ctx)
397 checkZonesFromLabelAndAffinity(pv, sets.NewString(zones...), true)
398 }
400 func testRegionalAllowedTopologiesWithDelayedBinding(ctx context.Context, c clientset.Interface, ns string, pvcCount int) {
401 test := testsuites.StorageClassTest{
402 Client: c,
403 Timeouts: framework.NewTimeoutContext(),
404 Name: "Regional PD storage class with allowedTopologies and waitForFirstConsumer test on GCE",
405 Provisioner: "kubernetes.io/gce-pd",
406 Parameters: map[string]string{
407 "type": "pd-standard",
408 "replication-type": "regional-pd",
409 },
410 ClaimSize: repdMinSize,
411 DelayBinding: true,
412 }
414 suffix := "topo-delayed-regional"
415 test.Class = testsuites.SetupStorageClass(ctx, test.Client, newStorageClass(test, ns, suffix))
416 topoZones := getTwoRandomZones(ctx, c)
417 addAllowedTopologiesToStorageClass(c, test.Class, topoZones)
418 var claims []*v1.PersistentVolumeClaim
419 for i := 0; i < pvcCount; i++ {
420 claim := e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{
421 ClaimSize: test.ClaimSize,
422 StorageClassName: &(test.Class.Name),
423 VolumeMode: &test.VolumeMode,
424 }, ns)
425 claims = append(claims, claim)
426 }
427 pvs, node := test.TestBindingWaitForFirstConsumerMultiPVC(ctx, claims, nil , false )
428 if node == nil {
429 framework.Failf("unexpected nil node found")
430 }
431 nodeZone, ok := node.Labels[v1.LabelTopologyZone]
432 if !ok {
433 framework.Failf("label %s not found on Node", v1.LabelTopologyZone)
434 }
435 zoneFound := false
436 for _, zone := range topoZones {
437 if zone == nodeZone {
438 zoneFound = true
439 break
440 }
441 }
442 if !zoneFound {
443 framework.Failf("zones specified in AllowedTopologies: %v does not contain zone of node where PV got provisioned: %s", topoZones, nodeZone)
444 }
445 for _, pv := range pvs {
446 checkZonesFromLabelAndAffinity(pv, sets.NewString(topoZones...), true)
447 }
448 }
450 func getPVC(ctx context.Context, c clientset.Interface, ns string, pvcLabels map[string]string) *v1.PersistentVolumeClaim {
451 selector := labels.Set(pvcLabels).AsSelector()
452 options := metav1.ListOptions{LabelSelector: selector.String()}
453 pvcList, err := c.CoreV1().PersistentVolumeClaims(ns).List(ctx, options)
454 framework.ExpectNoError(err)
455 gomega.Expect(pvcList.Items).To(gomega.HaveLen(1), "There should be exactly 1 PVC matched.")
457 return &pvcList.Items[0]
458 }
460 func getPod(ctx context.Context, c clientset.Interface, ns string, podLabels map[string]string) *v1.Pod {
461 selector := labels.Set(podLabels).AsSelector()
462 options := metav1.ListOptions{LabelSelector: selector.String()}
463 podList, err := c.CoreV1().Pods(ns).List(ctx, options)
464 framework.ExpectNoError(err)
465 gomega.Expect(podList.Items).To(gomega.HaveLen(1), "There should be exactly 1 pod matched.")
467 return &podList.Items[0]
468 }
470 func addAllowedTopologiesToStorageClass(c clientset.Interface, sc *storagev1.StorageClass, zones []string) {
471 term := v1.TopologySelectorTerm{
472 MatchLabelExpressions: []v1.TopologySelectorLabelRequirement{
473 {
474 Key: v1.LabelTopologyZone,
475 Values: zones,
476 },
477 },
478 }
479 sc.AllowedTopologies = append(sc.AllowedTopologies, term)
480 }
483 func newStatefulSet(claimTemplate *v1.PersistentVolumeClaim, ns string) (sts *appsv1.StatefulSet, svc *v1.Service, labels map[string]string) {
484 var replicas int32 = 1
485 labels = map[string]string{"app": "regional-pd-workload"}
487 svc = &v1.Service{
488 ObjectMeta: metav1.ObjectMeta{
489 Name: "regional-pd-service",
490 Namespace: ns,
491 Labels: labels,
492 },
493 Spec: v1.ServiceSpec{
494 Ports: []v1.ServicePort{{
495 Port: 80,
496 Name: "web",
497 }},
498 ClusterIP: v1.ClusterIPNone,
499 Selector: labels,
500 },
501 }
503 sts = &appsv1.StatefulSet{
504 ObjectMeta: metav1.ObjectMeta{
505 Name: "regional-pd-sts",
506 Namespace: ns,
507 },
508 Spec: appsv1.StatefulSetSpec{
509 Selector: &metav1.LabelSelector{
510 MatchLabels: labels,
511 },
512 ServiceName: svc.Name,
513 Replicas: &replicas,
514 Template: *newPodTemplate(labels),
515 VolumeClaimTemplates: []v1.PersistentVolumeClaim{*claimTemplate},
516 },
517 }
519 return
520 }
522 func newPodTemplate(labels map[string]string) *v1.PodTemplateSpec {
523 return &v1.PodTemplateSpec{
524 ObjectMeta: metav1.ObjectMeta{
525 Labels: labels,
526 },
527 Spec: v1.PodSpec{
528 Containers: []v1.Container{
531 {
532 Name: "busybox",
533 Image: imageutils.GetE2EImage(imageutils.BusyBox),
534 Command: []string{"sh", "-c"},
535 Args: []string{
536 "echo ${POD_NAME} >> /mnt/data/regional-pd/pods.txt;" +
537 "cat /mnt/data/regional-pd/pods.txt;" +
538 "sleep 3600;",
539 },
540 Env: []v1.EnvVar{{
541 Name: "POD_NAME",
542 ValueFrom: &v1.EnvVarSource{
543 FieldRef: &v1.ObjectFieldSelector{
544 FieldPath: "metadata.name",
545 },
546 },
547 }},
548 Ports: []v1.ContainerPort{{
549 ContainerPort: 80,
550 Name: "web",
551 }},
552 VolumeMounts: []v1.VolumeMount{{
553 Name: pvcName,
554 MountPath: "/mnt/data/regional-pd",
555 }},
556 },
557 },
558 },
559 }
560 }
562 func getTwoRandomZones(ctx context.Context, c clientset.Interface) []string {
563 zones, err := e2enode.GetClusterZones(ctx, c)
564 framework.ExpectNoError(err)
565 gomega.Expect(zones.Len()).To(gomega.BeNumerically(">=", 2),
566 "The test should only be run in multizone clusters.")
568 zone1, _ := zones.PopAny()
569 zone2, _ := zones.PopAny()
570 return []string{zone1, zone2}
571 }
575 func verifyZonesInPV(volume *v1.PersistentVolume, zones sets.String, match bool) error {
576 pvZones, err := volumehelpers.LabelZonesToSet(volume.Labels[v1.LabelTopologyZone])
577 if err != nil {
578 return err
579 }
581 if match && zones.Equal(pvZones) || !match && zones.IsSuperset(pvZones) {
582 return nil
583 }
585 return fmt.Errorf("Zones in StorageClass are %v, but zones in PV are %v", zones, pvZones)
587 }
589 func checkZoneFromLabelAndAffinity(pv *v1.PersistentVolume, zone string, matchZone bool) {
590 checkZonesFromLabelAndAffinity(pv, sets.NewString(zone), matchZone)
591 }
596 func checkZonesFromLabelAndAffinity(pv *v1.PersistentVolume, zones sets.String, matchZones bool) {
597 ginkgo.By("checking PV's zone label and node affinity terms match expected zone")
598 if pv == nil {
599 framework.Failf("nil pv passed")
600 }
601 pvLabel, ok := pv.Labels[v1.LabelTopologyZone]
602 if !ok {
603 framework.Failf("label %s not found on PV", v1.LabelTopologyZone)
604 }
606 zonesFromLabel, err := volumehelpers.LabelZonesToSet(pvLabel)
607 if err != nil {
608 framework.Failf("unable to parse zone labels %s: %v", pvLabel, err)
609 }
610 if matchZones && !zonesFromLabel.Equal(zones) {
611 framework.Failf("value[s] of %s label for PV: %v does not match expected zone[s]: %v", v1.LabelTopologyZone, zonesFromLabel, zones)
612 }
613 if !matchZones && !zonesFromLabel.IsSuperset(zones) {
614 framework.Failf("value[s] of %s label for PV: %v does not contain expected zone[s]: %v", v1.LabelTopologyZone, zonesFromLabel, zones)
615 }
616 if pv.Spec.NodeAffinity == nil {
617 framework.Failf("node affinity not found in PV spec %v", pv.Spec)
618 }
619 if len(pv.Spec.NodeAffinity.Required.NodeSelectorTerms) == 0 {
620 framework.Failf("node selector terms not found in PV spec %v", pv.Spec)
621 }
623 for _, term := range pv.Spec.NodeAffinity.Required.NodeSelectorTerms {
624 keyFound := false
625 for _, r := range term.MatchExpressions {
626 if r.Key != v1.LabelTopologyZone {
627 continue
628 }
629 keyFound = true
630 zonesFromNodeAffinity := sets.NewString(r.Values...)
631 if matchZones && !zonesFromNodeAffinity.Equal(zones) {
632 framework.Failf("zones from NodeAffinity of PV: %v does not equal expected zone[s]: %v", zonesFromNodeAffinity, zones)
633 }
634 if !matchZones && !zonesFromNodeAffinity.IsSuperset(zones) {
635 framework.Failf("zones from NodeAffinity of PV: %v does not contain expected zone[s]: %v", zonesFromNodeAffinity, zones)
636 }
637 break
638 }
639 if !keyFound {
640 framework.Failf("label %s not found in term %v", v1.LabelTopologyZone, term)
641 }
642 }
643 }
646 func waitForStatefulSetReplicasReady(ctx context.Context, statefulSetName, ns string, c clientset.Interface, Poll, timeout time.Duration) error {
647 framework.Logf("Waiting up to %v for StatefulSet %s to have all replicas ready", timeout, statefulSetName)
648 for start := time.Now(); time.Since(start) < timeout; time.Sleep(Poll) {
649 sts, err := c.AppsV1().StatefulSets(ns).Get(ctx, statefulSetName, metav1.GetOptions{})
650 if err != nil {
651 framework.Logf("Get StatefulSet %s failed, ignoring for %v: %v", statefulSetName, Poll, err)
652 continue
653 }
654 if sts.Status.ReadyReplicas == *sts.Spec.Replicas {
655 framework.Logf("All %d replicas of StatefulSet %s are ready. (%v)", sts.Status.ReadyReplicas, statefulSetName, time.Since(start))
656 return nil
657 }
658 framework.Logf("StatefulSet %s found but there are %d ready replicas and %d total replicas.", statefulSetName, sts.Status.ReadyReplicas, *sts.Spec.Replicas)
659 }
660 return fmt.Errorf("StatefulSet %s still has unready pods within %v", statefulSetName, timeout)
661 }
