1
16
17 package statefulset
18
19 import (
20 "context"
21 "errors"
22 "fmt"
23 "math/rand"
24 "reflect"
25 "runtime"
26 "sort"
27 "strconv"
28 "strings"
29 "sync"
30 "testing"
31 "time"
32
33 apps "k8s.io/api/apps/v1"
34 v1 "k8s.io/api/core/v1"
35 apierrors "k8s.io/apimachinery/pkg/api/errors"
36 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
37 "k8s.io/apimachinery/pkg/labels"
38 "k8s.io/apimachinery/pkg/types"
39 utilerrors "k8s.io/apimachinery/pkg/util/errors"
40 "k8s.io/apimachinery/pkg/util/intstr"
41 utilfeature "k8s.io/apiserver/pkg/util/feature"
42 "k8s.io/client-go/informers"
43 appsinformers "k8s.io/client-go/informers/apps/v1"
44 clientset "k8s.io/client-go/kubernetes"
45 "k8s.io/client-go/kubernetes/fake"
46 appslisters "k8s.io/client-go/listers/apps/v1"
47 corelisters "k8s.io/client-go/listers/core/v1"
48 "k8s.io/client-go/tools/cache"
49 featuregatetesting "k8s.io/component-base/featuregate/testing"
50 podutil "k8s.io/kubernetes/pkg/api/v1/pod"
51 "k8s.io/kubernetes/pkg/controller"
52 "k8s.io/kubernetes/pkg/controller/history"
53 "k8s.io/kubernetes/pkg/features"
54 )
55
56 type invariantFunc func(set *apps.StatefulSet, om *fakeObjectManager) error
57
58 func setupController(client clientset.Interface) (*fakeObjectManager, *fakeStatefulSetStatusUpdater, StatefulSetControlInterface) {
59 informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
60 om := newFakeObjectManager(informerFactory)
61 spc := NewStatefulPodControlFromManager(om, &noopRecorder{})
62 ssu := newFakeStatefulSetStatusUpdater(informerFactory.Apps().V1().StatefulSets())
63 recorder := &noopRecorder{}
64 ssc := NewDefaultStatefulSetControl(spc, ssu, history.NewFakeHistory(informerFactory.Apps().V1().ControllerRevisions()), recorder)
65
66
67
68
69
70
71
72
73
74 if sets, err := client.AppsV1().StatefulSets("").List(context.TODO(), metav1.ListOptions{}); err != nil {
75 panic(err)
76 } else {
77 for _, set := range sets.Items {
78 if err := om.setsIndexer.Update(&set); err != nil {
79 panic(err)
80 }
81 }
82 }
83
84 return om, ssu, ssc
85 }
86
87 func burst(set *apps.StatefulSet) *apps.StatefulSet {
88 set.Spec.PodManagementPolicy = apps.ParallelPodManagement
89 return set
90 }
91
92 func setMinReadySeconds(set *apps.StatefulSet, minReadySeconds int32) *apps.StatefulSet {
93 set.Spec.MinReadySeconds = minReadySeconds
94 return set
95 }
96
97 func runTestOverPVCRetentionPolicies(t *testing.T, testName string, testFn func(*testing.T, *apps.StatefulSetPersistentVolumeClaimRetentionPolicy)) {
98 subtestName := "StatefulSetAutoDeletePVCDisabled"
99 if testName != "" {
100 subtestName = fmt.Sprintf("%s/%s", testName, subtestName)
101 }
102 t.Run(subtestName, func(t *testing.T) {
103 defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetAutoDeletePVC, false)()
104 testFn(t, &apps.StatefulSetPersistentVolumeClaimRetentionPolicy{
105 WhenScaled: apps.RetainPersistentVolumeClaimRetentionPolicyType,
106 WhenDeleted: apps.RetainPersistentVolumeClaimRetentionPolicyType,
107 })
108 })
109
110 for _, policy := range []*apps.StatefulSetPersistentVolumeClaimRetentionPolicy{
111 {
112 WhenScaled: apps.RetainPersistentVolumeClaimRetentionPolicyType,
113 WhenDeleted: apps.RetainPersistentVolumeClaimRetentionPolicyType,
114 },
115 {
116 WhenScaled: apps.DeletePersistentVolumeClaimRetentionPolicyType,
117 WhenDeleted: apps.RetainPersistentVolumeClaimRetentionPolicyType,
118 },
119 {
120 WhenScaled: apps.RetainPersistentVolumeClaimRetentionPolicyType,
121 WhenDeleted: apps.DeletePersistentVolumeClaimRetentionPolicyType,
122 },
123 {
124 WhenScaled: apps.DeletePersistentVolumeClaimRetentionPolicyType,
125 WhenDeleted: apps.DeletePersistentVolumeClaimRetentionPolicyType,
126 },
127
128 nil,
129 } {
130 subtestName := pvcDeletePolicyString(policy) + "/StatefulSetAutoDeletePVCEnabled"
131 if testName != "" {
132 subtestName = fmt.Sprintf("%s/%s", testName, subtestName)
133 }
134 t.Run(subtestName, func(t *testing.T) {
135 defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetAutoDeletePVC, true)()
136 testFn(t, policy)
137 })
138 }
139 }
140
141 func pvcDeletePolicyString(policy *apps.StatefulSetPersistentVolumeClaimRetentionPolicy) string {
142 if policy == nil {
143 return "nullPolicy"
144 }
145 const retain = apps.RetainPersistentVolumeClaimRetentionPolicyType
146 const delete = apps.DeletePersistentVolumeClaimRetentionPolicyType
147 switch {
148 case policy.WhenScaled == retain && policy.WhenDeleted == retain:
149 return "Retain"
150 case policy.WhenScaled == retain && policy.WhenDeleted == delete:
151 return "SetDeleteOnly"
152 case policy.WhenScaled == delete && policy.WhenDeleted == retain:
153 return "ScaleDownOnly"
154 case policy.WhenScaled == delete && policy.WhenDeleted == delete:
155 return "Delete"
156 }
157 return "invalid"
158 }
159
160 func TestStatefulSetControl(t *testing.T) {
161 simpleSetFn := func() *apps.StatefulSet { return newStatefulSet(3) }
162 largeSetFn := func() *apps.StatefulSet { return newStatefulSet(5) }
163
164 testCases := []struct {
165 fn func(*testing.T, *apps.StatefulSet, invariantFunc)
166 obj func() *apps.StatefulSet
167 }{
168 {CreatesPods, simpleSetFn},
169 {ScalesUp, simpleSetFn},
170 {ScalesDown, simpleSetFn},
171 {ReplacesPods, largeSetFn},
172 {RecreatesFailedPod, simpleSetFn},
173 {RecreatesSucceededPod, simpleSetFn},
174 {CreatePodFailure, simpleSetFn},
175 {UpdatePodFailure, simpleSetFn},
176 {UpdateSetStatusFailure, simpleSetFn},
177 {PodRecreateDeleteFailure, simpleSetFn},
178 {NewRevisionDeletePodFailure, simpleSetFn},
179 {RecreatesPVCForPendingPod, simpleSetFn},
180 }
181
182 for _, testCase := range testCases {
183 fnName := runtime.FuncForPC(reflect.ValueOf(testCase.fn).Pointer()).Name()
184 if i := strings.LastIndex(fnName, "."); i != -1 {
185 fnName = fnName[i+1:]
186 }
187 testObj := testCase.obj
188 testFn := testCase.fn
189 runTestOverPVCRetentionPolicies(
190 t,
191 fmt.Sprintf("%s/Monotonic", fnName),
192 func(t *testing.T, policy *apps.StatefulSetPersistentVolumeClaimRetentionPolicy) {
193 set := testObj()
194 set.Spec.PersistentVolumeClaimRetentionPolicy = policy
195 testFn(t, set, assertMonotonicInvariants)
196 },
197 )
198 runTestOverPVCRetentionPolicies(
199 t,
200 fmt.Sprintf("%s/Burst", fnName),
201 func(t *testing.T, policy *apps.StatefulSetPersistentVolumeClaimRetentionPolicy) {
202 set := burst(testObj())
203 set.Spec.PersistentVolumeClaimRetentionPolicy = policy
204 testFn(t, set, assertBurstInvariants)
205 },
206 )
207 }
208 }
209
210 func CreatesPods(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
211 client := fake.NewSimpleClientset(set)
212 om, _, ssc := setupController(client)
213
214 if err := scaleUpStatefulSetControl(set, ssc, om, invariants); err != nil {
215 t.Errorf("Failed to turn up StatefulSet : %s", err)
216 }
217 var err error
218 set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
219 if err != nil {
220 t.Fatalf("Error getting updated StatefulSet: %v", err)
221 }
222 if set.Status.Replicas != 3 {
223 t.Error("Failed to scale statefulset to 3 replicas")
224 }
225 if set.Status.ReadyReplicas != 3 {
226 t.Error("Failed to set ReadyReplicas correctly")
227 }
228 if set.Status.UpdatedReplicas != 3 {
229 t.Error("Failed to set UpdatedReplicas correctly")
230 }
231
232 if utilfeature.DefaultFeatureGate.Enabled(features.PodIndexLabel) {
233 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
234 if err != nil {
235 t.Error(err)
236 }
237 pods, err := om.podsLister.Pods(set.Namespace).List(selector)
238 if err != nil {
239 t.Error(err)
240 }
241 if len(pods) != 3 {
242 t.Errorf("Expected 3 pods, got %d", len(pods))
243 }
244 for _, pod := range pods {
245 podIndexFromLabel, exists := pod.Labels[apps.PodIndexLabel]
246 if !exists {
247 t.Errorf("Missing pod index label: %s", apps.PodIndexLabel)
248 continue
249 }
250 podIndexFromName := strconv.Itoa(getOrdinal(pod))
251 if podIndexFromLabel != podIndexFromName {
252 t.Errorf("Pod index label value (%s) does not match pod index in pod name (%s)", podIndexFromLabel, podIndexFromName)
253 }
254 }
255 }
256 }
257
258 func ScalesUp(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
259 client := fake.NewSimpleClientset(set)
260 om, _, ssc := setupController(client)
261
262 if err := scaleUpStatefulSetControl(set, ssc, om, invariants); err != nil {
263 t.Errorf("Failed to turn up StatefulSet : %s", err)
264 }
265 *set.Spec.Replicas = 4
266 if err := scaleUpStatefulSetControl(set, ssc, om, invariants); err != nil {
267 t.Errorf("Failed to scale StatefulSet : %s", err)
268 }
269 var err error
270 set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
271 if err != nil {
272 t.Fatalf("Error getting updated StatefulSet: %v", err)
273 }
274 if set.Status.Replicas != 4 {
275 t.Error("Failed to scale statefulset to 4 replicas")
276 }
277 if set.Status.ReadyReplicas != 4 {
278 t.Error("Failed to set readyReplicas correctly")
279 }
280 if set.Status.UpdatedReplicas != 4 {
281 t.Error("Failed to set updatedReplicas correctly")
282 }
283 }
284
285 func ScalesDown(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
286 client := fake.NewSimpleClientset(set)
287 om, _, ssc := setupController(client)
288
289 if err := scaleUpStatefulSetControl(set, ssc, om, invariants); err != nil {
290 t.Errorf("Failed to turn up StatefulSet : %s", err)
291 }
292 var err error
293 set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
294 if err != nil {
295 t.Fatalf("Error getting updated StatefulSet: %v", err)
296 }
297 *set.Spec.Replicas = 0
298 if err := scaleDownStatefulSetControl(set, ssc, om, invariants); err != nil {
299 t.Errorf("Failed to scale StatefulSet : %s", err)
300 }
301
302
303 set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
304 if err != nil {
305 t.Fatalf("Error getting updated StatefulSet: %v", err)
306 }
307 if set.Status.Replicas != 0 {
308 t.Error("Failed to scale statefulset to 0 replicas")
309 }
310 if set.Status.ReadyReplicas != 0 {
311 t.Error("Failed to set readyReplicas correctly")
312 }
313 if set.Status.UpdatedReplicas != 0 {
314 t.Error("Failed to set updatedReplicas correctly")
315 }
316 }
317
318 func ReplacesPods(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
319 client := fake.NewSimpleClientset(set)
320 om, _, ssc := setupController(client)
321
322 if err := scaleUpStatefulSetControl(set, ssc, om, invariants); err != nil {
323 t.Errorf("Failed to turn up StatefulSet : %s", err)
324 }
325 var err error
326 set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
327 if err != nil {
328 t.Fatalf("Error getting updated StatefulSet: %v", err)
329 }
330 if set.Status.Replicas != 5 {
331 t.Error("Failed to scale statefulset to 5 replicas")
332 }
333 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
334 if err != nil {
335 t.Error(err)
336 }
337 claims, err := om.claimsLister.PersistentVolumeClaims(set.Namespace).List(selector)
338 if err != nil {
339 t.Error(err)
340 }
341 pods, err := om.podsLister.Pods(set.Namespace).List(selector)
342 if err != nil {
343 t.Error(err)
344 }
345 for _, pod := range pods {
346 podClaims := getPersistentVolumeClaims(set, pod)
347 for _, claim := range claims {
348 if _, found := podClaims[claim.Name]; found {
349 if hasOwnerRef(claim, pod) {
350 t.Errorf("Unexpected ownerRef on %s", claim.Name)
351 }
352 }
353 }
354 }
355 sort.Sort(ascendingOrdinal(pods))
356 om.podsIndexer.Delete(pods[0])
357 om.podsIndexer.Delete(pods[2])
358 om.podsIndexer.Delete(pods[4])
359 for i := 0; i < 5; i += 2 {
360 pods, err := om.podsLister.Pods(set.Namespace).List(selector)
361 if err != nil {
362 t.Error(err)
363 }
364 if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
365 t.Errorf("Failed to update StatefulSet : %s", err)
366 }
367 set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
368 if err != nil {
369 t.Fatalf("Error getting updated StatefulSet: %v", err)
370 }
371 if pods, err = om.setPodRunning(set, i); err != nil {
372 t.Error(err)
373 }
374 if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
375 t.Errorf("Failed to update StatefulSet : %s", err)
376 }
377 set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
378 if err != nil {
379 t.Fatalf("Error getting updated StatefulSet: %v", err)
380 }
381 if _, err = om.setPodReady(set, i); err != nil {
382 t.Error(err)
383 }
384 }
385 pods, err = om.podsLister.Pods(set.Namespace).List(selector)
386 if err != nil {
387 t.Error(err)
388 }
389 if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
390 t.Errorf("Failed to update StatefulSet : %s", err)
391 }
392 set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
393 if err != nil {
394 t.Fatalf("Error getting updated StatefulSet: %v", err)
395 }
396 if e, a := int32(5), set.Status.Replicas; e != a {
397 t.Errorf("Expected to scale to %d, got %d", e, a)
398 }
399 }
400
401 func recreatesPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc, phase v1.PodPhase) {
402 client := fake.NewSimpleClientset()
403 om, _, ssc := setupController(client)
404 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
405 if err != nil {
406 t.Error(err)
407 }
408 pods, err := om.podsLister.Pods(set.Namespace).List(selector)
409 if err != nil {
410 t.Error(err)
411 }
412 if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
413 t.Errorf("Error updating StatefulSet %s", err)
414 }
415 if err := invariants(set, om); err != nil {
416 t.Error(err)
417 }
418 pods, err = om.podsLister.Pods(set.Namespace).List(selector)
419 if err != nil {
420 t.Error(err)
421 }
422 pods[0].Status.Phase = phase
423 om.podsIndexer.Update(pods[0])
424 if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
425 t.Errorf("Error updating StatefulSet %s", err)
426 }
427 if err := invariants(set, om); err != nil {
428 t.Error(err)
429 }
430 pods, err = om.podsLister.Pods(set.Namespace).List(selector)
431 if err != nil {
432 t.Error(err)
433 }
434 if isCreated(pods[0]) {
435 t.Error("StatefulSet did not recreate failed Pod")
436 }
437
438 }
439
440 func RecreatesFailedPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
441 recreatesPod(t, set, invariants, v1.PodFailed)
442 }
443
444 func RecreatesSucceededPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
445 recreatesPod(t, set, invariants, v1.PodSucceeded)
446 }
447
448 func CreatePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
449 client := fake.NewSimpleClientset(set)
450 om, _, ssc := setupController(client)
451 om.SetCreateStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 2)
452
453 if err := scaleUpStatefulSetControl(set, ssc, om, invariants); err != nil && isOrHasInternalError(err) {
454 t.Errorf("StatefulSetControl did not return InternalError found %s", err)
455 }
456
457 var err error
458 set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
459 if err != nil {
460 t.Fatalf("Error getting updated StatefulSet: %v", err)
461 }
462 if err := scaleUpStatefulSetControl(set, ssc, om, invariants); err != nil {
463 t.Errorf("Failed to turn up StatefulSet : %s", err)
464 }
465 set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
466 if err != nil {
467 t.Fatalf("Error getting updated StatefulSet: %v", err)
468 }
469 if set.Status.Replicas != 3 {
470 t.Error("Failed to scale StatefulSet to 3 replicas")
471 }
472 if set.Status.ReadyReplicas != 3 {
473 t.Error("Failed to set readyReplicas correctly")
474 }
475 if set.Status.UpdatedReplicas != 3 {
476 t.Error("Failed to updatedReplicas correctly")
477 }
478 }
479
480 func UpdatePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
481 client := fake.NewSimpleClientset(set)
482 om, _, ssc := setupController(client)
483 om.SetUpdateStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 0)
484
485
486 if err := scaleUpStatefulSetControl(set, ssc, om, invariants); err != nil {
487 t.Fatalf("Unexpected error: %v", err)
488 }
489 var err error
490 set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
491 if err != nil {
492 t.Fatalf("Error getting updated StatefulSet: %v", err)
493 }
494 if set.Status.Replicas != 3 {
495 t.Error("Failed to scale StatefulSet to 3 replicas")
496 }
497 if set.Status.ReadyReplicas != 3 {
498 t.Error("Failed to set readyReplicas correctly")
499 }
500 if set.Status.UpdatedReplicas != 3 {
501 t.Error("Failed to set updatedReplicas correctly")
502 }
503
504
505 pods, err := om.podsLister.List(labels.Everything())
506 if err != nil {
507 t.Fatalf("Error listing pods: %v", err)
508 }
509 if len(pods) != 3 {
510 t.Fatalf("Expected 3 pods, got %d", len(pods))
511 }
512 sort.Sort(ascendingOrdinal(pods))
513 pods[0].Name = "goo-0"
514 om.podsIndexer.Update(pods[0])
515
516
517 if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil && isOrHasInternalError(err) {
518 t.Errorf("StatefulSetControl did not return InternalError found %s", err)
519 }
520 }
521
522 func UpdateSetStatusFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
523 client := fake.NewSimpleClientset(set)
524 om, ssu, ssc := setupController(client)
525 ssu.SetUpdateStatefulSetStatusError(apierrors.NewInternalError(errors.New("API server failed")), 2)
526
527 if err := scaleUpStatefulSetControl(set, ssc, om, invariants); err != nil && isOrHasInternalError(err) {
528 t.Errorf("StatefulSetControl did not return InternalError found %s", err)
529 }
530
531 var err error
532 set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
533 if err != nil {
534 t.Fatalf("Error getting updated StatefulSet: %v", err)
535 }
536 if err := scaleUpStatefulSetControl(set, ssc, om, invariants); err != nil {
537 t.Errorf("Failed to turn up StatefulSet : %s", err)
538 }
539 set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
540 if err != nil {
541 t.Fatalf("Error getting updated StatefulSet: %v", err)
542 }
543 if set.Status.Replicas != 3 {
544 t.Error("Failed to scale StatefulSet to 3 replicas")
545 }
546 if set.Status.ReadyReplicas != 3 {
547 t.Error("Failed to set readyReplicas to 3")
548 }
549 if set.Status.UpdatedReplicas != 3 {
550 t.Error("Failed to set updatedReplicas to 3")
551 }
552 }
553
554 func PodRecreateDeleteFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
555 client := fake.NewSimpleClientset(set)
556 om, _, ssc := setupController(client)
557
558 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
559 if err != nil {
560 t.Error(err)
561 }
562 pods, err := om.podsLister.Pods(set.Namespace).List(selector)
563 if err != nil {
564 t.Error(err)
565 }
566 if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
567 t.Errorf("Error updating StatefulSet %s", err)
568 }
569 if err := invariants(set, om); err != nil {
570 t.Error(err)
571 }
572 pods, err = om.podsLister.Pods(set.Namespace).List(selector)
573 if err != nil {
574 t.Error(err)
575 }
576 pods[0].Status.Phase = v1.PodFailed
577 om.podsIndexer.Update(pods[0])
578 om.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 0)
579 if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil && isOrHasInternalError(err) {
580 t.Errorf("StatefulSet failed to %s", err)
581 }
582 if err := invariants(set, om); err != nil {
583 t.Error(err)
584 }
585 if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
586 t.Errorf("Error updating StatefulSet %s", err)
587 }
588 if err := invariants(set, om); err != nil {
589 t.Error(err)
590 }
591 pods, err = om.podsLister.Pods(set.Namespace).List(selector)
592 if err != nil {
593 t.Error(err)
594 }
595 if isCreated(pods[0]) {
596 t.Error("StatefulSet did not recreate failed Pod")
597 }
598 }
599
600 func NewRevisionDeletePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
601 client := fake.NewSimpleClientset(set)
602 om, _, ssc := setupController(client)
603 if err := scaleUpStatefulSetControl(set, ssc, om, invariants); err != nil {
604 t.Errorf("Failed to turn up StatefulSet : %s", err)
605 }
606 var err error
607 set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
608 if err != nil {
609 t.Fatalf("Error getting updated StatefulSet: %v", err)
610 }
611 if set.Status.Replicas != 3 {
612 t.Error("Failed to scale StatefulSet to 3 replicas")
613 }
614 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
615 if err != nil {
616 t.Error(err)
617 }
618 pods, err := om.podsLister.Pods(set.Namespace).List(selector)
619 if err != nil {
620 t.Error(err)
621 }
622
623
624 updateSet := set.DeepCopy()
625 updateSet.Spec.Template.Spec.Containers[0].Image = "nginx-new"
626 if err := om.setsIndexer.Update(updateSet); err != nil {
627 t.Error("Failed to update StatefulSet")
628 }
629 set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
630 if err != nil {
631 t.Fatalf("Error getting updated StatefulSet: %v", err)
632 }
633
634
635 om.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 0)
636 _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods)
637 if err == nil {
638 t.Error("Expected err in update StatefulSet when deleting a pod")
639 }
640
641 set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
642 if err != nil {
643 t.Fatalf("Error getting updated StatefulSet: %v", err)
644 }
645 if err := invariants(set, om); err != nil {
646 t.Error(err)
647 }
648 if set.Status.CurrentReplicas != 3 {
649 t.Fatalf("Failed pod deletion should not update CurrentReplicas: want 3, got %d", set.Status.CurrentReplicas)
650 }
651 if set.Status.CurrentRevision == set.Status.UpdateRevision {
652 t.Error("Failed to create new revision")
653 }
654
655
656 om.SetDeleteStatefulPodError(nil, 0)
657 status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods)
658 if err != nil {
659 t.Fatalf("Unexpected err in update StatefulSet: %v", err)
660 }
661 if status.CurrentReplicas != 2 {
662 t.Fatalf("Pod deletion should update CurrentReplicas: want 2, got %d", status.CurrentReplicas)
663 }
664 if err := invariants(set, om); err != nil {
665 t.Error(err)
666 }
667 }
668
669 func emptyInvariants(set *apps.StatefulSet, om *fakeObjectManager) error {
670 return nil
671 }
672
673 func TestStatefulSetControlWithStartOrdinal(t *testing.T) {
674 defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetStartOrdinal, true)()
675
676 simpleSetFn := func() *apps.StatefulSet {
677 statefulSet := newStatefulSet(3)
678 statefulSet.Spec.Ordinals = &apps.StatefulSetOrdinals{Start: int32(2)}
679 return statefulSet
680 }
681
682 testCases := []struct {
683 fn func(*testing.T, *apps.StatefulSet, invariantFunc)
684 obj func() *apps.StatefulSet
685 }{
686 {CreatesPodsWithStartOrdinal, simpleSetFn},
687 }
688
689 for _, testCase := range testCases {
690 testObj := testCase.obj
691 testFn := testCase.fn
692
693 set := testObj()
694 testFn(t, set, emptyInvariants)
695 }
696 }
697
698 func CreatesPodsWithStartOrdinal(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
699 client := fake.NewSimpleClientset(set)
700 om, _, ssc := setupController(client)
701
702 if err := scaleUpStatefulSetControl(set, ssc, om, invariants); err != nil {
703 t.Errorf("Failed to turn up StatefulSet : %s", err)
704 }
705 var err error
706 set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
707 if err != nil {
708 t.Fatalf("Error getting updated StatefulSet: %v", err)
709 }
710 if set.Status.Replicas != 3 {
711 t.Error("Failed to scale statefulset to 3 replicas")
712 }
713 if set.Status.ReadyReplicas != 3 {
714 t.Error("Failed to set ReadyReplicas correctly")
715 }
716 if set.Status.UpdatedReplicas != 3 {
717 t.Error("Failed to set UpdatedReplicas correctly")
718 }
719 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
720 if err != nil {
721 t.Error(err)
722 }
723 pods, err := om.podsLister.Pods(set.Namespace).List(selector)
724 if err != nil {
725 t.Error(err)
726 }
727 sort.Sort(ascendingOrdinal(pods))
728 for i, pod := range pods {
729 expectedOrdinal := 2 + i
730 actualPodOrdinal := getOrdinal(pod)
731 if actualPodOrdinal != expectedOrdinal {
732 t.Errorf("Expected pod ordinal %d. Got %d", expectedOrdinal, actualPodOrdinal)
733 }
734 }
735 }
736
737 func RecreatesPVCForPendingPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
738 client := fake.NewSimpleClientset()
739 om, _, ssc := setupController(client)
740 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
741 if err != nil {
742 t.Error(err)
743 }
744 pods, err := om.podsLister.Pods(set.Namespace).List(selector)
745 if err != nil {
746 t.Error(err)
747 }
748 if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
749 t.Errorf("Error updating StatefulSet %s", err)
750 }
751 if err := invariants(set, om); err != nil {
752 t.Error(err)
753 }
754 pods, err = om.podsLister.Pods(set.Namespace).List(selector)
755 if err != nil {
756 t.Error(err)
757 }
758 for _, claim := range getPersistentVolumeClaims(set, pods[0]) {
759 om.claimsIndexer.Delete(&claim)
760 }
761 pods[0].Status.Phase = v1.PodPending
762 om.podsIndexer.Update(pods[0])
763 if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
764 t.Errorf("Error updating StatefulSet %s", err)
765 }
766
767 if err := invariants(set, om); err != nil {
768 t.Error(err)
769 }
770 _, err = om.podsLister.Pods(set.Namespace).List(selector)
771 if err != nil {
772 t.Error(err)
773 }
774 }
775
776 func TestStatefulSetControlScaleDownDeleteError(t *testing.T) {
777 runTestOverPVCRetentionPolicies(
778 t, "", func(t *testing.T, policy *apps.StatefulSetPersistentVolumeClaimRetentionPolicy) {
779 set := newStatefulSet(3)
780 set.Spec.PersistentVolumeClaimRetentionPolicy = policy
781 invariants := assertMonotonicInvariants
782 client := fake.NewSimpleClientset(set)
783 om, _, ssc := setupController(client)
784
785 if err := scaleUpStatefulSetControl(set, ssc, om, invariants); err != nil {
786 t.Errorf("Failed to turn up StatefulSet : %s", err)
787 }
788 var err error
789 set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
790 if err != nil {
791 t.Fatalf("Error getting updated StatefulSet: %v", err)
792 }
793 *set.Spec.Replicas = 0
794 om.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 2)
795 if err := scaleDownStatefulSetControl(set, ssc, om, invariants); err != nil && isOrHasInternalError(err) {
796 t.Errorf("StatefulSetControl failed to throw error on delete %s", err)
797 }
798 set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
799 if err != nil {
800 t.Fatalf("Error getting updated StatefulSet: %v", err)
801 }
802 if err := scaleDownStatefulSetControl(set, ssc, om, invariants); err != nil {
803 t.Errorf("Failed to turn down StatefulSet %s", err)
804 }
805 set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
806 if err != nil {
807 t.Fatalf("Error getting updated StatefulSet: %v", err)
808 }
809 if set.Status.Replicas != 0 {
810 t.Error("Failed to scale statefulset to 0 replicas")
811 }
812 if set.Status.ReadyReplicas != 0 {
813 t.Error("Failed to set readyReplicas to 0")
814 }
815 if set.Status.UpdatedReplicas != 0 {
816 t.Error("Failed to set updatedReplicas to 0")
817 }
818 })
819 }
820
821 func TestStatefulSetControl_getSetRevisions(t *testing.T) {
822 type testcase struct {
823 name string
824 existing []*apps.ControllerRevision
825 set *apps.StatefulSet
826 expectedCount int
827 expectedCurrent *apps.ControllerRevision
828 expectedUpdate *apps.ControllerRevision
829 err bool
830 }
831
832 testFn := func(test *testcase, t *testing.T) {
833 client := fake.NewSimpleClientset()
834 informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
835 spc := NewStatefulPodControlFromManager(newFakeObjectManager(informerFactory), &noopRecorder{})
836 ssu := newFakeStatefulSetStatusUpdater(informerFactory.Apps().V1().StatefulSets())
837 recorder := &noopRecorder{}
838 ssc := defaultStatefulSetControl{spc, ssu, history.NewFakeHistory(informerFactory.Apps().V1().ControllerRevisions()), recorder}
839
840 stop := make(chan struct{})
841 defer close(stop)
842 informerFactory.Start(stop)
843 cache.WaitForCacheSync(
844 stop,
845 informerFactory.Apps().V1().StatefulSets().Informer().HasSynced,
846 informerFactory.Core().V1().Pods().Informer().HasSynced,
847 informerFactory.Apps().V1().ControllerRevisions().Informer().HasSynced,
848 )
849 test.set.Status.CollisionCount = new(int32)
850 for i := range test.existing {
851 ssc.controllerHistory.CreateControllerRevision(test.set, test.existing[i], test.set.Status.CollisionCount)
852 }
853 revisions, err := ssc.ListRevisions(test.set)
854 if err != nil {
855 t.Fatal(err)
856 }
857 current, update, _, err := ssc.getStatefulSetRevisions(test.set, revisions)
858 if err != nil {
859 t.Fatalf("error getting statefulset revisions:%v", err)
860 }
861 revisions, err = ssc.ListRevisions(test.set)
862 if err != nil {
863 t.Fatal(err)
864 }
865 if len(revisions) != test.expectedCount {
866 t.Errorf("%s: want %d revisions got %d", test.name, test.expectedCount, len(revisions))
867 }
868 if test.err && err == nil {
869 t.Errorf("%s: expected error", test.name)
870 }
871 if !test.err && !history.EqualRevision(current, test.expectedCurrent) {
872 t.Errorf("%s: for current want %v got %v", test.name, test.expectedCurrent, current)
873 }
874 if !test.err && !history.EqualRevision(update, test.expectedUpdate) {
875 t.Errorf("%s: for update want %v got %v", test.name, test.expectedUpdate, update)
876 }
877 if !test.err && test.expectedCurrent != nil && current != nil && test.expectedCurrent.Revision != current.Revision {
878 t.Errorf("%s: for current revision want %d got %d", test.name, test.expectedCurrent.Revision, current.Revision)
879 }
880 if !test.err && test.expectedUpdate != nil && update != nil && test.expectedUpdate.Revision != update.Revision {
881 t.Errorf("%s: for update revision want %d got %d", test.name, test.expectedUpdate.Revision, update.Revision)
882 }
883 }
884
885 updateRevision := func(cr *apps.ControllerRevision, revision int64) *apps.ControllerRevision {
886 clone := cr.DeepCopy()
887 clone.Revision = revision
888 return clone
889 }
890
891 runTestOverPVCRetentionPolicies(
892 t, "", func(t *testing.T, policy *apps.StatefulSetPersistentVolumeClaimRetentionPolicy) {
893 set := newStatefulSet(3)
894 set.Spec.PersistentVolumeClaimRetentionPolicy = policy
895 set.Status.CollisionCount = new(int32)
896 rev0 := newRevisionOrDie(set, 1)
897 set1 := set.DeepCopy()
898 set1.Spec.Template.Spec.Containers[0].Image = "foo"
899 set1.Status.CurrentRevision = rev0.Name
900 set1.Status.CollisionCount = new(int32)
901 rev1 := newRevisionOrDie(set1, 2)
902 set2 := set1.DeepCopy()
903 set2.Spec.Template.Labels["new"] = "label"
904 set2.Status.CurrentRevision = rev0.Name
905 set2.Status.CollisionCount = new(int32)
906 rev2 := newRevisionOrDie(set2, 3)
907 tests := []testcase{
908 {
909 name: "creates initial revision",
910 existing: nil,
911 set: set,
912 expectedCount: 1,
913 expectedCurrent: rev0,
914 expectedUpdate: rev0,
915 err: false,
916 },
917 {
918 name: "creates revision on update",
919 existing: []*apps.ControllerRevision{rev0},
920 set: set1,
921 expectedCount: 2,
922 expectedCurrent: rev0,
923 expectedUpdate: rev1,
924 err: false,
925 },
926 {
927 name: "must not recreate a new revision of same set",
928 existing: []*apps.ControllerRevision{rev0, rev1},
929 set: set1,
930 expectedCount: 2,
931 expectedCurrent: rev0,
932 expectedUpdate: rev1,
933 err: false,
934 },
935 {
936 name: "must rollback to a previous revision",
937 existing: []*apps.ControllerRevision{rev0, rev1, rev2},
938 set: set1,
939 expectedCount: 3,
940 expectedCurrent: rev0,
941 expectedUpdate: updateRevision(rev1, 4),
942 err: false,
943 },
944 }
945 for i := range tests {
946 testFn(&tests[i], t)
947 }
948 })
949 }
950
951 func setupPodManagementPolicy(podManagementPolicy apps.PodManagementPolicyType, set *apps.StatefulSet) *apps.StatefulSet {
952 set.Spec.PodManagementPolicy = podManagementPolicy
953 return set
954 }
955
956 func TestStatefulSetControlRollingUpdateWithMaxUnavailable(t *testing.T) {
957 defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.MaxUnavailableStatefulSet, true)()
958
959 simpleParallelVerificationFn := func(
960 set *apps.StatefulSet,
961 spc *fakeObjectManager,
962 ssc StatefulSetControlInterface,
963 pods []*v1.Pod,
964 totalPods int,
965 selector labels.Selector,
966 ) []*v1.Pod {
967
968 if len(pods) != totalPods {
969 t.Fatalf("Expected create pods 4/5, got pods %v", len(pods))
970 }
971
972
973 spc.setPodRunning(set, 4)
974 spc.setPodRunning(set, 5)
975 originalPods, _ := spc.setPodReady(set, 4)
976 sort.Sort(ascendingOrdinal(originalPods))
977 if _, err := ssc.UpdateStatefulSet(context.TODO(), set, originalPods); err != nil {
978 t.Fatal(err)
979 }
980 pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
981 if err != nil {
982 t.Fatal(err)
983 }
984 sort.Sort(ascendingOrdinal(pods))
985
986 if !reflect.DeepEqual(pods, append(originalPods[:3], originalPods[4:]...)) {
987 t.Fatalf("Expected pods %v, got pods %v", append(originalPods[:3], originalPods[4:]...), pods)
988 }
989
990
991 if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
992 t.Fatal(err)
993 }
994 pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
995 if err != nil {
996 t.Fatal(err)
997 }
998 if len(pods) != totalPods {
999 t.Fatalf("Expected create pods 2/3, got pods %v", pods)
1000 }
1001
1002 return pods
1003 }
1004 simpleOrderedVerificationFn := func(
1005 set *apps.StatefulSet,
1006 spc *fakeObjectManager,
1007 ssc StatefulSetControlInterface,
1008 pods []*v1.Pod,
1009 totalPods int,
1010 selector labels.Selector,
1011 ) []*v1.Pod {
1012
1013 if len(pods) != 5 {
1014 t.Fatalf("Expected create pods 5, got pods %v", len(pods))
1015 }
1016 spc.setPodRunning(set, 4)
1017 pods, _ = spc.setPodReady(set, 4)
1018
1019
1020 if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
1021 t.Fatal(err)
1022 }
1023 pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
1024 if err != nil {
1025 t.Fatal(err)
1026 }
1027
1028 if len(pods) != totalPods {
1029 t.Fatalf("Expected create pods 4, got pods %v", len(pods))
1030 }
1031
1032 spc.setPodRunning(set, 5)
1033 originalPods, _ := spc.setPodReady(set, 5)
1034 sort.Sort(ascendingOrdinal(originalPods))
1035 if _, err = ssc.UpdateStatefulSet(context.TODO(), set, originalPods); err != nil {
1036 t.Fatal(err)
1037 }
1038 pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
1039 if err != nil {
1040 t.Fatal(err)
1041 }
1042 sort.Sort(ascendingOrdinal(pods))
1043
1044
1045 if !reflect.DeepEqual(pods, append(originalPods[:3], originalPods[4:]...)) {
1046 t.Fatalf("Expected pods %v, got pods %v", append(originalPods[:3], originalPods[4:]...), pods)
1047 }
1048
1049
1050 if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
1051 t.Fatal(err)
1052 }
1053 pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
1054 if err != nil {
1055 t.Fatal(err)
1056 }
1057 if len(pods) != totalPods {
1058 t.Fatalf("Expected create pods 2/3, got pods %v", pods)
1059 }
1060
1061 return pods
1062 }
1063 testCases := []struct {
1064 policyType apps.PodManagementPolicyType
1065 verifyFn func(
1066 set *apps.StatefulSet,
1067 spc *fakeObjectManager,
1068 ssc StatefulSetControlInterface,
1069 pods []*v1.Pod,
1070 totalPods int,
1071 selector labels.Selector,
1072 ) []*v1.Pod
1073 }{
1074 {apps.OrderedReadyPodManagement, simpleOrderedVerificationFn},
1075 {apps.ParallelPodManagement, simpleParallelVerificationFn},
1076 }
1077 for _, tc := range testCases {
1078
1079 var totalPods int32 = 6
1080 var partition int32 = 3
1081 var maxUnavailable = intstr.FromInt32(2)
1082 set := setupPodManagementPolicy(tc.policyType, newStatefulSet(totalPods))
1083 set.Spec.UpdateStrategy = apps.StatefulSetUpdateStrategy{
1084 Type: apps.RollingUpdateStatefulSetStrategyType,
1085 RollingUpdate: func() *apps.RollingUpdateStatefulSetStrategy {
1086 return &apps.RollingUpdateStatefulSetStrategy{
1087 Partition: &partition,
1088 MaxUnavailable: &maxUnavailable,
1089 }
1090 }(),
1091 }
1092
1093 client := fake.NewSimpleClientset()
1094 spc, _, ssc := setupController(client)
1095 if err := scaleUpStatefulSetControl(set, ssc, spc, assertBurstInvariants); err != nil {
1096 t.Fatal(err)
1097 }
1098 set, err := spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
1099 if err != nil {
1100 t.Fatal(err)
1101 }
1102
1103
1104 set.Spec.Template.Spec.Containers[0].Image = "foo"
1105
1106 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
1107 if err != nil {
1108 t.Fatal(err)
1109 }
1110 originalPods, err := spc.podsLister.Pods(set.Namespace).List(selector)
1111 if err != nil {
1112 t.Fatal(err)
1113 }
1114 sort.Sort(ascendingOrdinal(originalPods))
1115
1116
1117 if _, err = ssc.UpdateStatefulSet(context.TODO(), set, originalPods); err != nil {
1118 t.Fatal(err)
1119 }
1120 pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
1121 if err != nil {
1122 t.Fatal(err)
1123 }
1124
1125 sort.Sort(ascendingOrdinal(pods))
1126
1127
1128 if !reflect.DeepEqual(pods, originalPods[:4]) {
1129 t.Fatalf("Expected pods %v, got pods %v", originalPods[:4], pods)
1130 }
1131
1132
1133 if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
1134 t.Fatal(err)
1135 }
1136 pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
1137 if err != nil {
1138 t.Fatal(err)
1139 }
1140
1141 tc.verifyFn(set, spc, ssc, pods, int(totalPods), selector)
1142
1143
1144 spc.setPodRunning(set, 3)
1145 spc.setPodRunning(set, 5)
1146 spc.setPodReady(set, 5)
1147 originalPods, _ = spc.setPodReady(set, 3)
1148 sort.Sort(ascendingOrdinal(originalPods))
1149 if _, err = ssc.UpdateStatefulSet(context.TODO(), set, originalPods); err != nil {
1150 t.Fatal(err)
1151 }
1152 pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
1153 if err != nil {
1154 t.Fatal(err)
1155 }
1156 sort.Sort(ascendingOrdinal(pods))
1157 if !reflect.DeepEqual(pods, originalPods) {
1158 t.Fatalf("Expected pods %v, got pods %v", originalPods, pods)
1159 }
1160 }
1161
1162 }
1163
1164 func setupForInvariant(t *testing.T) (*apps.StatefulSet, *fakeObjectManager, StatefulSetControlInterface, intstr.IntOrString, int32) {
1165 var totalPods int32 = 6
1166 set := newStatefulSet(totalPods)
1167
1168 var partition int32 = 3
1169 var maxUnavailable = intstr.FromInt32(2)
1170 set.Spec.UpdateStrategy = apps.StatefulSetUpdateStrategy{
1171 Type: apps.RollingUpdateStatefulSetStrategyType,
1172 RollingUpdate: func() *apps.RollingUpdateStatefulSetStrategy {
1173 return &apps.RollingUpdateStatefulSetStrategy{
1174 Partition: &partition,
1175 MaxUnavailable: &maxUnavailable,
1176 }
1177 }(),
1178 }
1179
1180 client := fake.NewSimpleClientset()
1181 spc, _, ssc := setupController(client)
1182 if err := scaleUpStatefulSetControl(set, ssc, spc, assertBurstInvariants); err != nil {
1183 t.Fatal(err)
1184 }
1185 set, err := spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
1186 if err != nil {
1187 t.Fatal(err)
1188 }
1189
1190 return set, spc, ssc, maxUnavailable, totalPods
1191 }
1192
1193 func TestStatefulSetControlRollingUpdateWithMaxUnavailableInOrderedModeVerifyInvariant(t *testing.T) {
1194
1195
1196
1197
1198 testCases := []struct {
1199 ordinalOfPodToTerminate []int
1200 }{
1201
1202 {[]int{}},
1203 {[]int{5}},
1204 {[]int{3}},
1205 {[]int{4}},
1206 {[]int{5, 4}},
1207 {[]int{5, 3}},
1208 {[]int{4, 3}},
1209 {[]int{5, 4, 3}},
1210 {[]int{2}},
1211 {[]int{1}},
1212 }
1213 for _, tc := range testCases {
1214 defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.MaxUnavailableStatefulSet, true)()
1215 set, spc, ssc, maxUnavailable, totalPods := setupForInvariant(t)
1216 t.Run(fmt.Sprintf("terminating pod at ordinal %d", tc.ordinalOfPodToTerminate), func(t *testing.T) {
1217 status := apps.StatefulSetStatus{Replicas: int32(totalPods)}
1218 updateRevision := &apps.ControllerRevision{}
1219
1220 for i := 0; i < len(tc.ordinalOfPodToTerminate); i++ {
1221
1222 _, err := spc.addTerminatingPod(set, tc.ordinalOfPodToTerminate[i])
1223 if err != nil {
1224 t.Fatal(err)
1225 }
1226 }
1227
1228 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
1229 if err != nil {
1230 t.Fatal(err)
1231 }
1232
1233 originalPods, err := spc.podsLister.Pods(set.Namespace).List(selector)
1234 if err != nil {
1235 t.Fatal(err)
1236 }
1237
1238 sort.Sort(ascendingOrdinal(originalPods))
1239
1240
1241 set.Spec.Template.Spec.Containers[0].Image = "foo"
1242
1243
1244
1245 if _, err = updateStatefulSetAfterInvariantEstablished(context.TODO(), ssc.(*defaultStatefulSetControl), set, originalPods, updateRevision, status); err != nil {
1246 t.Fatal(err)
1247 }
1248 pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
1249 if err != nil {
1250 t.Fatal(err)
1251 }
1252
1253 sort.Sort(ascendingOrdinal(pods))
1254
1255 expecteddPodsToBeDeleted := maxUnavailable.IntValue() - len(tc.ordinalOfPodToTerminate)
1256 if expecteddPodsToBeDeleted < 0 {
1257 expecteddPodsToBeDeleted = 0
1258 }
1259
1260 expectedPodsAfterUpdate := int(totalPods) - expecteddPodsToBeDeleted
1261
1262 if len(pods) != expectedPodsAfterUpdate {
1263 t.Errorf("Expected pods %v, got pods %v", expectedPodsAfterUpdate, len(pods))
1264 }
1265
1266 })
1267 }
1268 }
1269
1270 func TestStatefulSetControlRollingUpdate(t *testing.T) {
1271 type testcase struct {
1272 name string
1273 invariants func(set *apps.StatefulSet, om *fakeObjectManager) error
1274 initial func() *apps.StatefulSet
1275 update func(set *apps.StatefulSet) *apps.StatefulSet
1276 validate func(set *apps.StatefulSet, pods []*v1.Pod) error
1277 }
1278
1279 testFn := func(test *testcase, t *testing.T) {
1280 set := test.initial()
1281 client := fake.NewSimpleClientset(set)
1282 om, _, ssc := setupController(client)
1283 if err := scaleUpStatefulSetControl(set, ssc, om, test.invariants); err != nil {
1284 t.Fatalf("%s: %s", test.name, err)
1285 }
1286 set, err := om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
1287 if err != nil {
1288 t.Fatalf("%s: %s", test.name, err)
1289 }
1290 set = test.update(set)
1291 if err := updateStatefulSetControl(set, ssc, om, assertUpdateInvariants); err != nil {
1292 t.Fatalf("%s: %s", test.name, err)
1293 }
1294 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
1295 if err != nil {
1296 t.Fatalf("%s: %s", test.name, err)
1297 }
1298 pods, err := om.podsLister.Pods(set.Namespace).List(selector)
1299 if err != nil {
1300 t.Fatalf("%s: %s", test.name, err)
1301 }
1302 set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
1303 if err != nil {
1304 t.Fatalf("%s: %s", test.name, err)
1305 }
1306 if err := test.validate(set, pods); err != nil {
1307 t.Fatalf("%s: %s", test.name, err)
1308 }
1309 }
1310
1311 tests := []testcase{
1312 {
1313 name: "monotonic image update",
1314 invariants: assertMonotonicInvariants,
1315 initial: func() *apps.StatefulSet {
1316 return newStatefulSet(3)
1317 },
1318 update: func(set *apps.StatefulSet) *apps.StatefulSet {
1319 set.Spec.Template.Spec.Containers[0].Image = "foo"
1320 return set
1321 },
1322 validate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
1323 sort.Sort(ascendingOrdinal(pods))
1324 for i := range pods {
1325 if pods[i].Spec.Containers[0].Image != "foo" {
1326 return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
1327 }
1328 }
1329 return nil
1330 },
1331 },
1332 {
1333 name: "monotonic image update and scale up",
1334 invariants: assertMonotonicInvariants,
1335 initial: func() *apps.StatefulSet {
1336 return newStatefulSet(3)
1337 },
1338 update: func(set *apps.StatefulSet) *apps.StatefulSet {
1339 *set.Spec.Replicas = 5
1340 set.Spec.Template.Spec.Containers[0].Image = "foo"
1341 return set
1342 },
1343 validate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
1344 sort.Sort(ascendingOrdinal(pods))
1345 for i := range pods {
1346 if pods[i].Spec.Containers[0].Image != "foo" {
1347 return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
1348 }
1349 }
1350 return nil
1351 },
1352 },
1353 {
1354 name: "monotonic image update and scale down",
1355 invariants: assertMonotonicInvariants,
1356 initial: func() *apps.StatefulSet {
1357 return newStatefulSet(5)
1358 },
1359 update: func(set *apps.StatefulSet) *apps.StatefulSet {
1360 *set.Spec.Replicas = 3
1361 set.Spec.Template.Spec.Containers[0].Image = "foo"
1362 return set
1363 },
1364 validate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
1365 sort.Sort(ascendingOrdinal(pods))
1366 for i := range pods {
1367 if pods[i].Spec.Containers[0].Image != "foo" {
1368 return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
1369 }
1370 }
1371 return nil
1372 },
1373 },
1374 {
1375 name: "burst image update",
1376 invariants: assertBurstInvariants,
1377 initial: func() *apps.StatefulSet {
1378 return burst(newStatefulSet(3))
1379 },
1380 update: func(set *apps.StatefulSet) *apps.StatefulSet {
1381 set.Spec.Template.Spec.Containers[0].Image = "foo"
1382 return set
1383 },
1384 validate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
1385 sort.Sort(ascendingOrdinal(pods))
1386 for i := range pods {
1387 if pods[i].Spec.Containers[0].Image != "foo" {
1388 return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
1389 }
1390 }
1391 return nil
1392 },
1393 },
1394 {
1395 name: "burst image update and scale up",
1396 invariants: assertBurstInvariants,
1397 initial: func() *apps.StatefulSet {
1398 return burst(newStatefulSet(3))
1399 },
1400 update: func(set *apps.StatefulSet) *apps.StatefulSet {
1401 *set.Spec.Replicas = 5
1402 set.Spec.Template.Spec.Containers[0].Image = "foo"
1403 return set
1404 },
1405 validate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
1406 sort.Sort(ascendingOrdinal(pods))
1407 for i := range pods {
1408 if pods[i].Spec.Containers[0].Image != "foo" {
1409 return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
1410 }
1411 }
1412 return nil
1413 },
1414 },
1415 {
1416 name: "burst image update and scale down",
1417 invariants: assertBurstInvariants,
1418 initial: func() *apps.StatefulSet {
1419 return burst(newStatefulSet(5))
1420 },
1421 update: func(set *apps.StatefulSet) *apps.StatefulSet {
1422 *set.Spec.Replicas = 3
1423 set.Spec.Template.Spec.Containers[0].Image = "foo"
1424 return set
1425 },
1426 validate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
1427 sort.Sort(ascendingOrdinal(pods))
1428 for i := range pods {
1429 if pods[i].Spec.Containers[0].Image != "foo" {
1430 return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
1431 }
1432 }
1433 return nil
1434 },
1435 },
1436 }
1437 for i := range tests {
1438 testFn(&tests[i], t)
1439 }
1440 }
1441
1442 func TestStatefulSetControlOnDeleteUpdate(t *testing.T) {
1443 type testcase struct {
1444 name string
1445 invariants func(set *apps.StatefulSet, om *fakeObjectManager) error
1446 initial func() *apps.StatefulSet
1447 update func(set *apps.StatefulSet) *apps.StatefulSet
1448 validateUpdate func(set *apps.StatefulSet, pods []*v1.Pod) error
1449 validateRestart func(set *apps.StatefulSet, pods []*v1.Pod) error
1450 }
1451
1452 originalImage := newStatefulSet(3).Spec.Template.Spec.Containers[0].Image
1453
1454 testFn := func(t *testing.T, test *testcase, policy *apps.StatefulSetPersistentVolumeClaimRetentionPolicy) {
1455 set := test.initial()
1456 set.Spec.PersistentVolumeClaimRetentionPolicy = policy
1457 set.Spec.UpdateStrategy = apps.StatefulSetUpdateStrategy{Type: apps.OnDeleteStatefulSetStrategyType}
1458 client := fake.NewSimpleClientset(set)
1459 om, _, ssc := setupController(client)
1460 if err := scaleUpStatefulSetControl(set, ssc, om, test.invariants); err != nil {
1461 t.Fatalf("%s: %s", test.name, err)
1462 }
1463 set, err := om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
1464 if err != nil {
1465 t.Fatalf("%s: %s", test.name, err)
1466 }
1467 set = test.update(set)
1468 if err := updateStatefulSetControl(set, ssc, om, assertUpdateInvariants); err != nil {
1469 t.Fatalf("%s: %s", test.name, err)
1470 }
1471
1472
1473 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
1474 if err != nil {
1475 t.Fatalf("%s: %s", test.name, err)
1476 }
1477 claims, err := om.claimsLister.PersistentVolumeClaims(set.Namespace).List(selector)
1478 if err != nil {
1479 t.Fatalf("%s: %s", test.name, err)
1480 }
1481 for _, claim := range claims {
1482 for _, ref := range claim.GetOwnerReferences() {
1483 if strings.HasPrefix(ref.Name, "foo-") {
1484 om.claimsIndexer.Delete(claim)
1485 break
1486 }
1487 }
1488 }
1489
1490 set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
1491 if err != nil {
1492 t.Fatalf("%s: %s", test.name, err)
1493 }
1494 pods, err := om.podsLister.Pods(set.Namespace).List(selector)
1495 if err != nil {
1496 t.Fatalf("%s: %s", test.name, err)
1497 }
1498 if err := test.validateUpdate(set, pods); err != nil {
1499 for i := range pods {
1500 t.Log(pods[i].Name)
1501 }
1502 t.Fatalf("%s: %s", test.name, err)
1503
1504 }
1505 claims, err = om.claimsLister.PersistentVolumeClaims(set.Namespace).List(selector)
1506 if err != nil {
1507 t.Fatalf("%s: %s", test.name, err)
1508 }
1509 for _, claim := range claims {
1510 for _, ref := range claim.GetOwnerReferences() {
1511 if strings.HasPrefix(ref.Name, "foo-") {
1512 t.Fatalf("Unexpected pod reference on %s: %v", claim.Name, claim.GetOwnerReferences())
1513 }
1514 }
1515 }
1516
1517 replicas := *set.Spec.Replicas
1518 *set.Spec.Replicas = 0
1519 if err := scaleDownStatefulSetControl(set, ssc, om, test.invariants); err != nil {
1520 t.Fatalf("%s: %s", test.name, err)
1521 }
1522 set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
1523 if err != nil {
1524 t.Fatalf("%s: %s", test.name, err)
1525 }
1526 *set.Spec.Replicas = replicas
1527
1528 claims, err = om.claimsLister.PersistentVolumeClaims(set.Namespace).List(selector)
1529 if err != nil {
1530 t.Fatalf("%s: %s", test.name, err)
1531 }
1532 for _, claim := range claims {
1533 for _, ref := range claim.GetOwnerReferences() {
1534 if strings.HasPrefix(ref.Name, "foo-") {
1535 t.Fatalf("Unexpected pod reference on %s: %v", claim.Name, claim.GetOwnerReferences())
1536 }
1537 }
1538 }
1539
1540 if err := scaleUpStatefulSetControl(set, ssc, om, test.invariants); err != nil {
1541 t.Fatalf("%s: %s", test.name, err)
1542 }
1543 set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
1544 if err != nil {
1545 t.Fatalf("%s: %s", test.name, err)
1546 }
1547 pods, err = om.podsLister.Pods(set.Namespace).List(selector)
1548 if err != nil {
1549 t.Fatalf("%s: %s", test.name, err)
1550 }
1551 if err := test.validateRestart(set, pods); err != nil {
1552 t.Fatalf("%s: %s", test.name, err)
1553 }
1554 }
1555
1556 tests := []testcase{
1557 {
1558 name: "monotonic image update",
1559 invariants: assertMonotonicInvariants,
1560 initial: func() *apps.StatefulSet {
1561 return newStatefulSet(3)
1562 },
1563 update: func(set *apps.StatefulSet) *apps.StatefulSet {
1564 set.Spec.Template.Spec.Containers[0].Image = "foo"
1565 return set
1566 },
1567 validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
1568 sort.Sort(ascendingOrdinal(pods))
1569 for i := range pods {
1570 if pods[i].Spec.Containers[0].Image != originalImage {
1571 return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
1572 }
1573 }
1574 return nil
1575 },
1576 validateRestart: func(set *apps.StatefulSet, pods []*v1.Pod) error {
1577 sort.Sort(ascendingOrdinal(pods))
1578 for i := range pods {
1579 if pods[i].Spec.Containers[0].Image != "foo" {
1580 return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
1581 }
1582 }
1583 return nil
1584 },
1585 },
1586 {
1587 name: "monotonic image update and scale up",
1588 invariants: assertMonotonicInvariants,
1589 initial: func() *apps.StatefulSet {
1590 return newStatefulSet(3)
1591 },
1592 update: func(set *apps.StatefulSet) *apps.StatefulSet {
1593 *set.Spec.Replicas = 5
1594 set.Spec.Template.Spec.Containers[0].Image = "foo"
1595 return set
1596 },
1597 validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
1598 sort.Sort(ascendingOrdinal(pods))
1599 for i := range pods {
1600 if i < 3 && pods[i].Spec.Containers[0].Image != originalImage {
1601 return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
1602 }
1603 if i >= 3 && pods[i].Spec.Containers[0].Image != "foo" {
1604 return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
1605 }
1606 }
1607 return nil
1608 },
1609 validateRestart: func(set *apps.StatefulSet, pods []*v1.Pod) error {
1610 sort.Sort(ascendingOrdinal(pods))
1611 for i := range pods {
1612 if pods[i].Spec.Containers[0].Image != "foo" {
1613 return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
1614 }
1615 }
1616 return nil
1617 },
1618 },
1619 {
1620 name: "monotonic image update and scale down",
1621 invariants: assertMonotonicInvariants,
1622 initial: func() *apps.StatefulSet {
1623 return newStatefulSet(5)
1624 },
1625 update: func(set *apps.StatefulSet) *apps.StatefulSet {
1626 *set.Spec.Replicas = 3
1627 set.Spec.Template.Spec.Containers[0].Image = "foo"
1628 return set
1629 },
1630 validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
1631 sort.Sort(ascendingOrdinal(pods))
1632 for i := range pods {
1633 if pods[i].Spec.Containers[0].Image != originalImage {
1634 return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
1635 }
1636 }
1637 return nil
1638 },
1639 validateRestart: func(set *apps.StatefulSet, pods []*v1.Pod) error {
1640 sort.Sort(ascendingOrdinal(pods))
1641 for i := range pods {
1642 if pods[i].Spec.Containers[0].Image != "foo" {
1643 return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
1644 }
1645 }
1646 return nil
1647 },
1648 },
1649 {
1650 name: "burst image update",
1651 invariants: assertBurstInvariants,
1652 initial: func() *apps.StatefulSet {
1653 return burst(newStatefulSet(3))
1654 },
1655 update: func(set *apps.StatefulSet) *apps.StatefulSet {
1656 set.Spec.Template.Spec.Containers[0].Image = "foo"
1657 return set
1658 },
1659 validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
1660 sort.Sort(ascendingOrdinal(pods))
1661 for i := range pods {
1662 if pods[i].Spec.Containers[0].Image != originalImage {
1663 return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
1664 }
1665 }
1666 return nil
1667 },
1668 validateRestart: func(set *apps.StatefulSet, pods []*v1.Pod) error {
1669 sort.Sort(ascendingOrdinal(pods))
1670 for i := range pods {
1671 if pods[i].Spec.Containers[0].Image != "foo" {
1672 return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
1673 }
1674 }
1675 return nil
1676 },
1677 },
1678 {
1679 name: "burst image update and scale up",
1680 invariants: assertBurstInvariants,
1681 initial: func() *apps.StatefulSet {
1682 return burst(newStatefulSet(3))
1683 },
1684 update: func(set *apps.StatefulSet) *apps.StatefulSet {
1685 *set.Spec.Replicas = 5
1686 set.Spec.Template.Spec.Containers[0].Image = "foo"
1687 return set
1688 },
1689 validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
1690 sort.Sort(ascendingOrdinal(pods))
1691 for i := range pods {
1692 if i < 3 && pods[i].Spec.Containers[0].Image != originalImage {
1693 return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
1694 }
1695 if i >= 3 && pods[i].Spec.Containers[0].Image != "foo" {
1696 return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
1697 }
1698 }
1699 return nil
1700 },
1701 validateRestart: func(set *apps.StatefulSet, pods []*v1.Pod) error {
1702 sort.Sort(ascendingOrdinal(pods))
1703 for i := range pods {
1704 if pods[i].Spec.Containers[0].Image != "foo" {
1705 return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
1706 }
1707 }
1708 return nil
1709 },
1710 },
1711 {
1712 name: "burst image update and scale down",
1713 invariants: assertBurstInvariants,
1714 initial: func() *apps.StatefulSet {
1715 return burst(newStatefulSet(5))
1716 },
1717 update: func(set *apps.StatefulSet) *apps.StatefulSet {
1718 *set.Spec.Replicas = 3
1719 set.Spec.Template.Spec.Containers[0].Image = "foo"
1720 return set
1721 },
1722 validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
1723 sort.Sort(ascendingOrdinal(pods))
1724 for i := range pods {
1725 if pods[i].Spec.Containers[0].Image != originalImage {
1726 return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
1727 }
1728 }
1729 return nil
1730 },
1731 validateRestart: func(set *apps.StatefulSet, pods []*v1.Pod) error {
1732 sort.Sort(ascendingOrdinal(pods))
1733 for i := range pods {
1734 if pods[i].Spec.Containers[0].Image != "foo" {
1735 return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
1736 }
1737 }
1738 return nil
1739 },
1740 },
1741 }
1742 runTestOverPVCRetentionPolicies(t, "", func(t *testing.T, policy *apps.StatefulSetPersistentVolumeClaimRetentionPolicy) {
1743 for i := range tests {
1744 testFn(t, &tests[i], policy)
1745 }
1746 })
1747 }
1748
1749 func TestStatefulSetControlRollingUpdateWithPartition(t *testing.T) {
1750 type testcase struct {
1751 name string
1752 partition int32
1753 invariants func(set *apps.StatefulSet, om *fakeObjectManager) error
1754 initial func() *apps.StatefulSet
1755 update func(set *apps.StatefulSet) *apps.StatefulSet
1756 validate func(set *apps.StatefulSet, pods []*v1.Pod) error
1757 }
1758
1759 testFn := func(t *testing.T, test *testcase, policy *apps.StatefulSetPersistentVolumeClaimRetentionPolicy) {
1760 set := test.initial()
1761 set.Spec.PersistentVolumeClaimRetentionPolicy = policy
1762 set.Spec.UpdateStrategy = apps.StatefulSetUpdateStrategy{
1763 Type: apps.RollingUpdateStatefulSetStrategyType,
1764 RollingUpdate: func() *apps.RollingUpdateStatefulSetStrategy {
1765 return &apps.RollingUpdateStatefulSetStrategy{Partition: &test.partition}
1766 }(),
1767 }
1768 client := fake.NewSimpleClientset(set)
1769 om, _, ssc := setupController(client)
1770 if err := scaleUpStatefulSetControl(set, ssc, om, test.invariants); err != nil {
1771 t.Fatalf("%s: %s", test.name, err)
1772 }
1773 set, err := om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
1774 if err != nil {
1775 t.Fatalf("%s: %s", test.name, err)
1776 }
1777 set = test.update(set)
1778 if err := updateStatefulSetControl(set, ssc, om, assertUpdateInvariants); err != nil {
1779 t.Fatalf("%s: %s", test.name, err)
1780 }
1781 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
1782 if err != nil {
1783 t.Fatalf("%s: %s", test.name, err)
1784 }
1785 pods, err := om.podsLister.Pods(set.Namespace).List(selector)
1786 if err != nil {
1787 t.Fatalf("%s: %s", test.name, err)
1788 }
1789 set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
1790 if err != nil {
1791 t.Fatalf("%s: %s", test.name, err)
1792 }
1793 if err := test.validate(set, pods); err != nil {
1794 t.Fatalf("%s: %s", test.name, err)
1795 }
1796 }
1797
1798 originalImage := newStatefulSet(3).Spec.Template.Spec.Containers[0].Image
1799
1800 tests := []testcase{
1801 {
1802 name: "monotonic image update",
1803 invariants: assertMonotonicInvariants,
1804 partition: 2,
1805 initial: func() *apps.StatefulSet {
1806 return newStatefulSet(3)
1807 },
1808 update: func(set *apps.StatefulSet) *apps.StatefulSet {
1809 set.Spec.Template.Spec.Containers[0].Image = "foo"
1810 return set
1811 },
1812 validate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
1813 sort.Sort(ascendingOrdinal(pods))
1814 for i := range pods {
1815 if i < 2 && pods[i].Spec.Containers[0].Image != originalImage {
1816 return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
1817 }
1818 if i >= 2 && pods[i].Spec.Containers[0].Image != "foo" {
1819 return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
1820 }
1821 }
1822 return nil
1823 },
1824 },
1825 {
1826 name: "monotonic image update and scale up",
1827 partition: 2,
1828 invariants: assertMonotonicInvariants,
1829 initial: func() *apps.StatefulSet {
1830 return newStatefulSet(3)
1831 },
1832 update: func(set *apps.StatefulSet) *apps.StatefulSet {
1833 *set.Spec.Replicas = 5
1834 set.Spec.Template.Spec.Containers[0].Image = "foo"
1835 return set
1836 },
1837 validate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
1838 sort.Sort(ascendingOrdinal(pods))
1839 for i := range pods {
1840 if i < 2 && pods[i].Spec.Containers[0].Image != originalImage {
1841 return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
1842 }
1843 if i >= 2 && pods[i].Spec.Containers[0].Image != "foo" {
1844 return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
1845 }
1846 }
1847 return nil
1848 },
1849 },
1850 {
1851 name: "burst image update",
1852 partition: 2,
1853 invariants: assertBurstInvariants,
1854 initial: func() *apps.StatefulSet {
1855 return burst(newStatefulSet(3))
1856 },
1857 update: func(set *apps.StatefulSet) *apps.StatefulSet {
1858 set.Spec.Template.Spec.Containers[0].Image = "foo"
1859 return set
1860 },
1861 validate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
1862 sort.Sort(ascendingOrdinal(pods))
1863 for i := range pods {
1864 if i < 2 && pods[i].Spec.Containers[0].Image != originalImage {
1865 return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
1866 }
1867 if i >= 2 && pods[i].Spec.Containers[0].Image != "foo" {
1868 return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
1869 }
1870 }
1871 return nil
1872 },
1873 },
1874 {
1875 name: "burst image update and scale up",
1876 invariants: assertBurstInvariants,
1877 partition: 2,
1878 initial: func() *apps.StatefulSet {
1879 return burst(newStatefulSet(3))
1880 },
1881 update: func(set *apps.StatefulSet) *apps.StatefulSet {
1882 *set.Spec.Replicas = 5
1883 set.Spec.Template.Spec.Containers[0].Image = "foo"
1884 return set
1885 },
1886 validate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
1887 sort.Sort(ascendingOrdinal(pods))
1888 for i := range pods {
1889 if i < 2 && pods[i].Spec.Containers[0].Image != originalImage {
1890 return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
1891 }
1892 if i >= 2 && pods[i].Spec.Containers[0].Image != "foo" {
1893 return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
1894 }
1895 }
1896 return nil
1897 },
1898 },
1899 }
1900 runTestOverPVCRetentionPolicies(t, "", func(t *testing.T, policy *apps.StatefulSetPersistentVolumeClaimRetentionPolicy) {
1901 for i := range tests {
1902 testFn(t, &tests[i], policy)
1903 }
1904 })
1905 }
1906
1907 func TestStatefulSetHonorRevisionHistoryLimit(t *testing.T) {
1908 runTestOverPVCRetentionPolicies(t, "", func(t *testing.T, policy *apps.StatefulSetPersistentVolumeClaimRetentionPolicy) {
1909 invariants := assertMonotonicInvariants
1910 set := newStatefulSet(3)
1911 set.Spec.PersistentVolumeClaimRetentionPolicy = policy
1912 client := fake.NewSimpleClientset(set)
1913 om, ssu, ssc := setupController(client)
1914
1915 if err := scaleUpStatefulSetControl(set, ssc, om, invariants); err != nil {
1916 t.Errorf("Failed to turn up StatefulSet : %s", err)
1917 }
1918 var err error
1919 set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
1920 if err != nil {
1921 t.Fatalf("Error getting updated StatefulSet: %v", err)
1922 }
1923
1924 for i := 0; i < int(*set.Spec.RevisionHistoryLimit)+5; i++ {
1925 set.Spec.Template.Spec.Containers[0].Image = fmt.Sprintf("foo-%d", i)
1926 ssu.SetUpdateStatefulSetStatusError(apierrors.NewInternalError(errors.New("API server failed")), 2)
1927 updateStatefulSetControl(set, ssc, om, assertUpdateInvariants)
1928 set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
1929 if err != nil {
1930 t.Fatalf("Error getting updated StatefulSet: %v", err)
1931 }
1932 revisions, err := ssc.ListRevisions(set)
1933 if err != nil {
1934 t.Fatalf("Error listing revisions: %v", err)
1935 }
1936
1937
1938 if len(revisions) > int(*set.Spec.RevisionHistoryLimit)+2 {
1939 t.Fatalf("%s: %d greater than limit %d", "", len(revisions), *set.Spec.RevisionHistoryLimit)
1940 }
1941 }
1942 })
1943 }
1944
1945 func TestStatefulSetControlLimitsHistory(t *testing.T) {
1946 type testcase struct {
1947 name string
1948 invariants func(set *apps.StatefulSet, om *fakeObjectManager) error
1949 initial func() *apps.StatefulSet
1950 }
1951
1952 testFn := func(t *testing.T, test *testcase) {
1953 set := test.initial()
1954 client := fake.NewSimpleClientset(set)
1955 om, _, ssc := setupController(client)
1956 if err := scaleUpStatefulSetControl(set, ssc, om, test.invariants); err != nil {
1957 t.Fatalf("%s: %s", test.name, err)
1958 }
1959 set, err := om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
1960 if err != nil {
1961 t.Fatalf("%s: %s", test.name, err)
1962 }
1963 for i := 0; i < 10; i++ {
1964 set.Spec.Template.Spec.Containers[0].Image = fmt.Sprintf("foo-%d", i)
1965 if err := updateStatefulSetControl(set, ssc, om, assertUpdateInvariants); err != nil {
1966 t.Fatalf("%s: %s", test.name, err)
1967 }
1968 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
1969 if err != nil {
1970 t.Fatalf("%s: %s", test.name, err)
1971 }
1972 pods, err := om.podsLister.Pods(set.Namespace).List(selector)
1973 if err != nil {
1974 t.Fatalf("%s: %s", test.name, err)
1975 }
1976 set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
1977 if err != nil {
1978 t.Fatalf("%s: %s", test.name, err)
1979 }
1980 _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods)
1981 if err != nil {
1982 t.Fatalf("%s: %s", test.name, err)
1983 }
1984 revisions, err := ssc.ListRevisions(set)
1985 if err != nil {
1986 t.Fatalf("%s: %s", test.name, err)
1987 }
1988 if len(revisions) > int(*set.Spec.RevisionHistoryLimit)+2 {
1989 t.Fatalf("%s: %d greater than limit %d", test.name, len(revisions), *set.Spec.RevisionHistoryLimit)
1990 }
1991 }
1992 }
1993
1994 tests := []testcase{
1995 {
1996 name: "monotonic update",
1997 invariants: assertMonotonicInvariants,
1998 initial: func() *apps.StatefulSet {
1999 return newStatefulSet(3)
2000 },
2001 },
2002 {
2003 name: "burst update",
2004 invariants: assertBurstInvariants,
2005 initial: func() *apps.StatefulSet {
2006 return burst(newStatefulSet(3))
2007 },
2008 },
2009 }
2010 for i := range tests {
2011 testFn(t, &tests[i])
2012 }
2013 }
2014
2015 func TestStatefulSetControlRollback(t *testing.T) {
2016 type testcase struct {
2017 name string
2018 invariants func(set *apps.StatefulSet, om *fakeObjectManager) error
2019 initial func() *apps.StatefulSet
2020 update func(set *apps.StatefulSet) *apps.StatefulSet
2021 validateUpdate func(set *apps.StatefulSet, pods []*v1.Pod) error
2022 validateRollback func(set *apps.StatefulSet, pods []*v1.Pod) error
2023 }
2024
2025 originalImage := newStatefulSet(3).Spec.Template.Spec.Containers[0].Image
2026
2027 testFn := func(t *testing.T, test *testcase) {
2028 set := test.initial()
2029 client := fake.NewSimpleClientset(set)
2030 om, _, ssc := setupController(client)
2031 if err := scaleUpStatefulSetControl(set, ssc, om, test.invariants); err != nil {
2032 t.Fatalf("%s: %s", test.name, err)
2033 }
2034 set, err := om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
2035 if err != nil {
2036 t.Fatalf("%s: %s", test.name, err)
2037 }
2038 set = test.update(set)
2039 if err := updateStatefulSetControl(set, ssc, om, assertUpdateInvariants); err != nil {
2040 t.Fatalf("%s: %s", test.name, err)
2041 }
2042 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
2043 if err != nil {
2044 t.Fatalf("%s: %s", test.name, err)
2045 }
2046 pods, err := om.podsLister.Pods(set.Namespace).List(selector)
2047 if err != nil {
2048 t.Fatalf("%s: %s", test.name, err)
2049 }
2050 set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
2051 if err != nil {
2052 t.Fatalf("%s: %s", test.name, err)
2053 }
2054 if err := test.validateUpdate(set, pods); err != nil {
2055 t.Fatalf("%s: %s", test.name, err)
2056 }
2057 revisions, err := ssc.ListRevisions(set)
2058 if err != nil {
2059 t.Fatalf("%s: %s", test.name, err)
2060 }
2061 history.SortControllerRevisions(revisions)
2062 set, err = ApplyRevision(set, revisions[0])
2063 if err != nil {
2064 t.Fatalf("%s: %s", test.name, err)
2065 }
2066 if err := updateStatefulSetControl(set, ssc, om, assertUpdateInvariants); err != nil {
2067 t.Fatalf("%s: %s", test.name, err)
2068 }
2069 if err != nil {
2070 t.Fatalf("%s: %s", test.name, err)
2071 }
2072 pods, err = om.podsLister.Pods(set.Namespace).List(selector)
2073 if err != nil {
2074 t.Fatalf("%s: %s", test.name, err)
2075 }
2076 set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
2077 if err != nil {
2078 t.Fatalf("%s: %s", test.name, err)
2079 }
2080 if err := test.validateRollback(set, pods); err != nil {
2081 t.Fatalf("%s: %s", test.name, err)
2082 }
2083 }
2084
2085 tests := []testcase{
2086 {
2087 name: "monotonic image update",
2088 invariants: assertMonotonicInvariants,
2089 initial: func() *apps.StatefulSet {
2090 return newStatefulSet(3)
2091 },
2092 update: func(set *apps.StatefulSet) *apps.StatefulSet {
2093 set.Spec.Template.Spec.Containers[0].Image = "foo"
2094 return set
2095 },
2096 validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
2097 sort.Sort(ascendingOrdinal(pods))
2098 for i := range pods {
2099 if pods[i].Spec.Containers[0].Image != "foo" {
2100 return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
2101 }
2102 }
2103 return nil
2104 },
2105 validateRollback: func(set *apps.StatefulSet, pods []*v1.Pod) error {
2106 sort.Sort(ascendingOrdinal(pods))
2107 for i := range pods {
2108 if pods[i].Spec.Containers[0].Image != originalImage {
2109 return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
2110 }
2111 }
2112 return nil
2113 },
2114 },
2115 {
2116 name: "monotonic image update and scale up",
2117 invariants: assertMonotonicInvariants,
2118 initial: func() *apps.StatefulSet {
2119 return newStatefulSet(3)
2120 },
2121 update: func(set *apps.StatefulSet) *apps.StatefulSet {
2122 *set.Spec.Replicas = 5
2123 set.Spec.Template.Spec.Containers[0].Image = "foo"
2124 return set
2125 },
2126 validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
2127 sort.Sort(ascendingOrdinal(pods))
2128 for i := range pods {
2129 if pods[i].Spec.Containers[0].Image != "foo" {
2130 return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
2131 }
2132 }
2133 return nil
2134 },
2135 validateRollback: func(set *apps.StatefulSet, pods []*v1.Pod) error {
2136 sort.Sort(ascendingOrdinal(pods))
2137 for i := range pods {
2138 if pods[i].Spec.Containers[0].Image != originalImage {
2139 return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
2140 }
2141 }
2142 return nil
2143 },
2144 },
2145 {
2146 name: "monotonic image update and scale down",
2147 invariants: assertMonotonicInvariants,
2148 initial: func() *apps.StatefulSet {
2149 return newStatefulSet(5)
2150 },
2151 update: func(set *apps.StatefulSet) *apps.StatefulSet {
2152 *set.Spec.Replicas = 3
2153 set.Spec.Template.Spec.Containers[0].Image = "foo"
2154 return set
2155 },
2156 validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
2157 sort.Sort(ascendingOrdinal(pods))
2158 for i := range pods {
2159 if pods[i].Spec.Containers[0].Image != "foo" {
2160 return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
2161 }
2162 }
2163 return nil
2164 },
2165 validateRollback: func(set *apps.StatefulSet, pods []*v1.Pod) error {
2166 sort.Sort(ascendingOrdinal(pods))
2167 for i := range pods {
2168 if pods[i].Spec.Containers[0].Image != originalImage {
2169 return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
2170 }
2171 }
2172 return nil
2173 },
2174 },
2175 {
2176 name: "burst image update",
2177 invariants: assertBurstInvariants,
2178 initial: func() *apps.StatefulSet {
2179 return burst(newStatefulSet(3))
2180 },
2181 update: func(set *apps.StatefulSet) *apps.StatefulSet {
2182 set.Spec.Template.Spec.Containers[0].Image = "foo"
2183 return set
2184 },
2185 validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
2186 sort.Sort(ascendingOrdinal(pods))
2187 for i := range pods {
2188 if pods[i].Spec.Containers[0].Image != "foo" {
2189 return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
2190 }
2191 }
2192 return nil
2193 },
2194 validateRollback: func(set *apps.StatefulSet, pods []*v1.Pod) error {
2195 sort.Sort(ascendingOrdinal(pods))
2196 for i := range pods {
2197 if pods[i].Spec.Containers[0].Image != originalImage {
2198 return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
2199 }
2200 }
2201 return nil
2202 },
2203 },
2204 {
2205 name: "burst image update and scale up",
2206 invariants: assertBurstInvariants,
2207 initial: func() *apps.StatefulSet {
2208 return burst(newStatefulSet(3))
2209 },
2210 update: func(set *apps.StatefulSet) *apps.StatefulSet {
2211 *set.Spec.Replicas = 5
2212 set.Spec.Template.Spec.Containers[0].Image = "foo"
2213 return set
2214 },
2215 validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
2216 sort.Sort(ascendingOrdinal(pods))
2217 for i := range pods {
2218 if pods[i].Spec.Containers[0].Image != "foo" {
2219 return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
2220 }
2221 }
2222 return nil
2223 },
2224 validateRollback: func(set *apps.StatefulSet, pods []*v1.Pod) error {
2225 sort.Sort(ascendingOrdinal(pods))
2226 for i := range pods {
2227 if pods[i].Spec.Containers[0].Image != originalImage {
2228 return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
2229 }
2230 }
2231 return nil
2232 },
2233 },
2234 {
2235 name: "burst image update and scale down",
2236 invariants: assertBurstInvariants,
2237 initial: func() *apps.StatefulSet {
2238 return burst(newStatefulSet(5))
2239 },
2240 update: func(set *apps.StatefulSet) *apps.StatefulSet {
2241 *set.Spec.Replicas = 3
2242 set.Spec.Template.Spec.Containers[0].Image = "foo"
2243 return set
2244 },
2245 validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
2246 sort.Sort(ascendingOrdinal(pods))
2247 for i := range pods {
2248 if pods[i].Spec.Containers[0].Image != "foo" {
2249 return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
2250 }
2251 }
2252 return nil
2253 },
2254 validateRollback: func(set *apps.StatefulSet, pods []*v1.Pod) error {
2255 sort.Sort(ascendingOrdinal(pods))
2256 for i := range pods {
2257 if pods[i].Spec.Containers[0].Image != originalImage {
2258 return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
2259 }
2260 }
2261 return nil
2262 },
2263 },
2264 }
2265 for i := range tests {
2266 testFn(t, &tests[i])
2267 }
2268 }
2269
2270 func TestStatefulSetAvailability(t *testing.T) {
2271 tests := []struct {
2272 name string
2273 inputSTS *apps.StatefulSet
2274 expectedActiveReplicas int32
2275 readyDuration time.Duration
2276 }{
2277 {
2278 name: "replicas running for required time, when minReadySeconds is enabled",
2279 inputSTS: setMinReadySeconds(newStatefulSet(1), int32(3600)),
2280 readyDuration: -120 * time.Minute,
2281 expectedActiveReplicas: int32(1),
2282 },
2283 {
2284 name: "replicas not running for required time, when minReadySeconds is enabled",
2285 inputSTS: setMinReadySeconds(newStatefulSet(1), int32(3600)),
2286 readyDuration: -30 * time.Minute,
2287 expectedActiveReplicas: int32(0),
2288 },
2289 }
2290 for _, test := range tests {
2291 set := test.inputSTS
2292 client := fake.NewSimpleClientset(set)
2293 spc, _, ssc := setupController(client)
2294 if err := scaleUpStatefulSetControl(set, ssc, spc, assertBurstInvariants); err != nil {
2295 t.Fatalf("%s: %s", test.name, err)
2296 }
2297 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
2298 if err != nil {
2299 t.Fatalf("%s: %s", test.name, err)
2300 }
2301 _, err = spc.podsLister.Pods(set.Namespace).List(selector)
2302 if err != nil {
2303 t.Fatalf("%s: %s", test.name, err)
2304 }
2305 set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
2306 if err != nil {
2307 t.Fatalf("%s: %s", test.name, err)
2308 }
2309 pods, err := spc.setPodAvailable(set, 0, time.Now().Add(test.readyDuration))
2310 if err != nil {
2311 t.Fatalf("%s: %s", test.name, err)
2312 }
2313 status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods)
2314 if err != nil {
2315 t.Fatalf("%s: %s", test.name, err)
2316 }
2317 if status.AvailableReplicas != test.expectedActiveReplicas {
2318 t.Fatalf("expected %d active replicas got %d", test.expectedActiveReplicas, status.AvailableReplicas)
2319 }
2320 }
2321 }
2322
2323 func TestStatefulSetStatusUpdate(t *testing.T) {
2324 var (
2325 syncErr = fmt.Errorf("sync error")
2326 statusErr = fmt.Errorf("status error")
2327 )
2328
2329 testCases := []struct {
2330 desc string
2331
2332 hasSyncErr bool
2333 hasStatusErr bool
2334
2335 expectedErr error
2336 }{
2337 {
2338 desc: "no error",
2339 hasSyncErr: false,
2340 hasStatusErr: false,
2341 expectedErr: nil,
2342 },
2343 {
2344 desc: "sync error",
2345 hasSyncErr: true,
2346 hasStatusErr: false,
2347 expectedErr: syncErr,
2348 },
2349 {
2350 desc: "status error",
2351 hasSyncErr: false,
2352 hasStatusErr: true,
2353 expectedErr: statusErr,
2354 },
2355 {
2356 desc: "sync and status error",
2357 hasSyncErr: true,
2358 hasStatusErr: true,
2359 expectedErr: syncErr,
2360 },
2361 }
2362
2363 for _, tc := range testCases {
2364 t.Run(tc.desc, func(t *testing.T) {
2365 set := newStatefulSet(3)
2366 client := fake.NewSimpleClientset(set)
2367 om, ssu, ssc := setupController(client)
2368
2369 if tc.hasSyncErr {
2370 om.SetCreateStatefulPodError(syncErr, 0)
2371 }
2372 if tc.hasStatusErr {
2373 ssu.SetUpdateStatefulSetStatusError(statusErr, 0)
2374 }
2375
2376 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
2377 if err != nil {
2378 t.Error(err)
2379 }
2380 pods, err := om.podsLister.Pods(set.Namespace).List(selector)
2381 if err != nil {
2382 t.Error(err)
2383 }
2384 _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods)
2385 if ssu.updateStatusTracker.requests != 1 {
2386 t.Errorf("Did not update status")
2387 }
2388 if !errors.Is(err, tc.expectedErr) {
2389 t.Errorf("Expected error: %v, got: %v", tc.expectedErr, err)
2390 }
2391 })
2392 }
2393 }
2394
2395 type requestTracker struct {
2396 sync.Mutex
2397 requests int
2398 err error
2399 after int
2400
2401 parallelLock sync.Mutex
2402 parallel int
2403 maxParallel int
2404
2405 delay time.Duration
2406 }
2407
2408 func (rt *requestTracker) errorReady() bool {
2409 rt.Lock()
2410 defer rt.Unlock()
2411 return rt.err != nil && rt.requests >= rt.after
2412 }
2413
2414 func (rt *requestTracker) inc() {
2415 rt.parallelLock.Lock()
2416 rt.parallel++
2417 if rt.maxParallel < rt.parallel {
2418 rt.maxParallel = rt.parallel
2419 }
2420 rt.parallelLock.Unlock()
2421
2422 rt.Lock()
2423 defer rt.Unlock()
2424 rt.requests++
2425 if rt.delay != 0 {
2426 time.Sleep(rt.delay)
2427 }
2428 }
2429
2430 func (rt *requestTracker) reset() {
2431 rt.parallelLock.Lock()
2432 rt.parallel = 0
2433 rt.parallelLock.Unlock()
2434
2435 rt.Lock()
2436 defer rt.Unlock()
2437 rt.err = nil
2438 rt.after = 0
2439 rt.delay = 0
2440 }
2441
2442 func (rt *requestTracker) getErr() error {
2443 rt.Lock()
2444 defer rt.Unlock()
2445 return rt.err
2446 }
2447
2448 func newRequestTracker(requests int, err error, after int) requestTracker {
2449 return requestTracker{
2450 requests: requests,
2451 err: err,
2452 after: after,
2453 }
2454 }
2455
2456 type fakeObjectManager struct {
2457 podsLister corelisters.PodLister
2458 claimsLister corelisters.PersistentVolumeClaimLister
2459 setsLister appslisters.StatefulSetLister
2460 podsIndexer cache.Indexer
2461 claimsIndexer cache.Indexer
2462 setsIndexer cache.Indexer
2463 revisionsIndexer cache.Indexer
2464 createPodTracker requestTracker
2465 updatePodTracker requestTracker
2466 deletePodTracker requestTracker
2467 }
2468
2469 func newFakeObjectManager(informerFactory informers.SharedInformerFactory) *fakeObjectManager {
2470 podInformer := informerFactory.Core().V1().Pods()
2471 claimInformer := informerFactory.Core().V1().PersistentVolumeClaims()
2472 setInformer := informerFactory.Apps().V1().StatefulSets()
2473 revisionInformer := informerFactory.Apps().V1().ControllerRevisions()
2474
2475 return &fakeObjectManager{
2476 podInformer.Lister(),
2477 claimInformer.Lister(),
2478 setInformer.Lister(),
2479 podInformer.Informer().GetIndexer(),
2480 claimInformer.Informer().GetIndexer(),
2481 setInformer.Informer().GetIndexer(),
2482 revisionInformer.Informer().GetIndexer(),
2483 newRequestTracker(0, nil, 0),
2484 newRequestTracker(0, nil, 0),
2485 newRequestTracker(0, nil, 0),
2486 }
2487 }
2488
2489 func (om *fakeObjectManager) CreatePod(ctx context.Context, pod *v1.Pod) error {
2490 defer om.createPodTracker.inc()
2491 if om.createPodTracker.errorReady() {
2492 defer om.createPodTracker.reset()
2493 return om.createPodTracker.getErr()
2494 }
2495 pod.SetUID(types.UID(pod.Name + "-uid"))
2496 return om.podsIndexer.Update(pod)
2497 }
2498
2499 func (om *fakeObjectManager) GetPod(namespace, podName string) (*v1.Pod, error) {
2500 return om.podsLister.Pods(namespace).Get(podName)
2501 }
2502
2503 func (om *fakeObjectManager) UpdatePod(pod *v1.Pod) error {
2504 return om.podsIndexer.Update(pod)
2505 }
2506
2507 func (om *fakeObjectManager) DeletePod(pod *v1.Pod) error {
2508 defer om.deletePodTracker.inc()
2509 if om.deletePodTracker.errorReady() {
2510 defer om.deletePodTracker.reset()
2511 return om.deletePodTracker.getErr()
2512 }
2513 if key, err := controller.KeyFunc(pod); err != nil {
2514 return err
2515 } else if obj, found, err := om.podsIndexer.GetByKey(key); err != nil {
2516 return err
2517 } else if found {
2518 return om.podsIndexer.Delete(obj)
2519 }
2520 return nil
2521 }
2522
2523 func (om *fakeObjectManager) CreateClaim(claim *v1.PersistentVolumeClaim) error {
2524 om.claimsIndexer.Update(claim)
2525 return nil
2526 }
2527
2528 func (om *fakeObjectManager) GetClaim(namespace, claimName string) (*v1.PersistentVolumeClaim, error) {
2529 return om.claimsLister.PersistentVolumeClaims(namespace).Get(claimName)
2530 }
2531
2532 func (om *fakeObjectManager) UpdateClaim(claim *v1.PersistentVolumeClaim) error {
2533
2534 refs := claim.GetOwnerReferences()
2535 for _, ref := range refs {
2536 if ref.APIVersion == "" || ref.Kind == "" || ref.Name == "" {
2537 return fmt.Errorf("invalid ownerRefs: %s %v", claim.Name, refs)
2538 }
2539 }
2540 om.claimsIndexer.Update(claim)
2541 return nil
2542 }
2543
2544 func (om *fakeObjectManager) SetCreateStatefulPodError(err error, after int) {
2545 om.createPodTracker.err = err
2546 om.createPodTracker.after = after
2547 }
2548
2549 func (om *fakeObjectManager) SetUpdateStatefulPodError(err error, after int) {
2550 om.updatePodTracker.err = err
2551 om.updatePodTracker.after = after
2552 }
2553
2554 func (om *fakeObjectManager) SetDeleteStatefulPodError(err error, after int) {
2555 om.deletePodTracker.err = err
2556 om.deletePodTracker.after = after
2557 }
2558
2559 func findPodByOrdinal(pods []*v1.Pod, ordinal int) *v1.Pod {
2560 for _, pod := range pods {
2561 if getOrdinal(pod) == ordinal {
2562 return pod.DeepCopy()
2563 }
2564 }
2565
2566 return nil
2567 }
2568
2569 func (om *fakeObjectManager) setPodPending(set *apps.StatefulSet, ordinal int) ([]*v1.Pod, error) {
2570 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
2571 if err != nil {
2572 return nil, err
2573 }
2574 pods, err := om.podsLister.Pods(set.Namespace).List(selector)
2575 if err != nil {
2576 return nil, err
2577 }
2578 pod := findPodByOrdinal(pods, ordinal)
2579 if pod == nil {
2580 return nil, fmt.Errorf("setPodPending: pod ordinal %d not found", ordinal)
2581 }
2582 pod.Status.Phase = v1.PodPending
2583 fakeResourceVersion(pod)
2584 om.podsIndexer.Update(pod)
2585 return om.podsLister.Pods(set.Namespace).List(selector)
2586 }
2587
2588 func (om *fakeObjectManager) setPodRunning(set *apps.StatefulSet, ordinal int) ([]*v1.Pod, error) {
2589 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
2590 if err != nil {
2591 return nil, err
2592 }
2593 pods, err := om.podsLister.Pods(set.Namespace).List(selector)
2594 if err != nil {
2595 return nil, err
2596 }
2597 pod := findPodByOrdinal(pods, ordinal)
2598 if pod == nil {
2599 return nil, fmt.Errorf("setPodRunning: pod ordinal %d not found", ordinal)
2600 }
2601 pod.Status.Phase = v1.PodRunning
2602 fakeResourceVersion(pod)
2603 om.podsIndexer.Update(pod)
2604 return om.podsLister.Pods(set.Namespace).List(selector)
2605 }
2606
2607 func (om *fakeObjectManager) setPodReady(set *apps.StatefulSet, ordinal int) ([]*v1.Pod, error) {
2608 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
2609 if err != nil {
2610 return nil, err
2611 }
2612 pods, err := om.podsLister.Pods(set.Namespace).List(selector)
2613 if err != nil {
2614 return nil, err
2615 }
2616 pod := findPodByOrdinal(pods, ordinal)
2617 if pod == nil {
2618 return nil, fmt.Errorf("setPodReady: pod ordinal %d not found", ordinal)
2619 }
2620 condition := v1.PodCondition{Type: v1.PodReady, Status: v1.ConditionTrue}
2621 podutil.UpdatePodCondition(&pod.Status, &condition)
2622 fakeResourceVersion(pod)
2623 om.podsIndexer.Update(pod)
2624 return om.podsLister.Pods(set.Namespace).List(selector)
2625 }
2626
2627 func (om *fakeObjectManager) setPodAvailable(set *apps.StatefulSet, ordinal int, lastTransitionTime time.Time) ([]*v1.Pod, error) {
2628 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
2629 if err != nil {
2630 return nil, err
2631 }
2632 pods, err := om.podsLister.Pods(set.Namespace).List(selector)
2633 if err != nil {
2634 return nil, err
2635 }
2636 pod := findPodByOrdinal(pods, ordinal)
2637 if pod == nil {
2638 return nil, fmt.Errorf("setPodAvailable: pod ordinal %d not found", ordinal)
2639 }
2640 condition := v1.PodCondition{Type: v1.PodReady, Status: v1.ConditionTrue, LastTransitionTime: metav1.Time{Time: lastTransitionTime}}
2641 _, existingCondition := podutil.GetPodCondition(&pod.Status, condition.Type)
2642 if existingCondition != nil {
2643 existingCondition.Status = v1.ConditionTrue
2644 existingCondition.LastTransitionTime = metav1.Time{Time: lastTransitionTime}
2645 } else {
2646 existingCondition = &v1.PodCondition{
2647 Type: v1.PodReady,
2648 Status: v1.ConditionTrue,
2649 LastTransitionTime: metav1.Time{Time: lastTransitionTime},
2650 }
2651 pod.Status.Conditions = append(pod.Status.Conditions, *existingCondition)
2652 }
2653 podutil.UpdatePodCondition(&pod.Status, &condition)
2654 fakeResourceVersion(pod)
2655 om.podsIndexer.Update(pod)
2656 return om.podsLister.Pods(set.Namespace).List(selector)
2657 }
2658
2659 func (om *fakeObjectManager) addTerminatingPod(set *apps.StatefulSet, ordinal int) ([]*v1.Pod, error) {
2660 pod := newStatefulSetPod(set, ordinal)
2661 pod.SetUID(types.UID(pod.Name + "-uid"))
2662 pod.Status.Phase = v1.PodRunning
2663 deleted := metav1.NewTime(time.Now())
2664 pod.DeletionTimestamp = &deleted
2665 condition := v1.PodCondition{Type: v1.PodReady, Status: v1.ConditionTrue}
2666 fakeResourceVersion(pod)
2667 podutil.UpdatePodCondition(&pod.Status, &condition)
2668 om.podsIndexer.Update(pod)
2669 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
2670 if err != nil {
2671 return nil, err
2672 }
2673 return om.podsLister.Pods(set.Namespace).List(selector)
2674 }
2675
2676 func (om *fakeObjectManager) setPodTerminated(set *apps.StatefulSet, ordinal int) ([]*v1.Pod, error) {
2677 pod := newStatefulSetPod(set, ordinal)
2678 deleted := metav1.NewTime(time.Now())
2679 pod.DeletionTimestamp = &deleted
2680 fakeResourceVersion(pod)
2681 om.podsIndexer.Update(pod)
2682 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
2683 if err != nil {
2684 return nil, err
2685 }
2686 return om.podsLister.Pods(set.Namespace).List(selector)
2687 }
2688
2689 var _ StatefulPodControlObjectManager = &fakeObjectManager{}
2690
2691 type fakeStatefulSetStatusUpdater struct {
2692 setsLister appslisters.StatefulSetLister
2693 setsIndexer cache.Indexer
2694 updateStatusTracker requestTracker
2695 }
2696
2697 func newFakeStatefulSetStatusUpdater(setInformer appsinformers.StatefulSetInformer) *fakeStatefulSetStatusUpdater {
2698 return &fakeStatefulSetStatusUpdater{
2699 setInformer.Lister(),
2700 setInformer.Informer().GetIndexer(),
2701 newRequestTracker(0, nil, 0),
2702 }
2703 }
2704
2705 func (ssu *fakeStatefulSetStatusUpdater) UpdateStatefulSetStatus(ctx context.Context, set *apps.StatefulSet, status *apps.StatefulSetStatus) error {
2706 defer ssu.updateStatusTracker.inc()
2707 if ssu.updateStatusTracker.errorReady() {
2708 defer ssu.updateStatusTracker.reset()
2709 return ssu.updateStatusTracker.err
2710 }
2711 set.Status = *status
2712 ssu.setsIndexer.Update(set)
2713 return nil
2714 }
2715
2716 func (ssu *fakeStatefulSetStatusUpdater) SetUpdateStatefulSetStatusError(err error, after int) {
2717 ssu.updateStatusTracker.err = err
2718 ssu.updateStatusTracker.after = after
2719 }
2720
2721 var _ StatefulSetStatusUpdaterInterface = &fakeStatefulSetStatusUpdater{}
2722
2723 func assertMonotonicInvariants(set *apps.StatefulSet, om *fakeObjectManager) error {
2724 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
2725 if err != nil {
2726 return err
2727 }
2728 pods, err := om.podsLister.Pods(set.Namespace).List(selector)
2729 if err != nil {
2730 return err
2731 }
2732 sort.Sort(ascendingOrdinal(pods))
2733 for idx := 0; idx < len(pods); idx++ {
2734 if idx > 0 && isRunningAndReady(pods[idx]) && !isRunningAndReady(pods[idx-1]) {
2735 return fmt.Errorf("successor %s is Running and Ready while %s is not", pods[idx].Name, pods[idx-1].Name)
2736 }
2737
2738 if ord := idx + getStartOrdinal(set); getOrdinal(pods[idx]) != ord {
2739 return fmt.Errorf("pods %s deployed in the wrong order %d", pods[idx].Name, ord)
2740 }
2741
2742 if !storageMatches(set, pods[idx]) {
2743 return fmt.Errorf("pods %s does not match the storage specification of StatefulSet %s ", pods[idx].Name, set.Name)
2744 }
2745
2746 for _, claim := range getPersistentVolumeClaims(set, pods[idx]) {
2747 claim, _ := om.claimsLister.PersistentVolumeClaims(set.Namespace).Get(claim.Name)
2748 if err := checkClaimInvarients(set, pods[idx], claim); err != nil {
2749 return err
2750 }
2751 }
2752
2753 if !identityMatches(set, pods[idx]) {
2754 return fmt.Errorf("pods %s does not match the identity specification of StatefulSet %s ", pods[idx].Name, set.Name)
2755 }
2756 }
2757 return nil
2758 }
2759
2760 func assertBurstInvariants(set *apps.StatefulSet, om *fakeObjectManager) error {
2761 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
2762 if err != nil {
2763 return err
2764 }
2765 pods, err := om.podsLister.Pods(set.Namespace).List(selector)
2766 if err != nil {
2767 return err
2768 }
2769 sort.Sort(ascendingOrdinal(pods))
2770 for _, pod := range pods {
2771 if !storageMatches(set, pod) {
2772 return fmt.Errorf("pods %s does not match the storage specification of StatefulSet %s ", pod.Name, set.Name)
2773 }
2774
2775 for _, claim := range getPersistentVolumeClaims(set, pod) {
2776 claim, err := om.claimsLister.PersistentVolumeClaims(set.Namespace).Get(claim.Name)
2777 if err != nil {
2778 return err
2779 }
2780 if err := checkClaimInvarients(set, pod, claim); err != nil {
2781 return err
2782 }
2783 }
2784
2785 if !identityMatches(set, pod) {
2786 return fmt.Errorf("pods %s does not match the identity specification of StatefulSet %s ",
2787 pod.Name,
2788 set.Name)
2789 }
2790 }
2791 return nil
2792 }
2793
2794 func assertUpdateInvariants(set *apps.StatefulSet, om *fakeObjectManager) error {
2795 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
2796 if err != nil {
2797 return err
2798 }
2799 pods, err := om.podsLister.Pods(set.Namespace).List(selector)
2800 if err != nil {
2801 return err
2802 }
2803 sort.Sort(ascendingOrdinal(pods))
2804 for _, pod := range pods {
2805
2806 if !storageMatches(set, pod) {
2807 return fmt.Errorf("pod %s does not match the storage specification of StatefulSet %s ", pod.Name, set.Name)
2808 }
2809
2810 for _, claim := range getPersistentVolumeClaims(set, pod) {
2811 claim, err := om.claimsLister.PersistentVolumeClaims(set.Namespace).Get(claim.Name)
2812 if err != nil {
2813 return err
2814 }
2815 if err := checkClaimInvarients(set, pod, claim); err != nil {
2816 return err
2817 }
2818 }
2819
2820 if !identityMatches(set, pod) {
2821 return fmt.Errorf("pod %s does not match the identity specification of StatefulSet %s ", pod.Name, set.Name)
2822 }
2823 }
2824 if set.Spec.UpdateStrategy.Type == apps.OnDeleteStatefulSetStrategyType {
2825 return nil
2826 }
2827 if set.Spec.UpdateStrategy.Type == apps.RollingUpdateStatefulSetStrategyType {
2828 for i := 0; i < int(set.Status.CurrentReplicas) && i < len(pods); i++ {
2829 if want, got := set.Status.CurrentRevision, getPodRevision(pods[i]); want != got {
2830 return fmt.Errorf("pod %s want current revision %s got %s", pods[i].Name, want, got)
2831 }
2832 }
2833 for i, j := len(pods)-1, 0; j < int(set.Status.UpdatedReplicas); i, j = i-1, j+1 {
2834 if want, got := set.Status.UpdateRevision, getPodRevision(pods[i]); want != got {
2835 return fmt.Errorf("pod %s want update revision %s got %s", pods[i].Name, want, got)
2836 }
2837 }
2838 }
2839 return nil
2840 }
2841
2842 func checkClaimInvarients(set *apps.StatefulSet, pod *v1.Pod, claim *v1.PersistentVolumeClaim) error {
2843 policy := apps.StatefulSetPersistentVolumeClaimRetentionPolicy{
2844 WhenScaled: apps.RetainPersistentVolumeClaimRetentionPolicyType,
2845 WhenDeleted: apps.RetainPersistentVolumeClaimRetentionPolicyType,
2846 }
2847 if set.Spec.PersistentVolumeClaimRetentionPolicy != nil && utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) {
2848 policy = *set.Spec.PersistentVolumeClaimRetentionPolicy
2849 }
2850 claimShouldBeRetained := policy.WhenScaled == apps.RetainPersistentVolumeClaimRetentionPolicyType
2851 if claim == nil {
2852 if claimShouldBeRetained {
2853 return fmt.Errorf("claim for Pod %s was not created", pod.Name)
2854 }
2855 return nil
2856 }
2857
2858 if pod.Status.Phase != v1.PodRunning || !podutil.IsPodReady(pod) {
2859
2860 return nil
2861 }
2862
2863 const retain = apps.RetainPersistentVolumeClaimRetentionPolicyType
2864 const delete = apps.DeletePersistentVolumeClaimRetentionPolicyType
2865 switch {
2866 case policy.WhenScaled == retain && policy.WhenDeleted == retain:
2867 if hasOwnerRef(claim, set) {
2868 return fmt.Errorf("claim %s has unexpected owner ref on %s for StatefulSet retain", claim.Name, set.Name)
2869 }
2870 if hasOwnerRef(claim, pod) {
2871 return fmt.Errorf("claim %s has unexpected owner ref on pod %s for StatefulSet retain", claim.Name, pod.Name)
2872 }
2873 case policy.WhenScaled == retain && policy.WhenDeleted == delete:
2874 if !hasOwnerRef(claim, set) {
2875 return fmt.Errorf("claim %s does not have owner ref on %s for StatefulSet deletion", claim.Name, set.Name)
2876 }
2877 if hasOwnerRef(claim, pod) {
2878 return fmt.Errorf("claim %s has unexpected owner ref on pod %s for StatefulSet deletion", claim.Name, pod.Name)
2879 }
2880 case policy.WhenScaled == delete && policy.WhenDeleted == retain:
2881 if hasOwnerRef(claim, set) {
2882 return fmt.Errorf("claim %s has unexpected owner ref on %s for scaledown only", claim.Name, set.Name)
2883 }
2884 if !podInOrdinalRange(pod, set) && !hasOwnerRef(claim, pod) {
2885 return fmt.Errorf("claim %s does not have owner ref on condemned pod %s for scaledown delete", claim.Name, pod.Name)
2886 }
2887 if podInOrdinalRange(pod, set) && hasOwnerRef(claim, pod) {
2888 return fmt.Errorf("claim %s has unexpected owner ref on condemned pod %s for scaledown delete", claim.Name, pod.Name)
2889 }
2890 case policy.WhenScaled == delete && policy.WhenDeleted == delete:
2891 if !podInOrdinalRange(pod, set) {
2892 if !hasOwnerRef(claim, pod) || hasOwnerRef(claim, set) {
2893 return fmt.Errorf("condemned claim %s has bad owner refs: %v", claim.Name, claim.GetOwnerReferences())
2894 }
2895 } else {
2896 if hasOwnerRef(claim, pod) || !hasOwnerRef(claim, set) {
2897 return fmt.Errorf("live claim %s has bad owner refs: %v", claim.Name, claim.GetOwnerReferences())
2898 }
2899 }
2900 }
2901 return nil
2902 }
2903
2904 func fakeResourceVersion(object interface{}) {
2905 obj, isObj := object.(metav1.Object)
2906 if !isObj {
2907 return
2908 }
2909 if version := obj.GetResourceVersion(); version == "" {
2910 obj.SetResourceVersion("1")
2911 } else if intValue, err := strconv.ParseInt(version, 10, 32); err == nil {
2912 obj.SetResourceVersion(strconv.FormatInt(intValue+1, 10))
2913 }
2914 }
2915
2916 func TestParallelScale(t *testing.T) {
2917 for _, tc := range []struct {
2918 desc string
2919 replicas int32
2920 desiredReplicas int32
2921 }{
2922 {
2923 desc: "scale up from 3 to 30",
2924 replicas: 3,
2925 desiredReplicas: 30,
2926 },
2927 {
2928 desc: "scale down from 10 to 1",
2929 replicas: 10,
2930 desiredReplicas: 1,
2931 },
2932
2933 {
2934 desc: "scale down to 0",
2935 replicas: 501,
2936 desiredReplicas: 0,
2937 },
2938 {
2939 desc: "scale up from 0",
2940 replicas: 0,
2941 desiredReplicas: 1000,
2942 },
2943 } {
2944 t.Run(tc.desc, func(t *testing.T) {
2945 set := burst(newStatefulSet(0))
2946 set.Spec.VolumeClaimTemplates[0].ObjectMeta.Labels = map[string]string{"test": "test"}
2947 parallelScale(t, set, tc.replicas, tc.desiredReplicas, assertBurstInvariants)
2948 })
2949 }
2950
2951 }
2952
2953 func parallelScale(t *testing.T, set *apps.StatefulSet, replicas, desiredReplicas int32, invariants invariantFunc) {
2954 var err error
2955 diff := desiredReplicas - replicas
2956 client := fake.NewSimpleClientset(set)
2957 om, _, ssc := setupController(client)
2958 om.createPodTracker.delay = time.Millisecond
2959
2960 *set.Spec.Replicas = replicas
2961 if err := parallelScaleUpStatefulSetControl(set, ssc, om, invariants); err != nil {
2962 t.Errorf("Failed to turn up StatefulSet : %s", err)
2963 }
2964 set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
2965 if err != nil {
2966 t.Fatalf("Error getting updated StatefulSet: %v", err)
2967 }
2968 if set.Status.Replicas != replicas {
2969 t.Errorf("want %v, got %v replicas", replicas, set.Status.Replicas)
2970 }
2971
2972 fn := parallelScaleUpStatefulSetControl
2973 if diff < 0 {
2974 fn = parallelScaleDownStatefulSetControl
2975 }
2976 *set.Spec.Replicas = desiredReplicas
2977 if err := fn(set, ssc, om, invariants); err != nil {
2978 t.Errorf("Failed to scale StatefulSet : %s", err)
2979 }
2980
2981 set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
2982 if err != nil {
2983 t.Fatalf("Error getting updated StatefulSet: %v", err)
2984 }
2985
2986 if set.Status.Replicas != desiredReplicas {
2987 t.Errorf("Failed to scale statefulset to %v replicas, got %v replicas", desiredReplicas, set.Status.Replicas)
2988 }
2989
2990 if (diff < -1 || diff > 1) && om.createPodTracker.maxParallel <= 1 {
2991 t.Errorf("want max parallel requests > 1, got %v", om.createPodTracker.maxParallel)
2992 }
2993 }
2994
2995 func parallelScaleUpStatefulSetControl(set *apps.StatefulSet,
2996 ssc StatefulSetControlInterface,
2997 om *fakeObjectManager,
2998 invariants invariantFunc) error {
2999 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
3000 if err != nil {
3001 return err
3002 }
3003
3004
3005
3006
3007 maxLoops := 2
3008 loops := maxLoops
3009 for set.Status.Replicas < *set.Spec.Replicas {
3010 if loops < 1 {
3011 return fmt.Errorf("after %v loops: want %v, got replicas %v", maxLoops, *set.Spec.Replicas, set.Status.Replicas)
3012 }
3013 loops--
3014 pods, err := om.podsLister.Pods(set.Namespace).List(selector)
3015 if err != nil {
3016 return err
3017 }
3018 sort.Sort(ascendingOrdinal(pods))
3019
3020 ordinals := []int{}
3021 for _, pod := range pods {
3022 if pod.Status.Phase == "" {
3023 ordinals = append(ordinals, getOrdinal(pod))
3024 }
3025 }
3026
3027 for _, ord := range ordinals {
3028 if pods, err = om.setPodPending(set, ord); err != nil {
3029 return err
3030 }
3031 }
3032
3033
3034 _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods)
3035 if err != nil {
3036 return err
3037 }
3038 set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
3039 if err != nil {
3040 return err
3041 }
3042 if err := invariants(set, om); err != nil {
3043 return err
3044 }
3045 }
3046 return invariants(set, om)
3047 }
3048
3049 func parallelScaleDownStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlInterface, om *fakeObjectManager, invariants invariantFunc) error {
3050 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
3051 if err != nil {
3052 return err
3053 }
3054
3055
3056
3057
3058 maxLoops := 2
3059 loops := maxLoops
3060 for set.Status.Replicas > *set.Spec.Replicas {
3061 if loops < 1 {
3062 return fmt.Errorf("after %v loops: want %v replicas, got %v", maxLoops, *set.Spec.Replicas, set.Status.Replicas)
3063 }
3064 loops--
3065 pods, err := om.podsLister.Pods(set.Namespace).List(selector)
3066 if err != nil {
3067 return err
3068 }
3069 sort.Sort(ascendingOrdinal(pods))
3070 if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
3071 return err
3072 }
3073 set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
3074 if err != nil {
3075 return err
3076 }
3077 if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
3078 return err
3079 }
3080 }
3081
3082 set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
3083 if err != nil {
3084 return err
3085 }
3086 if err := invariants(set, om); err != nil {
3087 return err
3088 }
3089
3090 return nil
3091 }
3092
3093 func scaleUpStatefulSetControl(set *apps.StatefulSet,
3094 ssc StatefulSetControlInterface,
3095 om *fakeObjectManager,
3096 invariants invariantFunc) error {
3097 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
3098 if err != nil {
3099 return err
3100 }
3101 for set.Status.ReadyReplicas < *set.Spec.Replicas {
3102 pods, err := om.podsLister.Pods(set.Namespace).List(selector)
3103 if err != nil {
3104 return err
3105 }
3106 sort.Sort(ascendingOrdinal(pods))
3107
3108
3109 for _, pod := range pods {
3110 if pod.Status.Phase == "" {
3111 if pods, err = om.setPodPending(set, getOrdinal(pod)); err != nil {
3112 return err
3113 }
3114 break
3115 }
3116 }
3117
3118
3119 if len(pods) > 0 {
3120 idx := int(rand.Int63n(int64(len(pods))))
3121 pod := pods[idx]
3122 switch pod.Status.Phase {
3123 case v1.PodPending:
3124 if pods, err = om.setPodRunning(set, getOrdinal(pod)); err != nil {
3125 return err
3126 }
3127 case v1.PodRunning:
3128 if pods, err = om.setPodReady(set, getOrdinal(pod)); err != nil {
3129 return err
3130 }
3131 default:
3132 continue
3133 }
3134 }
3135
3136 _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods)
3137 if err != nil {
3138 return err
3139 }
3140 set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
3141 if err != nil {
3142 return err
3143 }
3144 if err := invariants(set, om); err != nil {
3145 return err
3146 }
3147 }
3148 return invariants(set, om)
3149 }
3150
3151 func scaleDownStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlInterface, om *fakeObjectManager, invariants invariantFunc) error {
3152 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
3153 if err != nil {
3154 return err
3155 }
3156
3157 for set.Status.Replicas > *set.Spec.Replicas {
3158 pods, err := om.podsLister.Pods(set.Namespace).List(selector)
3159 if err != nil {
3160 return err
3161 }
3162 sort.Sort(ascendingOrdinal(pods))
3163 if idx := len(pods) - 1; idx >= 0 {
3164 if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
3165 return err
3166 }
3167 set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
3168 if err != nil {
3169 return err
3170 }
3171 if pods, err = om.addTerminatingPod(set, getOrdinal(pods[idx])); err != nil {
3172 return err
3173 }
3174 if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
3175 return err
3176 }
3177 set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
3178 if err != nil {
3179 return err
3180 }
3181 pods, err = om.podsLister.Pods(set.Namespace).List(selector)
3182 if err != nil {
3183 return err
3184 }
3185 sort.Sort(ascendingOrdinal(pods))
3186
3187 if len(pods) > 0 {
3188 om.podsIndexer.Delete(pods[len(pods)-1])
3189 }
3190 }
3191 if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
3192 return err
3193 }
3194 set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
3195 if err != nil {
3196 return err
3197 }
3198
3199 if err := invariants(set, om); err != nil {
3200 return err
3201 }
3202 }
3203
3204 pods, err := om.podsLister.Pods(set.Namespace).List(selector)
3205 if err != nil {
3206 return err
3207 }
3208 currentPods := map[string]bool{}
3209 for _, pod := range pods {
3210 currentPods[pod.Name] = true
3211 }
3212 claims, err := om.claimsLister.PersistentVolumeClaims(set.Namespace).List(selector)
3213 if err != nil {
3214 return err
3215 }
3216 for _, claim := range claims {
3217 claimPodName := getClaimPodName(set, claim)
3218 if claimPodName == "" {
3219 continue
3220 }
3221 if _, found := currentPods[claimPodName]; found {
3222 continue
3223 }
3224 for _, refs := range claim.GetOwnerReferences() {
3225 if refs.Name == claimPodName {
3226 om.claimsIndexer.Delete(claim)
3227 break
3228 }
3229 }
3230 }
3231
3232 return invariants(set, om)
3233 }
3234
3235 func updateComplete(set *apps.StatefulSet, pods []*v1.Pod) bool {
3236 sort.Sort(ascendingOrdinal(pods))
3237 if len(pods) != int(*set.Spec.Replicas) {
3238 return false
3239 }
3240 if set.Status.ReadyReplicas != *set.Spec.Replicas {
3241 return false
3242 }
3243
3244 switch set.Spec.UpdateStrategy.Type {
3245 case apps.OnDeleteStatefulSetStrategyType:
3246 return true
3247 case apps.RollingUpdateStatefulSetStrategyType:
3248 if set.Spec.UpdateStrategy.RollingUpdate == nil || *set.Spec.UpdateStrategy.RollingUpdate.Partition <= 0 {
3249 if set.Status.CurrentReplicas < *set.Spec.Replicas {
3250 return false
3251 }
3252 for i := range pods {
3253 if getPodRevision(pods[i]) != set.Status.CurrentRevision {
3254 return false
3255 }
3256 }
3257 } else {
3258 partition := int(*set.Spec.UpdateStrategy.RollingUpdate.Partition)
3259 if len(pods) < partition {
3260 return false
3261 }
3262 for i := partition; i < len(pods); i++ {
3263 if getPodRevision(pods[i]) != set.Status.UpdateRevision {
3264 return false
3265 }
3266 }
3267 }
3268 }
3269 return true
3270 }
3271
3272 func updateStatefulSetControl(set *apps.StatefulSet,
3273 ssc StatefulSetControlInterface,
3274 om *fakeObjectManager,
3275 invariants invariantFunc) error {
3276 selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
3277 if err != nil {
3278 return err
3279 }
3280 pods, err := om.podsLister.Pods(set.Namespace).List(selector)
3281 if err != nil {
3282 return err
3283 }
3284 if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
3285 return err
3286 }
3287
3288 set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
3289 if err != nil {
3290 return err
3291 }
3292 pods, err = om.podsLister.Pods(set.Namespace).List(selector)
3293 if err != nil {
3294 return err
3295 }
3296 for !updateComplete(set, pods) {
3297 pods, err = om.podsLister.Pods(set.Namespace).List(selector)
3298 if err != nil {
3299 return err
3300 }
3301 sort.Sort(ascendingOrdinal(pods))
3302 initialized := false
3303 for _, pod := range pods {
3304 if pod.Status.Phase == "" {
3305 if pods, err = om.setPodPending(set, getOrdinal(pod)); err != nil {
3306 return err
3307 }
3308 break
3309 }
3310 }
3311 if initialized {
3312 continue
3313 }
3314
3315 if len(pods) > 0 {
3316 idx := int(rand.Int63n(int64(len(pods))))
3317 pod := pods[idx]
3318 switch pod.Status.Phase {
3319 case v1.PodPending:
3320 if pods, err = om.setPodRunning(set, getOrdinal(pod)); err != nil {
3321 return err
3322 }
3323 case v1.PodRunning:
3324 if pods, err = om.setPodReady(set, getOrdinal(pod)); err != nil {
3325 return err
3326 }
3327 default:
3328 continue
3329 }
3330 }
3331
3332 if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
3333 return err
3334 }
3335 set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
3336 if err != nil {
3337 return err
3338 }
3339 if err := invariants(set, om); err != nil {
3340 return err
3341 }
3342 pods, err = om.podsLister.Pods(set.Namespace).List(selector)
3343 if err != nil {
3344 return err
3345 }
3346 }
3347 return invariants(set, om)
3348 }
3349
3350 func newRevisionOrDie(set *apps.StatefulSet, revision int64) *apps.ControllerRevision {
3351 rev, err := newRevision(set, revision, set.Status.CollisionCount)
3352 if err != nil {
3353 panic(err)
3354 }
3355 return rev
3356 }
3357
3358 func isOrHasInternalError(err error) bool {
3359 agg, ok := err.(utilerrors.Aggregate)
3360 return !ok && !apierrors.IsInternalError(err) || ok && len(agg.Errors()) > 0 && !apierrors.IsInternalError(agg.Errors()[0])
3361 }
3362
View as plain text