1
16
17 package statefulset
18
19 import (
20 "bytes"
21 "context"
22 "encoding/json"
23 "fmt"
24 "sort"
25 "testing"
26
27 apps "k8s.io/api/apps/v1"
28 v1 "k8s.io/api/core/v1"
29 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30 "k8s.io/apimachinery/pkg/labels"
31 "k8s.io/apimachinery/pkg/runtime"
32 "k8s.io/apimachinery/pkg/types"
33 "k8s.io/apimachinery/pkg/util/sets"
34 "k8s.io/apimachinery/pkg/util/strategicpatch"
35 utilfeature "k8s.io/apiserver/pkg/util/feature"
36 "k8s.io/client-go/informers"
37 "k8s.io/client-go/kubernetes/fake"
38 core "k8s.io/client-go/testing"
39 "k8s.io/client-go/tools/cache"
40 "k8s.io/client-go/tools/record"
41 featuregatetesting "k8s.io/component-base/featuregate/testing"
42 "k8s.io/klog/v2"
43 "k8s.io/klog/v2/ktesting"
44 "k8s.io/kubernetes/pkg/controller"
45 "k8s.io/kubernetes/pkg/controller/history"
46 "k8s.io/kubernetes/pkg/features"
47 )
48
49 var parentKind = apps.SchemeGroupVersion.WithKind("StatefulSet")
50
51 func alwaysReady() bool { return true }
52
53 func TestStatefulSetControllerCreates(t *testing.T) {
54 set := newStatefulSet(3)
55 logger, ctx := ktesting.NewTestContext(t)
56 ssc, spc, om, _ := newFakeStatefulSetController(ctx, set)
57 if err := scaleUpStatefulSetController(logger, set, ssc, spc, om); err != nil {
58 t.Errorf("Failed to turn up StatefulSet : %s", err)
59 }
60 if obj, _, err := om.setsIndexer.Get(set); err != nil {
61 t.Error(err)
62 } else {
63 set = obj.(*apps.StatefulSet)
64 }
65 if set.Status.Replicas != 3 {
66 t.Errorf("set.Status.Replicas = %v; want 3", set.Status.Replicas)
67 }
68 }
69
70 func TestStatefulSetControllerDeletes(t *testing.T) {
71 set := newStatefulSet(3)
72 logger, ctx := ktesting.NewTestContext(t)
73 ssc, spc, om, _ := newFakeStatefulSetController(ctx, set)
74 if err := scaleUpStatefulSetController(logger, set, ssc, spc, om); err != nil {
75 t.Errorf("Failed to turn up StatefulSet : %s", err)
76 }
77 if obj, _, err := om.setsIndexer.Get(set); err != nil {
78 t.Error(err)
79 } else {
80 set = obj.(*apps.StatefulSet)
81 }
82 if set.Status.Replicas != 3 {
83 t.Errorf("set.Status.Replicas = %v; want 3", set.Status.Replicas)
84 }
85 *set.Spec.Replicas = 0
86 if err := scaleDownStatefulSetController(logger, set, ssc, spc, om); err != nil {
87 t.Errorf("Failed to turn down StatefulSet : %s", err)
88 }
89 if obj, _, err := om.setsIndexer.Get(set); err != nil {
90 t.Error(err)
91 } else {
92 set = obj.(*apps.StatefulSet)
93 }
94 if set.Status.Replicas != 0 {
95 t.Errorf("set.Status.Replicas = %v; want 0", set.Status.Replicas)
96 }
97 }
98
99 func TestStatefulSetControllerRespectsTermination(t *testing.T) {
100 set := newStatefulSet(3)
101 logger, ctx := ktesting.NewTestContext(t)
102 ssc, spc, om, _ := newFakeStatefulSetController(ctx, set)
103 if err := scaleUpStatefulSetController(logger, set, ssc, spc, om); err != nil {
104 t.Errorf("Failed to turn up StatefulSet : %s", err)
105 }
106 if obj, _, err := om.setsIndexer.Get(set); err != nil {
107 t.Error(err)
108 } else {
109 set = obj.(*apps.StatefulSet)
110 }
111 if set.Status.Replicas != 3 {
112 t.Errorf("set.Status.Replicas = %v; want 3", set.Status.Replicas)
113 }
114 _, err := om.addTerminatingPod(set, 3)
115 if err != nil {
116 t.Error(err)
117 }
118 pods, err := om.addTerminatingPod(set, 4)
119 if err != nil {
120 t.Error(err)
121 }
122 ssc.syncStatefulSet(ctx, set, pods)
123 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
124 if err != nil {
125 t.Error(err)
126 }
127 pods, err = om.podsLister.Pods(set.Namespace).List(selector)
128 if err != nil {
129 t.Error(err)
130 }
131 if len(pods) != 5 {
132 t.Error("StatefulSet does not respect termination")
133 }
134 sort.Sort(ascendingOrdinal(pods))
135 spc.DeleteStatefulPod(set, pods[3])
136 spc.DeleteStatefulPod(set, pods[4])
137 *set.Spec.Replicas = 0
138 if err := scaleDownStatefulSetController(logger, set, ssc, spc, om); err != nil {
139 t.Errorf("Failed to turn down StatefulSet : %s", err)
140 }
141 if obj, _, err := om.setsIndexer.Get(set); err != nil {
142 t.Error(err)
143 } else {
144 set = obj.(*apps.StatefulSet)
145 }
146 if set.Status.Replicas != 0 {
147 t.Errorf("set.Status.Replicas = %v; want 0", set.Status.Replicas)
148 }
149 }
150
151 func TestStatefulSetControllerBlocksScaling(t *testing.T) {
152 logger, ctx := ktesting.NewTestContext(t)
153 set := newStatefulSet(3)
154 ssc, spc, om, _ := newFakeStatefulSetController(ctx, set)
155 if err := scaleUpStatefulSetController(logger, set, ssc, spc, om); err != nil {
156 t.Errorf("Failed to turn up StatefulSet : %s", err)
157 }
158 if obj, _, err := om.setsIndexer.Get(set); err != nil {
159 t.Error(err)
160 } else {
161 set = obj.(*apps.StatefulSet)
162 }
163 if set.Status.Replicas != 3 {
164 t.Errorf("set.Status.Replicas = %v; want 3", set.Status.Replicas)
165 }
166 *set.Spec.Replicas = 5
167 fakeResourceVersion(set)
168 om.setsIndexer.Update(set)
169 _, err := om.setPodTerminated(set, 0)
170 if err != nil {
171 t.Error("Failed to set pod terminated at ordinal 0")
172 }
173 ssc.enqueueStatefulSet(set)
174 fakeWorker(ssc)
175 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
176 if err != nil {
177 t.Error(err)
178 }
179 pods, err := om.podsLister.Pods(set.Namespace).List(selector)
180 if err != nil {
181 t.Error(err)
182 }
183 if len(pods) != 3 {
184 t.Error("StatefulSet does not block scaling")
185 }
186 sort.Sort(ascendingOrdinal(pods))
187 spc.DeleteStatefulPod(set, pods[0])
188 ssc.enqueueStatefulSet(set)
189 fakeWorker(ssc)
190 pods, err = om.podsLister.Pods(set.Namespace).List(selector)
191 if err != nil {
192 t.Error(err)
193 }
194 if len(pods) != 3 {
195 t.Error("StatefulSet does not resume when terminated Pod is removed")
196 }
197 }
198
199 func TestStatefulSetControllerDeletionTimestamp(t *testing.T) {
200 _, ctx := ktesting.NewTestContext(t)
201 set := newStatefulSet(3)
202 set.DeletionTimestamp = new(metav1.Time)
203 ssc, _, om, _ := newFakeStatefulSetController(ctx, set)
204
205 om.setsIndexer.Add(set)
206
207
208 ssc.enqueueStatefulSet(set)
209 fakeWorker(ssc)
210
211 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
212 if err != nil {
213 t.Fatal(err)
214 }
215 pods, err := om.podsLister.Pods(set.Namespace).List(selector)
216 if err != nil {
217 t.Fatal(err)
218 }
219 if got, want := len(pods), 0; got != want {
220 t.Errorf("len(pods) = %v, want %v", got, want)
221 }
222 }
223
224 func TestStatefulSetControllerDeletionTimestampRace(t *testing.T) {
225 _, ctx := ktesting.NewTestContext(t)
226 set := newStatefulSet(3)
227
228 set.DeletionTimestamp = new(metav1.Time)
229 ssc, _, om, ssh := newFakeStatefulSetController(ctx, set)
230
231
232 set2 := *set
233 set2.DeletionTimestamp = nil
234 om.setsIndexer.Add(&set2)
235
236
237 pod := newStatefulSetPod(set, 1)
238 pod.OwnerReferences = nil
239 om.podsIndexer.Add(pod)
240 set.Status.CollisionCount = new(int32)
241 revision, err := newRevision(set, 1, set.Status.CollisionCount)
242 if err != nil {
243 t.Fatal(err)
244 }
245 revision.OwnerReferences = nil
246 _, err = ssh.CreateControllerRevision(set, revision, set.Status.CollisionCount)
247 if err != nil {
248 t.Fatal(err)
249 }
250
251
252 ssc.enqueueStatefulSet(set)
253 fakeWorker(ssc)
254
255 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
256 if err != nil {
257 t.Fatal(err)
258 }
259 pods, err := om.podsLister.Pods(set.Namespace).List(selector)
260 if err != nil {
261 t.Fatal(err)
262 }
263 if got, want := len(pods), 1; got != want {
264 t.Errorf("len(pods) = %v, want %v", got, want)
265 }
266
267
268 for _, pod := range pods {
269 if len(pod.OwnerReferences) > 0 {
270 t.Errorf("unexpected pod owner references: %v", pod.OwnerReferences)
271 }
272 }
273
274
275 revisions, err := ssh.ListControllerRevisions(set, selector)
276 if err != nil {
277 t.Fatal(err)
278 }
279 if got, want := len(revisions), 1; got != want {
280 t.Errorf("len(revisions) = %v, want %v", got, want)
281 }
282 for _, revision := range revisions {
283 if len(revision.OwnerReferences) > 0 {
284 t.Errorf("unexpected revision owner references: %v", revision.OwnerReferences)
285 }
286 }
287 }
288
289 func TestStatefulSetControllerAddPod(t *testing.T) {
290 logger, ctx := ktesting.NewTestContext(t)
291 ssc, _, om, _ := newFakeStatefulSetController(ctx)
292 set1 := newStatefulSet(3)
293 set2 := newStatefulSet(3)
294 pod1 := newStatefulSetPod(set1, 0)
295 pod2 := newStatefulSetPod(set2, 0)
296 om.setsIndexer.Add(set1)
297 om.setsIndexer.Add(set2)
298
299 ssc.addPod(logger, pod1)
300 key, done := ssc.queue.Get()
301 if key == nil || done {
302 t.Error("failed to enqueue StatefulSet")
303 } else if key, ok := key.(string); !ok {
304 t.Error("key is not a string")
305 } else if expectedKey, _ := controller.KeyFunc(set1); expectedKey != key {
306 t.Errorf("expected StatefulSet key %s found %s", expectedKey, key)
307 }
308 ssc.queue.Done(key)
309
310 ssc.addPod(logger, pod2)
311 key, done = ssc.queue.Get()
312 if key == nil || done {
313 t.Error("failed to enqueue StatefulSet")
314 } else if key, ok := key.(string); !ok {
315 t.Error("key is not a string")
316 } else if expectedKey, _ := controller.KeyFunc(set2); expectedKey != key {
317 t.Errorf("expected StatefulSet key %s found %s", expectedKey, key)
318 }
319 ssc.queue.Done(key)
320 }
321
322 func TestStatefulSetControllerAddPodOrphan(t *testing.T) {
323 logger, ctx := ktesting.NewTestContext(t)
324 ssc, _, om, _ := newFakeStatefulSetController(ctx)
325 set1 := newStatefulSet(3)
326 set2 := newStatefulSet(3)
327 set2.Name = "foo2"
328 set3 := newStatefulSet(3)
329 set3.Name = "foo3"
330 set3.Spec.Selector.MatchLabels = map[string]string{"foo3": "bar"}
331 pod := newStatefulSetPod(set1, 0)
332 om.setsIndexer.Add(set1)
333 om.setsIndexer.Add(set2)
334 om.setsIndexer.Add(set3)
335
336
337 pod.OwnerReferences = nil
338 ssc.addPod(logger, pod)
339 if got, want := ssc.queue.Len(), 2; got != want {
340 t.Errorf("queue.Len() = %v, want %v", got, want)
341 }
342 }
343
344 func TestStatefulSetControllerAddPodNoSet(t *testing.T) {
345 logger, ctx := ktesting.NewTestContext(t)
346 ssc, _, _, _ := newFakeStatefulSetController(ctx)
347 set := newStatefulSet(3)
348 pod := newStatefulSetPod(set, 0)
349 ssc.addPod(logger, pod)
350 ssc.queue.ShutDown()
351 key, _ := ssc.queue.Get()
352 if key != nil {
353 t.Errorf("StatefulSet enqueued key for Pod with no Set %s", key)
354 }
355 }
356
357 func TestStatefulSetControllerUpdatePod(t *testing.T) {
358 logger, ctx := ktesting.NewTestContext(t)
359 ssc, _, om, _ := newFakeStatefulSetController(ctx)
360 set1 := newStatefulSet(3)
361 set2 := newStatefulSet(3)
362 set2.Name = "foo2"
363 pod1 := newStatefulSetPod(set1, 0)
364 pod2 := newStatefulSetPod(set2, 0)
365 om.setsIndexer.Add(set1)
366 om.setsIndexer.Add(set2)
367
368 prev := *pod1
369 fakeResourceVersion(pod1)
370 ssc.updatePod(logger, &prev, pod1)
371 key, done := ssc.queue.Get()
372 if key == nil || done {
373 t.Error("failed to enqueue StatefulSet")
374 } else if key, ok := key.(string); !ok {
375 t.Error("key is not a string")
376 } else if expectedKey, _ := controller.KeyFunc(set1); expectedKey != key {
377 t.Errorf("expected StatefulSet key %s found %s", expectedKey, key)
378 }
379
380 prev = *pod2
381 fakeResourceVersion(pod2)
382 ssc.updatePod(logger, &prev, pod2)
383 key, done = ssc.queue.Get()
384 if key == nil || done {
385 t.Error("failed to enqueue StatefulSet")
386 } else if key, ok := key.(string); !ok {
387 t.Error("key is not a string")
388 } else if expectedKey, _ := controller.KeyFunc(set2); expectedKey != key {
389 t.Errorf("expected StatefulSet key %s found %s", expectedKey, key)
390 }
391 }
392
393 func TestStatefulSetControllerUpdatePodWithNoSet(t *testing.T) {
394 logger, ctx := ktesting.NewTestContext(t)
395 ssc, _, _, _ := newFakeStatefulSetController(ctx)
396 set := newStatefulSet(3)
397 pod := newStatefulSetPod(set, 0)
398 prev := *pod
399 fakeResourceVersion(pod)
400 ssc.updatePod(logger, &prev, pod)
401 ssc.queue.ShutDown()
402 key, _ := ssc.queue.Get()
403 if key != nil {
404 t.Errorf("StatefulSet enqueued key for Pod with no Set %s", key)
405 }
406 }
407
408 func TestStatefulSetControllerUpdatePodWithSameVersion(t *testing.T) {
409 logger, ctx := ktesting.NewTestContext(t)
410 ssc, _, om, _ := newFakeStatefulSetController(ctx)
411 set := newStatefulSet(3)
412 pod := newStatefulSetPod(set, 0)
413 om.setsIndexer.Add(set)
414 ssc.updatePod(logger, pod, pod)
415 ssc.queue.ShutDown()
416 key, _ := ssc.queue.Get()
417 if key != nil {
418 t.Errorf("StatefulSet enqueued key for Pod with no Set %s", key)
419 }
420 }
421
422 func TestStatefulSetControllerUpdatePodOrphanWithNewLabels(t *testing.T) {
423 logger, ctx := ktesting.NewTestContext(t)
424 ssc, _, om, _ := newFakeStatefulSetController(ctx)
425 set := newStatefulSet(3)
426 pod := newStatefulSetPod(set, 0)
427 pod.OwnerReferences = nil
428 set2 := newStatefulSet(3)
429 set2.Name = "foo2"
430 om.setsIndexer.Add(set)
431 om.setsIndexer.Add(set2)
432 clone := *pod
433 clone.Labels = map[string]string{"foo2": "bar2"}
434 fakeResourceVersion(&clone)
435 ssc.updatePod(logger, &clone, pod)
436 if got, want := ssc.queue.Len(), 2; got != want {
437 t.Errorf("queue.Len() = %v, want %v", got, want)
438 }
439 }
440
441 func TestStatefulSetControllerUpdatePodChangeControllerRef(t *testing.T) {
442 logger, ctx := ktesting.NewTestContext(t)
443 ssc, _, om, _ := newFakeStatefulSetController(ctx)
444 set := newStatefulSet(3)
445 set2 := newStatefulSet(3)
446 set2.Name = "foo2"
447 pod := newStatefulSetPod(set, 0)
448 pod2 := newStatefulSetPod(set2, 0)
449 om.setsIndexer.Add(set)
450 om.setsIndexer.Add(set2)
451 clone := *pod
452 clone.OwnerReferences = pod2.OwnerReferences
453 fakeResourceVersion(&clone)
454 ssc.updatePod(logger, &clone, pod)
455 if got, want := ssc.queue.Len(), 2; got != want {
456 t.Errorf("queue.Len() = %v, want %v", got, want)
457 }
458 }
459
460 func TestStatefulSetControllerUpdatePodRelease(t *testing.T) {
461 logger, ctx := ktesting.NewTestContext(t)
462 ssc, _, om, _ := newFakeStatefulSetController(ctx)
463 set := newStatefulSet(3)
464 set2 := newStatefulSet(3)
465 set2.Name = "foo2"
466 pod := newStatefulSetPod(set, 0)
467 om.setsIndexer.Add(set)
468 om.setsIndexer.Add(set2)
469 clone := *pod
470 clone.OwnerReferences = nil
471 fakeResourceVersion(&clone)
472 ssc.updatePod(logger, pod, &clone)
473 if got, want := ssc.queue.Len(), 2; got != want {
474 t.Errorf("queue.Len() = %v, want %v", got, want)
475 }
476 }
477
478 func TestStatefulSetControllerDeletePod(t *testing.T) {
479 logger, ctx := ktesting.NewTestContext(t)
480 ssc, _, om, _ := newFakeStatefulSetController(ctx)
481 set1 := newStatefulSet(3)
482 set2 := newStatefulSet(3)
483 set2.Name = "foo2"
484 pod1 := newStatefulSetPod(set1, 0)
485 pod2 := newStatefulSetPod(set2, 0)
486 om.setsIndexer.Add(set1)
487 om.setsIndexer.Add(set2)
488
489 ssc.deletePod(logger, pod1)
490 key, done := ssc.queue.Get()
491 if key == nil || done {
492 t.Error("failed to enqueue StatefulSet")
493 } else if key, ok := key.(string); !ok {
494 t.Error("key is not a string")
495 } else if expectedKey, _ := controller.KeyFunc(set1); expectedKey != key {
496 t.Errorf("expected StatefulSet key %s found %s", expectedKey, key)
497 }
498
499 ssc.deletePod(logger, pod2)
500 key, done = ssc.queue.Get()
501 if key == nil || done {
502 t.Error("failed to enqueue StatefulSet")
503 } else if key, ok := key.(string); !ok {
504 t.Error("key is not a string")
505 } else if expectedKey, _ := controller.KeyFunc(set2); expectedKey != key {
506 t.Errorf("expected StatefulSet key %s found %s", expectedKey, key)
507 }
508 }
509
510 func TestStatefulSetControllerDeletePodOrphan(t *testing.T) {
511 logger, ctx := ktesting.NewTestContext(t)
512 ssc, _, om, _ := newFakeStatefulSetController(ctx)
513 set1 := newStatefulSet(3)
514 set2 := newStatefulSet(3)
515 set2.Name = "foo2"
516 pod1 := newStatefulSetPod(set1, 0)
517 om.setsIndexer.Add(set1)
518 om.setsIndexer.Add(set2)
519
520 pod1.OwnerReferences = nil
521 ssc.deletePod(logger, pod1)
522 if got, want := ssc.queue.Len(), 0; got != want {
523 t.Errorf("queue.Len() = %v, want %v", got, want)
524 }
525 }
526
527 func TestStatefulSetControllerDeletePodTombstone(t *testing.T) {
528 logger, ctx := ktesting.NewTestContext(t)
529 ssc, _, om, _ := newFakeStatefulSetController(ctx)
530 set := newStatefulSet(3)
531 pod := newStatefulSetPod(set, 0)
532 om.setsIndexer.Add(set)
533 tombstoneKey, _ := controller.KeyFunc(pod)
534 tombstone := cache.DeletedFinalStateUnknown{Key: tombstoneKey, Obj: pod}
535 ssc.deletePod(logger, tombstone)
536 key, done := ssc.queue.Get()
537 if key == nil || done {
538 t.Error("failed to enqueue StatefulSet")
539 } else if key, ok := key.(string); !ok {
540 t.Error("key is not a string")
541 } else if expectedKey, _ := controller.KeyFunc(set); expectedKey != key {
542 t.Errorf("expected StatefulSet key %s found %s", expectedKey, key)
543 }
544 }
545
546 func TestStatefulSetControllerGetStatefulSetsForPod(t *testing.T) {
547 _, ctx := ktesting.NewTestContext(t)
548 ssc, _, om, _ := newFakeStatefulSetController(ctx)
549 set1 := newStatefulSet(3)
550 set2 := newStatefulSet(3)
551 set2.Name = "foo2"
552 pod := newStatefulSetPod(set1, 0)
553 om.setsIndexer.Add(set1)
554 om.setsIndexer.Add(set2)
555 om.podsIndexer.Add(pod)
556 sets := ssc.getStatefulSetsForPod(pod)
557 if got, want := len(sets), 2; got != want {
558 t.Errorf("len(sets) = %v, want %v", got, want)
559 }
560 }
561
562 func TestGetPodsForStatefulSetAdopt(t *testing.T) {
563 set := newStatefulSet(5)
564 pod1 := newStatefulSetPod(set, 1)
565
566 pod2 := newStatefulSetPod(set, 2)
567 pod2.OwnerReferences = nil
568
569 pod3 := newStatefulSetPod(set, 3)
570 pod3.OwnerReferences = nil
571 pod3.Labels = nil
572
573 pod4 := newStatefulSetPod(set, 4)
574 pod4.OwnerReferences = nil
575 pod4.Name = "x" + pod4.Name
576
577 _, ctx := ktesting.NewTestContext(t)
578 ssc, _, om, _ := newFakeStatefulSetController(ctx, set, pod1, pod2, pod3, pod4)
579
580 om.podsIndexer.Add(pod1)
581 om.podsIndexer.Add(pod2)
582 om.podsIndexer.Add(pod3)
583 om.podsIndexer.Add(pod4)
584 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
585 if err != nil {
586 t.Fatal(err)
587 }
588 pods, err := ssc.getPodsForStatefulSet(context.TODO(), set, selector)
589 if err != nil {
590 t.Fatalf("getPodsForStatefulSet() error: %v", err)
591 }
592 got := sets.NewString()
593 for _, pod := range pods {
594 got.Insert(pod.Name)
595 }
596
597 want := sets.NewString(pod1.Name, pod2.Name)
598 if !got.Equal(want) {
599 t.Errorf("getPodsForStatefulSet() = %v, want %v", got, want)
600 }
601 }
602
603 func TestAdoptOrphanRevisions(t *testing.T) {
604 ss1 := newStatefulSetWithLabels(3, "ss1", types.UID("ss1"), map[string]string{"foo": "bar"})
605 ss1.Status.CollisionCount = new(int32)
606 ss1Rev1, err := history.NewControllerRevision(ss1, parentKind, ss1.Spec.Template.Labels, rawTemplate(&ss1.Spec.Template), 1, ss1.Status.CollisionCount)
607 if err != nil {
608 t.Fatal(err)
609 }
610 ss1Rev1.Namespace = ss1.Namespace
611 ss1.Spec.Template.Annotations = make(map[string]string)
612 ss1.Spec.Template.Annotations["ss1"] = "ss1"
613 ss1Rev2, err := history.NewControllerRevision(ss1, parentKind, ss1.Spec.Template.Labels, rawTemplate(&ss1.Spec.Template), 2, ss1.Status.CollisionCount)
614 if err != nil {
615 t.Fatal(err)
616 }
617 ss1Rev2.Namespace = ss1.Namespace
618 ss1Rev2.OwnerReferences = []metav1.OwnerReference{}
619
620 _, ctx := ktesting.NewTestContext(t)
621 ssc, _, om, _ := newFakeStatefulSetController(ctx, ss1, ss1Rev1, ss1Rev2)
622
623 om.revisionsIndexer.Add(ss1Rev1)
624 om.revisionsIndexer.Add(ss1Rev2)
625
626 err = ssc.adoptOrphanRevisions(context.TODO(), ss1)
627 if err != nil {
628 t.Errorf("adoptOrphanRevisions() error: %v", err)
629 }
630
631 if revisions, err := ssc.control.ListRevisions(ss1); err != nil {
632 t.Errorf("ListRevisions() error: %v", err)
633 } else {
634 var adopted bool
635 for i := range revisions {
636 if revisions[i].Name == ss1Rev2.Name && metav1.GetControllerOf(revisions[i]) != nil {
637 adopted = true
638 }
639 }
640 if !adopted {
641 t.Error("adoptOrphanRevisions() not adopt orphan revisions")
642 }
643 }
644 }
645
646 func TestGetPodsForStatefulSetRelease(t *testing.T) {
647 _, ctx := ktesting.NewTestContext(t)
648 set := newStatefulSet(3)
649 ssc, _, om, _ := newFakeStatefulSetController(ctx, set)
650 pod1 := newStatefulSetPod(set, 1)
651
652 pod2 := newStatefulSetPod(set, 2)
653 pod2.Name = "x" + pod2.Name
654
655 pod3 := newStatefulSetPod(set, 3)
656 pod3.Labels = nil
657
658 pod4 := newStatefulSetPod(set, 4)
659 pod4.OwnerReferences = nil
660 pod4.Labels = nil
661
662 om.podsIndexer.Add(pod1)
663 om.podsIndexer.Add(pod2)
664 om.podsIndexer.Add(pod3)
665 om.podsIndexer.Add(pod4)
666 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
667 if err != nil {
668 t.Fatal(err)
669 }
670 pods, err := ssc.getPodsForStatefulSet(context.TODO(), set, selector)
671 if err != nil {
672 t.Fatalf("getPodsForStatefulSet() error: %v", err)
673 }
674 got := sets.NewString()
675 for _, pod := range pods {
676 got.Insert(pod.Name)
677 }
678
679
680 want := sets.NewString(pod1.Name)
681 if !got.Equal(want) {
682 t.Errorf("getPodsForStatefulSet() = %v, want %v", got, want)
683 }
684 }
685
686 func TestOrphanedPodsWithPVCDeletePolicy(t *testing.T) {
687 defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetAutoDeletePVC, true)()
688
689 testFn := func(t *testing.T, scaledownPolicy, deletionPolicy apps.PersistentVolumeClaimRetentionPolicyType) {
690 set := newStatefulSet(4)
691 *set.Spec.Replicas = 2
692 set.Spec.PersistentVolumeClaimRetentionPolicy.WhenScaled = scaledownPolicy
693 set.Spec.PersistentVolumeClaimRetentionPolicy.WhenDeleted = deletionPolicy
694 _, ctx := ktesting.NewTestContext(t)
695 ssc, _, om, _ := newFakeStatefulSetController(ctx, set)
696 om.setsIndexer.Add(set)
697
698 pods := []*v1.Pod{}
699 pods = append(pods, newStatefulSetPod(set, 0))
700
701 pods = append(pods, newStatefulSetPod(set, 1))
702 pods[1].OwnerReferences = nil
703
704 pods = append(pods, newStatefulSetPod(set, 2))
705 pods[2].Name = "x" + pods[2].Name
706
707 ssc.kubeClient.(*fake.Clientset).PrependReactor("patch", "pods", func(action core.Action) (bool, runtime.Object, error) {
708 patch := action.(core.PatchAction).GetPatch()
709 target := action.(core.PatchAction).GetName()
710 var pod *v1.Pod
711 for _, p := range pods {
712 if p.Name == target {
713 pod = p
714 break
715 }
716 }
717 if pod == nil {
718 t.Fatalf("Can't find patch target %s", target)
719 }
720 original, err := json.Marshal(pod)
721 if err != nil {
722 t.Fatalf("failed to marshal original pod %s: %v", pod.Name, err)
723 }
724 updated, err := strategicpatch.StrategicMergePatch(original, patch, v1.Pod{})
725 if err != nil {
726 t.Fatalf("failed to apply strategic merge patch %q on node %s: %v", patch, pod.Name, err)
727 }
728 if err := json.Unmarshal(updated, pod); err != nil {
729 t.Fatalf("failed to unmarshal updated pod %s: %v", pod.Name, err)
730 }
731
732 return true, pod, nil
733 })
734
735 for _, pod := range pods {
736 om.podsIndexer.Add(pod)
737 claims := getPersistentVolumeClaims(set, pod)
738 for _, claim := range claims {
739 om.CreateClaim(&claim)
740 }
741 }
742
743 for i := range pods {
744 if _, err := om.setPodReady(set, i); err != nil {
745 t.Errorf("%d: %v", i, err)
746 }
747 if _, err := om.setPodRunning(set, i); err != nil {
748 t.Errorf("%d: %v", i, err)
749 }
750 }
751
752
753 ssc.enqueueStatefulSet(set)
754 fakeWorker(ssc)
755 *set.Spec.Replicas = 0
756 ssc.enqueueStatefulSet(set)
757 fakeWorker(ssc)
758
759 hasNamedOwnerRef := func(claim *v1.PersistentVolumeClaim, name string) bool {
760 for _, ownerRef := range claim.GetOwnerReferences() {
761 if ownerRef.Name == name {
762 return true
763 }
764 }
765 return false
766 }
767 verifyOwnerRefs := func(claim *v1.PersistentVolumeClaim, condemned bool) {
768 podName := getClaimPodName(set, claim)
769 const retain = apps.RetainPersistentVolumeClaimRetentionPolicyType
770 const delete = apps.DeletePersistentVolumeClaimRetentionPolicyType
771 switch {
772 case scaledownPolicy == retain && deletionPolicy == retain:
773 if hasNamedOwnerRef(claim, podName) || hasNamedOwnerRef(claim, set.Name) {
774 t.Errorf("bad claim ownerRefs: %s: %v", claim.Name, claim.GetOwnerReferences())
775 }
776 case scaledownPolicy == retain && deletionPolicy == delete:
777 if hasNamedOwnerRef(claim, podName) || !hasNamedOwnerRef(claim, set.Name) {
778 t.Errorf("bad claim ownerRefs: %s: %v", claim.Name, claim.GetOwnerReferences())
779 }
780 case scaledownPolicy == delete && deletionPolicy == retain:
781 if hasNamedOwnerRef(claim, podName) != condemned || hasNamedOwnerRef(claim, set.Name) {
782 t.Errorf("bad claim ownerRefs: %s: %v", claim.Name, claim.GetOwnerReferences())
783 }
784 case scaledownPolicy == delete && deletionPolicy == delete:
785 if hasNamedOwnerRef(claim, podName) != condemned || !hasNamedOwnerRef(claim, set.Name) {
786 t.Errorf("bad claim ownerRefs: %s: %v", claim.Name, claim.GetOwnerReferences())
787 }
788 }
789 }
790
791 claims, _ := om.claimsLister.PersistentVolumeClaims(set.Namespace).List(labels.Everything())
792 if len(claims) != len(pods) {
793 t.Errorf("Unexpected number of claims: %d", len(claims))
794 }
795 for _, claim := range claims {
796
797 switch claim.Name {
798 case "datadir-foo-0", "datadir-foo-1":
799 verifyOwnerRefs(claim, false)
800 case "datadir-foo-2":
801 if hasNamedOwnerRef(claim, getClaimPodName(set, claim)) || hasNamedOwnerRef(claim, set.Name) {
802 t.Errorf("unexpected ownerRefs for %s: %v", claim.Name, claim.GetOwnerReferences())
803 }
804 default:
805 t.Errorf("Unexpected claim %s", claim.Name)
806 }
807 }
808 }
809 policies := []apps.PersistentVolumeClaimRetentionPolicyType{
810 apps.RetainPersistentVolumeClaimRetentionPolicyType,
811 apps.DeletePersistentVolumeClaimRetentionPolicyType,
812 }
813 for _, scaledownPolicy := range policies {
814 for _, deletionPolicy := range policies {
815 testName := fmt.Sprintf("ScaleDown:%s/SetDeletion:%s", scaledownPolicy, deletionPolicy)
816 t.Run(testName, func(t *testing.T) { testFn(t, scaledownPolicy, deletionPolicy) })
817 }
818 }
819 }
820
821 func TestStaleOwnerRefOnScaleup(t *testing.T) {
822 defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetAutoDeletePVC, true)()
823
824 for _, policy := range []*apps.StatefulSetPersistentVolumeClaimRetentionPolicy{
825 {
826 WhenScaled: apps.DeletePersistentVolumeClaimRetentionPolicyType,
827 WhenDeleted: apps.RetainPersistentVolumeClaimRetentionPolicyType,
828 },
829 {
830 WhenScaled: apps.DeletePersistentVolumeClaimRetentionPolicyType,
831 WhenDeleted: apps.DeletePersistentVolumeClaimRetentionPolicyType,
832 },
833 } {
834 onPolicy := func(msg string, args ...interface{}) string {
835 return fmt.Sprintf(fmt.Sprintf("(%s) %s", policy, msg), args...)
836 }
837 set := newStatefulSet(3)
838 set.Spec.PersistentVolumeClaimRetentionPolicy = policy
839 logger, ctx := ktesting.NewTestContext(t)
840 ssc, spc, om, _ := newFakeStatefulSetController(ctx, set)
841 if err := scaleUpStatefulSetController(logger, set, ssc, spc, om); err != nil {
842 t.Errorf(onPolicy("Failed to turn up StatefulSet : %s", err))
843 }
844 var err error
845 if set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name); err != nil {
846 t.Errorf(onPolicy("Could not get scaled up set: %v", err))
847 }
848 if set.Status.Replicas != 3 {
849 t.Errorf(onPolicy("set.Status.Replicas = %v; want 3", set.Status.Replicas))
850 }
851 *set.Spec.Replicas = 2
852 if err := scaleDownStatefulSetController(logger, set, ssc, spc, om); err != nil {
853 t.Errorf(onPolicy("Failed to scale down StatefulSet : msg, %s", err))
854 }
855 set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
856 if err != nil {
857 t.Errorf(onPolicy("Could not get scaled down StatefulSet: %v", err))
858 }
859 if set.Status.Replicas != 2 {
860 t.Errorf(onPolicy("Failed to scale statefulset to 2 replicas"))
861 }
862
863 var claim *v1.PersistentVolumeClaim
864 claim, err = om.claimsLister.PersistentVolumeClaims(set.Namespace).Get("datadir-foo-2")
865 if err != nil {
866 t.Errorf(onPolicy("Could not find expected pvc datadir-foo-2"))
867 }
868 refs := claim.GetOwnerReferences()
869 if len(refs) != 1 {
870 t.Errorf(onPolicy("Expected only one refs: %v", refs))
871 }
872
873 for i := range refs {
874 if refs[i].Name == "foo-2" {
875 refs[i].UID = "stale"
876 break
877 }
878 }
879 claim.SetOwnerReferences(refs)
880 if err = om.claimsIndexer.Update(claim); err != nil {
881 t.Errorf(onPolicy("Could not update claim with new owner ref: %v", err))
882 }
883
884 *set.Spec.Replicas = 3
885
886 if err := scaleUpStatefulSetControllerBounded(logger, set, ssc, spc, om, 10); err != nil {
887 t.Errorf(onPolicy("Failed attempt to scale StatefulSet back up: %v", err))
888 }
889 set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
890 if err != nil {
891 t.Errorf(onPolicy("Could not get scaled down StatefulSet: %v", err))
892 }
893 if set.Status.Replicas != 2 {
894 t.Errorf(onPolicy("Expected set to stay at two replicas"))
895 }
896
897 claim, err = om.claimsLister.PersistentVolumeClaims(set.Namespace).Get("datadir-foo-2")
898 if err != nil {
899 t.Errorf(onPolicy("Could not find expected pvc datadir-foo-2"))
900 }
901 refs = claim.GetOwnerReferences()
902 if len(refs) != 1 {
903 t.Errorf(onPolicy("Unexpected change to condemned pvc ownerRefs: %v", refs))
904 }
905 foundPodRef := false
906 for i := range refs {
907 if refs[i].UID == "stale" {
908 foundPodRef = true
909 break
910 }
911 }
912 if !foundPodRef {
913 t.Errorf(onPolicy("Claim ref unexpectedly changed: %v", refs))
914 }
915 if err = om.claimsIndexer.Delete(claim); err != nil {
916 t.Errorf(onPolicy("Could not delete stale pvc: %v", err))
917 }
918
919 if err := scaleUpStatefulSetController(logger, set, ssc, spc, om); err != nil {
920 t.Errorf(onPolicy("Failed to scale StatefulSet back up: %v", err))
921 }
922 set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
923 if err != nil {
924 t.Errorf(onPolicy("Could not get scaled down StatefulSet: %v", err))
925 }
926 if set.Status.Replicas != 3 {
927 t.Errorf(onPolicy("Failed to scale set back up once PVC was deleted"))
928 }
929 }
930 }
931
932 func newFakeStatefulSetController(ctx context.Context, initialObjects ...runtime.Object) (*StatefulSetController, *StatefulPodControl, *fakeObjectManager, history.Interface) {
933 client := fake.NewSimpleClientset(initialObjects...)
934 informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
935 om := newFakeObjectManager(informerFactory)
936 spc := NewStatefulPodControlFromManager(om, &noopRecorder{})
937 ssu := newFakeStatefulSetStatusUpdater(informerFactory.Apps().V1().StatefulSets())
938 ssc := NewStatefulSetController(
939 ctx,
940 informerFactory.Core().V1().Pods(),
941 informerFactory.Apps().V1().StatefulSets(),
942 informerFactory.Core().V1().PersistentVolumeClaims(),
943 informerFactory.Apps().V1().ControllerRevisions(),
944 client,
945 )
946 ssh := history.NewFakeHistory(informerFactory.Apps().V1().ControllerRevisions())
947 ssc.podListerSynced = alwaysReady
948 ssc.setListerSynced = alwaysReady
949 recorder := record.NewFakeRecorder(10)
950 ssc.control = NewDefaultStatefulSetControl(spc, ssu, ssh, recorder)
951
952 return ssc, spc, om, ssh
953 }
954
955 func fakeWorker(ssc *StatefulSetController) {
956 if obj, done := ssc.queue.Get(); !done {
957 ssc.sync(context.TODO(), obj.(string))
958 ssc.queue.Done(obj)
959 }
960 }
961
962 func getPodAtOrdinal(pods []*v1.Pod, ordinal int) *v1.Pod {
963 if 0 > ordinal || ordinal >= len(pods) {
964 return nil
965 }
966 sort.Sort(ascendingOrdinal(pods))
967 return pods[ordinal]
968 }
969
970 func scaleUpStatefulSetController(logger klog.Logger, set *apps.StatefulSet, ssc *StatefulSetController, spc *StatefulPodControl, om *fakeObjectManager) error {
971 return scaleUpStatefulSetControllerBounded(logger, set, ssc, spc, om, -1)
972 }
973
974 func scaleUpStatefulSetControllerBounded(logger klog.Logger, set *apps.StatefulSet, ssc *StatefulSetController, spc *StatefulPodControl, om *fakeObjectManager, maxIterations int) error {
975 om.setsIndexer.Add(set)
976 ssc.enqueueStatefulSet(set)
977 fakeWorker(ssc)
978 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
979 if err != nil {
980 return err
981 }
982 iterations := 0
983 for (maxIterations < 0 || iterations < maxIterations) && set.Status.ReadyReplicas < *set.Spec.Replicas {
984 iterations++
985 pods, err := om.podsLister.Pods(set.Namespace).List(selector)
986 if err != nil {
987 return err
988 }
989 ord := len(pods) - 1
990 if pods, err = om.setPodPending(set, ord); err != nil {
991 return err
992 }
993 pod := getPodAtOrdinal(pods, ord)
994 ssc.addPod(logger, pod)
995 fakeWorker(ssc)
996 pod = getPodAtOrdinal(pods, ord)
997 prev := *pod
998 if pods, err = om.setPodRunning(set, ord); err != nil {
999 return err
1000 }
1001 pod = getPodAtOrdinal(pods, ord)
1002 ssc.updatePod(logger, &prev, pod)
1003 fakeWorker(ssc)
1004 pod = getPodAtOrdinal(pods, ord)
1005 prev = *pod
1006 if pods, err = om.setPodReady(set, ord); err != nil {
1007 return err
1008 }
1009 pod = getPodAtOrdinal(pods, ord)
1010 ssc.updatePod(logger, &prev, pod)
1011 fakeWorker(ssc)
1012 if err := assertMonotonicInvariants(set, om); err != nil {
1013 return err
1014 }
1015 obj, _, err := om.setsIndexer.Get(set)
1016 if err != nil {
1017 return err
1018 }
1019 set = obj.(*apps.StatefulSet)
1020
1021 }
1022 return assertMonotonicInvariants(set, om)
1023 }
1024
1025 func scaleDownStatefulSetController(logger klog.Logger, set *apps.StatefulSet, ssc *StatefulSetController, spc *StatefulPodControl, om *fakeObjectManager) error {
1026 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
1027 if err != nil {
1028 return err
1029 }
1030 pods, err := om.podsLister.Pods(set.Namespace).List(selector)
1031 if err != nil {
1032 return err
1033 }
1034 ord := len(pods) - 1
1035 pod := getPodAtOrdinal(pods, ord)
1036 prev := *pod
1037 fakeResourceVersion(set)
1038 om.setsIndexer.Add(set)
1039 ssc.enqueueStatefulSet(set)
1040 fakeWorker(ssc)
1041 pods, err = om.addTerminatingPod(set, ord)
1042 if err != nil {
1043 return err
1044 }
1045 pod = getPodAtOrdinal(pods, ord)
1046 ssc.updatePod(logger, &prev, pod)
1047 fakeWorker(ssc)
1048 spc.DeleteStatefulPod(set, pod)
1049 ssc.deletePod(logger, pod)
1050 fakeWorker(ssc)
1051 for set.Status.Replicas > *set.Spec.Replicas {
1052 pods, err = om.podsLister.Pods(set.Namespace).List(selector)
1053 if err != nil {
1054 return err
1055 }
1056
1057 ord := len(pods)
1058 pods, err = om.addTerminatingPod(set, ord)
1059 if err != nil {
1060 return err
1061 }
1062 pod = getPodAtOrdinal(pods, ord)
1063 ssc.updatePod(logger, &prev, pod)
1064 fakeWorker(ssc)
1065 spc.DeleteStatefulPod(set, pod)
1066 ssc.deletePod(logger, pod)
1067 fakeWorker(ssc)
1068 obj, _, err := om.setsIndexer.Get(set)
1069 if err != nil {
1070 return err
1071 }
1072 set = obj.(*apps.StatefulSet)
1073
1074 }
1075 return assertMonotonicInvariants(set, om)
1076 }
1077
1078 func rawTemplate(template *v1.PodTemplateSpec) runtime.RawExtension {
1079 buf := new(bytes.Buffer)
1080 enc := json.NewEncoder(buf)
1081 if err := enc.Encode(template); err != nil {
1082 panic(err)
1083 }
1084 return runtime.RawExtension{Raw: buf.Bytes()}
1085 }
1086
View as plain text