1
16
17 package storage
18
19 import (
20 "context"
21 "fmt"
22 "sync"
23
24 "github.com/onsi/ginkgo/v2"
25 v1 "k8s.io/api/core/v1"
26 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27 utilerrors "k8s.io/apimachinery/pkg/util/errors"
28 "k8s.io/apimachinery/pkg/util/sets"
29 clientset "k8s.io/client-go/kubernetes"
30 "k8s.io/kubernetes/test/e2e/framework"
31 e2enode "k8s.io/kubernetes/test/e2e/framework/node"
32 e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
33 e2epv "k8s.io/kubernetes/test/e2e/framework/pv"
34 e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
35 "k8s.io/kubernetes/test/e2e/storage/utils"
36 admissionapi "k8s.io/pod-security-admission/api"
37 )
38
39 var _ = utils.SIGDescribe("Multi-AZ Cluster Volumes", func() {
40 f := framework.NewDefaultFramework("multi-az")
41 f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
42 var zoneCount int
43 var err error
44 image := framework.ServeHostnameImage
45 ginkgo.BeforeEach(func(ctx context.Context) {
46 e2eskipper.SkipUnlessProviderIs("gce", "gke")
47 if zoneCount <= 0 {
48 zoneCount, err = getZoneCount(ctx, f.ClientSet)
49 framework.ExpectNoError(err)
50 }
51 ginkgo.By(fmt.Sprintf("Checking for multi-zone cluster. Zone count = %d", zoneCount))
52 msg := fmt.Sprintf("Zone count is %d, only run for multi-zone clusters, skipping test", zoneCount)
53 e2eskipper.SkipUnlessAtLeast(zoneCount, 2, msg)
54
55 })
56 ginkgo.It("should schedule pods in the same zones as statically provisioned PVs", func(ctx context.Context) {
57 PodsUseStaticPVsOrFail(ctx, f, (2*zoneCount)+1, image)
58 })
59 })
60
61
62 func getZoneCount(ctx context.Context, c clientset.Interface) (int, error) {
63 zoneNames, err := e2enode.GetSchedulableClusterZones(ctx, c)
64 if err != nil {
65 return -1, err
66 }
67 return len(zoneNames), nil
68 }
69
70 type staticPVTestConfig struct {
71 pvSource *v1.PersistentVolumeSource
72 pv *v1.PersistentVolume
73 pvc *v1.PersistentVolumeClaim
74 pod *v1.Pod
75 }
76
77
78
79 func PodsUseStaticPVsOrFail(ctx context.Context, f *framework.Framework, podCount int, image string) {
80 var err error
81 c := f.ClientSet
82 ns := f.Namespace.Name
83
84 zones, err := e2enode.GetSchedulableClusterZones(ctx, c)
85 framework.ExpectNoError(err)
86 zonelist := sets.List(zones)
87 ginkgo.By("Creating static PVs across zones")
88 configs := make([]*staticPVTestConfig, podCount)
89 for i := range configs {
90 configs[i] = &staticPVTestConfig{}
91 }
92
93 ginkgo.DeferCleanup(func(ctx context.Context) {
94 ginkgo.By("Cleaning up pods and PVs")
95 for _, config := range configs {
96 e2epod.DeletePodOrFail(ctx, c, ns, config.pod.Name)
97 }
98 var wg sync.WaitGroup
99 wg.Add(len(configs))
100 for i := range configs {
101 go func(config *staticPVTestConfig) {
102 defer ginkgo.GinkgoRecover()
103 defer wg.Done()
104 err := e2epod.WaitForPodNotFoundInNamespace(ctx, c, config.pod.Name, ns, f.Timeouts.PodDelete)
105 framework.ExpectNoError(err, "while waiting for pod to disappear")
106 errs := e2epv.PVPVCCleanup(ctx, c, ns, config.pv, config.pvc)
107 framework.ExpectNoError(utilerrors.NewAggregate(errs), "while cleaning up PVs and PVCs")
108 err = e2epv.DeletePVSource(ctx, config.pvSource)
109 framework.ExpectNoError(err, "while deleting PVSource")
110 }(configs[i])
111 }
112 wg.Wait()
113 })
114
115 for i, config := range configs {
116 zone := zonelist[i%len(zones)]
117 config.pvSource, err = e2epv.CreatePVSource(ctx, zone)
118 framework.ExpectNoError(err)
119
120 pvConfig := e2epv.PersistentVolumeConfig{
121 NamePrefix: "multizone-pv",
122 PVSource: *config.pvSource,
123 Prebind: nil,
124 }
125 className := ""
126 pvcConfig := e2epv.PersistentVolumeClaimConfig{StorageClassName: &className}
127
128 config.pv, config.pvc, err = e2epv.CreatePVPVC(ctx, c, f.Timeouts, pvConfig, pvcConfig, ns, true)
129 framework.ExpectNoError(err)
130 }
131
132 ginkgo.By("Waiting for all PVCs to be bound")
133 for _, config := range configs {
134 e2epv.WaitOnPVandPVC(ctx, c, f.Timeouts, ns, config.pv, config.pvc)
135 }
136
137 ginkgo.By("Creating pods for each static PV")
138 for _, config := range configs {
139 podConfig := e2epod.MakePod(ns, nil, []*v1.PersistentVolumeClaim{config.pvc}, f.NamespacePodSecurityLevel, "")
140 config.pod, err = c.CoreV1().Pods(ns).Create(ctx, podConfig, metav1.CreateOptions{})
141 framework.ExpectNoError(err)
142 }
143
144 ginkgo.By("Waiting for all pods to be running")
145 for _, config := range configs {
146 err = e2epod.WaitForPodRunningInNamespace(ctx, c, config.pod)
147 framework.ExpectNoError(err)
148 }
149 }
150
View as plain text