1
16
17
18
19 package testsuites
20
21 import (
22 "context"
23 "sync"
24
25 "github.com/onsi/ginkgo/v2"
26
27 v1 "k8s.io/api/core/v1"
28 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29 errors "k8s.io/apimachinery/pkg/util/errors"
30 clientset "k8s.io/client-go/kubernetes"
31 "k8s.io/kubernetes/test/e2e/framework"
32 e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
33 e2epv "k8s.io/kubernetes/test/e2e/framework/pv"
34 e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
35 storageframework "k8s.io/kubernetes/test/e2e/storage/framework"
36 admissionapi "k8s.io/pod-security-admission/api"
37 )
38
39 type volumeStressTestSuite struct {
40 tsInfo storageframework.TestSuiteInfo
41 }
42
43 type volumeStressTest struct {
44 config *storageframework.PerTestConfig
45
46 migrationCheck *migrationOpCheck
47
48 volumes []*storageframework.VolumeResource
49 pods []*v1.Pod
50
51 wg sync.WaitGroup
52
53 testOptions storageframework.StressTestOptions
54 }
55
56 var _ storageframework.TestSuite = &volumeStressTestSuite{}
57
58
59
60 func InitCustomVolumeStressTestSuite(patterns []storageframework.TestPattern) storageframework.TestSuite {
61 return &volumeStressTestSuite{
62 tsInfo: storageframework.TestSuiteInfo{
63 Name: "volume-stress",
64 TestPatterns: patterns,
65 },
66 }
67 }
68
69
70
71 func InitVolumeStressTestSuite() storageframework.TestSuite {
72 patterns := []storageframework.TestPattern{
73 storageframework.DefaultFsDynamicPV,
74 storageframework.BlockVolModeDynamicPV,
75 }
76 return InitCustomVolumeStressTestSuite(patterns)
77 }
78
79 func (t *volumeStressTestSuite) GetTestSuiteInfo() storageframework.TestSuiteInfo {
80 return t.tsInfo
81 }
82
83 func (t *volumeStressTestSuite) SkipUnsupportedTests(driver storageframework.TestDriver, pattern storageframework.TestPattern) {
84 dInfo := driver.GetDriverInfo()
85 if dInfo.StressTestOptions == nil {
86 e2eskipper.Skipf("Driver %s doesn't specify stress test options -- skipping", dInfo.Name)
87 }
88 if dInfo.StressTestOptions.NumPods <= 0 {
89 framework.Failf("NumPods in stress test options must be a positive integer, received: %d", dInfo.StressTestOptions.NumPods)
90 }
91 if dInfo.StressTestOptions.NumRestarts <= 0 {
92 framework.Failf("NumRestarts in stress test options must be a positive integer, received: %d", dInfo.StressTestOptions.NumRestarts)
93 }
94
95 if _, ok := driver.(storageframework.DynamicPVTestDriver); !ok {
96 e2eskipper.Skipf("Driver %s doesn't implement DynamicPVTestDriver -- skipping", dInfo.Name)
97 }
98 if !driver.GetDriverInfo().Capabilities[storageframework.CapBlock] && pattern.VolMode == v1.PersistentVolumeBlock {
99 e2eskipper.Skipf("Driver %q does not support block volume mode - skipping", dInfo.Name)
100 }
101 }
102
103 func (t *volumeStressTestSuite) DefineTests(driver storageframework.TestDriver, pattern storageframework.TestPattern) {
104 var (
105 dInfo = driver.GetDriverInfo()
106 cs clientset.Interface
107 l *volumeStressTest
108 )
109
110
111
112 f := framework.NewFrameworkWithCustomTimeouts("stress", storageframework.GetDriverTimeouts(driver))
113 f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
114
115 init := func(ctx context.Context) {
116 cs = f.ClientSet
117 l = &volumeStressTest{}
118
119
120 l.config = driver.PrepareTest(ctx, f)
121 l.migrationCheck = newMigrationOpCheck(ctx, f.ClientSet, f.ClientConfig(), dInfo.InTreePluginName)
122 l.volumes = []*storageframework.VolumeResource{}
123 l.pods = []*v1.Pod{}
124 l.testOptions = *dInfo.StressTestOptions
125 }
126
127 createPodsAndVolumes := func(ctx context.Context) {
128 for i := 0; i < l.testOptions.NumPods; i++ {
129 framework.Logf("Creating resources for pod %v/%v", i, l.testOptions.NumPods-1)
130 r := storageframework.CreateVolumeResource(ctx, driver, l.config, pattern, t.GetTestSuiteInfo().SupportedSizeRange)
131 l.volumes = append(l.volumes, r)
132 podConfig := e2epod.Config{
133 NS: f.Namespace.Name,
134 PVCs: []*v1.PersistentVolumeClaim{r.Pvc},
135 SeLinuxLabel: e2epv.SELinuxLabel,
136 }
137 pod, err := e2epod.MakeSecPod(&podConfig)
138 framework.ExpectNoError(err)
139
140 l.pods = append(l.pods, pod)
141 }
142 }
143
144 cleanup := func(ctx context.Context) {
145 framework.Logf("Stopping and waiting for all test routines to finish")
146 l.wg.Wait()
147
148 var (
149 errs []error
150 mu sync.Mutex
151 wg sync.WaitGroup
152 )
153
154 wg.Add(len(l.pods))
155 for _, pod := range l.pods {
156 go func(pod *v1.Pod) {
157 defer ginkgo.GinkgoRecover()
158 defer wg.Done()
159
160 framework.Logf("Deleting pod %v", pod.Name)
161 err := e2epod.DeletePodWithWait(ctx, cs, pod)
162 mu.Lock()
163 defer mu.Unlock()
164 errs = append(errs, err)
165 }(pod)
166 }
167 wg.Wait()
168
169 wg.Add(len(l.volumes))
170 for _, volume := range l.volumes {
171 go func(volume *storageframework.VolumeResource) {
172 defer ginkgo.GinkgoRecover()
173 defer wg.Done()
174
175 framework.Logf("Deleting volume %s", volume.Pvc.GetName())
176 err := volume.CleanupResource(ctx)
177 mu.Lock()
178 defer mu.Unlock()
179 errs = append(errs, err)
180 }(volume)
181 }
182 wg.Wait()
183
184 framework.ExpectNoError(errors.NewAggregate(errs), "while cleaning up resource")
185 l.migrationCheck.validateMigrationVolumeOpCounts(ctx)
186 }
187
188 f.It("multiple pods should access different volumes repeatedly", f.WithSlow(), f.WithSerial(), func(ctx context.Context) {
189 init(ctx)
190 ginkgo.DeferCleanup(cleanup)
191 createPodsAndVolumes(ctx)
192
193 for i := 0; i < l.testOptions.NumPods; i++ {
194 podIndex := i
195 l.wg.Add(1)
196 go func() {
197 defer ginkgo.GinkgoRecover()
198 defer l.wg.Done()
199 for j := 0; j < l.testOptions.NumRestarts; j++ {
200 select {
201 case <-ctx.Done():
202
203
204
205
206
207
208
209
210 return
211 default:
212 pod := l.pods[podIndex]
213 framework.Logf("Pod-%v [%v], Iteration %v/%v", podIndex, pod.Name, j, l.testOptions.NumRestarts-1)
214 _, err := cs.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{})
215 if err != nil {
216 framework.Failf("Failed to create pod-%v [%+v]. Error: %v", podIndex, pod, err)
217 }
218
219 err = e2epod.WaitTimeoutForPodRunningInNamespace(ctx, cs, pod.Name, pod.Namespace, f.Timeouts.PodStart)
220 if err != nil {
221 framework.Failf("Failed to wait for pod-%v [%+v] turn into running status. Error: %v", podIndex, pod, err)
222 }
223
224
225
226 err = e2epod.DeletePodWithWait(ctx, f.ClientSet, pod)
227 if err != nil {
228 framework.Failf("Failed to delete pod-%v [%+v]. Error: %v", podIndex, pod, err)
229 }
230 }
231 }
232 }()
233 }
234
235 l.wg.Wait()
236 })
237 }
238
View as plain text