...

Source file src/k8s.io/kubernetes/pkg/controller/statefulset/stateful_set_control_test.go

Documentation: k8s.io/kubernetes/pkg/controller/statefulset

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package statefulset
    18  
    19  import (
    20  	"context"
    21  	"errors"
    22  	"fmt"
    23  	"math/rand"
    24  	"reflect"
    25  	"runtime"
    26  	"sort"
    27  	"strconv"
    28  	"strings"
    29  	"sync"
    30  	"testing"
    31  	"time"
    32  
    33  	apps "k8s.io/api/apps/v1"
    34  	v1 "k8s.io/api/core/v1"
    35  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    36  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    37  	"k8s.io/apimachinery/pkg/labels"
    38  	"k8s.io/apimachinery/pkg/types"
    39  	utilerrors "k8s.io/apimachinery/pkg/util/errors"
    40  	"k8s.io/apimachinery/pkg/util/intstr"
    41  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    42  	"k8s.io/client-go/informers"
    43  	appsinformers "k8s.io/client-go/informers/apps/v1"
    44  	clientset "k8s.io/client-go/kubernetes"
    45  	"k8s.io/client-go/kubernetes/fake"
    46  	appslisters "k8s.io/client-go/listers/apps/v1"
    47  	corelisters "k8s.io/client-go/listers/core/v1"
    48  	"k8s.io/client-go/tools/cache"
    49  	featuregatetesting "k8s.io/component-base/featuregate/testing"
    50  	podutil "k8s.io/kubernetes/pkg/api/v1/pod"
    51  	"k8s.io/kubernetes/pkg/controller"
    52  	"k8s.io/kubernetes/pkg/controller/history"
    53  	"k8s.io/kubernetes/pkg/features"
    54  )
    55  
    56  type invariantFunc func(set *apps.StatefulSet, om *fakeObjectManager) error
    57  
    58  func setupController(client clientset.Interface) (*fakeObjectManager, *fakeStatefulSetStatusUpdater, StatefulSetControlInterface) {
    59  	informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
    60  	om := newFakeObjectManager(informerFactory)
    61  	spc := NewStatefulPodControlFromManager(om, &noopRecorder{})
    62  	ssu := newFakeStatefulSetStatusUpdater(informerFactory.Apps().V1().StatefulSets())
    63  	recorder := &noopRecorder{}
    64  	ssc := NewDefaultStatefulSetControl(spc, ssu, history.NewFakeHistory(informerFactory.Apps().V1().ControllerRevisions()), recorder)
    65  
    66  	// The informer is not started. The tests here manipulate the local cache (indexers) directly, and there is no waiting
    67  	// for client state to sync. In fact, because the client is not updated during tests, informer updates will break tests
    68  	// by unexpectedly deleting objects.
    69  	//
    70  	// TODO: It might be better to rewrite all these tests manipulate the client an explicitly sync to ensure consistent
    71  	// state, or to create a fake client that does not use a local cache.
    72  
    73  	// The client is passed initial sets, so we have to put them in the local setsIndexer cache.
    74  	if sets, err := client.AppsV1().StatefulSets("").List(context.TODO(), metav1.ListOptions{}); err != nil {
    75  		panic(err)
    76  	} else {
    77  		for _, set := range sets.Items {
    78  			if err := om.setsIndexer.Update(&set); err != nil {
    79  				panic(err)
    80  			}
    81  		}
    82  	}
    83  
    84  	return om, ssu, ssc
    85  }
    86  
    87  func burst(set *apps.StatefulSet) *apps.StatefulSet {
    88  	set.Spec.PodManagementPolicy = apps.ParallelPodManagement
    89  	return set
    90  }
    91  
    92  func setMinReadySeconds(set *apps.StatefulSet, minReadySeconds int32) *apps.StatefulSet {
    93  	set.Spec.MinReadySeconds = minReadySeconds
    94  	return set
    95  }
    96  
    97  func runTestOverPVCRetentionPolicies(t *testing.T, testName string, testFn func(*testing.T, *apps.StatefulSetPersistentVolumeClaimRetentionPolicy)) {
    98  	subtestName := "StatefulSetAutoDeletePVCDisabled"
    99  	if testName != "" {
   100  		subtestName = fmt.Sprintf("%s/%s", testName, subtestName)
   101  	}
   102  	t.Run(subtestName, func(t *testing.T) {
   103  		defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetAutoDeletePVC, false)()
   104  		testFn(t, &apps.StatefulSetPersistentVolumeClaimRetentionPolicy{
   105  			WhenScaled:  apps.RetainPersistentVolumeClaimRetentionPolicyType,
   106  			WhenDeleted: apps.RetainPersistentVolumeClaimRetentionPolicyType,
   107  		})
   108  	})
   109  
   110  	for _, policy := range []*apps.StatefulSetPersistentVolumeClaimRetentionPolicy{
   111  		{
   112  			WhenScaled:  apps.RetainPersistentVolumeClaimRetentionPolicyType,
   113  			WhenDeleted: apps.RetainPersistentVolumeClaimRetentionPolicyType,
   114  		},
   115  		{
   116  			WhenScaled:  apps.DeletePersistentVolumeClaimRetentionPolicyType,
   117  			WhenDeleted: apps.RetainPersistentVolumeClaimRetentionPolicyType,
   118  		},
   119  		{
   120  			WhenScaled:  apps.RetainPersistentVolumeClaimRetentionPolicyType,
   121  			WhenDeleted: apps.DeletePersistentVolumeClaimRetentionPolicyType,
   122  		},
   123  		{
   124  			WhenScaled:  apps.DeletePersistentVolumeClaimRetentionPolicyType,
   125  			WhenDeleted: apps.DeletePersistentVolumeClaimRetentionPolicyType,
   126  		},
   127  		// tests the case when no policy is set.
   128  		nil,
   129  	} {
   130  		subtestName := pvcDeletePolicyString(policy) + "/StatefulSetAutoDeletePVCEnabled"
   131  		if testName != "" {
   132  			subtestName = fmt.Sprintf("%s/%s", testName, subtestName)
   133  		}
   134  		t.Run(subtestName, func(t *testing.T) {
   135  			defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetAutoDeletePVC, true)()
   136  			testFn(t, policy)
   137  		})
   138  	}
   139  }
   140  
   141  func pvcDeletePolicyString(policy *apps.StatefulSetPersistentVolumeClaimRetentionPolicy) string {
   142  	if policy == nil {
   143  		return "nullPolicy"
   144  	}
   145  	const retain = apps.RetainPersistentVolumeClaimRetentionPolicyType
   146  	const delete = apps.DeletePersistentVolumeClaimRetentionPolicyType
   147  	switch {
   148  	case policy.WhenScaled == retain && policy.WhenDeleted == retain:
   149  		return "Retain"
   150  	case policy.WhenScaled == retain && policy.WhenDeleted == delete:
   151  		return "SetDeleteOnly"
   152  	case policy.WhenScaled == delete && policy.WhenDeleted == retain:
   153  		return "ScaleDownOnly"
   154  	case policy.WhenScaled == delete && policy.WhenDeleted == delete:
   155  		return "Delete"
   156  	}
   157  	return "invalid"
   158  }
   159  
   160  func TestStatefulSetControl(t *testing.T) {
   161  	simpleSetFn := func() *apps.StatefulSet { return newStatefulSet(3) }
   162  	largeSetFn := func() *apps.StatefulSet { return newStatefulSet(5) }
   163  
   164  	testCases := []struct {
   165  		fn  func(*testing.T, *apps.StatefulSet, invariantFunc)
   166  		obj func() *apps.StatefulSet
   167  	}{
   168  		{CreatesPods, simpleSetFn},
   169  		{ScalesUp, simpleSetFn},
   170  		{ScalesDown, simpleSetFn},
   171  		{ReplacesPods, largeSetFn},
   172  		{RecreatesFailedPod, simpleSetFn},
   173  		{RecreatesSucceededPod, simpleSetFn},
   174  		{CreatePodFailure, simpleSetFn},
   175  		{UpdatePodFailure, simpleSetFn},
   176  		{UpdateSetStatusFailure, simpleSetFn},
   177  		{PodRecreateDeleteFailure, simpleSetFn},
   178  		{NewRevisionDeletePodFailure, simpleSetFn},
   179  		{RecreatesPVCForPendingPod, simpleSetFn},
   180  	}
   181  
   182  	for _, testCase := range testCases {
   183  		fnName := runtime.FuncForPC(reflect.ValueOf(testCase.fn).Pointer()).Name()
   184  		if i := strings.LastIndex(fnName, "."); i != -1 {
   185  			fnName = fnName[i+1:]
   186  		}
   187  		testObj := testCase.obj
   188  		testFn := testCase.fn
   189  		runTestOverPVCRetentionPolicies(
   190  			t,
   191  			fmt.Sprintf("%s/Monotonic", fnName),
   192  			func(t *testing.T, policy *apps.StatefulSetPersistentVolumeClaimRetentionPolicy) {
   193  				set := testObj()
   194  				set.Spec.PersistentVolumeClaimRetentionPolicy = policy
   195  				testFn(t, set, assertMonotonicInvariants)
   196  			},
   197  		)
   198  		runTestOverPVCRetentionPolicies(
   199  			t,
   200  			fmt.Sprintf("%s/Burst", fnName),
   201  			func(t *testing.T, policy *apps.StatefulSetPersistentVolumeClaimRetentionPolicy) {
   202  				set := burst(testObj())
   203  				set.Spec.PersistentVolumeClaimRetentionPolicy = policy
   204  				testFn(t, set, assertBurstInvariants)
   205  			},
   206  		)
   207  	}
   208  }
   209  
   210  func CreatesPods(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
   211  	client := fake.NewSimpleClientset(set)
   212  	om, _, ssc := setupController(client)
   213  
   214  	if err := scaleUpStatefulSetControl(set, ssc, om, invariants); err != nil {
   215  		t.Errorf("Failed to turn up StatefulSet : %s", err)
   216  	}
   217  	var err error
   218  	set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
   219  	if err != nil {
   220  		t.Fatalf("Error getting updated StatefulSet: %v", err)
   221  	}
   222  	if set.Status.Replicas != 3 {
   223  		t.Error("Failed to scale statefulset to 3 replicas")
   224  	}
   225  	if set.Status.ReadyReplicas != 3 {
   226  		t.Error("Failed to set ReadyReplicas correctly")
   227  	}
   228  	if set.Status.UpdatedReplicas != 3 {
   229  		t.Error("Failed to set UpdatedReplicas correctly")
   230  	}
   231  	// Check all pods have correct pod index label.
   232  	if utilfeature.DefaultFeatureGate.Enabled(features.PodIndexLabel) {
   233  		selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
   234  		if err != nil {
   235  			t.Error(err)
   236  		}
   237  		pods, err := om.podsLister.Pods(set.Namespace).List(selector)
   238  		if err != nil {
   239  			t.Error(err)
   240  		}
   241  		if len(pods) != 3 {
   242  			t.Errorf("Expected 3 pods, got %d", len(pods))
   243  		}
   244  		for _, pod := range pods {
   245  			podIndexFromLabel, exists := pod.Labels[apps.PodIndexLabel]
   246  			if !exists {
   247  				t.Errorf("Missing pod index label: %s", apps.PodIndexLabel)
   248  				continue
   249  			}
   250  			podIndexFromName := strconv.Itoa(getOrdinal(pod))
   251  			if podIndexFromLabel != podIndexFromName {
   252  				t.Errorf("Pod index label value (%s) does not match pod index in pod name (%s)", podIndexFromLabel, podIndexFromName)
   253  			}
   254  		}
   255  	}
   256  }
   257  
   258  func ScalesUp(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
   259  	client := fake.NewSimpleClientset(set)
   260  	om, _, ssc := setupController(client)
   261  
   262  	if err := scaleUpStatefulSetControl(set, ssc, om, invariants); err != nil {
   263  		t.Errorf("Failed to turn up StatefulSet : %s", err)
   264  	}
   265  	*set.Spec.Replicas = 4
   266  	if err := scaleUpStatefulSetControl(set, ssc, om, invariants); err != nil {
   267  		t.Errorf("Failed to scale StatefulSet : %s", err)
   268  	}
   269  	var err error
   270  	set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
   271  	if err != nil {
   272  		t.Fatalf("Error getting updated StatefulSet: %v", err)
   273  	}
   274  	if set.Status.Replicas != 4 {
   275  		t.Error("Failed to scale statefulset to 4 replicas")
   276  	}
   277  	if set.Status.ReadyReplicas != 4 {
   278  		t.Error("Failed to set readyReplicas correctly")
   279  	}
   280  	if set.Status.UpdatedReplicas != 4 {
   281  		t.Error("Failed to set updatedReplicas correctly")
   282  	}
   283  }
   284  
   285  func ScalesDown(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
   286  	client := fake.NewSimpleClientset(set)
   287  	om, _, ssc := setupController(client)
   288  
   289  	if err := scaleUpStatefulSetControl(set, ssc, om, invariants); err != nil {
   290  		t.Errorf("Failed to turn up StatefulSet : %s", err)
   291  	}
   292  	var err error
   293  	set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
   294  	if err != nil {
   295  		t.Fatalf("Error getting updated StatefulSet: %v", err)
   296  	}
   297  	*set.Spec.Replicas = 0
   298  	if err := scaleDownStatefulSetControl(set, ssc, om, invariants); err != nil {
   299  		t.Errorf("Failed to scale StatefulSet : %s", err)
   300  	}
   301  
   302  	// Check updated set.
   303  	set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
   304  	if err != nil {
   305  		t.Fatalf("Error getting updated StatefulSet: %v", err)
   306  	}
   307  	if set.Status.Replicas != 0 {
   308  		t.Error("Failed to scale statefulset to 0 replicas")
   309  	}
   310  	if set.Status.ReadyReplicas != 0 {
   311  		t.Error("Failed to set readyReplicas correctly")
   312  	}
   313  	if set.Status.UpdatedReplicas != 0 {
   314  		t.Error("Failed to set updatedReplicas correctly")
   315  	}
   316  }
   317  
   318  func ReplacesPods(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
   319  	client := fake.NewSimpleClientset(set)
   320  	om, _, ssc := setupController(client)
   321  
   322  	if err := scaleUpStatefulSetControl(set, ssc, om, invariants); err != nil {
   323  		t.Errorf("Failed to turn up StatefulSet : %s", err)
   324  	}
   325  	var err error
   326  	set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
   327  	if err != nil {
   328  		t.Fatalf("Error getting updated StatefulSet: %v", err)
   329  	}
   330  	if set.Status.Replicas != 5 {
   331  		t.Error("Failed to scale statefulset to 5 replicas")
   332  	}
   333  	selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
   334  	if err != nil {
   335  		t.Error(err)
   336  	}
   337  	claims, err := om.claimsLister.PersistentVolumeClaims(set.Namespace).List(selector)
   338  	if err != nil {
   339  		t.Error(err)
   340  	}
   341  	pods, err := om.podsLister.Pods(set.Namespace).List(selector)
   342  	if err != nil {
   343  		t.Error(err)
   344  	}
   345  	for _, pod := range pods {
   346  		podClaims := getPersistentVolumeClaims(set, pod)
   347  		for _, claim := range claims {
   348  			if _, found := podClaims[claim.Name]; found {
   349  				if hasOwnerRef(claim, pod) {
   350  					t.Errorf("Unexpected ownerRef on %s", claim.Name)
   351  				}
   352  			}
   353  		}
   354  	}
   355  	sort.Sort(ascendingOrdinal(pods))
   356  	om.podsIndexer.Delete(pods[0])
   357  	om.podsIndexer.Delete(pods[2])
   358  	om.podsIndexer.Delete(pods[4])
   359  	for i := 0; i < 5; i += 2 {
   360  		pods, err := om.podsLister.Pods(set.Namespace).List(selector)
   361  		if err != nil {
   362  			t.Error(err)
   363  		}
   364  		if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
   365  			t.Errorf("Failed to update StatefulSet : %s", err)
   366  		}
   367  		set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
   368  		if err != nil {
   369  			t.Fatalf("Error getting updated StatefulSet: %v", err)
   370  		}
   371  		if pods, err = om.setPodRunning(set, i); err != nil {
   372  			t.Error(err)
   373  		}
   374  		if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
   375  			t.Errorf("Failed to update StatefulSet : %s", err)
   376  		}
   377  		set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
   378  		if err != nil {
   379  			t.Fatalf("Error getting updated StatefulSet: %v", err)
   380  		}
   381  		if _, err = om.setPodReady(set, i); err != nil {
   382  			t.Error(err)
   383  		}
   384  	}
   385  	pods, err = om.podsLister.Pods(set.Namespace).List(selector)
   386  	if err != nil {
   387  		t.Error(err)
   388  	}
   389  	if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
   390  		t.Errorf("Failed to update StatefulSet : %s", err)
   391  	}
   392  	set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
   393  	if err != nil {
   394  		t.Fatalf("Error getting updated StatefulSet: %v", err)
   395  	}
   396  	if e, a := int32(5), set.Status.Replicas; e != a {
   397  		t.Errorf("Expected to scale to %d, got %d", e, a)
   398  	}
   399  }
   400  
   401  func recreatesPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc, phase v1.PodPhase) {
   402  	client := fake.NewSimpleClientset()
   403  	om, _, ssc := setupController(client)
   404  	selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
   405  	if err != nil {
   406  		t.Error(err)
   407  	}
   408  	pods, err := om.podsLister.Pods(set.Namespace).List(selector)
   409  	if err != nil {
   410  		t.Error(err)
   411  	}
   412  	if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
   413  		t.Errorf("Error updating StatefulSet %s", err)
   414  	}
   415  	if err := invariants(set, om); err != nil {
   416  		t.Error(err)
   417  	}
   418  	pods, err = om.podsLister.Pods(set.Namespace).List(selector)
   419  	if err != nil {
   420  		t.Error(err)
   421  	}
   422  	pods[0].Status.Phase = phase
   423  	om.podsIndexer.Update(pods[0])
   424  	if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
   425  		t.Errorf("Error updating StatefulSet %s", err)
   426  	}
   427  	if err := invariants(set, om); err != nil {
   428  		t.Error(err)
   429  	}
   430  	pods, err = om.podsLister.Pods(set.Namespace).List(selector)
   431  	if err != nil {
   432  		t.Error(err)
   433  	}
   434  	if isCreated(pods[0]) {
   435  		t.Error("StatefulSet did not recreate failed Pod")
   436  	}
   437  
   438  }
   439  
   440  func RecreatesFailedPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
   441  	recreatesPod(t, set, invariants, v1.PodFailed)
   442  }
   443  
   444  func RecreatesSucceededPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
   445  	recreatesPod(t, set, invariants, v1.PodSucceeded)
   446  }
   447  
   448  func CreatePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
   449  	client := fake.NewSimpleClientset(set)
   450  	om, _, ssc := setupController(client)
   451  	om.SetCreateStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 2)
   452  
   453  	if err := scaleUpStatefulSetControl(set, ssc, om, invariants); err != nil && isOrHasInternalError(err) {
   454  		t.Errorf("StatefulSetControl did not return InternalError found %s", err)
   455  	}
   456  	// Update so set.Status is set for the next scaleUpStatefulSetControl call.
   457  	var err error
   458  	set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
   459  	if err != nil {
   460  		t.Fatalf("Error getting updated StatefulSet: %v", err)
   461  	}
   462  	if err := scaleUpStatefulSetControl(set, ssc, om, invariants); err != nil {
   463  		t.Errorf("Failed to turn up StatefulSet : %s", err)
   464  	}
   465  	set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
   466  	if err != nil {
   467  		t.Fatalf("Error getting updated StatefulSet: %v", err)
   468  	}
   469  	if set.Status.Replicas != 3 {
   470  		t.Error("Failed to scale StatefulSet to 3 replicas")
   471  	}
   472  	if set.Status.ReadyReplicas != 3 {
   473  		t.Error("Failed to set readyReplicas correctly")
   474  	}
   475  	if set.Status.UpdatedReplicas != 3 {
   476  		t.Error("Failed to updatedReplicas correctly")
   477  	}
   478  }
   479  
   480  func UpdatePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
   481  	client := fake.NewSimpleClientset(set)
   482  	om, _, ssc := setupController(client)
   483  	om.SetUpdateStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 0)
   484  
   485  	// have to have 1 successful loop first
   486  	if err := scaleUpStatefulSetControl(set, ssc, om, invariants); err != nil {
   487  		t.Fatalf("Unexpected error: %v", err)
   488  	}
   489  	var err error
   490  	set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
   491  	if err != nil {
   492  		t.Fatalf("Error getting updated StatefulSet: %v", err)
   493  	}
   494  	if set.Status.Replicas != 3 {
   495  		t.Error("Failed to scale StatefulSet to 3 replicas")
   496  	}
   497  	if set.Status.ReadyReplicas != 3 {
   498  		t.Error("Failed to set readyReplicas correctly")
   499  	}
   500  	if set.Status.UpdatedReplicas != 3 {
   501  		t.Error("Failed to set updatedReplicas correctly")
   502  	}
   503  
   504  	// now mutate a pod's identity
   505  	pods, err := om.podsLister.List(labels.Everything())
   506  	if err != nil {
   507  		t.Fatalf("Error listing pods: %v", err)
   508  	}
   509  	if len(pods) != 3 {
   510  		t.Fatalf("Expected 3 pods, got %d", len(pods))
   511  	}
   512  	sort.Sort(ascendingOrdinal(pods))
   513  	pods[0].Name = "goo-0"
   514  	om.podsIndexer.Update(pods[0])
   515  
   516  	// now it should fail
   517  	if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil && isOrHasInternalError(err) {
   518  		t.Errorf("StatefulSetControl did not return InternalError found %s", err)
   519  	}
   520  }
   521  
   522  func UpdateSetStatusFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
   523  	client := fake.NewSimpleClientset(set)
   524  	om, ssu, ssc := setupController(client)
   525  	ssu.SetUpdateStatefulSetStatusError(apierrors.NewInternalError(errors.New("API server failed")), 2)
   526  
   527  	if err := scaleUpStatefulSetControl(set, ssc, om, invariants); err != nil && isOrHasInternalError(err) {
   528  		t.Errorf("StatefulSetControl did not return InternalError found %s", err)
   529  	}
   530  	// Update so set.Status is set for the next scaleUpStatefulSetControl call.
   531  	var err error
   532  	set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
   533  	if err != nil {
   534  		t.Fatalf("Error getting updated StatefulSet: %v", err)
   535  	}
   536  	if err := scaleUpStatefulSetControl(set, ssc, om, invariants); err != nil {
   537  		t.Errorf("Failed to turn up StatefulSet : %s", err)
   538  	}
   539  	set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
   540  	if err != nil {
   541  		t.Fatalf("Error getting updated StatefulSet: %v", err)
   542  	}
   543  	if set.Status.Replicas != 3 {
   544  		t.Error("Failed to scale StatefulSet to 3 replicas")
   545  	}
   546  	if set.Status.ReadyReplicas != 3 {
   547  		t.Error("Failed to set readyReplicas to 3")
   548  	}
   549  	if set.Status.UpdatedReplicas != 3 {
   550  		t.Error("Failed to set updatedReplicas to 3")
   551  	}
   552  }
   553  
   554  func PodRecreateDeleteFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
   555  	client := fake.NewSimpleClientset(set)
   556  	om, _, ssc := setupController(client)
   557  
   558  	selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
   559  	if err != nil {
   560  		t.Error(err)
   561  	}
   562  	pods, err := om.podsLister.Pods(set.Namespace).List(selector)
   563  	if err != nil {
   564  		t.Error(err)
   565  	}
   566  	if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
   567  		t.Errorf("Error updating StatefulSet %s", err)
   568  	}
   569  	if err := invariants(set, om); err != nil {
   570  		t.Error(err)
   571  	}
   572  	pods, err = om.podsLister.Pods(set.Namespace).List(selector)
   573  	if err != nil {
   574  		t.Error(err)
   575  	}
   576  	pods[0].Status.Phase = v1.PodFailed
   577  	om.podsIndexer.Update(pods[0])
   578  	om.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 0)
   579  	if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil && isOrHasInternalError(err) {
   580  		t.Errorf("StatefulSet failed to %s", err)
   581  	}
   582  	if err := invariants(set, om); err != nil {
   583  		t.Error(err)
   584  	}
   585  	if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
   586  		t.Errorf("Error updating StatefulSet %s", err)
   587  	}
   588  	if err := invariants(set, om); err != nil {
   589  		t.Error(err)
   590  	}
   591  	pods, err = om.podsLister.Pods(set.Namespace).List(selector)
   592  	if err != nil {
   593  		t.Error(err)
   594  	}
   595  	if isCreated(pods[0]) {
   596  		t.Error("StatefulSet did not recreate failed Pod")
   597  	}
   598  }
   599  
   600  func NewRevisionDeletePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
   601  	client := fake.NewSimpleClientset(set)
   602  	om, _, ssc := setupController(client)
   603  	if err := scaleUpStatefulSetControl(set, ssc, om, invariants); err != nil {
   604  		t.Errorf("Failed to turn up StatefulSet : %s", err)
   605  	}
   606  	var err error
   607  	set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
   608  	if err != nil {
   609  		t.Fatalf("Error getting updated StatefulSet: %v", err)
   610  	}
   611  	if set.Status.Replicas != 3 {
   612  		t.Error("Failed to scale StatefulSet to 3 replicas")
   613  	}
   614  	selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
   615  	if err != nil {
   616  		t.Error(err)
   617  	}
   618  	pods, err := om.podsLister.Pods(set.Namespace).List(selector)
   619  	if err != nil {
   620  		t.Error(err)
   621  	}
   622  
   623  	// trigger a new revision
   624  	updateSet := set.DeepCopy()
   625  	updateSet.Spec.Template.Spec.Containers[0].Image = "nginx-new"
   626  	if err := om.setsIndexer.Update(updateSet); err != nil {
   627  		t.Error("Failed to update StatefulSet")
   628  	}
   629  	set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
   630  	if err != nil {
   631  		t.Fatalf("Error getting updated StatefulSet: %v", err)
   632  	}
   633  
   634  	// delete fails
   635  	om.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 0)
   636  	_, err = ssc.UpdateStatefulSet(context.TODO(), set, pods)
   637  	if err == nil {
   638  		t.Error("Expected err in update StatefulSet when deleting a pod")
   639  	}
   640  
   641  	set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
   642  	if err != nil {
   643  		t.Fatalf("Error getting updated StatefulSet: %v", err)
   644  	}
   645  	if err := invariants(set, om); err != nil {
   646  		t.Error(err)
   647  	}
   648  	if set.Status.CurrentReplicas != 3 {
   649  		t.Fatalf("Failed pod deletion should not update CurrentReplicas: want 3, got %d", set.Status.CurrentReplicas)
   650  	}
   651  	if set.Status.CurrentRevision == set.Status.UpdateRevision {
   652  		t.Error("Failed to create new revision")
   653  	}
   654  
   655  	// delete works
   656  	om.SetDeleteStatefulPodError(nil, 0)
   657  	status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods)
   658  	if err != nil {
   659  		t.Fatalf("Unexpected err in update StatefulSet: %v", err)
   660  	}
   661  	if status.CurrentReplicas != 2 {
   662  		t.Fatalf("Pod deletion should update CurrentReplicas: want 2, got %d", status.CurrentReplicas)
   663  	}
   664  	if err := invariants(set, om); err != nil {
   665  		t.Error(err)
   666  	}
   667  }
   668  
   669  func emptyInvariants(set *apps.StatefulSet, om *fakeObjectManager) error {
   670  	return nil
   671  }
   672  
   673  func TestStatefulSetControlWithStartOrdinal(t *testing.T) {
   674  	defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetStartOrdinal, true)()
   675  
   676  	simpleSetFn := func() *apps.StatefulSet {
   677  		statefulSet := newStatefulSet(3)
   678  		statefulSet.Spec.Ordinals = &apps.StatefulSetOrdinals{Start: int32(2)}
   679  		return statefulSet
   680  	}
   681  
   682  	testCases := []struct {
   683  		fn  func(*testing.T, *apps.StatefulSet, invariantFunc)
   684  		obj func() *apps.StatefulSet
   685  	}{
   686  		{CreatesPodsWithStartOrdinal, simpleSetFn},
   687  	}
   688  
   689  	for _, testCase := range testCases {
   690  		testObj := testCase.obj
   691  		testFn := testCase.fn
   692  
   693  		set := testObj()
   694  		testFn(t, set, emptyInvariants)
   695  	}
   696  }
   697  
   698  func CreatesPodsWithStartOrdinal(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
   699  	client := fake.NewSimpleClientset(set)
   700  	om, _, ssc := setupController(client)
   701  
   702  	if err := scaleUpStatefulSetControl(set, ssc, om, invariants); err != nil {
   703  		t.Errorf("Failed to turn up StatefulSet : %s", err)
   704  	}
   705  	var err error
   706  	set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
   707  	if err != nil {
   708  		t.Fatalf("Error getting updated StatefulSet: %v", err)
   709  	}
   710  	if set.Status.Replicas != 3 {
   711  		t.Error("Failed to scale statefulset to 3 replicas")
   712  	}
   713  	if set.Status.ReadyReplicas != 3 {
   714  		t.Error("Failed to set ReadyReplicas correctly")
   715  	}
   716  	if set.Status.UpdatedReplicas != 3 {
   717  		t.Error("Failed to set UpdatedReplicas correctly")
   718  	}
   719  	selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
   720  	if err != nil {
   721  		t.Error(err)
   722  	}
   723  	pods, err := om.podsLister.Pods(set.Namespace).List(selector)
   724  	if err != nil {
   725  		t.Error(err)
   726  	}
   727  	sort.Sort(ascendingOrdinal(pods))
   728  	for i, pod := range pods {
   729  		expectedOrdinal := 2 + i
   730  		actualPodOrdinal := getOrdinal(pod)
   731  		if actualPodOrdinal != expectedOrdinal {
   732  			t.Errorf("Expected pod ordinal %d. Got %d", expectedOrdinal, actualPodOrdinal)
   733  		}
   734  	}
   735  }
   736  
   737  func RecreatesPVCForPendingPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
   738  	client := fake.NewSimpleClientset()
   739  	om, _, ssc := setupController(client)
   740  	selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
   741  	if err != nil {
   742  		t.Error(err)
   743  	}
   744  	pods, err := om.podsLister.Pods(set.Namespace).List(selector)
   745  	if err != nil {
   746  		t.Error(err)
   747  	}
   748  	if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
   749  		t.Errorf("Error updating StatefulSet %s", err)
   750  	}
   751  	if err := invariants(set, om); err != nil {
   752  		t.Error(err)
   753  	}
   754  	pods, err = om.podsLister.Pods(set.Namespace).List(selector)
   755  	if err != nil {
   756  		t.Error(err)
   757  	}
   758  	for _, claim := range getPersistentVolumeClaims(set, pods[0]) {
   759  		om.claimsIndexer.Delete(&claim)
   760  	}
   761  	pods[0].Status.Phase = v1.PodPending
   762  	om.podsIndexer.Update(pods[0])
   763  	if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
   764  		t.Errorf("Error updating StatefulSet %s", err)
   765  	}
   766  	// invariants check if there any missing PVCs for the Pods
   767  	if err := invariants(set, om); err != nil {
   768  		t.Error(err)
   769  	}
   770  	_, err = om.podsLister.Pods(set.Namespace).List(selector)
   771  	if err != nil {
   772  		t.Error(err)
   773  	}
   774  }
   775  
   776  func TestStatefulSetControlScaleDownDeleteError(t *testing.T) {
   777  	runTestOverPVCRetentionPolicies(
   778  		t, "", func(t *testing.T, policy *apps.StatefulSetPersistentVolumeClaimRetentionPolicy) {
   779  			set := newStatefulSet(3)
   780  			set.Spec.PersistentVolumeClaimRetentionPolicy = policy
   781  			invariants := assertMonotonicInvariants
   782  			client := fake.NewSimpleClientset(set)
   783  			om, _, ssc := setupController(client)
   784  
   785  			if err := scaleUpStatefulSetControl(set, ssc, om, invariants); err != nil {
   786  				t.Errorf("Failed to turn up StatefulSet : %s", err)
   787  			}
   788  			var err error
   789  			set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
   790  			if err != nil {
   791  				t.Fatalf("Error getting updated StatefulSet: %v", err)
   792  			}
   793  			*set.Spec.Replicas = 0
   794  			om.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 2)
   795  			if err := scaleDownStatefulSetControl(set, ssc, om, invariants); err != nil && isOrHasInternalError(err) {
   796  				t.Errorf("StatefulSetControl failed to throw error on delete %s", err)
   797  			}
   798  			set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
   799  			if err != nil {
   800  				t.Fatalf("Error getting updated StatefulSet: %v", err)
   801  			}
   802  			if err := scaleDownStatefulSetControl(set, ssc, om, invariants); err != nil {
   803  				t.Errorf("Failed to turn down StatefulSet %s", err)
   804  			}
   805  			set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
   806  			if err != nil {
   807  				t.Fatalf("Error getting updated StatefulSet: %v", err)
   808  			}
   809  			if set.Status.Replicas != 0 {
   810  				t.Error("Failed to scale statefulset to 0 replicas")
   811  			}
   812  			if set.Status.ReadyReplicas != 0 {
   813  				t.Error("Failed to set readyReplicas to 0")
   814  			}
   815  			if set.Status.UpdatedReplicas != 0 {
   816  				t.Error("Failed to set updatedReplicas to 0")
   817  			}
   818  		})
   819  }
   820  
   821  func TestStatefulSetControl_getSetRevisions(t *testing.T) {
   822  	type testcase struct {
   823  		name            string
   824  		existing        []*apps.ControllerRevision
   825  		set             *apps.StatefulSet
   826  		expectedCount   int
   827  		expectedCurrent *apps.ControllerRevision
   828  		expectedUpdate  *apps.ControllerRevision
   829  		err             bool
   830  	}
   831  
   832  	testFn := func(test *testcase, t *testing.T) {
   833  		client := fake.NewSimpleClientset()
   834  		informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
   835  		spc := NewStatefulPodControlFromManager(newFakeObjectManager(informerFactory), &noopRecorder{})
   836  		ssu := newFakeStatefulSetStatusUpdater(informerFactory.Apps().V1().StatefulSets())
   837  		recorder := &noopRecorder{}
   838  		ssc := defaultStatefulSetControl{spc, ssu, history.NewFakeHistory(informerFactory.Apps().V1().ControllerRevisions()), recorder}
   839  
   840  		stop := make(chan struct{})
   841  		defer close(stop)
   842  		informerFactory.Start(stop)
   843  		cache.WaitForCacheSync(
   844  			stop,
   845  			informerFactory.Apps().V1().StatefulSets().Informer().HasSynced,
   846  			informerFactory.Core().V1().Pods().Informer().HasSynced,
   847  			informerFactory.Apps().V1().ControllerRevisions().Informer().HasSynced,
   848  		)
   849  		test.set.Status.CollisionCount = new(int32)
   850  		for i := range test.existing {
   851  			ssc.controllerHistory.CreateControllerRevision(test.set, test.existing[i], test.set.Status.CollisionCount)
   852  		}
   853  		revisions, err := ssc.ListRevisions(test.set)
   854  		if err != nil {
   855  			t.Fatal(err)
   856  		}
   857  		current, update, _, err := ssc.getStatefulSetRevisions(test.set, revisions)
   858  		if err != nil {
   859  			t.Fatalf("error getting statefulset revisions:%v", err)
   860  		}
   861  		revisions, err = ssc.ListRevisions(test.set)
   862  		if err != nil {
   863  			t.Fatal(err)
   864  		}
   865  		if len(revisions) != test.expectedCount {
   866  			t.Errorf("%s: want %d revisions got %d", test.name, test.expectedCount, len(revisions))
   867  		}
   868  		if test.err && err == nil {
   869  			t.Errorf("%s: expected error", test.name)
   870  		}
   871  		if !test.err && !history.EqualRevision(current, test.expectedCurrent) {
   872  			t.Errorf("%s: for current want %v got %v", test.name, test.expectedCurrent, current)
   873  		}
   874  		if !test.err && !history.EqualRevision(update, test.expectedUpdate) {
   875  			t.Errorf("%s: for update want %v got %v", test.name, test.expectedUpdate, update)
   876  		}
   877  		if !test.err && test.expectedCurrent != nil && current != nil && test.expectedCurrent.Revision != current.Revision {
   878  			t.Errorf("%s: for current revision want %d got %d", test.name, test.expectedCurrent.Revision, current.Revision)
   879  		}
   880  		if !test.err && test.expectedUpdate != nil && update != nil && test.expectedUpdate.Revision != update.Revision {
   881  			t.Errorf("%s: for update revision want %d got %d", test.name, test.expectedUpdate.Revision, update.Revision)
   882  		}
   883  	}
   884  
   885  	updateRevision := func(cr *apps.ControllerRevision, revision int64) *apps.ControllerRevision {
   886  		clone := cr.DeepCopy()
   887  		clone.Revision = revision
   888  		return clone
   889  	}
   890  
   891  	runTestOverPVCRetentionPolicies(
   892  		t, "", func(t *testing.T, policy *apps.StatefulSetPersistentVolumeClaimRetentionPolicy) {
   893  			set := newStatefulSet(3)
   894  			set.Spec.PersistentVolumeClaimRetentionPolicy = policy
   895  			set.Status.CollisionCount = new(int32)
   896  			rev0 := newRevisionOrDie(set, 1)
   897  			set1 := set.DeepCopy()
   898  			set1.Spec.Template.Spec.Containers[0].Image = "foo"
   899  			set1.Status.CurrentRevision = rev0.Name
   900  			set1.Status.CollisionCount = new(int32)
   901  			rev1 := newRevisionOrDie(set1, 2)
   902  			set2 := set1.DeepCopy()
   903  			set2.Spec.Template.Labels["new"] = "label"
   904  			set2.Status.CurrentRevision = rev0.Name
   905  			set2.Status.CollisionCount = new(int32)
   906  			rev2 := newRevisionOrDie(set2, 3)
   907  			tests := []testcase{
   908  				{
   909  					name:            "creates initial revision",
   910  					existing:        nil,
   911  					set:             set,
   912  					expectedCount:   1,
   913  					expectedCurrent: rev0,
   914  					expectedUpdate:  rev0,
   915  					err:             false,
   916  				},
   917  				{
   918  					name:            "creates revision on update",
   919  					existing:        []*apps.ControllerRevision{rev0},
   920  					set:             set1,
   921  					expectedCount:   2,
   922  					expectedCurrent: rev0,
   923  					expectedUpdate:  rev1,
   924  					err:             false,
   925  				},
   926  				{
   927  					name:            "must not recreate a new revision of same set",
   928  					existing:        []*apps.ControllerRevision{rev0, rev1},
   929  					set:             set1,
   930  					expectedCount:   2,
   931  					expectedCurrent: rev0,
   932  					expectedUpdate:  rev1,
   933  					err:             false,
   934  				},
   935  				{
   936  					name:            "must rollback to a previous revision",
   937  					existing:        []*apps.ControllerRevision{rev0, rev1, rev2},
   938  					set:             set1,
   939  					expectedCount:   3,
   940  					expectedCurrent: rev0,
   941  					expectedUpdate:  updateRevision(rev1, 4),
   942  					err:             false,
   943  				},
   944  			}
   945  			for i := range tests {
   946  				testFn(&tests[i], t)
   947  			}
   948  		})
   949  }
   950  
   951  func setupPodManagementPolicy(podManagementPolicy apps.PodManagementPolicyType, set *apps.StatefulSet) *apps.StatefulSet {
   952  	set.Spec.PodManagementPolicy = podManagementPolicy
   953  	return set
   954  }
   955  
   956  func TestStatefulSetControlRollingUpdateWithMaxUnavailable(t *testing.T) {
   957  	defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.MaxUnavailableStatefulSet, true)()
   958  
   959  	simpleParallelVerificationFn := func(
   960  		set *apps.StatefulSet,
   961  		spc *fakeObjectManager,
   962  		ssc StatefulSetControlInterface,
   963  		pods []*v1.Pod,
   964  		totalPods int,
   965  		selector labels.Selector,
   966  	) []*v1.Pod {
   967  		// in burst mode, 2 pods got deleted, so 2 new pods will be created at the same time
   968  		if len(pods) != totalPods {
   969  			t.Fatalf("Expected create pods 4/5, got pods %v", len(pods))
   970  		}
   971  
   972  		// if pod 4 ready, start to update pod 3, even though 5 is not ready
   973  		spc.setPodRunning(set, 4)
   974  		spc.setPodRunning(set, 5)
   975  		originalPods, _ := spc.setPodReady(set, 4)
   976  		sort.Sort(ascendingOrdinal(originalPods))
   977  		if _, err := ssc.UpdateStatefulSet(context.TODO(), set, originalPods); err != nil {
   978  			t.Fatal(err)
   979  		}
   980  		pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
   981  		if err != nil {
   982  			t.Fatal(err)
   983  		}
   984  		sort.Sort(ascendingOrdinal(pods))
   985  		// pods 0, 1,2, 4,5 should be present(note 3 is missing)
   986  		if !reflect.DeepEqual(pods, append(originalPods[:3], originalPods[4:]...)) {
   987  			t.Fatalf("Expected pods %v, got pods %v", append(originalPods[:3], originalPods[4:]...), pods)
   988  		}
   989  
   990  		// create new pod 3
   991  		if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
   992  			t.Fatal(err)
   993  		}
   994  		pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
   995  		if err != nil {
   996  			t.Fatal(err)
   997  		}
   998  		if len(pods) != totalPods {
   999  			t.Fatalf("Expected create pods 2/3, got pods %v", pods)
  1000  		}
  1001  
  1002  		return pods
  1003  	}
  1004  	simpleOrderedVerificationFn := func(
  1005  		set *apps.StatefulSet,
  1006  		spc *fakeObjectManager,
  1007  		ssc StatefulSetControlInterface,
  1008  		pods []*v1.Pod,
  1009  		totalPods int,
  1010  		selector labels.Selector,
  1011  	) []*v1.Pod {
  1012  		// only one pod gets created at a time due to OrderedReady
  1013  		if len(pods) != 5 {
  1014  			t.Fatalf("Expected create pods 5, got pods %v", len(pods))
  1015  		}
  1016  		spc.setPodRunning(set, 4)
  1017  		pods, _ = spc.setPodReady(set, 4)
  1018  
  1019  		// create new pods 4(only one pod gets created at a time due to OrderedReady)
  1020  		if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
  1021  			t.Fatal(err)
  1022  		}
  1023  		pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  1024  		if err != nil {
  1025  			t.Fatal(err)
  1026  		}
  1027  
  1028  		if len(pods) != totalPods {
  1029  			t.Fatalf("Expected create pods 4, got pods %v", len(pods))
  1030  		}
  1031  		// if pod 4 ready, start to update pod 3
  1032  		spc.setPodRunning(set, 5)
  1033  		originalPods, _ := spc.setPodReady(set, 5)
  1034  		sort.Sort(ascendingOrdinal(originalPods))
  1035  		if _, err = ssc.UpdateStatefulSet(context.TODO(), set, originalPods); err != nil {
  1036  			t.Fatal(err)
  1037  		}
  1038  		pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
  1039  		if err != nil {
  1040  			t.Fatal(err)
  1041  		}
  1042  		sort.Sort(ascendingOrdinal(pods))
  1043  
  1044  		// verify the remaining pods are 0,1,2,4,5 (3 got deleted)
  1045  		if !reflect.DeepEqual(pods, append(originalPods[:3], originalPods[4:]...)) {
  1046  			t.Fatalf("Expected pods %v, got pods %v", append(originalPods[:3], originalPods[4:]...), pods)
  1047  		}
  1048  
  1049  		// create new pod 3
  1050  		if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
  1051  			t.Fatal(err)
  1052  		}
  1053  		pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
  1054  		if err != nil {
  1055  			t.Fatal(err)
  1056  		}
  1057  		if len(pods) != totalPods {
  1058  			t.Fatalf("Expected create pods 2/3, got pods %v", pods)
  1059  		}
  1060  
  1061  		return pods
  1062  	}
  1063  	testCases := []struct {
  1064  		policyType apps.PodManagementPolicyType
  1065  		verifyFn   func(
  1066  			set *apps.StatefulSet,
  1067  			spc *fakeObjectManager,
  1068  			ssc StatefulSetControlInterface,
  1069  			pods []*v1.Pod,
  1070  			totalPods int,
  1071  			selector labels.Selector,
  1072  		) []*v1.Pod
  1073  	}{
  1074  		{apps.OrderedReadyPodManagement, simpleOrderedVerificationFn},
  1075  		{apps.ParallelPodManagement, simpleParallelVerificationFn},
  1076  	}
  1077  	for _, tc := range testCases {
  1078  		// Setup the statefulSet controller
  1079  		var totalPods int32 = 6
  1080  		var partition int32 = 3
  1081  		var maxUnavailable = intstr.FromInt32(2)
  1082  		set := setupPodManagementPolicy(tc.policyType, newStatefulSet(totalPods))
  1083  		set.Spec.UpdateStrategy = apps.StatefulSetUpdateStrategy{
  1084  			Type: apps.RollingUpdateStatefulSetStrategyType,
  1085  			RollingUpdate: func() *apps.RollingUpdateStatefulSetStrategy {
  1086  				return &apps.RollingUpdateStatefulSetStrategy{
  1087  					Partition:      &partition,
  1088  					MaxUnavailable: &maxUnavailable,
  1089  				}
  1090  			}(),
  1091  		}
  1092  
  1093  		client := fake.NewSimpleClientset()
  1094  		spc, _, ssc := setupController(client)
  1095  		if err := scaleUpStatefulSetControl(set, ssc, spc, assertBurstInvariants); err != nil {
  1096  			t.Fatal(err)
  1097  		}
  1098  		set, err := spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  1099  		if err != nil {
  1100  			t.Fatal(err)
  1101  		}
  1102  
  1103  		// Change the image to trigger an update
  1104  		set.Spec.Template.Spec.Containers[0].Image = "foo"
  1105  
  1106  		selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  1107  		if err != nil {
  1108  			t.Fatal(err)
  1109  		}
  1110  		originalPods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  1111  		if err != nil {
  1112  			t.Fatal(err)
  1113  		}
  1114  		sort.Sort(ascendingOrdinal(originalPods))
  1115  
  1116  		// since maxUnavailable is 2, update pods 4 and 5, this will delete the pod 4 and 5,
  1117  		if _, err = ssc.UpdateStatefulSet(context.TODO(), set, originalPods); err != nil {
  1118  			t.Fatal(err)
  1119  		}
  1120  		pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  1121  		if err != nil {
  1122  			t.Fatal(err)
  1123  		}
  1124  
  1125  		sort.Sort(ascendingOrdinal(pods))
  1126  
  1127  		// expected number of pod is 0,1,2,3
  1128  		if !reflect.DeepEqual(pods, originalPods[:4]) {
  1129  			t.Fatalf("Expected pods %v, got pods %v", originalPods[:4], pods)
  1130  		}
  1131  
  1132  		// create new pods
  1133  		if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
  1134  			t.Fatal(err)
  1135  		}
  1136  		pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
  1137  		if err != nil {
  1138  			t.Fatal(err)
  1139  		}
  1140  
  1141  		tc.verifyFn(set, spc, ssc, pods, int(totalPods), selector)
  1142  
  1143  		// pods 3/4/5 ready, should not update other pods
  1144  		spc.setPodRunning(set, 3)
  1145  		spc.setPodRunning(set, 5)
  1146  		spc.setPodReady(set, 5)
  1147  		originalPods, _ = spc.setPodReady(set, 3)
  1148  		sort.Sort(ascendingOrdinal(originalPods))
  1149  		if _, err = ssc.UpdateStatefulSet(context.TODO(), set, originalPods); err != nil {
  1150  			t.Fatal(err)
  1151  		}
  1152  		pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
  1153  		if err != nil {
  1154  			t.Fatal(err)
  1155  		}
  1156  		sort.Sort(ascendingOrdinal(pods))
  1157  		if !reflect.DeepEqual(pods, originalPods) {
  1158  			t.Fatalf("Expected pods %v, got pods %v", originalPods, pods)
  1159  		}
  1160  	}
  1161  
  1162  }
  1163  
  1164  func setupForInvariant(t *testing.T) (*apps.StatefulSet, *fakeObjectManager, StatefulSetControlInterface, intstr.IntOrString, int32) {
  1165  	var totalPods int32 = 6
  1166  	set := newStatefulSet(totalPods)
  1167  	// update all pods >=3(3,4,5)
  1168  	var partition int32 = 3
  1169  	var maxUnavailable = intstr.FromInt32(2)
  1170  	set.Spec.UpdateStrategy = apps.StatefulSetUpdateStrategy{
  1171  		Type: apps.RollingUpdateStatefulSetStrategyType,
  1172  		RollingUpdate: func() *apps.RollingUpdateStatefulSetStrategy {
  1173  			return &apps.RollingUpdateStatefulSetStrategy{
  1174  				Partition:      &partition,
  1175  				MaxUnavailable: &maxUnavailable,
  1176  			}
  1177  		}(),
  1178  	}
  1179  
  1180  	client := fake.NewSimpleClientset()
  1181  	spc, _, ssc := setupController(client)
  1182  	if err := scaleUpStatefulSetControl(set, ssc, spc, assertBurstInvariants); err != nil {
  1183  		t.Fatal(err)
  1184  	}
  1185  	set, err := spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  1186  	if err != nil {
  1187  		t.Fatal(err)
  1188  	}
  1189  
  1190  	return set, spc, ssc, maxUnavailable, totalPods
  1191  }
  1192  
  1193  func TestStatefulSetControlRollingUpdateWithMaxUnavailableInOrderedModeVerifyInvariant(t *testing.T) {
  1194  	// Make all pods in statefulset unavailable one by one
  1195  	// and verify that RollingUpdate doesnt proceed with maxUnavailable set
  1196  	// this could have been a simple loop, keeping it like this to be able
  1197  	// to add more params here.
  1198  	testCases := []struct {
  1199  		ordinalOfPodToTerminate []int
  1200  	}{
  1201  
  1202  		{[]int{}},
  1203  		{[]int{5}},
  1204  		{[]int{3}},
  1205  		{[]int{4}},
  1206  		{[]int{5, 4}},
  1207  		{[]int{5, 3}},
  1208  		{[]int{4, 3}},
  1209  		{[]int{5, 4, 3}},
  1210  		{[]int{2}}, // note this is an ordinal greater than partition(3)
  1211  		{[]int{1}}, // note this is an ordinal greater than partition(3)
  1212  	}
  1213  	for _, tc := range testCases {
  1214  		defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.MaxUnavailableStatefulSet, true)()
  1215  		set, spc, ssc, maxUnavailable, totalPods := setupForInvariant(t)
  1216  		t.Run(fmt.Sprintf("terminating pod at ordinal %d", tc.ordinalOfPodToTerminate), func(t *testing.T) {
  1217  			status := apps.StatefulSetStatus{Replicas: int32(totalPods)}
  1218  			updateRevision := &apps.ControllerRevision{}
  1219  
  1220  			for i := 0; i < len(tc.ordinalOfPodToTerminate); i++ {
  1221  				// Ensure at least one pod is unavailable before trying to update
  1222  				_, err := spc.addTerminatingPod(set, tc.ordinalOfPodToTerminate[i])
  1223  				if err != nil {
  1224  					t.Fatal(err)
  1225  				}
  1226  			}
  1227  
  1228  			selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  1229  			if err != nil {
  1230  				t.Fatal(err)
  1231  			}
  1232  
  1233  			originalPods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  1234  			if err != nil {
  1235  				t.Fatal(err)
  1236  			}
  1237  
  1238  			sort.Sort(ascendingOrdinal(originalPods))
  1239  
  1240  			// start to update
  1241  			set.Spec.Template.Spec.Containers[0].Image = "foo"
  1242  
  1243  			// try to update the statefulset
  1244  			// this function is only called in main code when feature gate is enabled
  1245  			if _, err = updateStatefulSetAfterInvariantEstablished(context.TODO(), ssc.(*defaultStatefulSetControl), set, originalPods, updateRevision, status); err != nil {
  1246  				t.Fatal(err)
  1247  			}
  1248  			pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
  1249  			if err != nil {
  1250  				t.Fatal(err)
  1251  			}
  1252  
  1253  			sort.Sort(ascendingOrdinal(pods))
  1254  
  1255  			expecteddPodsToBeDeleted := maxUnavailable.IntValue() - len(tc.ordinalOfPodToTerminate)
  1256  			if expecteddPodsToBeDeleted < 0 {
  1257  				expecteddPodsToBeDeleted = 0
  1258  			}
  1259  
  1260  			expectedPodsAfterUpdate := int(totalPods) - expecteddPodsToBeDeleted
  1261  
  1262  			if len(pods) != expectedPodsAfterUpdate {
  1263  				t.Errorf("Expected pods %v, got pods %v", expectedPodsAfterUpdate, len(pods))
  1264  			}
  1265  
  1266  		})
  1267  	}
  1268  }
  1269  
  1270  func TestStatefulSetControlRollingUpdate(t *testing.T) {
  1271  	type testcase struct {
  1272  		name       string
  1273  		invariants func(set *apps.StatefulSet, om *fakeObjectManager) error
  1274  		initial    func() *apps.StatefulSet
  1275  		update     func(set *apps.StatefulSet) *apps.StatefulSet
  1276  		validate   func(set *apps.StatefulSet, pods []*v1.Pod) error
  1277  	}
  1278  
  1279  	testFn := func(test *testcase, t *testing.T) {
  1280  		set := test.initial()
  1281  		client := fake.NewSimpleClientset(set)
  1282  		om, _, ssc := setupController(client)
  1283  		if err := scaleUpStatefulSetControl(set, ssc, om, test.invariants); err != nil {
  1284  			t.Fatalf("%s: %s", test.name, err)
  1285  		}
  1286  		set, err := om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  1287  		if err != nil {
  1288  			t.Fatalf("%s: %s", test.name, err)
  1289  		}
  1290  		set = test.update(set)
  1291  		if err := updateStatefulSetControl(set, ssc, om, assertUpdateInvariants); err != nil {
  1292  			t.Fatalf("%s: %s", test.name, err)
  1293  		}
  1294  		selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  1295  		if err != nil {
  1296  			t.Fatalf("%s: %s", test.name, err)
  1297  		}
  1298  		pods, err := om.podsLister.Pods(set.Namespace).List(selector)
  1299  		if err != nil {
  1300  			t.Fatalf("%s: %s", test.name, err)
  1301  		}
  1302  		set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  1303  		if err != nil {
  1304  			t.Fatalf("%s: %s", test.name, err)
  1305  		}
  1306  		if err := test.validate(set, pods); err != nil {
  1307  			t.Fatalf("%s: %s", test.name, err)
  1308  		}
  1309  	}
  1310  
  1311  	tests := []testcase{
  1312  		{
  1313  			name:       "monotonic image update",
  1314  			invariants: assertMonotonicInvariants,
  1315  			initial: func() *apps.StatefulSet {
  1316  				return newStatefulSet(3)
  1317  			},
  1318  			update: func(set *apps.StatefulSet) *apps.StatefulSet {
  1319  				set.Spec.Template.Spec.Containers[0].Image = "foo"
  1320  				return set
  1321  			},
  1322  			validate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1323  				sort.Sort(ascendingOrdinal(pods))
  1324  				for i := range pods {
  1325  					if pods[i].Spec.Containers[0].Image != "foo" {
  1326  						return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  1327  					}
  1328  				}
  1329  				return nil
  1330  			},
  1331  		},
  1332  		{
  1333  			name:       "monotonic image update and scale up",
  1334  			invariants: assertMonotonicInvariants,
  1335  			initial: func() *apps.StatefulSet {
  1336  				return newStatefulSet(3)
  1337  			},
  1338  			update: func(set *apps.StatefulSet) *apps.StatefulSet {
  1339  				*set.Spec.Replicas = 5
  1340  				set.Spec.Template.Spec.Containers[0].Image = "foo"
  1341  				return set
  1342  			},
  1343  			validate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1344  				sort.Sort(ascendingOrdinal(pods))
  1345  				for i := range pods {
  1346  					if pods[i].Spec.Containers[0].Image != "foo" {
  1347  						return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  1348  					}
  1349  				}
  1350  				return nil
  1351  			},
  1352  		},
  1353  		{
  1354  			name:       "monotonic image update and scale down",
  1355  			invariants: assertMonotonicInvariants,
  1356  			initial: func() *apps.StatefulSet {
  1357  				return newStatefulSet(5)
  1358  			},
  1359  			update: func(set *apps.StatefulSet) *apps.StatefulSet {
  1360  				*set.Spec.Replicas = 3
  1361  				set.Spec.Template.Spec.Containers[0].Image = "foo"
  1362  				return set
  1363  			},
  1364  			validate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1365  				sort.Sort(ascendingOrdinal(pods))
  1366  				for i := range pods {
  1367  					if pods[i].Spec.Containers[0].Image != "foo" {
  1368  						return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  1369  					}
  1370  				}
  1371  				return nil
  1372  			},
  1373  		},
  1374  		{
  1375  			name:       "burst image update",
  1376  			invariants: assertBurstInvariants,
  1377  			initial: func() *apps.StatefulSet {
  1378  				return burst(newStatefulSet(3))
  1379  			},
  1380  			update: func(set *apps.StatefulSet) *apps.StatefulSet {
  1381  				set.Spec.Template.Spec.Containers[0].Image = "foo"
  1382  				return set
  1383  			},
  1384  			validate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1385  				sort.Sort(ascendingOrdinal(pods))
  1386  				for i := range pods {
  1387  					if pods[i].Spec.Containers[0].Image != "foo" {
  1388  						return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  1389  					}
  1390  				}
  1391  				return nil
  1392  			},
  1393  		},
  1394  		{
  1395  			name:       "burst image update and scale up",
  1396  			invariants: assertBurstInvariants,
  1397  			initial: func() *apps.StatefulSet {
  1398  				return burst(newStatefulSet(3))
  1399  			},
  1400  			update: func(set *apps.StatefulSet) *apps.StatefulSet {
  1401  				*set.Spec.Replicas = 5
  1402  				set.Spec.Template.Spec.Containers[0].Image = "foo"
  1403  				return set
  1404  			},
  1405  			validate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1406  				sort.Sort(ascendingOrdinal(pods))
  1407  				for i := range pods {
  1408  					if pods[i].Spec.Containers[0].Image != "foo" {
  1409  						return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  1410  					}
  1411  				}
  1412  				return nil
  1413  			},
  1414  		},
  1415  		{
  1416  			name:       "burst image update and scale down",
  1417  			invariants: assertBurstInvariants,
  1418  			initial: func() *apps.StatefulSet {
  1419  				return burst(newStatefulSet(5))
  1420  			},
  1421  			update: func(set *apps.StatefulSet) *apps.StatefulSet {
  1422  				*set.Spec.Replicas = 3
  1423  				set.Spec.Template.Spec.Containers[0].Image = "foo"
  1424  				return set
  1425  			},
  1426  			validate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1427  				sort.Sort(ascendingOrdinal(pods))
  1428  				for i := range pods {
  1429  					if pods[i].Spec.Containers[0].Image != "foo" {
  1430  						return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  1431  					}
  1432  				}
  1433  				return nil
  1434  			},
  1435  		},
  1436  	}
  1437  	for i := range tests {
  1438  		testFn(&tests[i], t)
  1439  	}
  1440  }
  1441  
  1442  func TestStatefulSetControlOnDeleteUpdate(t *testing.T) {
  1443  	type testcase struct {
  1444  		name            string
  1445  		invariants      func(set *apps.StatefulSet, om *fakeObjectManager) error
  1446  		initial         func() *apps.StatefulSet
  1447  		update          func(set *apps.StatefulSet) *apps.StatefulSet
  1448  		validateUpdate  func(set *apps.StatefulSet, pods []*v1.Pod) error
  1449  		validateRestart func(set *apps.StatefulSet, pods []*v1.Pod) error
  1450  	}
  1451  
  1452  	originalImage := newStatefulSet(3).Spec.Template.Spec.Containers[0].Image
  1453  
  1454  	testFn := func(t *testing.T, test *testcase, policy *apps.StatefulSetPersistentVolumeClaimRetentionPolicy) {
  1455  		set := test.initial()
  1456  		set.Spec.PersistentVolumeClaimRetentionPolicy = policy
  1457  		set.Spec.UpdateStrategy = apps.StatefulSetUpdateStrategy{Type: apps.OnDeleteStatefulSetStrategyType}
  1458  		client := fake.NewSimpleClientset(set)
  1459  		om, _, ssc := setupController(client)
  1460  		if err := scaleUpStatefulSetControl(set, ssc, om, test.invariants); err != nil {
  1461  			t.Fatalf("%s: %s", test.name, err)
  1462  		}
  1463  		set, err := om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  1464  		if err != nil {
  1465  			t.Fatalf("%s: %s", test.name, err)
  1466  		}
  1467  		set = test.update(set)
  1468  		if err := updateStatefulSetControl(set, ssc, om, assertUpdateInvariants); err != nil {
  1469  			t.Fatalf("%s: %s", test.name, err)
  1470  		}
  1471  
  1472  		// Pods may have been deleted in the update. Delete any claims with a pod ownerRef.
  1473  		selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  1474  		if err != nil {
  1475  			t.Fatalf("%s: %s", test.name, err)
  1476  		}
  1477  		claims, err := om.claimsLister.PersistentVolumeClaims(set.Namespace).List(selector)
  1478  		if err != nil {
  1479  			t.Fatalf("%s: %s", test.name, err)
  1480  		}
  1481  		for _, claim := range claims {
  1482  			for _, ref := range claim.GetOwnerReferences() {
  1483  				if strings.HasPrefix(ref.Name, "foo-") {
  1484  					om.claimsIndexer.Delete(claim)
  1485  					break
  1486  				}
  1487  			}
  1488  		}
  1489  
  1490  		set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  1491  		if err != nil {
  1492  			t.Fatalf("%s: %s", test.name, err)
  1493  		}
  1494  		pods, err := om.podsLister.Pods(set.Namespace).List(selector)
  1495  		if err != nil {
  1496  			t.Fatalf("%s: %s", test.name, err)
  1497  		}
  1498  		if err := test.validateUpdate(set, pods); err != nil {
  1499  			for i := range pods {
  1500  				t.Log(pods[i].Name)
  1501  			}
  1502  			t.Fatalf("%s: %s", test.name, err)
  1503  
  1504  		}
  1505  		claims, err = om.claimsLister.PersistentVolumeClaims(set.Namespace).List(selector)
  1506  		if err != nil {
  1507  			t.Fatalf("%s: %s", test.name, err)
  1508  		}
  1509  		for _, claim := range claims {
  1510  			for _, ref := range claim.GetOwnerReferences() {
  1511  				if strings.HasPrefix(ref.Name, "foo-") {
  1512  					t.Fatalf("Unexpected pod reference on %s: %v", claim.Name, claim.GetOwnerReferences())
  1513  				}
  1514  			}
  1515  		}
  1516  
  1517  		replicas := *set.Spec.Replicas
  1518  		*set.Spec.Replicas = 0
  1519  		if err := scaleDownStatefulSetControl(set, ssc, om, test.invariants); err != nil {
  1520  			t.Fatalf("%s: %s", test.name, err)
  1521  		}
  1522  		set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  1523  		if err != nil {
  1524  			t.Fatalf("%s: %s", test.name, err)
  1525  		}
  1526  		*set.Spec.Replicas = replicas
  1527  
  1528  		claims, err = om.claimsLister.PersistentVolumeClaims(set.Namespace).List(selector)
  1529  		if err != nil {
  1530  			t.Fatalf("%s: %s", test.name, err)
  1531  		}
  1532  		for _, claim := range claims {
  1533  			for _, ref := range claim.GetOwnerReferences() {
  1534  				if strings.HasPrefix(ref.Name, "foo-") {
  1535  					t.Fatalf("Unexpected pod reference on %s: %v", claim.Name, claim.GetOwnerReferences())
  1536  				}
  1537  			}
  1538  		}
  1539  
  1540  		if err := scaleUpStatefulSetControl(set, ssc, om, test.invariants); err != nil {
  1541  			t.Fatalf("%s: %s", test.name, err)
  1542  		}
  1543  		set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  1544  		if err != nil {
  1545  			t.Fatalf("%s: %s", test.name, err)
  1546  		}
  1547  		pods, err = om.podsLister.Pods(set.Namespace).List(selector)
  1548  		if err != nil {
  1549  			t.Fatalf("%s: %s", test.name, err)
  1550  		}
  1551  		if err := test.validateRestart(set, pods); err != nil {
  1552  			t.Fatalf("%s: %s", test.name, err)
  1553  		}
  1554  	}
  1555  
  1556  	tests := []testcase{
  1557  		{
  1558  			name:       "monotonic image update",
  1559  			invariants: assertMonotonicInvariants,
  1560  			initial: func() *apps.StatefulSet {
  1561  				return newStatefulSet(3)
  1562  			},
  1563  			update: func(set *apps.StatefulSet) *apps.StatefulSet {
  1564  				set.Spec.Template.Spec.Containers[0].Image = "foo"
  1565  				return set
  1566  			},
  1567  			validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1568  				sort.Sort(ascendingOrdinal(pods))
  1569  				for i := range pods {
  1570  					if pods[i].Spec.Containers[0].Image != originalImage {
  1571  						return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  1572  					}
  1573  				}
  1574  				return nil
  1575  			},
  1576  			validateRestart: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1577  				sort.Sort(ascendingOrdinal(pods))
  1578  				for i := range pods {
  1579  					if pods[i].Spec.Containers[0].Image != "foo" {
  1580  						return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  1581  					}
  1582  				}
  1583  				return nil
  1584  			},
  1585  		},
  1586  		{
  1587  			name:       "monotonic image update and scale up",
  1588  			invariants: assertMonotonicInvariants,
  1589  			initial: func() *apps.StatefulSet {
  1590  				return newStatefulSet(3)
  1591  			},
  1592  			update: func(set *apps.StatefulSet) *apps.StatefulSet {
  1593  				*set.Spec.Replicas = 5
  1594  				set.Spec.Template.Spec.Containers[0].Image = "foo"
  1595  				return set
  1596  			},
  1597  			validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1598  				sort.Sort(ascendingOrdinal(pods))
  1599  				for i := range pods {
  1600  					if i < 3 && pods[i].Spec.Containers[0].Image != originalImage {
  1601  						return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  1602  					}
  1603  					if i >= 3 && pods[i].Spec.Containers[0].Image != "foo" {
  1604  						return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  1605  					}
  1606  				}
  1607  				return nil
  1608  			},
  1609  			validateRestart: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1610  				sort.Sort(ascendingOrdinal(pods))
  1611  				for i := range pods {
  1612  					if pods[i].Spec.Containers[0].Image != "foo" {
  1613  						return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  1614  					}
  1615  				}
  1616  				return nil
  1617  			},
  1618  		},
  1619  		{
  1620  			name:       "monotonic image update and scale down",
  1621  			invariants: assertMonotonicInvariants,
  1622  			initial: func() *apps.StatefulSet {
  1623  				return newStatefulSet(5)
  1624  			},
  1625  			update: func(set *apps.StatefulSet) *apps.StatefulSet {
  1626  				*set.Spec.Replicas = 3
  1627  				set.Spec.Template.Spec.Containers[0].Image = "foo"
  1628  				return set
  1629  			},
  1630  			validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1631  				sort.Sort(ascendingOrdinal(pods))
  1632  				for i := range pods {
  1633  					if pods[i].Spec.Containers[0].Image != originalImage {
  1634  						return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  1635  					}
  1636  				}
  1637  				return nil
  1638  			},
  1639  			validateRestart: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1640  				sort.Sort(ascendingOrdinal(pods))
  1641  				for i := range pods {
  1642  					if pods[i].Spec.Containers[0].Image != "foo" {
  1643  						return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  1644  					}
  1645  				}
  1646  				return nil
  1647  			},
  1648  		},
  1649  		{
  1650  			name:       "burst image update",
  1651  			invariants: assertBurstInvariants,
  1652  			initial: func() *apps.StatefulSet {
  1653  				return burst(newStatefulSet(3))
  1654  			},
  1655  			update: func(set *apps.StatefulSet) *apps.StatefulSet {
  1656  				set.Spec.Template.Spec.Containers[0].Image = "foo"
  1657  				return set
  1658  			},
  1659  			validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1660  				sort.Sort(ascendingOrdinal(pods))
  1661  				for i := range pods {
  1662  					if pods[i].Spec.Containers[0].Image != originalImage {
  1663  						return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  1664  					}
  1665  				}
  1666  				return nil
  1667  			},
  1668  			validateRestart: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1669  				sort.Sort(ascendingOrdinal(pods))
  1670  				for i := range pods {
  1671  					if pods[i].Spec.Containers[0].Image != "foo" {
  1672  						return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  1673  					}
  1674  				}
  1675  				return nil
  1676  			},
  1677  		},
  1678  		{
  1679  			name:       "burst image update and scale up",
  1680  			invariants: assertBurstInvariants,
  1681  			initial: func() *apps.StatefulSet {
  1682  				return burst(newStatefulSet(3))
  1683  			},
  1684  			update: func(set *apps.StatefulSet) *apps.StatefulSet {
  1685  				*set.Spec.Replicas = 5
  1686  				set.Spec.Template.Spec.Containers[0].Image = "foo"
  1687  				return set
  1688  			},
  1689  			validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1690  				sort.Sort(ascendingOrdinal(pods))
  1691  				for i := range pods {
  1692  					if i < 3 && pods[i].Spec.Containers[0].Image != originalImage {
  1693  						return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  1694  					}
  1695  					if i >= 3 && pods[i].Spec.Containers[0].Image != "foo" {
  1696  						return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  1697  					}
  1698  				}
  1699  				return nil
  1700  			},
  1701  			validateRestart: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1702  				sort.Sort(ascendingOrdinal(pods))
  1703  				for i := range pods {
  1704  					if pods[i].Spec.Containers[0].Image != "foo" {
  1705  						return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  1706  					}
  1707  				}
  1708  				return nil
  1709  			},
  1710  		},
  1711  		{
  1712  			name:       "burst image update and scale down",
  1713  			invariants: assertBurstInvariants,
  1714  			initial: func() *apps.StatefulSet {
  1715  				return burst(newStatefulSet(5))
  1716  			},
  1717  			update: func(set *apps.StatefulSet) *apps.StatefulSet {
  1718  				*set.Spec.Replicas = 3
  1719  				set.Spec.Template.Spec.Containers[0].Image = "foo"
  1720  				return set
  1721  			},
  1722  			validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1723  				sort.Sort(ascendingOrdinal(pods))
  1724  				for i := range pods {
  1725  					if pods[i].Spec.Containers[0].Image != originalImage {
  1726  						return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  1727  					}
  1728  				}
  1729  				return nil
  1730  			},
  1731  			validateRestart: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1732  				sort.Sort(ascendingOrdinal(pods))
  1733  				for i := range pods {
  1734  					if pods[i].Spec.Containers[0].Image != "foo" {
  1735  						return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  1736  					}
  1737  				}
  1738  				return nil
  1739  			},
  1740  		},
  1741  	}
  1742  	runTestOverPVCRetentionPolicies(t, "", func(t *testing.T, policy *apps.StatefulSetPersistentVolumeClaimRetentionPolicy) {
  1743  		for i := range tests {
  1744  			testFn(t, &tests[i], policy)
  1745  		}
  1746  	})
  1747  }
  1748  
  1749  func TestStatefulSetControlRollingUpdateWithPartition(t *testing.T) {
  1750  	type testcase struct {
  1751  		name       string
  1752  		partition  int32
  1753  		invariants func(set *apps.StatefulSet, om *fakeObjectManager) error
  1754  		initial    func() *apps.StatefulSet
  1755  		update     func(set *apps.StatefulSet) *apps.StatefulSet
  1756  		validate   func(set *apps.StatefulSet, pods []*v1.Pod) error
  1757  	}
  1758  
  1759  	testFn := func(t *testing.T, test *testcase, policy *apps.StatefulSetPersistentVolumeClaimRetentionPolicy) {
  1760  		set := test.initial()
  1761  		set.Spec.PersistentVolumeClaimRetentionPolicy = policy
  1762  		set.Spec.UpdateStrategy = apps.StatefulSetUpdateStrategy{
  1763  			Type: apps.RollingUpdateStatefulSetStrategyType,
  1764  			RollingUpdate: func() *apps.RollingUpdateStatefulSetStrategy {
  1765  				return &apps.RollingUpdateStatefulSetStrategy{Partition: &test.partition}
  1766  			}(),
  1767  		}
  1768  		client := fake.NewSimpleClientset(set)
  1769  		om, _, ssc := setupController(client)
  1770  		if err := scaleUpStatefulSetControl(set, ssc, om, test.invariants); err != nil {
  1771  			t.Fatalf("%s: %s", test.name, err)
  1772  		}
  1773  		set, err := om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  1774  		if err != nil {
  1775  			t.Fatalf("%s: %s", test.name, err)
  1776  		}
  1777  		set = test.update(set)
  1778  		if err := updateStatefulSetControl(set, ssc, om, assertUpdateInvariants); err != nil {
  1779  			t.Fatalf("%s: %s", test.name, err)
  1780  		}
  1781  		selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  1782  		if err != nil {
  1783  			t.Fatalf("%s: %s", test.name, err)
  1784  		}
  1785  		pods, err := om.podsLister.Pods(set.Namespace).List(selector)
  1786  		if err != nil {
  1787  			t.Fatalf("%s: %s", test.name, err)
  1788  		}
  1789  		set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  1790  		if err != nil {
  1791  			t.Fatalf("%s: %s", test.name, err)
  1792  		}
  1793  		if err := test.validate(set, pods); err != nil {
  1794  			t.Fatalf("%s: %s", test.name, err)
  1795  		}
  1796  	}
  1797  
  1798  	originalImage := newStatefulSet(3).Spec.Template.Spec.Containers[0].Image
  1799  
  1800  	tests := []testcase{
  1801  		{
  1802  			name:       "monotonic image update",
  1803  			invariants: assertMonotonicInvariants,
  1804  			partition:  2,
  1805  			initial: func() *apps.StatefulSet {
  1806  				return newStatefulSet(3)
  1807  			},
  1808  			update: func(set *apps.StatefulSet) *apps.StatefulSet {
  1809  				set.Spec.Template.Spec.Containers[0].Image = "foo"
  1810  				return set
  1811  			},
  1812  			validate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1813  				sort.Sort(ascendingOrdinal(pods))
  1814  				for i := range pods {
  1815  					if i < 2 && pods[i].Spec.Containers[0].Image != originalImage {
  1816  						return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  1817  					}
  1818  					if i >= 2 && pods[i].Spec.Containers[0].Image != "foo" {
  1819  						return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  1820  					}
  1821  				}
  1822  				return nil
  1823  			},
  1824  		},
  1825  		{
  1826  			name:       "monotonic image update and scale up",
  1827  			partition:  2,
  1828  			invariants: assertMonotonicInvariants,
  1829  			initial: func() *apps.StatefulSet {
  1830  				return newStatefulSet(3)
  1831  			},
  1832  			update: func(set *apps.StatefulSet) *apps.StatefulSet {
  1833  				*set.Spec.Replicas = 5
  1834  				set.Spec.Template.Spec.Containers[0].Image = "foo"
  1835  				return set
  1836  			},
  1837  			validate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1838  				sort.Sort(ascendingOrdinal(pods))
  1839  				for i := range pods {
  1840  					if i < 2 && pods[i].Spec.Containers[0].Image != originalImage {
  1841  						return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  1842  					}
  1843  					if i >= 2 && pods[i].Spec.Containers[0].Image != "foo" {
  1844  						return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  1845  					}
  1846  				}
  1847  				return nil
  1848  			},
  1849  		},
  1850  		{
  1851  			name:       "burst image update",
  1852  			partition:  2,
  1853  			invariants: assertBurstInvariants,
  1854  			initial: func() *apps.StatefulSet {
  1855  				return burst(newStatefulSet(3))
  1856  			},
  1857  			update: func(set *apps.StatefulSet) *apps.StatefulSet {
  1858  				set.Spec.Template.Spec.Containers[0].Image = "foo"
  1859  				return set
  1860  			},
  1861  			validate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1862  				sort.Sort(ascendingOrdinal(pods))
  1863  				for i := range pods {
  1864  					if i < 2 && pods[i].Spec.Containers[0].Image != originalImage {
  1865  						return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  1866  					}
  1867  					if i >= 2 && pods[i].Spec.Containers[0].Image != "foo" {
  1868  						return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  1869  					}
  1870  				}
  1871  				return nil
  1872  			},
  1873  		},
  1874  		{
  1875  			name:       "burst image update and scale up",
  1876  			invariants: assertBurstInvariants,
  1877  			partition:  2,
  1878  			initial: func() *apps.StatefulSet {
  1879  				return burst(newStatefulSet(3))
  1880  			},
  1881  			update: func(set *apps.StatefulSet) *apps.StatefulSet {
  1882  				*set.Spec.Replicas = 5
  1883  				set.Spec.Template.Spec.Containers[0].Image = "foo"
  1884  				return set
  1885  			},
  1886  			validate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  1887  				sort.Sort(ascendingOrdinal(pods))
  1888  				for i := range pods {
  1889  					if i < 2 && pods[i].Spec.Containers[0].Image != originalImage {
  1890  						return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  1891  					}
  1892  					if i >= 2 && pods[i].Spec.Containers[0].Image != "foo" {
  1893  						return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  1894  					}
  1895  				}
  1896  				return nil
  1897  			},
  1898  		},
  1899  	}
  1900  	runTestOverPVCRetentionPolicies(t, "", func(t *testing.T, policy *apps.StatefulSetPersistentVolumeClaimRetentionPolicy) {
  1901  		for i := range tests {
  1902  			testFn(t, &tests[i], policy)
  1903  		}
  1904  	})
  1905  }
  1906  
  1907  func TestStatefulSetHonorRevisionHistoryLimit(t *testing.T) {
  1908  	runTestOverPVCRetentionPolicies(t, "", func(t *testing.T, policy *apps.StatefulSetPersistentVolumeClaimRetentionPolicy) {
  1909  		invariants := assertMonotonicInvariants
  1910  		set := newStatefulSet(3)
  1911  		set.Spec.PersistentVolumeClaimRetentionPolicy = policy
  1912  		client := fake.NewSimpleClientset(set)
  1913  		om, ssu, ssc := setupController(client)
  1914  
  1915  		if err := scaleUpStatefulSetControl(set, ssc, om, invariants); err != nil {
  1916  			t.Errorf("Failed to turn up StatefulSet : %s", err)
  1917  		}
  1918  		var err error
  1919  		set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  1920  		if err != nil {
  1921  			t.Fatalf("Error getting updated StatefulSet: %v", err)
  1922  		}
  1923  
  1924  		for i := 0; i < int(*set.Spec.RevisionHistoryLimit)+5; i++ {
  1925  			set.Spec.Template.Spec.Containers[0].Image = fmt.Sprintf("foo-%d", i)
  1926  			ssu.SetUpdateStatefulSetStatusError(apierrors.NewInternalError(errors.New("API server failed")), 2)
  1927  			updateStatefulSetControl(set, ssc, om, assertUpdateInvariants)
  1928  			set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  1929  			if err != nil {
  1930  				t.Fatalf("Error getting updated StatefulSet: %v", err)
  1931  			}
  1932  			revisions, err := ssc.ListRevisions(set)
  1933  			if err != nil {
  1934  				t.Fatalf("Error listing revisions: %v", err)
  1935  			}
  1936  			// the extra 2 revisions are `currentRevision` and `updateRevision`
  1937  			// They're considered as `live`, and truncateHistory only cleans up non-live revisions
  1938  			if len(revisions) > int(*set.Spec.RevisionHistoryLimit)+2 {
  1939  				t.Fatalf("%s: %d greater than limit %d", "", len(revisions), *set.Spec.RevisionHistoryLimit)
  1940  			}
  1941  		}
  1942  	})
  1943  }
  1944  
  1945  func TestStatefulSetControlLimitsHistory(t *testing.T) {
  1946  	type testcase struct {
  1947  		name       string
  1948  		invariants func(set *apps.StatefulSet, om *fakeObjectManager) error
  1949  		initial    func() *apps.StatefulSet
  1950  	}
  1951  
  1952  	testFn := func(t *testing.T, test *testcase) {
  1953  		set := test.initial()
  1954  		client := fake.NewSimpleClientset(set)
  1955  		om, _, ssc := setupController(client)
  1956  		if err := scaleUpStatefulSetControl(set, ssc, om, test.invariants); err != nil {
  1957  			t.Fatalf("%s: %s", test.name, err)
  1958  		}
  1959  		set, err := om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  1960  		if err != nil {
  1961  			t.Fatalf("%s: %s", test.name, err)
  1962  		}
  1963  		for i := 0; i < 10; i++ {
  1964  			set.Spec.Template.Spec.Containers[0].Image = fmt.Sprintf("foo-%d", i)
  1965  			if err := updateStatefulSetControl(set, ssc, om, assertUpdateInvariants); err != nil {
  1966  				t.Fatalf("%s: %s", test.name, err)
  1967  			}
  1968  			selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  1969  			if err != nil {
  1970  				t.Fatalf("%s: %s", test.name, err)
  1971  			}
  1972  			pods, err := om.podsLister.Pods(set.Namespace).List(selector)
  1973  			if err != nil {
  1974  				t.Fatalf("%s: %s", test.name, err)
  1975  			}
  1976  			set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  1977  			if err != nil {
  1978  				t.Fatalf("%s: %s", test.name, err)
  1979  			}
  1980  			_, err = ssc.UpdateStatefulSet(context.TODO(), set, pods)
  1981  			if err != nil {
  1982  				t.Fatalf("%s: %s", test.name, err)
  1983  			}
  1984  			revisions, err := ssc.ListRevisions(set)
  1985  			if err != nil {
  1986  				t.Fatalf("%s: %s", test.name, err)
  1987  			}
  1988  			if len(revisions) > int(*set.Spec.RevisionHistoryLimit)+2 {
  1989  				t.Fatalf("%s: %d greater than limit %d", test.name, len(revisions), *set.Spec.RevisionHistoryLimit)
  1990  			}
  1991  		}
  1992  	}
  1993  
  1994  	tests := []testcase{
  1995  		{
  1996  			name:       "monotonic update",
  1997  			invariants: assertMonotonicInvariants,
  1998  			initial: func() *apps.StatefulSet {
  1999  				return newStatefulSet(3)
  2000  			},
  2001  		},
  2002  		{
  2003  			name:       "burst update",
  2004  			invariants: assertBurstInvariants,
  2005  			initial: func() *apps.StatefulSet {
  2006  				return burst(newStatefulSet(3))
  2007  			},
  2008  		},
  2009  	}
  2010  	for i := range tests {
  2011  		testFn(t, &tests[i])
  2012  	}
  2013  }
  2014  
  2015  func TestStatefulSetControlRollback(t *testing.T) {
  2016  	type testcase struct {
  2017  		name             string
  2018  		invariants       func(set *apps.StatefulSet, om *fakeObjectManager) error
  2019  		initial          func() *apps.StatefulSet
  2020  		update           func(set *apps.StatefulSet) *apps.StatefulSet
  2021  		validateUpdate   func(set *apps.StatefulSet, pods []*v1.Pod) error
  2022  		validateRollback func(set *apps.StatefulSet, pods []*v1.Pod) error
  2023  	}
  2024  
  2025  	originalImage := newStatefulSet(3).Spec.Template.Spec.Containers[0].Image
  2026  
  2027  	testFn := func(t *testing.T, test *testcase) {
  2028  		set := test.initial()
  2029  		client := fake.NewSimpleClientset(set)
  2030  		om, _, ssc := setupController(client)
  2031  		if err := scaleUpStatefulSetControl(set, ssc, om, test.invariants); err != nil {
  2032  			t.Fatalf("%s: %s", test.name, err)
  2033  		}
  2034  		set, err := om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  2035  		if err != nil {
  2036  			t.Fatalf("%s: %s", test.name, err)
  2037  		}
  2038  		set = test.update(set)
  2039  		if err := updateStatefulSetControl(set, ssc, om, assertUpdateInvariants); err != nil {
  2040  			t.Fatalf("%s: %s", test.name, err)
  2041  		}
  2042  		selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  2043  		if err != nil {
  2044  			t.Fatalf("%s: %s", test.name, err)
  2045  		}
  2046  		pods, err := om.podsLister.Pods(set.Namespace).List(selector)
  2047  		if err != nil {
  2048  			t.Fatalf("%s: %s", test.name, err)
  2049  		}
  2050  		set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  2051  		if err != nil {
  2052  			t.Fatalf("%s: %s", test.name, err)
  2053  		}
  2054  		if err := test.validateUpdate(set, pods); err != nil {
  2055  			t.Fatalf("%s: %s", test.name, err)
  2056  		}
  2057  		revisions, err := ssc.ListRevisions(set)
  2058  		if err != nil {
  2059  			t.Fatalf("%s: %s", test.name, err)
  2060  		}
  2061  		history.SortControllerRevisions(revisions)
  2062  		set, err = ApplyRevision(set, revisions[0])
  2063  		if err != nil {
  2064  			t.Fatalf("%s: %s", test.name, err)
  2065  		}
  2066  		if err := updateStatefulSetControl(set, ssc, om, assertUpdateInvariants); err != nil {
  2067  			t.Fatalf("%s: %s", test.name, err)
  2068  		}
  2069  		if err != nil {
  2070  			t.Fatalf("%s: %s", test.name, err)
  2071  		}
  2072  		pods, err = om.podsLister.Pods(set.Namespace).List(selector)
  2073  		if err != nil {
  2074  			t.Fatalf("%s: %s", test.name, err)
  2075  		}
  2076  		set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  2077  		if err != nil {
  2078  			t.Fatalf("%s: %s", test.name, err)
  2079  		}
  2080  		if err := test.validateRollback(set, pods); err != nil {
  2081  			t.Fatalf("%s: %s", test.name, err)
  2082  		}
  2083  	}
  2084  
  2085  	tests := []testcase{
  2086  		{
  2087  			name:       "monotonic image update",
  2088  			invariants: assertMonotonicInvariants,
  2089  			initial: func() *apps.StatefulSet {
  2090  				return newStatefulSet(3)
  2091  			},
  2092  			update: func(set *apps.StatefulSet) *apps.StatefulSet {
  2093  				set.Spec.Template.Spec.Containers[0].Image = "foo"
  2094  				return set
  2095  			},
  2096  			validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  2097  				sort.Sort(ascendingOrdinal(pods))
  2098  				for i := range pods {
  2099  					if pods[i].Spec.Containers[0].Image != "foo" {
  2100  						return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  2101  					}
  2102  				}
  2103  				return nil
  2104  			},
  2105  			validateRollback: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  2106  				sort.Sort(ascendingOrdinal(pods))
  2107  				for i := range pods {
  2108  					if pods[i].Spec.Containers[0].Image != originalImage {
  2109  						return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  2110  					}
  2111  				}
  2112  				return nil
  2113  			},
  2114  		},
  2115  		{
  2116  			name:       "monotonic image update and scale up",
  2117  			invariants: assertMonotonicInvariants,
  2118  			initial: func() *apps.StatefulSet {
  2119  				return newStatefulSet(3)
  2120  			},
  2121  			update: func(set *apps.StatefulSet) *apps.StatefulSet {
  2122  				*set.Spec.Replicas = 5
  2123  				set.Spec.Template.Spec.Containers[0].Image = "foo"
  2124  				return set
  2125  			},
  2126  			validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  2127  				sort.Sort(ascendingOrdinal(pods))
  2128  				for i := range pods {
  2129  					if pods[i].Spec.Containers[0].Image != "foo" {
  2130  						return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  2131  					}
  2132  				}
  2133  				return nil
  2134  			},
  2135  			validateRollback: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  2136  				sort.Sort(ascendingOrdinal(pods))
  2137  				for i := range pods {
  2138  					if pods[i].Spec.Containers[0].Image != originalImage {
  2139  						return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  2140  					}
  2141  				}
  2142  				return nil
  2143  			},
  2144  		},
  2145  		{
  2146  			name:       "monotonic image update and scale down",
  2147  			invariants: assertMonotonicInvariants,
  2148  			initial: func() *apps.StatefulSet {
  2149  				return newStatefulSet(5)
  2150  			},
  2151  			update: func(set *apps.StatefulSet) *apps.StatefulSet {
  2152  				*set.Spec.Replicas = 3
  2153  				set.Spec.Template.Spec.Containers[0].Image = "foo"
  2154  				return set
  2155  			},
  2156  			validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  2157  				sort.Sort(ascendingOrdinal(pods))
  2158  				for i := range pods {
  2159  					if pods[i].Spec.Containers[0].Image != "foo" {
  2160  						return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  2161  					}
  2162  				}
  2163  				return nil
  2164  			},
  2165  			validateRollback: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  2166  				sort.Sort(ascendingOrdinal(pods))
  2167  				for i := range pods {
  2168  					if pods[i].Spec.Containers[0].Image != originalImage {
  2169  						return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  2170  					}
  2171  				}
  2172  				return nil
  2173  			},
  2174  		},
  2175  		{
  2176  			name:       "burst image update",
  2177  			invariants: assertBurstInvariants,
  2178  			initial: func() *apps.StatefulSet {
  2179  				return burst(newStatefulSet(3))
  2180  			},
  2181  			update: func(set *apps.StatefulSet) *apps.StatefulSet {
  2182  				set.Spec.Template.Spec.Containers[0].Image = "foo"
  2183  				return set
  2184  			},
  2185  			validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  2186  				sort.Sort(ascendingOrdinal(pods))
  2187  				for i := range pods {
  2188  					if pods[i].Spec.Containers[0].Image != "foo" {
  2189  						return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  2190  					}
  2191  				}
  2192  				return nil
  2193  			},
  2194  			validateRollback: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  2195  				sort.Sort(ascendingOrdinal(pods))
  2196  				for i := range pods {
  2197  					if pods[i].Spec.Containers[0].Image != originalImage {
  2198  						return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  2199  					}
  2200  				}
  2201  				return nil
  2202  			},
  2203  		},
  2204  		{
  2205  			name:       "burst image update and scale up",
  2206  			invariants: assertBurstInvariants,
  2207  			initial: func() *apps.StatefulSet {
  2208  				return burst(newStatefulSet(3))
  2209  			},
  2210  			update: func(set *apps.StatefulSet) *apps.StatefulSet {
  2211  				*set.Spec.Replicas = 5
  2212  				set.Spec.Template.Spec.Containers[0].Image = "foo"
  2213  				return set
  2214  			},
  2215  			validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  2216  				sort.Sort(ascendingOrdinal(pods))
  2217  				for i := range pods {
  2218  					if pods[i].Spec.Containers[0].Image != "foo" {
  2219  						return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  2220  					}
  2221  				}
  2222  				return nil
  2223  			},
  2224  			validateRollback: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  2225  				sort.Sort(ascendingOrdinal(pods))
  2226  				for i := range pods {
  2227  					if pods[i].Spec.Containers[0].Image != originalImage {
  2228  						return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  2229  					}
  2230  				}
  2231  				return nil
  2232  			},
  2233  		},
  2234  		{
  2235  			name:       "burst image update and scale down",
  2236  			invariants: assertBurstInvariants,
  2237  			initial: func() *apps.StatefulSet {
  2238  				return burst(newStatefulSet(5))
  2239  			},
  2240  			update: func(set *apps.StatefulSet) *apps.StatefulSet {
  2241  				*set.Spec.Replicas = 3
  2242  				set.Spec.Template.Spec.Containers[0].Image = "foo"
  2243  				return set
  2244  			},
  2245  			validateUpdate: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  2246  				sort.Sort(ascendingOrdinal(pods))
  2247  				for i := range pods {
  2248  					if pods[i].Spec.Containers[0].Image != "foo" {
  2249  						return fmt.Errorf("want pod %s image foo found %s", pods[i].Name, pods[i].Spec.Containers[0].Image)
  2250  					}
  2251  				}
  2252  				return nil
  2253  			},
  2254  			validateRollback: func(set *apps.StatefulSet, pods []*v1.Pod) error {
  2255  				sort.Sort(ascendingOrdinal(pods))
  2256  				for i := range pods {
  2257  					if pods[i].Spec.Containers[0].Image != originalImage {
  2258  						return fmt.Errorf("want pod %s image %s found %s", pods[i].Name, originalImage, pods[i].Spec.Containers[0].Image)
  2259  					}
  2260  				}
  2261  				return nil
  2262  			},
  2263  		},
  2264  	}
  2265  	for i := range tests {
  2266  		testFn(t, &tests[i])
  2267  	}
  2268  }
  2269  
  2270  func TestStatefulSetAvailability(t *testing.T) {
  2271  	tests := []struct {
  2272  		name                   string
  2273  		inputSTS               *apps.StatefulSet
  2274  		expectedActiveReplicas int32
  2275  		readyDuration          time.Duration
  2276  	}{
  2277  		{
  2278  			name:                   "replicas running for required time, when minReadySeconds is enabled",
  2279  			inputSTS:               setMinReadySeconds(newStatefulSet(1), int32(3600)),
  2280  			readyDuration:          -120 * time.Minute,
  2281  			expectedActiveReplicas: int32(1),
  2282  		},
  2283  		{
  2284  			name:                   "replicas not running for required time, when minReadySeconds is enabled",
  2285  			inputSTS:               setMinReadySeconds(newStatefulSet(1), int32(3600)),
  2286  			readyDuration:          -30 * time.Minute,
  2287  			expectedActiveReplicas: int32(0),
  2288  		},
  2289  	}
  2290  	for _, test := range tests {
  2291  		set := test.inputSTS
  2292  		client := fake.NewSimpleClientset(set)
  2293  		spc, _, ssc := setupController(client)
  2294  		if err := scaleUpStatefulSetControl(set, ssc, spc, assertBurstInvariants); err != nil {
  2295  			t.Fatalf("%s: %s", test.name, err)
  2296  		}
  2297  		selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  2298  		if err != nil {
  2299  			t.Fatalf("%s: %s", test.name, err)
  2300  		}
  2301  		_, err = spc.podsLister.Pods(set.Namespace).List(selector)
  2302  		if err != nil {
  2303  			t.Fatalf("%s: %s", test.name, err)
  2304  		}
  2305  		set, err = spc.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  2306  		if err != nil {
  2307  			t.Fatalf("%s: %s", test.name, err)
  2308  		}
  2309  		pods, err := spc.setPodAvailable(set, 0, time.Now().Add(test.readyDuration))
  2310  		if err != nil {
  2311  			t.Fatalf("%s: %s", test.name, err)
  2312  		}
  2313  		status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods)
  2314  		if err != nil {
  2315  			t.Fatalf("%s: %s", test.name, err)
  2316  		}
  2317  		if status.AvailableReplicas != test.expectedActiveReplicas {
  2318  			t.Fatalf("expected %d active replicas got %d", test.expectedActiveReplicas, status.AvailableReplicas)
  2319  		}
  2320  	}
  2321  }
  2322  
  2323  func TestStatefulSetStatusUpdate(t *testing.T) {
  2324  	var (
  2325  		syncErr   = fmt.Errorf("sync error")
  2326  		statusErr = fmt.Errorf("status error")
  2327  	)
  2328  
  2329  	testCases := []struct {
  2330  		desc string
  2331  
  2332  		hasSyncErr   bool
  2333  		hasStatusErr bool
  2334  
  2335  		expectedErr error
  2336  	}{
  2337  		{
  2338  			desc:         "no error",
  2339  			hasSyncErr:   false,
  2340  			hasStatusErr: false,
  2341  			expectedErr:  nil,
  2342  		},
  2343  		{
  2344  			desc:         "sync error",
  2345  			hasSyncErr:   true,
  2346  			hasStatusErr: false,
  2347  			expectedErr:  syncErr,
  2348  		},
  2349  		{
  2350  			desc:         "status error",
  2351  			hasSyncErr:   false,
  2352  			hasStatusErr: true,
  2353  			expectedErr:  statusErr,
  2354  		},
  2355  		{
  2356  			desc:         "sync and status error",
  2357  			hasSyncErr:   true,
  2358  			hasStatusErr: true,
  2359  			expectedErr:  syncErr,
  2360  		},
  2361  	}
  2362  
  2363  	for _, tc := range testCases {
  2364  		t.Run(tc.desc, func(t *testing.T) {
  2365  			set := newStatefulSet(3)
  2366  			client := fake.NewSimpleClientset(set)
  2367  			om, ssu, ssc := setupController(client)
  2368  
  2369  			if tc.hasSyncErr {
  2370  				om.SetCreateStatefulPodError(syncErr, 0)
  2371  			}
  2372  			if tc.hasStatusErr {
  2373  				ssu.SetUpdateStatefulSetStatusError(statusErr, 0)
  2374  			}
  2375  
  2376  			selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  2377  			if err != nil {
  2378  				t.Error(err)
  2379  			}
  2380  			pods, err := om.podsLister.Pods(set.Namespace).List(selector)
  2381  			if err != nil {
  2382  				t.Error(err)
  2383  			}
  2384  			_, err = ssc.UpdateStatefulSet(context.TODO(), set, pods)
  2385  			if ssu.updateStatusTracker.requests != 1 {
  2386  				t.Errorf("Did not update status")
  2387  			}
  2388  			if !errors.Is(err, tc.expectedErr) {
  2389  				t.Errorf("Expected error: %v, got: %v", tc.expectedErr, err)
  2390  			}
  2391  		})
  2392  	}
  2393  }
  2394  
  2395  type requestTracker struct {
  2396  	sync.Mutex
  2397  	requests int
  2398  	err      error
  2399  	after    int
  2400  
  2401  	parallelLock sync.Mutex
  2402  	parallel     int
  2403  	maxParallel  int
  2404  
  2405  	delay time.Duration
  2406  }
  2407  
  2408  func (rt *requestTracker) errorReady() bool {
  2409  	rt.Lock()
  2410  	defer rt.Unlock()
  2411  	return rt.err != nil && rt.requests >= rt.after
  2412  }
  2413  
  2414  func (rt *requestTracker) inc() {
  2415  	rt.parallelLock.Lock()
  2416  	rt.parallel++
  2417  	if rt.maxParallel < rt.parallel {
  2418  		rt.maxParallel = rt.parallel
  2419  	}
  2420  	rt.parallelLock.Unlock()
  2421  
  2422  	rt.Lock()
  2423  	defer rt.Unlock()
  2424  	rt.requests++
  2425  	if rt.delay != 0 {
  2426  		time.Sleep(rt.delay)
  2427  	}
  2428  }
  2429  
  2430  func (rt *requestTracker) reset() {
  2431  	rt.parallelLock.Lock()
  2432  	rt.parallel = 0
  2433  	rt.parallelLock.Unlock()
  2434  
  2435  	rt.Lock()
  2436  	defer rt.Unlock()
  2437  	rt.err = nil
  2438  	rt.after = 0
  2439  	rt.delay = 0
  2440  }
  2441  
  2442  func (rt *requestTracker) getErr() error {
  2443  	rt.Lock()
  2444  	defer rt.Unlock()
  2445  	return rt.err
  2446  }
  2447  
  2448  func newRequestTracker(requests int, err error, after int) requestTracker {
  2449  	return requestTracker{
  2450  		requests: requests,
  2451  		err:      err,
  2452  		after:    after,
  2453  	}
  2454  }
  2455  
  2456  type fakeObjectManager struct {
  2457  	podsLister       corelisters.PodLister
  2458  	claimsLister     corelisters.PersistentVolumeClaimLister
  2459  	setsLister       appslisters.StatefulSetLister
  2460  	podsIndexer      cache.Indexer
  2461  	claimsIndexer    cache.Indexer
  2462  	setsIndexer      cache.Indexer
  2463  	revisionsIndexer cache.Indexer
  2464  	createPodTracker requestTracker
  2465  	updatePodTracker requestTracker
  2466  	deletePodTracker requestTracker
  2467  }
  2468  
  2469  func newFakeObjectManager(informerFactory informers.SharedInformerFactory) *fakeObjectManager {
  2470  	podInformer := informerFactory.Core().V1().Pods()
  2471  	claimInformer := informerFactory.Core().V1().PersistentVolumeClaims()
  2472  	setInformer := informerFactory.Apps().V1().StatefulSets()
  2473  	revisionInformer := informerFactory.Apps().V1().ControllerRevisions()
  2474  
  2475  	return &fakeObjectManager{
  2476  		podInformer.Lister(),
  2477  		claimInformer.Lister(),
  2478  		setInformer.Lister(),
  2479  		podInformer.Informer().GetIndexer(),
  2480  		claimInformer.Informer().GetIndexer(),
  2481  		setInformer.Informer().GetIndexer(),
  2482  		revisionInformer.Informer().GetIndexer(),
  2483  		newRequestTracker(0, nil, 0),
  2484  		newRequestTracker(0, nil, 0),
  2485  		newRequestTracker(0, nil, 0),
  2486  	}
  2487  }
  2488  
  2489  func (om *fakeObjectManager) CreatePod(ctx context.Context, pod *v1.Pod) error {
  2490  	defer om.createPodTracker.inc()
  2491  	if om.createPodTracker.errorReady() {
  2492  		defer om.createPodTracker.reset()
  2493  		return om.createPodTracker.getErr()
  2494  	}
  2495  	pod.SetUID(types.UID(pod.Name + "-uid"))
  2496  	return om.podsIndexer.Update(pod)
  2497  }
  2498  
  2499  func (om *fakeObjectManager) GetPod(namespace, podName string) (*v1.Pod, error) {
  2500  	return om.podsLister.Pods(namespace).Get(podName)
  2501  }
  2502  
  2503  func (om *fakeObjectManager) UpdatePod(pod *v1.Pod) error {
  2504  	return om.podsIndexer.Update(pod)
  2505  }
  2506  
  2507  func (om *fakeObjectManager) DeletePod(pod *v1.Pod) error {
  2508  	defer om.deletePodTracker.inc()
  2509  	if om.deletePodTracker.errorReady() {
  2510  		defer om.deletePodTracker.reset()
  2511  		return om.deletePodTracker.getErr()
  2512  	}
  2513  	if key, err := controller.KeyFunc(pod); err != nil {
  2514  		return err
  2515  	} else if obj, found, err := om.podsIndexer.GetByKey(key); err != nil {
  2516  		return err
  2517  	} else if found {
  2518  		return om.podsIndexer.Delete(obj)
  2519  	}
  2520  	return nil // Not found, no error in deleting.
  2521  }
  2522  
  2523  func (om *fakeObjectManager) CreateClaim(claim *v1.PersistentVolumeClaim) error {
  2524  	om.claimsIndexer.Update(claim)
  2525  	return nil
  2526  }
  2527  
  2528  func (om *fakeObjectManager) GetClaim(namespace, claimName string) (*v1.PersistentVolumeClaim, error) {
  2529  	return om.claimsLister.PersistentVolumeClaims(namespace).Get(claimName)
  2530  }
  2531  
  2532  func (om *fakeObjectManager) UpdateClaim(claim *v1.PersistentVolumeClaim) error {
  2533  	// Validate ownerRefs.
  2534  	refs := claim.GetOwnerReferences()
  2535  	for _, ref := range refs {
  2536  		if ref.APIVersion == "" || ref.Kind == "" || ref.Name == "" {
  2537  			return fmt.Errorf("invalid ownerRefs: %s %v", claim.Name, refs)
  2538  		}
  2539  	}
  2540  	om.claimsIndexer.Update(claim)
  2541  	return nil
  2542  }
  2543  
  2544  func (om *fakeObjectManager) SetCreateStatefulPodError(err error, after int) {
  2545  	om.createPodTracker.err = err
  2546  	om.createPodTracker.after = after
  2547  }
  2548  
  2549  func (om *fakeObjectManager) SetUpdateStatefulPodError(err error, after int) {
  2550  	om.updatePodTracker.err = err
  2551  	om.updatePodTracker.after = after
  2552  }
  2553  
  2554  func (om *fakeObjectManager) SetDeleteStatefulPodError(err error, after int) {
  2555  	om.deletePodTracker.err = err
  2556  	om.deletePodTracker.after = after
  2557  }
  2558  
  2559  func findPodByOrdinal(pods []*v1.Pod, ordinal int) *v1.Pod {
  2560  	for _, pod := range pods {
  2561  		if getOrdinal(pod) == ordinal {
  2562  			return pod.DeepCopy()
  2563  		}
  2564  	}
  2565  
  2566  	return nil
  2567  }
  2568  
  2569  func (om *fakeObjectManager) setPodPending(set *apps.StatefulSet, ordinal int) ([]*v1.Pod, error) {
  2570  	selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  2571  	if err != nil {
  2572  		return nil, err
  2573  	}
  2574  	pods, err := om.podsLister.Pods(set.Namespace).List(selector)
  2575  	if err != nil {
  2576  		return nil, err
  2577  	}
  2578  	pod := findPodByOrdinal(pods, ordinal)
  2579  	if pod == nil {
  2580  		return nil, fmt.Errorf("setPodPending: pod ordinal %d not found", ordinal)
  2581  	}
  2582  	pod.Status.Phase = v1.PodPending
  2583  	fakeResourceVersion(pod)
  2584  	om.podsIndexer.Update(pod)
  2585  	return om.podsLister.Pods(set.Namespace).List(selector)
  2586  }
  2587  
  2588  func (om *fakeObjectManager) setPodRunning(set *apps.StatefulSet, ordinal int) ([]*v1.Pod, error) {
  2589  	selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  2590  	if err != nil {
  2591  		return nil, err
  2592  	}
  2593  	pods, err := om.podsLister.Pods(set.Namespace).List(selector)
  2594  	if err != nil {
  2595  		return nil, err
  2596  	}
  2597  	pod := findPodByOrdinal(pods, ordinal)
  2598  	if pod == nil {
  2599  		return nil, fmt.Errorf("setPodRunning: pod ordinal %d not found", ordinal)
  2600  	}
  2601  	pod.Status.Phase = v1.PodRunning
  2602  	fakeResourceVersion(pod)
  2603  	om.podsIndexer.Update(pod)
  2604  	return om.podsLister.Pods(set.Namespace).List(selector)
  2605  }
  2606  
  2607  func (om *fakeObjectManager) setPodReady(set *apps.StatefulSet, ordinal int) ([]*v1.Pod, error) {
  2608  	selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  2609  	if err != nil {
  2610  		return nil, err
  2611  	}
  2612  	pods, err := om.podsLister.Pods(set.Namespace).List(selector)
  2613  	if err != nil {
  2614  		return nil, err
  2615  	}
  2616  	pod := findPodByOrdinal(pods, ordinal)
  2617  	if pod == nil {
  2618  		return nil, fmt.Errorf("setPodReady: pod ordinal %d not found", ordinal)
  2619  	}
  2620  	condition := v1.PodCondition{Type: v1.PodReady, Status: v1.ConditionTrue}
  2621  	podutil.UpdatePodCondition(&pod.Status, &condition)
  2622  	fakeResourceVersion(pod)
  2623  	om.podsIndexer.Update(pod)
  2624  	return om.podsLister.Pods(set.Namespace).List(selector)
  2625  }
  2626  
  2627  func (om *fakeObjectManager) setPodAvailable(set *apps.StatefulSet, ordinal int, lastTransitionTime time.Time) ([]*v1.Pod, error) {
  2628  	selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  2629  	if err != nil {
  2630  		return nil, err
  2631  	}
  2632  	pods, err := om.podsLister.Pods(set.Namespace).List(selector)
  2633  	if err != nil {
  2634  		return nil, err
  2635  	}
  2636  	pod := findPodByOrdinal(pods, ordinal)
  2637  	if pod == nil {
  2638  		return nil, fmt.Errorf("setPodAvailable: pod ordinal %d not found", ordinal)
  2639  	}
  2640  	condition := v1.PodCondition{Type: v1.PodReady, Status: v1.ConditionTrue, LastTransitionTime: metav1.Time{Time: lastTransitionTime}}
  2641  	_, existingCondition := podutil.GetPodCondition(&pod.Status, condition.Type)
  2642  	if existingCondition != nil {
  2643  		existingCondition.Status = v1.ConditionTrue
  2644  		existingCondition.LastTransitionTime = metav1.Time{Time: lastTransitionTime}
  2645  	} else {
  2646  		existingCondition = &v1.PodCondition{
  2647  			Type:               v1.PodReady,
  2648  			Status:             v1.ConditionTrue,
  2649  			LastTransitionTime: metav1.Time{Time: lastTransitionTime},
  2650  		}
  2651  		pod.Status.Conditions = append(pod.Status.Conditions, *existingCondition)
  2652  	}
  2653  	podutil.UpdatePodCondition(&pod.Status, &condition)
  2654  	fakeResourceVersion(pod)
  2655  	om.podsIndexer.Update(pod)
  2656  	return om.podsLister.Pods(set.Namespace).List(selector)
  2657  }
  2658  
  2659  func (om *fakeObjectManager) addTerminatingPod(set *apps.StatefulSet, ordinal int) ([]*v1.Pod, error) {
  2660  	pod := newStatefulSetPod(set, ordinal)
  2661  	pod.SetUID(types.UID(pod.Name + "-uid")) // To match fakeObjectManager.CreatePod
  2662  	pod.Status.Phase = v1.PodRunning
  2663  	deleted := metav1.NewTime(time.Now())
  2664  	pod.DeletionTimestamp = &deleted
  2665  	condition := v1.PodCondition{Type: v1.PodReady, Status: v1.ConditionTrue}
  2666  	fakeResourceVersion(pod)
  2667  	podutil.UpdatePodCondition(&pod.Status, &condition)
  2668  	om.podsIndexer.Update(pod)
  2669  	selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  2670  	if err != nil {
  2671  		return nil, err
  2672  	}
  2673  	return om.podsLister.Pods(set.Namespace).List(selector)
  2674  }
  2675  
  2676  func (om *fakeObjectManager) setPodTerminated(set *apps.StatefulSet, ordinal int) ([]*v1.Pod, error) {
  2677  	pod := newStatefulSetPod(set, ordinal)
  2678  	deleted := metav1.NewTime(time.Now())
  2679  	pod.DeletionTimestamp = &deleted
  2680  	fakeResourceVersion(pod)
  2681  	om.podsIndexer.Update(pod)
  2682  	selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  2683  	if err != nil {
  2684  		return nil, err
  2685  	}
  2686  	return om.podsLister.Pods(set.Namespace).List(selector)
  2687  }
  2688  
  2689  var _ StatefulPodControlObjectManager = &fakeObjectManager{}
  2690  
  2691  type fakeStatefulSetStatusUpdater struct {
  2692  	setsLister          appslisters.StatefulSetLister
  2693  	setsIndexer         cache.Indexer
  2694  	updateStatusTracker requestTracker
  2695  }
  2696  
  2697  func newFakeStatefulSetStatusUpdater(setInformer appsinformers.StatefulSetInformer) *fakeStatefulSetStatusUpdater {
  2698  	return &fakeStatefulSetStatusUpdater{
  2699  		setInformer.Lister(),
  2700  		setInformer.Informer().GetIndexer(),
  2701  		newRequestTracker(0, nil, 0),
  2702  	}
  2703  }
  2704  
  2705  func (ssu *fakeStatefulSetStatusUpdater) UpdateStatefulSetStatus(ctx context.Context, set *apps.StatefulSet, status *apps.StatefulSetStatus) error {
  2706  	defer ssu.updateStatusTracker.inc()
  2707  	if ssu.updateStatusTracker.errorReady() {
  2708  		defer ssu.updateStatusTracker.reset()
  2709  		return ssu.updateStatusTracker.err
  2710  	}
  2711  	set.Status = *status
  2712  	ssu.setsIndexer.Update(set)
  2713  	return nil
  2714  }
  2715  
  2716  func (ssu *fakeStatefulSetStatusUpdater) SetUpdateStatefulSetStatusError(err error, after int) {
  2717  	ssu.updateStatusTracker.err = err
  2718  	ssu.updateStatusTracker.after = after
  2719  }
  2720  
  2721  var _ StatefulSetStatusUpdaterInterface = &fakeStatefulSetStatusUpdater{}
  2722  
  2723  func assertMonotonicInvariants(set *apps.StatefulSet, om *fakeObjectManager) error {
  2724  	selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  2725  	if err != nil {
  2726  		return err
  2727  	}
  2728  	pods, err := om.podsLister.Pods(set.Namespace).List(selector)
  2729  	if err != nil {
  2730  		return err
  2731  	}
  2732  	sort.Sort(ascendingOrdinal(pods))
  2733  	for idx := 0; idx < len(pods); idx++ {
  2734  		if idx > 0 && isRunningAndReady(pods[idx]) && !isRunningAndReady(pods[idx-1]) {
  2735  			return fmt.Errorf("successor %s is Running and Ready while %s is not", pods[idx].Name, pods[idx-1].Name)
  2736  		}
  2737  
  2738  		if ord := idx + getStartOrdinal(set); getOrdinal(pods[idx]) != ord {
  2739  			return fmt.Errorf("pods %s deployed in the wrong order %d", pods[idx].Name, ord)
  2740  		}
  2741  
  2742  		if !storageMatches(set, pods[idx]) {
  2743  			return fmt.Errorf("pods %s does not match the storage specification of StatefulSet %s ", pods[idx].Name, set.Name)
  2744  		}
  2745  
  2746  		for _, claim := range getPersistentVolumeClaims(set, pods[idx]) {
  2747  			claim, _ := om.claimsLister.PersistentVolumeClaims(set.Namespace).Get(claim.Name)
  2748  			if err := checkClaimInvarients(set, pods[idx], claim); err != nil {
  2749  				return err
  2750  			}
  2751  		}
  2752  
  2753  		if !identityMatches(set, pods[idx]) {
  2754  			return fmt.Errorf("pods %s does not match the identity specification of StatefulSet %s ", pods[idx].Name, set.Name)
  2755  		}
  2756  	}
  2757  	return nil
  2758  }
  2759  
  2760  func assertBurstInvariants(set *apps.StatefulSet, om *fakeObjectManager) error {
  2761  	selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  2762  	if err != nil {
  2763  		return err
  2764  	}
  2765  	pods, err := om.podsLister.Pods(set.Namespace).List(selector)
  2766  	if err != nil {
  2767  		return err
  2768  	}
  2769  	sort.Sort(ascendingOrdinal(pods))
  2770  	for _, pod := range pods {
  2771  		if !storageMatches(set, pod) {
  2772  			return fmt.Errorf("pods %s does not match the storage specification of StatefulSet %s ", pod.Name, set.Name)
  2773  		}
  2774  
  2775  		for _, claim := range getPersistentVolumeClaims(set, pod) {
  2776  			claim, err := om.claimsLister.PersistentVolumeClaims(set.Namespace).Get(claim.Name)
  2777  			if err != nil {
  2778  				return err
  2779  			}
  2780  			if err := checkClaimInvarients(set, pod, claim); err != nil {
  2781  				return err
  2782  			}
  2783  		}
  2784  
  2785  		if !identityMatches(set, pod) {
  2786  			return fmt.Errorf("pods %s does not match the identity specification of StatefulSet %s ",
  2787  				pod.Name,
  2788  				set.Name)
  2789  		}
  2790  	}
  2791  	return nil
  2792  }
  2793  
  2794  func assertUpdateInvariants(set *apps.StatefulSet, om *fakeObjectManager) error {
  2795  	selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  2796  	if err != nil {
  2797  		return err
  2798  	}
  2799  	pods, err := om.podsLister.Pods(set.Namespace).List(selector)
  2800  	if err != nil {
  2801  		return err
  2802  	}
  2803  	sort.Sort(ascendingOrdinal(pods))
  2804  	for _, pod := range pods {
  2805  
  2806  		if !storageMatches(set, pod) {
  2807  			return fmt.Errorf("pod %s does not match the storage specification of StatefulSet %s ", pod.Name, set.Name)
  2808  		}
  2809  
  2810  		for _, claim := range getPersistentVolumeClaims(set, pod) {
  2811  			claim, err := om.claimsLister.PersistentVolumeClaims(set.Namespace).Get(claim.Name)
  2812  			if err != nil {
  2813  				return err
  2814  			}
  2815  			if err := checkClaimInvarients(set, pod, claim); err != nil {
  2816  				return err
  2817  			}
  2818  		}
  2819  
  2820  		if !identityMatches(set, pod) {
  2821  			return fmt.Errorf("pod %s does not match the identity specification of StatefulSet %s ", pod.Name, set.Name)
  2822  		}
  2823  	}
  2824  	if set.Spec.UpdateStrategy.Type == apps.OnDeleteStatefulSetStrategyType {
  2825  		return nil
  2826  	}
  2827  	if set.Spec.UpdateStrategy.Type == apps.RollingUpdateStatefulSetStrategyType {
  2828  		for i := 0; i < int(set.Status.CurrentReplicas) && i < len(pods); i++ {
  2829  			if want, got := set.Status.CurrentRevision, getPodRevision(pods[i]); want != got {
  2830  				return fmt.Errorf("pod %s want current revision %s got %s", pods[i].Name, want, got)
  2831  			}
  2832  		}
  2833  		for i, j := len(pods)-1, 0; j < int(set.Status.UpdatedReplicas); i, j = i-1, j+1 {
  2834  			if want, got := set.Status.UpdateRevision, getPodRevision(pods[i]); want != got {
  2835  				return fmt.Errorf("pod %s want update revision %s got %s", pods[i].Name, want, got)
  2836  			}
  2837  		}
  2838  	}
  2839  	return nil
  2840  }
  2841  
  2842  func checkClaimInvarients(set *apps.StatefulSet, pod *v1.Pod, claim *v1.PersistentVolumeClaim) error {
  2843  	policy := apps.StatefulSetPersistentVolumeClaimRetentionPolicy{
  2844  		WhenScaled:  apps.RetainPersistentVolumeClaimRetentionPolicyType,
  2845  		WhenDeleted: apps.RetainPersistentVolumeClaimRetentionPolicyType,
  2846  	}
  2847  	if set.Spec.PersistentVolumeClaimRetentionPolicy != nil && utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) {
  2848  		policy = *set.Spec.PersistentVolumeClaimRetentionPolicy
  2849  	}
  2850  	claimShouldBeRetained := policy.WhenScaled == apps.RetainPersistentVolumeClaimRetentionPolicyType
  2851  	if claim == nil {
  2852  		if claimShouldBeRetained {
  2853  			return fmt.Errorf("claim for Pod %s was not created", pod.Name)
  2854  		}
  2855  		return nil // A non-retained claim has no invariants to satisfy.
  2856  	}
  2857  
  2858  	if pod.Status.Phase != v1.PodRunning || !podutil.IsPodReady(pod) {
  2859  		// The pod has spun up yet, we do not expect the owner refs on the claim to have been set.
  2860  		return nil
  2861  	}
  2862  
  2863  	const retain = apps.RetainPersistentVolumeClaimRetentionPolicyType
  2864  	const delete = apps.DeletePersistentVolumeClaimRetentionPolicyType
  2865  	switch {
  2866  	case policy.WhenScaled == retain && policy.WhenDeleted == retain:
  2867  		if hasOwnerRef(claim, set) {
  2868  			return fmt.Errorf("claim %s has unexpected owner ref on %s for StatefulSet retain", claim.Name, set.Name)
  2869  		}
  2870  		if hasOwnerRef(claim, pod) {
  2871  			return fmt.Errorf("claim %s has unexpected owner ref on pod %s for StatefulSet retain", claim.Name, pod.Name)
  2872  		}
  2873  	case policy.WhenScaled == retain && policy.WhenDeleted == delete:
  2874  		if !hasOwnerRef(claim, set) {
  2875  			return fmt.Errorf("claim %s does not have owner ref on %s for StatefulSet deletion", claim.Name, set.Name)
  2876  		}
  2877  		if hasOwnerRef(claim, pod) {
  2878  			return fmt.Errorf("claim %s has unexpected owner ref on pod %s for StatefulSet deletion", claim.Name, pod.Name)
  2879  		}
  2880  	case policy.WhenScaled == delete && policy.WhenDeleted == retain:
  2881  		if hasOwnerRef(claim, set) {
  2882  			return fmt.Errorf("claim %s has unexpected owner ref on %s for scaledown only", claim.Name, set.Name)
  2883  		}
  2884  		if !podInOrdinalRange(pod, set) && !hasOwnerRef(claim, pod) {
  2885  			return fmt.Errorf("claim %s does not have owner ref on condemned pod %s for scaledown delete", claim.Name, pod.Name)
  2886  		}
  2887  		if podInOrdinalRange(pod, set) && hasOwnerRef(claim, pod) {
  2888  			return fmt.Errorf("claim %s has unexpected owner ref on condemned pod %s for scaledown delete", claim.Name, pod.Name)
  2889  		}
  2890  	case policy.WhenScaled == delete && policy.WhenDeleted == delete:
  2891  		if !podInOrdinalRange(pod, set) {
  2892  			if !hasOwnerRef(claim, pod) || hasOwnerRef(claim, set) {
  2893  				return fmt.Errorf("condemned claim %s has bad owner refs: %v", claim.Name, claim.GetOwnerReferences())
  2894  			}
  2895  		} else {
  2896  			if hasOwnerRef(claim, pod) || !hasOwnerRef(claim, set) {
  2897  				return fmt.Errorf("live claim %s has bad owner refs: %v", claim.Name, claim.GetOwnerReferences())
  2898  			}
  2899  		}
  2900  	}
  2901  	return nil
  2902  }
  2903  
  2904  func fakeResourceVersion(object interface{}) {
  2905  	obj, isObj := object.(metav1.Object)
  2906  	if !isObj {
  2907  		return
  2908  	}
  2909  	if version := obj.GetResourceVersion(); version == "" {
  2910  		obj.SetResourceVersion("1")
  2911  	} else if intValue, err := strconv.ParseInt(version, 10, 32); err == nil {
  2912  		obj.SetResourceVersion(strconv.FormatInt(intValue+1, 10))
  2913  	}
  2914  }
  2915  
  2916  func TestParallelScale(t *testing.T) {
  2917  	for _, tc := range []struct {
  2918  		desc            string
  2919  		replicas        int32
  2920  		desiredReplicas int32
  2921  	}{
  2922  		{
  2923  			desc:            "scale up from 3 to 30",
  2924  			replicas:        3,
  2925  			desiredReplicas: 30,
  2926  		},
  2927  		{
  2928  			desc:            "scale down from 10 to 1",
  2929  			replicas:        10,
  2930  			desiredReplicas: 1,
  2931  		},
  2932  
  2933  		{
  2934  			desc:            "scale down to 0",
  2935  			replicas:        501,
  2936  			desiredReplicas: 0,
  2937  		},
  2938  		{
  2939  			desc:            "scale up from 0",
  2940  			replicas:        0,
  2941  			desiredReplicas: 1000,
  2942  		},
  2943  	} {
  2944  		t.Run(tc.desc, func(t *testing.T) {
  2945  			set := burst(newStatefulSet(0))
  2946  			set.Spec.VolumeClaimTemplates[0].ObjectMeta.Labels = map[string]string{"test": "test"}
  2947  			parallelScale(t, set, tc.replicas, tc.desiredReplicas, assertBurstInvariants)
  2948  		})
  2949  	}
  2950  
  2951  }
  2952  
  2953  func parallelScale(t *testing.T, set *apps.StatefulSet, replicas, desiredReplicas int32, invariants invariantFunc) {
  2954  	var err error
  2955  	diff := desiredReplicas - replicas
  2956  	client := fake.NewSimpleClientset(set)
  2957  	om, _, ssc := setupController(client)
  2958  	om.createPodTracker.delay = time.Millisecond
  2959  
  2960  	*set.Spec.Replicas = replicas
  2961  	if err := parallelScaleUpStatefulSetControl(set, ssc, om, invariants); err != nil {
  2962  		t.Errorf("Failed to turn up StatefulSet : %s", err)
  2963  	}
  2964  	set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  2965  	if err != nil {
  2966  		t.Fatalf("Error getting updated StatefulSet: %v", err)
  2967  	}
  2968  	if set.Status.Replicas != replicas {
  2969  		t.Errorf("want %v, got %v replicas", replicas, set.Status.Replicas)
  2970  	}
  2971  
  2972  	fn := parallelScaleUpStatefulSetControl
  2973  	if diff < 0 {
  2974  		fn = parallelScaleDownStatefulSetControl
  2975  	}
  2976  	*set.Spec.Replicas = desiredReplicas
  2977  	if err := fn(set, ssc, om, invariants); err != nil {
  2978  		t.Errorf("Failed to scale StatefulSet : %s", err)
  2979  	}
  2980  
  2981  	set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  2982  	if err != nil {
  2983  		t.Fatalf("Error getting updated StatefulSet: %v", err)
  2984  	}
  2985  
  2986  	if set.Status.Replicas != desiredReplicas {
  2987  		t.Errorf("Failed to scale statefulset to %v replicas, got %v replicas", desiredReplicas, set.Status.Replicas)
  2988  	}
  2989  
  2990  	if (diff < -1 || diff > 1) && om.createPodTracker.maxParallel <= 1 {
  2991  		t.Errorf("want max parallel requests > 1, got %v", om.createPodTracker.maxParallel)
  2992  	}
  2993  }
  2994  
  2995  func parallelScaleUpStatefulSetControl(set *apps.StatefulSet,
  2996  	ssc StatefulSetControlInterface,
  2997  	om *fakeObjectManager,
  2998  	invariants invariantFunc) error {
  2999  	selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  3000  	if err != nil {
  3001  		return err
  3002  	}
  3003  
  3004  	// Give up after 2 loops.
  3005  	// 2 * 500 pods per loop = 1000 max pods <- this should be enough for all test cases.
  3006  	// Anything slower than that (requiring more iterations) indicates a problem and should fail the test.
  3007  	maxLoops := 2
  3008  	loops := maxLoops
  3009  	for set.Status.Replicas < *set.Spec.Replicas {
  3010  		if loops < 1 {
  3011  			return fmt.Errorf("after %v loops: want %v, got replicas %v", maxLoops, *set.Spec.Replicas, set.Status.Replicas)
  3012  		}
  3013  		loops--
  3014  		pods, err := om.podsLister.Pods(set.Namespace).List(selector)
  3015  		if err != nil {
  3016  			return err
  3017  		}
  3018  		sort.Sort(ascendingOrdinal(pods))
  3019  
  3020  		ordinals := []int{}
  3021  		for _, pod := range pods {
  3022  			if pod.Status.Phase == "" {
  3023  				ordinals = append(ordinals, getOrdinal(pod))
  3024  			}
  3025  		}
  3026  		// ensure all pods are valid (have a phase)
  3027  		for _, ord := range ordinals {
  3028  			if pods, err = om.setPodPending(set, ord); err != nil {
  3029  				return err
  3030  			}
  3031  		}
  3032  
  3033  		// run the controller once and check invariants
  3034  		_, err = ssc.UpdateStatefulSet(context.TODO(), set, pods)
  3035  		if err != nil {
  3036  			return err
  3037  		}
  3038  		set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  3039  		if err != nil {
  3040  			return err
  3041  		}
  3042  		if err := invariants(set, om); err != nil {
  3043  			return err
  3044  		}
  3045  	}
  3046  	return invariants(set, om)
  3047  }
  3048  
  3049  func parallelScaleDownStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlInterface, om *fakeObjectManager, invariants invariantFunc) error {
  3050  	selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  3051  	if err != nil {
  3052  		return err
  3053  	}
  3054  
  3055  	// Give up after 2 loops.
  3056  	// 2 * 500 pods per loop = 1000 max pods <- this should be enough for all test cases.
  3057  	// Anything slower than that (requiring more iterations) indicates a problem and should fail the test.
  3058  	maxLoops := 2
  3059  	loops := maxLoops
  3060  	for set.Status.Replicas > *set.Spec.Replicas {
  3061  		if loops < 1 {
  3062  			return fmt.Errorf("after %v loops: want %v replicas, got %v", maxLoops, *set.Spec.Replicas, set.Status.Replicas)
  3063  		}
  3064  		loops--
  3065  		pods, err := om.podsLister.Pods(set.Namespace).List(selector)
  3066  		if err != nil {
  3067  			return err
  3068  		}
  3069  		sort.Sort(ascendingOrdinal(pods))
  3070  		if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
  3071  			return err
  3072  		}
  3073  		set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  3074  		if err != nil {
  3075  			return err
  3076  		}
  3077  		if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
  3078  			return err
  3079  		}
  3080  	}
  3081  
  3082  	set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  3083  	if err != nil {
  3084  		return err
  3085  	}
  3086  	if err := invariants(set, om); err != nil {
  3087  		return err
  3088  	}
  3089  
  3090  	return nil
  3091  }
  3092  
  3093  func scaleUpStatefulSetControl(set *apps.StatefulSet,
  3094  	ssc StatefulSetControlInterface,
  3095  	om *fakeObjectManager,
  3096  	invariants invariantFunc) error {
  3097  	selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  3098  	if err != nil {
  3099  		return err
  3100  	}
  3101  	for set.Status.ReadyReplicas < *set.Spec.Replicas {
  3102  		pods, err := om.podsLister.Pods(set.Namespace).List(selector)
  3103  		if err != nil {
  3104  			return err
  3105  		}
  3106  		sort.Sort(ascendingOrdinal(pods))
  3107  
  3108  		// ensure all pods are valid (have a phase)
  3109  		for _, pod := range pods {
  3110  			if pod.Status.Phase == "" {
  3111  				if pods, err = om.setPodPending(set, getOrdinal(pod)); err != nil {
  3112  					return err
  3113  				}
  3114  				break
  3115  			}
  3116  		}
  3117  
  3118  		// select one of the pods and move it forward in status
  3119  		if len(pods) > 0 {
  3120  			idx := int(rand.Int63n(int64(len(pods))))
  3121  			pod := pods[idx]
  3122  			switch pod.Status.Phase {
  3123  			case v1.PodPending:
  3124  				if pods, err = om.setPodRunning(set, getOrdinal(pod)); err != nil {
  3125  					return err
  3126  				}
  3127  			case v1.PodRunning:
  3128  				if pods, err = om.setPodReady(set, getOrdinal(pod)); err != nil {
  3129  					return err
  3130  				}
  3131  			default:
  3132  				continue
  3133  			}
  3134  		}
  3135  		// run the controller once and check invariants
  3136  		_, err = ssc.UpdateStatefulSet(context.TODO(), set, pods)
  3137  		if err != nil {
  3138  			return err
  3139  		}
  3140  		set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  3141  		if err != nil {
  3142  			return err
  3143  		}
  3144  		if err := invariants(set, om); err != nil {
  3145  			return err
  3146  		}
  3147  	}
  3148  	return invariants(set, om)
  3149  }
  3150  
  3151  func scaleDownStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlInterface, om *fakeObjectManager, invariants invariantFunc) error {
  3152  	selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  3153  	if err != nil {
  3154  		return err
  3155  	}
  3156  
  3157  	for set.Status.Replicas > *set.Spec.Replicas {
  3158  		pods, err := om.podsLister.Pods(set.Namespace).List(selector)
  3159  		if err != nil {
  3160  			return err
  3161  		}
  3162  		sort.Sort(ascendingOrdinal(pods))
  3163  		if idx := len(pods) - 1; idx >= 0 {
  3164  			if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
  3165  				return err
  3166  			}
  3167  			set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  3168  			if err != nil {
  3169  				return err
  3170  			}
  3171  			if pods, err = om.addTerminatingPod(set, getOrdinal(pods[idx])); err != nil {
  3172  				return err
  3173  			}
  3174  			if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
  3175  				return err
  3176  			}
  3177  			set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  3178  			if err != nil {
  3179  				return err
  3180  			}
  3181  			pods, err = om.podsLister.Pods(set.Namespace).List(selector)
  3182  			if err != nil {
  3183  				return err
  3184  			}
  3185  			sort.Sort(ascendingOrdinal(pods))
  3186  
  3187  			if len(pods) > 0 {
  3188  				om.podsIndexer.Delete(pods[len(pods)-1])
  3189  			}
  3190  		}
  3191  		if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
  3192  			return err
  3193  		}
  3194  		set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  3195  		if err != nil {
  3196  			return err
  3197  		}
  3198  
  3199  		if err := invariants(set, om); err != nil {
  3200  			return err
  3201  		}
  3202  	}
  3203  	// If there are claims with ownerRefs on pods that have been deleted, delete them.
  3204  	pods, err := om.podsLister.Pods(set.Namespace).List(selector)
  3205  	if err != nil {
  3206  		return err
  3207  	}
  3208  	currentPods := map[string]bool{}
  3209  	for _, pod := range pods {
  3210  		currentPods[pod.Name] = true
  3211  	}
  3212  	claims, err := om.claimsLister.PersistentVolumeClaims(set.Namespace).List(selector)
  3213  	if err != nil {
  3214  		return err
  3215  	}
  3216  	for _, claim := range claims {
  3217  		claimPodName := getClaimPodName(set, claim)
  3218  		if claimPodName == "" {
  3219  			continue // Skip claims not related to a stateful set pod.
  3220  		}
  3221  		if _, found := currentPods[claimPodName]; found {
  3222  			continue // Skip claims which still have a current pod.
  3223  		}
  3224  		for _, refs := range claim.GetOwnerReferences() {
  3225  			if refs.Name == claimPodName {
  3226  				om.claimsIndexer.Delete(claim)
  3227  				break
  3228  			}
  3229  		}
  3230  	}
  3231  
  3232  	return invariants(set, om)
  3233  }
  3234  
  3235  func updateComplete(set *apps.StatefulSet, pods []*v1.Pod) bool {
  3236  	sort.Sort(ascendingOrdinal(pods))
  3237  	if len(pods) != int(*set.Spec.Replicas) {
  3238  		return false
  3239  	}
  3240  	if set.Status.ReadyReplicas != *set.Spec.Replicas {
  3241  		return false
  3242  	}
  3243  
  3244  	switch set.Spec.UpdateStrategy.Type {
  3245  	case apps.OnDeleteStatefulSetStrategyType:
  3246  		return true
  3247  	case apps.RollingUpdateStatefulSetStrategyType:
  3248  		if set.Spec.UpdateStrategy.RollingUpdate == nil || *set.Spec.UpdateStrategy.RollingUpdate.Partition <= 0 {
  3249  			if set.Status.CurrentReplicas < *set.Spec.Replicas {
  3250  				return false
  3251  			}
  3252  			for i := range pods {
  3253  				if getPodRevision(pods[i]) != set.Status.CurrentRevision {
  3254  					return false
  3255  				}
  3256  			}
  3257  		} else {
  3258  			partition := int(*set.Spec.UpdateStrategy.RollingUpdate.Partition)
  3259  			if len(pods) < partition {
  3260  				return false
  3261  			}
  3262  			for i := partition; i < len(pods); i++ {
  3263  				if getPodRevision(pods[i]) != set.Status.UpdateRevision {
  3264  					return false
  3265  				}
  3266  			}
  3267  		}
  3268  	}
  3269  	return true
  3270  }
  3271  
  3272  func updateStatefulSetControl(set *apps.StatefulSet,
  3273  	ssc StatefulSetControlInterface,
  3274  	om *fakeObjectManager,
  3275  	invariants invariantFunc) error {
  3276  	selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  3277  	if err != nil {
  3278  		return err
  3279  	}
  3280  	pods, err := om.podsLister.Pods(set.Namespace).List(selector)
  3281  	if err != nil {
  3282  		return err
  3283  	}
  3284  	if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
  3285  		return err
  3286  	}
  3287  
  3288  	set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  3289  	if err != nil {
  3290  		return err
  3291  	}
  3292  	pods, err = om.podsLister.Pods(set.Namespace).List(selector)
  3293  	if err != nil {
  3294  		return err
  3295  	}
  3296  	for !updateComplete(set, pods) {
  3297  		pods, err = om.podsLister.Pods(set.Namespace).List(selector)
  3298  		if err != nil {
  3299  			return err
  3300  		}
  3301  		sort.Sort(ascendingOrdinal(pods))
  3302  		initialized := false
  3303  		for _, pod := range pods {
  3304  			if pod.Status.Phase == "" {
  3305  				if pods, err = om.setPodPending(set, getOrdinal(pod)); err != nil {
  3306  					return err
  3307  				}
  3308  				break
  3309  			}
  3310  		}
  3311  		if initialized {
  3312  			continue
  3313  		}
  3314  
  3315  		if len(pods) > 0 {
  3316  			idx := int(rand.Int63n(int64(len(pods))))
  3317  			pod := pods[idx]
  3318  			switch pod.Status.Phase {
  3319  			case v1.PodPending:
  3320  				if pods, err = om.setPodRunning(set, getOrdinal(pod)); err != nil {
  3321  					return err
  3322  				}
  3323  			case v1.PodRunning:
  3324  				if pods, err = om.setPodReady(set, getOrdinal(pod)); err != nil {
  3325  					return err
  3326  				}
  3327  			default:
  3328  				continue
  3329  			}
  3330  		}
  3331  
  3332  		if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
  3333  			return err
  3334  		}
  3335  		set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
  3336  		if err != nil {
  3337  			return err
  3338  		}
  3339  		if err := invariants(set, om); err != nil {
  3340  			return err
  3341  		}
  3342  		pods, err = om.podsLister.Pods(set.Namespace).List(selector)
  3343  		if err != nil {
  3344  			return err
  3345  		}
  3346  	}
  3347  	return invariants(set, om)
  3348  }
  3349  
  3350  func newRevisionOrDie(set *apps.StatefulSet, revision int64) *apps.ControllerRevision {
  3351  	rev, err := newRevision(set, revision, set.Status.CollisionCount)
  3352  	if err != nil {
  3353  		panic(err)
  3354  	}
  3355  	return rev
  3356  }
  3357  
  3358  func isOrHasInternalError(err error) bool {
  3359  	agg, ok := err.(utilerrors.Aggregate)
  3360  	return !ok && !apierrors.IsInternalError(err) || ok && len(agg.Errors()) > 0 && !apierrors.IsInternalError(agg.Errors()[0])
  3361  }
  3362  

View as plain text