1
16
17 package statefulset
18
19 import (
20 "context"
21 "errors"
22 "fmt"
23 "strings"
24 "testing"
25 "time"
26
27 apps "k8s.io/api/apps/v1"
28 v1 "k8s.io/api/core/v1"
29 apierrors "k8s.io/apimachinery/pkg/api/errors"
30 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
31 "k8s.io/apimachinery/pkg/runtime"
32 "k8s.io/apimachinery/pkg/types"
33 utilfeature "k8s.io/apiserver/pkg/util/feature"
34 "k8s.io/client-go/kubernetes/fake"
35 corelisters "k8s.io/client-go/listers/core/v1"
36 core "k8s.io/client-go/testing"
37 "k8s.io/client-go/tools/cache"
38 "k8s.io/client-go/tools/record"
39 featuregatetesting "k8s.io/component-base/featuregate/testing"
40 "k8s.io/klog/v2/ktesting"
41 _ "k8s.io/kubernetes/pkg/apis/apps/install"
42 _ "k8s.io/kubernetes/pkg/apis/core/install"
43 "k8s.io/kubernetes/pkg/features"
44 )
45
46 func TestStatefulPodControlCreatesPods(t *testing.T) {
47 recorder := record.NewFakeRecorder(10)
48 set := newStatefulSet(3)
49 pod := newStatefulSetPod(set, 0)
50 fakeClient := &fake.Clientset{}
51 claimIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
52 claimLister := corelisters.NewPersistentVolumeClaimLister(claimIndexer)
53 control := NewStatefulPodControl(fakeClient, nil, claimLister, recorder)
54 fakeClient.AddReactor("get", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
55 return true, nil, apierrors.NewNotFound(action.GetResource().GroupResource(), action.GetResource().Resource)
56 })
57 fakeClient.AddReactor("create", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
58 create := action.(core.CreateAction)
59 claimIndexer.Add(create.GetObject())
60 return true, create.GetObject(), nil
61 })
62 fakeClient.AddReactor("create", "pods", func(action core.Action) (bool, runtime.Object, error) {
63 create := action.(core.CreateAction)
64 return true, create.GetObject(), nil
65 })
66 if err := control.CreateStatefulPod(context.TODO(), set, pod); err != nil {
67 t.Errorf("StatefulPodControl failed to create Pod error: %s", err)
68 }
69 events := collectEvents(recorder.Events)
70 if eventCount := len(events); eventCount != 2 {
71 t.Errorf("Expected 2 events for successful create found %d", eventCount)
72 }
73 for i := range events {
74 if !strings.Contains(events[i], v1.EventTypeNormal) {
75 t.Errorf("Found unexpected non-normal event %s", events[i])
76 }
77 }
78 }
79
80 func TestStatefulPodControlCreatePodExists(t *testing.T) {
81 recorder := record.NewFakeRecorder(10)
82 set := newStatefulSet(3)
83 pod := newStatefulSetPod(set, 0)
84 fakeClient := &fake.Clientset{}
85 pvcs := getPersistentVolumeClaims(set, pod)
86 pvcIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
87 for k := range pvcs {
88 pvc := pvcs[k]
89 pvcIndexer.Add(&pvc)
90 }
91 pvcLister := corelisters.NewPersistentVolumeClaimLister(pvcIndexer)
92 control := NewStatefulPodControl(fakeClient, nil, pvcLister, recorder)
93 fakeClient.AddReactor("create", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
94 create := action.(core.CreateAction)
95 return true, create.GetObject(), nil
96 })
97 fakeClient.AddReactor("create", "pods", func(action core.Action) (bool, runtime.Object, error) {
98 return true, pod, apierrors.NewAlreadyExists(action.GetResource().GroupResource(), pod.Name)
99 })
100 if err := control.CreateStatefulPod(context.TODO(), set, pod); !apierrors.IsAlreadyExists(err) {
101 t.Errorf("Failed to create Pod error: %s", err)
102 }
103 events := collectEvents(recorder.Events)
104 if eventCount := len(events); eventCount != 0 {
105 t.Errorf("Pod and PVC exist: got %d events, but want 0", eventCount)
106 for i := range events {
107 t.Log(events[i])
108 }
109 }
110 }
111
112 func TestStatefulPodControlCreatePodPvcCreateFailure(t *testing.T) {
113 recorder := record.NewFakeRecorder(10)
114 set := newStatefulSet(3)
115 pod := newStatefulSetPod(set, 0)
116 fakeClient := &fake.Clientset{}
117 pvcIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
118 pvcLister := corelisters.NewPersistentVolumeClaimLister(pvcIndexer)
119 control := NewStatefulPodControl(fakeClient, nil, pvcLister, recorder)
120 fakeClient.AddReactor("create", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
121 return true, nil, apierrors.NewInternalError(errors.New("API server down"))
122 })
123 fakeClient.AddReactor("create", "pods", func(action core.Action) (bool, runtime.Object, error) {
124 create := action.(core.CreateAction)
125 return true, create.GetObject(), nil
126 })
127 if err := control.CreateStatefulPod(context.TODO(), set, pod); err == nil {
128 t.Error("Failed to produce error on PVC creation failure")
129 }
130 events := collectEvents(recorder.Events)
131 if eventCount := len(events); eventCount != 2 {
132 t.Errorf("PVC create failure: got %d events, but want 2", eventCount)
133 }
134 for i := range events {
135 if !strings.Contains(events[i], v1.EventTypeWarning) {
136 t.Errorf("Found unexpected non-warning event %s", events[i])
137 }
138 }
139 }
140 func TestStatefulPodControlCreatePodPVCDeleting(t *testing.T) {
141 recorder := record.NewFakeRecorder(10)
142 set := newStatefulSet(3)
143 pod := newStatefulSetPod(set, 0)
144 fakeClient := &fake.Clientset{}
145 pvcs := getPersistentVolumeClaims(set, pod)
146 pvcIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
147 deleteTime := time.Date(2019, time.January, 1, 0, 0, 0, 0, time.UTC)
148 for k := range pvcs {
149 pvc := pvcs[k]
150 pvc.DeletionTimestamp = &metav1.Time{Time: deleteTime}
151 pvcIndexer.Add(&pvc)
152 }
153 pvcLister := corelisters.NewPersistentVolumeClaimLister(pvcIndexer)
154 control := NewStatefulPodControl(fakeClient, nil, pvcLister, recorder)
155 fakeClient.AddReactor("create", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
156 create := action.(core.CreateAction)
157 return true, create.GetObject(), nil
158 })
159 fakeClient.AddReactor("create", "pods", func(action core.Action) (bool, runtime.Object, error) {
160 create := action.(core.CreateAction)
161 return true, create.GetObject(), nil
162 })
163 if err := control.CreateStatefulPod(context.TODO(), set, pod); err == nil {
164 t.Error("Failed to produce error on deleting PVC")
165 }
166 events := collectEvents(recorder.Events)
167 if eventCount := len(events); eventCount != 1 {
168 t.Errorf("Deleting PVC: got %d events, but want 1", eventCount)
169 }
170 for i := range events {
171 if !strings.Contains(events[i], v1.EventTypeWarning) {
172 t.Errorf("Found unexpected non-warning event %s", events[i])
173 }
174 }
175 }
176
177 type fakeIndexer struct {
178 cache.Indexer
179 getError error
180 }
181
182 func (f *fakeIndexer) GetByKey(key string) (interface{}, bool, error) {
183 return nil, false, f.getError
184 }
185
186 func TestStatefulPodControlCreatePodPvcGetFailure(t *testing.T) {
187 recorder := record.NewFakeRecorder(10)
188 set := newStatefulSet(3)
189 pod := newStatefulSetPod(set, 0)
190 fakeClient := &fake.Clientset{}
191 pvcIndexer := &fakeIndexer{getError: errors.New("API server down")}
192 pvcLister := corelisters.NewPersistentVolumeClaimLister(pvcIndexer)
193 control := NewStatefulPodControl(fakeClient, nil, pvcLister, recorder)
194 fakeClient.AddReactor("create", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
195 return true, nil, apierrors.NewInternalError(errors.New("API server down"))
196 })
197 fakeClient.AddReactor("create", "pods", func(action core.Action) (bool, runtime.Object, error) {
198 create := action.(core.CreateAction)
199 return true, create.GetObject(), nil
200 })
201 if err := control.CreateStatefulPod(context.TODO(), set, pod); err == nil {
202 t.Error("Failed to produce error on PVC creation failure")
203 }
204 events := collectEvents(recorder.Events)
205 if eventCount := len(events); eventCount != 2 {
206 t.Errorf("PVC create failure: got %d events, but want 2", eventCount)
207 }
208 for i := range events {
209 if !strings.Contains(events[i], v1.EventTypeWarning) {
210 t.Errorf("Found unexpected non-warning event: %s", events[i])
211 }
212 }
213 }
214
215 func TestStatefulPodControlCreatePodFailed(t *testing.T) {
216 recorder := record.NewFakeRecorder(10)
217 set := newStatefulSet(3)
218 pod := newStatefulSetPod(set, 0)
219 fakeClient := &fake.Clientset{}
220 pvcIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
221 pvcLister := corelisters.NewPersistentVolumeClaimLister(pvcIndexer)
222 control := NewStatefulPodControl(fakeClient, nil, pvcLister, recorder)
223 fakeClient.AddReactor("create", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
224 create := action.(core.CreateAction)
225 return true, create.GetObject(), nil
226 })
227 fakeClient.AddReactor("create", "pods", func(action core.Action) (bool, runtime.Object, error) {
228 return true, nil, apierrors.NewInternalError(errors.New("API server down"))
229 })
230 if err := control.CreateStatefulPod(context.TODO(), set, pod); err == nil {
231 t.Error("Failed to produce error on Pod creation failure")
232 }
233 events := collectEvents(recorder.Events)
234 if eventCount := len(events); eventCount != 2 {
235 t.Errorf("Pod create failed: got %d events, but want 2", eventCount)
236 } else if !strings.Contains(events[0], v1.EventTypeNormal) {
237 t.Errorf("Found unexpected non-normal event %s", events[0])
238
239 } else if !strings.Contains(events[1], v1.EventTypeWarning) {
240 t.Errorf("Found unexpected non-warning event %s", events[1])
241 }
242 }
243
244 func TestStatefulPodControlNoOpUpdate(t *testing.T) {
245 _, ctx := ktesting.NewTestContext(t)
246 recorder := record.NewFakeRecorder(10)
247 set := newStatefulSet(3)
248 pod := newStatefulSetPod(set, 0)
249 fakeClient := &fake.Clientset{}
250 claims := getPersistentVolumeClaims(set, pod)
251 indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
252 for k := range claims {
253 claim := claims[k]
254 indexer.Add(&claim)
255 }
256 claimLister := corelisters.NewPersistentVolumeClaimLister(indexer)
257 control := NewStatefulPodControl(fakeClient, nil, claimLister, recorder)
258 fakeClient.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) {
259 t.Error("no-op update should not make any client invocation")
260 return true, nil, apierrors.NewInternalError(errors.New("if we are here we have a problem"))
261 })
262 if err := control.UpdateStatefulPod(ctx, set, pod); err != nil {
263 t.Errorf("Error returned on no-op update error: %s", err)
264 }
265 events := collectEvents(recorder.Events)
266 if eventCount := len(events); eventCount != 0 {
267 t.Errorf("no-op update: got %d events, but want 0", eventCount)
268 }
269 }
270
271 func TestStatefulPodControlUpdatesIdentity(t *testing.T) {
272 _, ctx := ktesting.NewTestContext(t)
273 recorder := record.NewFakeRecorder(10)
274 set := newStatefulSet(3)
275 pod := newStatefulSetPod(set, 0)
276 fakeClient := fake.NewSimpleClientset(set, pod)
277 indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
278 claimLister := corelisters.NewPersistentVolumeClaimLister(indexer)
279 control := NewStatefulPodControl(fakeClient, nil, claimLister, recorder)
280 var updated *v1.Pod
281 fakeClient.PrependReactor("update", "pods", func(action core.Action) (bool, runtime.Object, error) {
282 update := action.(core.UpdateAction)
283 updated = update.GetObject().(*v1.Pod)
284 return true, update.GetObject(), nil
285 })
286 pod.Name = "goo-0"
287 if err := control.UpdateStatefulPod(ctx, set, pod); err != nil {
288 t.Errorf("Successful update returned an error: %s", err)
289 }
290 events := collectEvents(recorder.Events)
291 if eventCount := len(events); eventCount != 1 {
292 t.Errorf("Pod update successful:got %d events,but want 1", eventCount)
293 } else if !strings.Contains(events[0], v1.EventTypeNormal) {
294 t.Errorf("Found unexpected non-normal event %s", events[0])
295 }
296 if !identityMatches(set, updated) {
297 t.Error("Name update failed identity does not match")
298 }
299 }
300
301 func TestStatefulPodControlUpdateIdentityFailure(t *testing.T) {
302 _, ctx := ktesting.NewTestContext(t)
303 recorder := record.NewFakeRecorder(10)
304 set := newStatefulSet(3)
305 pod := newStatefulSetPod(set, 0)
306 fakeClient := &fake.Clientset{}
307 podIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
308 gooPod := newStatefulSetPod(set, 0)
309 gooPod.Name = "goo-0"
310 podIndexer.Add(gooPod)
311 podLister := corelisters.NewPodLister(podIndexer)
312 claimIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
313 claimLister := corelisters.NewPersistentVolumeClaimLister(claimIndexer)
314 control := NewStatefulPodControl(fakeClient, podLister, claimLister, recorder)
315 fakeClient.AddReactor("update", "pods", func(action core.Action) (bool, runtime.Object, error) {
316 pod.Name = "goo-0"
317 return true, nil, apierrors.NewInternalError(errors.New("API server down"))
318 })
319 pod.Name = "goo-0"
320 if err := control.UpdateStatefulPod(ctx, set, pod); err == nil {
321 t.Error("Failed update does not generate an error")
322 }
323 events := collectEvents(recorder.Events)
324 if eventCount := len(events); eventCount != 1 {
325 t.Errorf("Pod update failed: got %d events, but want 1", eventCount)
326 } else if !strings.Contains(events[0], v1.EventTypeWarning) {
327 t.Errorf("Found unexpected non-warning event %s", events[0])
328 }
329 if identityMatches(set, pod) {
330 t.Error("Failed update mutated Pod identity")
331 }
332 }
333
334 func TestStatefulPodControlUpdatesPodStorage(t *testing.T) {
335 _, ctx := ktesting.NewTestContext(t)
336 recorder := record.NewFakeRecorder(10)
337 set := newStatefulSet(3)
338 pod := newStatefulSetPod(set, 0)
339 fakeClient := &fake.Clientset{}
340 pvcIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
341 pvcLister := corelisters.NewPersistentVolumeClaimLister(pvcIndexer)
342 control := NewStatefulPodControl(fakeClient, nil, pvcLister, recorder)
343 pvcs := getPersistentVolumeClaims(set, pod)
344 volumes := make([]v1.Volume, 0, len(pod.Spec.Volumes))
345 for i := range pod.Spec.Volumes {
346 if _, contains := pvcs[pod.Spec.Volumes[i].Name]; !contains {
347 volumes = append(volumes, pod.Spec.Volumes[i])
348 }
349 }
350 pod.Spec.Volumes = volumes
351 fakeClient.AddReactor("update", "pods", func(action core.Action) (bool, runtime.Object, error) {
352 update := action.(core.UpdateAction)
353 return true, update.GetObject(), nil
354 })
355 fakeClient.AddReactor("create", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
356 update := action.(core.UpdateAction)
357 return true, update.GetObject(), nil
358 })
359 var updated *v1.Pod
360 fakeClient.PrependReactor("update", "pods", func(action core.Action) (bool, runtime.Object, error) {
361 update := action.(core.UpdateAction)
362 updated = update.GetObject().(*v1.Pod)
363 return true, update.GetObject(), nil
364 })
365 if err := control.UpdateStatefulPod(ctx, set, pod); err != nil {
366 t.Errorf("Successful update returned an error: %s", err)
367 }
368 events := collectEvents(recorder.Events)
369 if eventCount := len(events); eventCount != 2 {
370 t.Errorf("Pod storage update successful: got %d events, but want 2", eventCount)
371 }
372 for i := range events {
373 if !strings.Contains(events[i], v1.EventTypeNormal) {
374 t.Errorf("Found unexpected non-normal event %s", events[i])
375 }
376 }
377 if !storageMatches(set, updated) {
378 t.Error("Name update failed identity does not match")
379 }
380 }
381
382 func TestStatefulPodControlUpdatePodStorageFailure(t *testing.T) {
383 _, ctx := ktesting.NewTestContext(t)
384 recorder := record.NewFakeRecorder(10)
385 set := newStatefulSet(3)
386 pod := newStatefulSetPod(set, 0)
387 fakeClient := &fake.Clientset{}
388 pvcIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
389 pvcLister := corelisters.NewPersistentVolumeClaimLister(pvcIndexer)
390 control := NewStatefulPodControl(fakeClient, nil, pvcLister, recorder)
391 pvcs := getPersistentVolumeClaims(set, pod)
392 volumes := make([]v1.Volume, 0, len(pod.Spec.Volumes))
393 for i := range pod.Spec.Volumes {
394 if _, contains := pvcs[pod.Spec.Volumes[i].Name]; !contains {
395 volumes = append(volumes, pod.Spec.Volumes[i])
396 }
397 }
398 pod.Spec.Volumes = volumes
399 fakeClient.AddReactor("update", "pods", func(action core.Action) (bool, runtime.Object, error) {
400 update := action.(core.UpdateAction)
401 return true, update.GetObject(), nil
402 })
403 fakeClient.AddReactor("create", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
404 return true, nil, apierrors.NewInternalError(errors.New("API server down"))
405 })
406 if err := control.UpdateStatefulPod(ctx, set, pod); err == nil {
407 t.Error("Failed Pod storage update did not return an error")
408 }
409 events := collectEvents(recorder.Events)
410 if eventCount := len(events); eventCount != 2 {
411 t.Errorf("Pod storage update failed: got %d events, but want 2", eventCount)
412 }
413 for i := range events {
414 if !strings.Contains(events[i], v1.EventTypeWarning) {
415 t.Errorf("Found unexpected non-normal event %s", events[i])
416 }
417 }
418 }
419
420 func TestStatefulPodControlUpdatePodConflictSuccess(t *testing.T) {
421 _, ctx := ktesting.NewTestContext(t)
422 recorder := record.NewFakeRecorder(10)
423 set := newStatefulSet(3)
424 pod := newStatefulSetPod(set, 0)
425 fakeClient := &fake.Clientset{}
426 podIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
427 podLister := corelisters.NewPodLister(podIndexer)
428 claimIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
429 claimLister := corelisters.NewPersistentVolumeClaimLister(podIndexer)
430 gooPod := newStatefulSetPod(set, 0)
431 gooPod.Labels[apps.StatefulSetPodNameLabel] = "goo-starts"
432 podIndexer.Add(gooPod)
433 claims := getPersistentVolumeClaims(set, gooPod)
434 for k := range claims {
435 claim := claims[k]
436 claimIndexer.Add(&claim)
437 }
438 control := NewStatefulPodControl(fakeClient, podLister, claimLister, recorder)
439 conflict := false
440 fakeClient.AddReactor("update", "pods", func(action core.Action) (bool, runtime.Object, error) {
441 update := action.(core.UpdateAction)
442 if !conflict {
443 conflict = true
444 return true, update.GetObject(), apierrors.NewConflict(action.GetResource().GroupResource(), pod.Name, errors.New("conflict"))
445 }
446 return true, update.GetObject(), nil
447
448 })
449 pod.Labels[apps.StatefulSetPodNameLabel] = "goo-0"
450 if err := control.UpdateStatefulPod(ctx, set, pod); err != nil {
451 t.Errorf("Successful update returned an error: %s", err)
452 }
453 events := collectEvents(recorder.Events)
454 if eventCount := len(events); eventCount != 1 {
455 t.Errorf("Pod update successful: got %d, but want 1", eventCount)
456 } else if !strings.Contains(events[0], v1.EventTypeNormal) {
457 t.Errorf("Found unexpected non-normal event %s", events[0])
458 }
459 if !identityMatches(set, pod) {
460 t.Error("Name update failed identity does not match")
461 }
462 }
463
464 func TestStatefulPodControlDeletesStatefulPod(t *testing.T) {
465 recorder := record.NewFakeRecorder(10)
466 set := newStatefulSet(3)
467 pod := newStatefulSetPod(set, 0)
468 fakeClient := &fake.Clientset{}
469 control := NewStatefulPodControl(fakeClient, nil, nil, recorder)
470 fakeClient.AddReactor("delete", "pods", func(action core.Action) (bool, runtime.Object, error) {
471 return true, nil, nil
472 })
473 if err := control.DeleteStatefulPod(set, pod); err != nil {
474 t.Errorf("Error returned on successful delete: %s", err)
475 }
476 events := collectEvents(recorder.Events)
477 if eventCount := len(events); eventCount != 1 {
478 t.Errorf("delete successful: got %d events, but want 1", eventCount)
479 } else if !strings.Contains(events[0], v1.EventTypeNormal) {
480 t.Errorf("Found unexpected non-normal event %s", events[0])
481 }
482 }
483
484 func TestStatefulPodControlDeleteFailure(t *testing.T) {
485 recorder := record.NewFakeRecorder(10)
486 set := newStatefulSet(3)
487 pod := newStatefulSetPod(set, 0)
488 fakeClient := &fake.Clientset{}
489 control := NewStatefulPodControl(fakeClient, nil, nil, recorder)
490 fakeClient.AddReactor("delete", "pods", func(action core.Action) (bool, runtime.Object, error) {
491 return true, nil, apierrors.NewInternalError(errors.New("API server down"))
492 })
493 if err := control.DeleteStatefulPod(set, pod); err == nil {
494 t.Error("Failed to return error on failed delete")
495 }
496 events := collectEvents(recorder.Events)
497 if eventCount := len(events); eventCount != 1 {
498 t.Errorf("delete failed: got %d events, but want 1", eventCount)
499 } else if !strings.Contains(events[0], v1.EventTypeWarning) {
500 t.Errorf("Found unexpected non-warning event %s", events[0])
501 }
502 }
503
504 func TestStatefulPodControlClaimsMatchDeletionPolcy(t *testing.T) {
505
506
507 _, ctx := ktesting.NewTestContext(t)
508 fakeClient := &fake.Clientset{}
509 indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
510 claimLister := corelisters.NewPersistentVolumeClaimLister(indexer)
511 set := newStatefulSet(3)
512 pod := newStatefulSetPod(set, 0)
513 claims := getPersistentVolumeClaims(set, pod)
514 for k := range claims {
515 claim := claims[k]
516 indexer.Add(&claim)
517 }
518 control := NewStatefulPodControl(fakeClient, nil, claimLister, &noopRecorder{})
519 set.Spec.PersistentVolumeClaimRetentionPolicy = &apps.StatefulSetPersistentVolumeClaimRetentionPolicy{
520 WhenDeleted: apps.RetainPersistentVolumeClaimRetentionPolicyType,
521 WhenScaled: apps.RetainPersistentVolumeClaimRetentionPolicyType,
522 }
523 if matches, err := control.ClaimsMatchRetentionPolicy(ctx, set, pod); err != nil {
524 t.Errorf("Unexpected error for ClaimsMatchRetentionPolicy (retain): %v", err)
525 } else if !matches {
526 t.Error("Unexpected non-match for ClaimsMatchRetentionPolicy (retain)")
527 }
528 set.Spec.PersistentVolumeClaimRetentionPolicy = &apps.StatefulSetPersistentVolumeClaimRetentionPolicy{
529 WhenDeleted: apps.DeletePersistentVolumeClaimRetentionPolicyType,
530 WhenScaled: apps.RetainPersistentVolumeClaimRetentionPolicyType,
531 }
532 if matches, err := control.ClaimsMatchRetentionPolicy(ctx, set, pod); err != nil {
533 t.Errorf("Unexpected error for ClaimsMatchRetentionPolicy (set deletion): %v", err)
534 } else if matches {
535 t.Error("Unexpected match for ClaimsMatchRetentionPolicy (set deletion)")
536 }
537 }
538
539 func TestStatefulPodControlUpdatePodClaimForRetentionPolicy(t *testing.T) {
540
541
542 testFn := func(t *testing.T) {
543 _, ctx := ktesting.NewTestContext(t)
544 defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetAutoDeletePVC, true)()
545 fakeClient := &fake.Clientset{}
546 indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
547 claimLister := corelisters.NewPersistentVolumeClaimLister(indexer)
548 fakeClient.AddReactor("update", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
549 update := action.(core.UpdateAction)
550 indexer.Update(update.GetObject())
551 return true, update.GetObject(), nil
552 })
553 set := newStatefulSet(3)
554 set.GetObjectMeta().SetUID("set-123")
555 pod := newStatefulSetPod(set, 0)
556 claims := getPersistentVolumeClaims(set, pod)
557 for k := range claims {
558 claim := claims[k]
559 indexer.Add(&claim)
560 }
561 control := NewStatefulPodControl(fakeClient, nil, claimLister, &noopRecorder{})
562 set.Spec.PersistentVolumeClaimRetentionPolicy = &apps.StatefulSetPersistentVolumeClaimRetentionPolicy{
563 WhenDeleted: apps.DeletePersistentVolumeClaimRetentionPolicyType,
564 WhenScaled: apps.RetainPersistentVolumeClaimRetentionPolicyType,
565 }
566 if err := control.UpdatePodClaimForRetentionPolicy(ctx, set, pod); err != nil {
567 t.Errorf("Unexpected error for UpdatePodClaimForRetentionPolicy (retain): %v", err)
568 }
569 expectRef := utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC)
570 for k := range claims {
571 claim, err := claimLister.PersistentVolumeClaims(claims[k].Namespace).Get(claims[k].Name)
572 if err != nil {
573 t.Errorf("Unexpected error getting Claim %s/%s: %v", claim.Namespace, claim.Name, err)
574 }
575 if hasOwnerRef(claim, set) != expectRef {
576 t.Errorf("Claim %s/%s bad set owner ref", claim.Namespace, claim.Name)
577 }
578 }
579 }
580 t.Run("StatefulSetAutoDeletePVCEnabled", func(t *testing.T) {
581 defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetAutoDeletePVC, true)()
582 testFn(t)
583 })
584 t.Run("StatefulSetAutoDeletePVCDisabled", func(t *testing.T) {
585 defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetAutoDeletePVC, false)()
586 testFn(t)
587 })
588 }
589
590 func TestPodClaimIsStale(t *testing.T) {
591 const missing = "missing"
592 const exists = "exists"
593 const stale = "stale"
594 const withRef = "with-ref"
595 testCases := []struct {
596 name string
597 claimStates []string
598 expected bool
599 skipPodUID bool
600 }{
601 {
602 name: "all missing",
603 claimStates: []string{missing, missing},
604 expected: false,
605 },
606 {
607 name: "no claims",
608 claimStates: []string{},
609 expected: false,
610 },
611 {
612 name: "exists",
613 claimStates: []string{missing, exists},
614 expected: false,
615 },
616 {
617 name: "all refs",
618 claimStates: []string{withRef, withRef},
619 expected: false,
620 },
621 {
622 name: "stale & exists",
623 claimStates: []string{stale, exists},
624 expected: true,
625 },
626 {
627 name: "stale & missing",
628 claimStates: []string{stale, missing},
629 expected: true,
630 },
631 {
632 name: "withRef & stale",
633 claimStates: []string{withRef, stale},
634 expected: true,
635 },
636 {
637 name: "withRef, no UID",
638 claimStates: []string{withRef},
639 skipPodUID: true,
640 expected: true,
641 },
642 }
643 for _, tc := range testCases {
644 set := apps.StatefulSet{}
645 set.Name = "set"
646 set.Namespace = "default"
647 set.Spec.PersistentVolumeClaimRetentionPolicy = &apps.StatefulSetPersistentVolumeClaimRetentionPolicy{
648 WhenDeleted: apps.RetainPersistentVolumeClaimRetentionPolicyType,
649 WhenScaled: apps.DeletePersistentVolumeClaimRetentionPolicyType,
650 }
651 set.Spec.Selector = &metav1.LabelSelector{MatchLabels: map[string]string{"key": "value"}}
652 claimIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
653 for i, claimState := range tc.claimStates {
654 claim := v1.PersistentVolumeClaim{}
655 claim.Name = fmt.Sprintf("claim-%d", i)
656 set.Spec.VolumeClaimTemplates = append(set.Spec.VolumeClaimTemplates, claim)
657 claim.Name = fmt.Sprintf("%s-set-3", claim.Name)
658 claim.Namespace = set.Namespace
659 switch claimState {
660 case missing:
661
662 case exists:
663 claimIndexer.Add(&claim)
664 case stale:
665 claim.SetOwnerReferences([]metav1.OwnerReference{
666 {Name: "set-3", UID: types.UID("stale")},
667 })
668 claimIndexer.Add(&claim)
669 case withRef:
670 claim.SetOwnerReferences([]metav1.OwnerReference{
671 {Name: "set-3", UID: types.UID("123")},
672 })
673 claimIndexer.Add(&claim)
674 }
675 }
676 pod := v1.Pod{}
677 pod.Name = "set-3"
678 if !tc.skipPodUID {
679 pod.SetUID("123")
680 }
681 claimLister := corelisters.NewPersistentVolumeClaimLister(claimIndexer)
682 control := NewStatefulPodControl(&fake.Clientset{}, nil, claimLister, &noopRecorder{})
683 expected := tc.expected
684
685 if stale, _ := control.PodClaimIsStale(&set, &pod); stale != expected {
686 t.Errorf("unexpected stale for %s", tc.name)
687 }
688 }
689 }
690
691 func TestStatefulPodControlRetainDeletionPolicyUpdate(t *testing.T) {
692 testFn := func(t *testing.T) {
693 _, ctx := ktesting.NewTestContext(t)
694 recorder := record.NewFakeRecorder(10)
695 set := newStatefulSet(1)
696 set.Spec.PersistentVolumeClaimRetentionPolicy = &apps.StatefulSetPersistentVolumeClaimRetentionPolicy{
697 WhenDeleted: apps.RetainPersistentVolumeClaimRetentionPolicyType,
698 WhenScaled: apps.RetainPersistentVolumeClaimRetentionPolicyType,
699 }
700 pod := newStatefulSetPod(set, 0)
701 fakeClient := &fake.Clientset{}
702 podIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
703 podLister := corelisters.NewPodLister(podIndexer)
704 claimIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
705 claimLister := corelisters.NewPersistentVolumeClaimLister(claimIndexer)
706 podIndexer.Add(pod)
707 claims := getPersistentVolumeClaims(set, pod)
708 if len(claims) < 1 {
709 t.Errorf("Unexpected missing PVCs")
710 }
711 for k := range claims {
712 claim := claims[k]
713 setOwnerRef(&claim, set, &set.TypeMeta)
714 claimIndexer.Add(&claim)
715 }
716 control := NewStatefulPodControl(fakeClient, podLister, claimLister, recorder)
717 if err := control.UpdateStatefulPod(ctx, set, pod); err != nil {
718 t.Errorf("Successful update returned an error: %s", err)
719 }
720 for k := range claims {
721 claim := claims[k]
722 if hasOwnerRef(&claim, set) {
723 t.Errorf("ownerRef not removed: %s/%s", claim.Namespace, claim.Name)
724 }
725 }
726 events := collectEvents(recorder.Events)
727 if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) {
728 if eventCount := len(events); eventCount != 1 {
729 t.Errorf("delete failed: got %d events, but want 1", eventCount)
730 }
731 } else {
732 if len(events) != 0 {
733 t.Errorf("delete failed: expected no events, but got %v", events)
734 }
735 }
736 }
737 t.Run("StatefulSetAutoDeletePVCEnabled", func(t *testing.T) {
738 defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetAutoDeletePVC, true)()
739 testFn(t)
740 })
741 t.Run("StatefulSetAutoDeletePVCDisabled", func(t *testing.T) {
742 defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetAutoDeletePVC, false)()
743 testFn(t)
744 })
745 }
746
747 func TestStatefulPodControlRetentionPolicyUpdate(t *testing.T) {
748 _, ctx := ktesting.NewTestContext(t)
749
750 defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetAutoDeletePVC, true)()
751
752 recorder := record.NewFakeRecorder(10)
753 set := newStatefulSet(1)
754 set.Spec.PersistentVolumeClaimRetentionPolicy = &apps.StatefulSetPersistentVolumeClaimRetentionPolicy{
755 WhenDeleted: apps.DeletePersistentVolumeClaimRetentionPolicyType,
756 WhenScaled: apps.RetainPersistentVolumeClaimRetentionPolicyType,
757 }
758 pod := newStatefulSetPod(set, 0)
759 fakeClient := &fake.Clientset{}
760 podIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
761 claimIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
762 podIndexer.Add(pod)
763 claims := getPersistentVolumeClaims(set, pod)
764 if len(claims) != 1 {
765 t.Errorf("Unexpected or missing PVCs")
766 }
767 var claim v1.PersistentVolumeClaim
768 for k := range claims {
769 claim = claims[k]
770 claimIndexer.Add(&claim)
771 }
772 fakeClient.AddReactor("update", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
773 update := action.(core.UpdateAction)
774 claimIndexer.Update(update.GetObject())
775 return true, update.GetObject(), nil
776 })
777 podLister := corelisters.NewPodLister(podIndexer)
778 claimLister := corelisters.NewPersistentVolumeClaimLister(claimIndexer)
779 control := NewStatefulPodControl(fakeClient, podLister, claimLister, recorder)
780 if err := control.UpdateStatefulPod(ctx, set, pod); err != nil {
781 t.Errorf("Successful update returned an error: %s", err)
782 }
783 updatedClaim, err := claimLister.PersistentVolumeClaims(claim.Namespace).Get(claim.Name)
784 if err != nil {
785 t.Errorf("Error retrieving claim %s/%s: %v", claim.Namespace, claim.Name, err)
786 }
787 if !hasOwnerRef(updatedClaim, set) {
788 t.Errorf("ownerRef not added: %s/%s", claim.Namespace, claim.Name)
789 }
790 events := collectEvents(recorder.Events)
791 if eventCount := len(events); eventCount != 1 {
792 t.Errorf("update failed: got %d events, but want 1", eventCount)
793 }
794 }
795
796 func TestStatefulPodControlRetentionPolicyUpdateMissingClaims(t *testing.T) {
797 _, ctx := ktesting.NewTestContext(t)
798
799 defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetAutoDeletePVC, true)()
800
801 recorder := record.NewFakeRecorder(10)
802 set := newStatefulSet(1)
803 set.Spec.PersistentVolumeClaimRetentionPolicy = &apps.StatefulSetPersistentVolumeClaimRetentionPolicy{
804 WhenDeleted: apps.DeletePersistentVolumeClaimRetentionPolicyType,
805 WhenScaled: apps.RetainPersistentVolumeClaimRetentionPolicyType,
806 }
807 pod := newStatefulSetPod(set, 0)
808 fakeClient := &fake.Clientset{}
809 podIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
810 podLister := corelisters.NewPodLister(podIndexer)
811 claimIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
812 claimLister := corelisters.NewPersistentVolumeClaimLister(claimIndexer)
813 podIndexer.Add(pod)
814 fakeClient.AddReactor("update", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
815 update := action.(core.UpdateAction)
816 claimIndexer.Update(update.GetObject())
817 return true, update.GetObject(), nil
818 })
819 control := NewStatefulPodControl(fakeClient, podLister, claimLister, recorder)
820 if err := control.UpdateStatefulPod(ctx, set, pod); err != nil {
821 t.Error("Unexpected error on pod update when PVCs are missing")
822 }
823 claims := getPersistentVolumeClaims(set, pod)
824 if len(claims) != 1 {
825 t.Errorf("Unexpected or missing PVCs")
826 }
827 var claim v1.PersistentVolumeClaim
828 for k := range claims {
829 claim = claims[k]
830 claimIndexer.Add(&claim)
831 }
832
833 if err := control.UpdateStatefulPod(ctx, set, pod); err != nil {
834 t.Errorf("Expected update to succeed, saw error %v", err)
835 }
836 updatedClaim, err := claimLister.PersistentVolumeClaims(claim.Namespace).Get(claim.Name)
837 if err != nil {
838 t.Errorf("Error retrieving claim %s/%s: %v", claim.Namespace, claim.Name, err)
839 }
840 if !hasOwnerRef(updatedClaim, set) {
841 t.Errorf("ownerRef not added: %s/%s", claim.Namespace, claim.Name)
842 }
843 events := collectEvents(recorder.Events)
844 if eventCount := len(events); eventCount != 1 {
845 t.Errorf("update failed: got %d events, but want 2", eventCount)
846 }
847 if !strings.Contains(events[0], "SuccessfulUpdate") {
848 t.Errorf("expected first event to be a successful update: %s", events[1])
849 }
850 }
851
852 func collectEvents(source <-chan string) []string {
853 done := false
854 events := make([]string, 0)
855 for !done {
856 select {
857 case event := <-source:
858 events = append(events, event)
859 default:
860 done = true
861 }
862 }
863 return events
864 }
865
View as plain text