...

Source file src/k8s.io/kubernetes/pkg/controller/statefulset/stateful_set_test.go

Documentation: k8s.io/kubernetes/pkg/controller/statefulset

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package statefulset
    18  
    19  import (
    20  	"bytes"
    21  	"context"
    22  	"encoding/json"
    23  	"fmt"
    24  	"sort"
    25  	"testing"
    26  
    27  	apps "k8s.io/api/apps/v1"
    28  	v1 "k8s.io/api/core/v1"
    29  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    30  	"k8s.io/apimachinery/pkg/labels"
    31  	"k8s.io/apimachinery/pkg/runtime"
    32  	"k8s.io/apimachinery/pkg/types"
    33  	"k8s.io/apimachinery/pkg/util/sets"
    34  	"k8s.io/apimachinery/pkg/util/strategicpatch"
    35  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    36  	"k8s.io/client-go/informers"
    37  	"k8s.io/client-go/kubernetes/fake"
    38  	core "k8s.io/client-go/testing"
    39  	"k8s.io/client-go/tools/cache"
    40  	"k8s.io/client-go/tools/record"
    41  	featuregatetesting "k8s.io/component-base/featuregate/testing"
    42  	"k8s.io/klog/v2"
    43  	"k8s.io/klog/v2/ktesting"
    44  	"k8s.io/kubernetes/pkg/controller"
    45  	"k8s.io/kubernetes/pkg/controller/history"
    46  	"k8s.io/kubernetes/pkg/features"
    47  )
    48  
    49  var parentKind = apps.SchemeGroupVersion.WithKind("StatefulSet")
    50  
    51  func alwaysReady() bool { return true }
    52  
    53  func TestStatefulSetControllerCreates(t *testing.T) {
    54  	set := newStatefulSet(3)
    55  	logger, ctx := ktesting.NewTestContext(t)
    56  	ssc, spc, om, _ := newFakeStatefulSetController(ctx, set)
    57  	if err := scaleUpStatefulSetController(logger, set, ssc, spc, om); err != nil {
    58  		t.Errorf("Failed to turn up StatefulSet : %s", err)
    59  	}
    60  	if obj, _, err := om.setsIndexer.Get(set); err != nil {
    61  		t.Error(err)
    62  	} else {
    63  		set = obj.(*apps.StatefulSet)
    64  	}
    65  	if set.Status.Replicas != 3 {
    66  		t.Errorf("set.Status.Replicas = %v; want 3", set.Status.Replicas)
    67  	}
    68  }
    69  
    70  func TestStatefulSetControllerDeletes(t *testing.T) {
    71  	set := newStatefulSet(3)
    72  	logger, ctx := ktesting.NewTestContext(t)
    73  	ssc, spc, om, _ := newFakeStatefulSetController(ctx, set)
    74  	if err := scaleUpStatefulSetController(logger, set, ssc, spc, om); err != nil {
    75  		t.Errorf("Failed to turn up StatefulSet : %s", err)
    76  	}
    77  	if obj, _, err := om.setsIndexer.Get(set); err != nil {
    78  		t.Error(err)
    79  	} else {
    80  		set = obj.(*apps.StatefulSet)
    81  	}
    82  	if set.Status.Replicas != 3 {
    83  		t.Errorf("set.Status.Replicas = %v; want 3", set.Status.Replicas)
    84  	}
    85  	*set.Spec.Replicas = 0
    86  	if err := scaleDownStatefulSetController(logger, set, ssc, spc, om); err != nil {
    87  		t.Errorf("Failed to turn down StatefulSet : %s", err)
    88  	}
    89  	if obj, _, err := om.setsIndexer.Get(set); err != nil {
    90  		t.Error(err)
    91  	} else {
    92  		set = obj.(*apps.StatefulSet)
    93  	}
    94  	if set.Status.Replicas != 0 {
    95  		t.Errorf("set.Status.Replicas = %v; want 0", set.Status.Replicas)
    96  	}
    97  }
    98  
    99  func TestStatefulSetControllerRespectsTermination(t *testing.T) {
   100  	set := newStatefulSet(3)
   101  	logger, ctx := ktesting.NewTestContext(t)
   102  	ssc, spc, om, _ := newFakeStatefulSetController(ctx, set)
   103  	if err := scaleUpStatefulSetController(logger, set, ssc, spc, om); err != nil {
   104  		t.Errorf("Failed to turn up StatefulSet : %s", err)
   105  	}
   106  	if obj, _, err := om.setsIndexer.Get(set); err != nil {
   107  		t.Error(err)
   108  	} else {
   109  		set = obj.(*apps.StatefulSet)
   110  	}
   111  	if set.Status.Replicas != 3 {
   112  		t.Errorf("set.Status.Replicas = %v; want 3", set.Status.Replicas)
   113  	}
   114  	_, err := om.addTerminatingPod(set, 3)
   115  	if err != nil {
   116  		t.Error(err)
   117  	}
   118  	pods, err := om.addTerminatingPod(set, 4)
   119  	if err != nil {
   120  		t.Error(err)
   121  	}
   122  	ssc.syncStatefulSet(ctx, set, pods)
   123  	selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
   124  	if err != nil {
   125  		t.Error(err)
   126  	}
   127  	pods, err = om.podsLister.Pods(set.Namespace).List(selector)
   128  	if err != nil {
   129  		t.Error(err)
   130  	}
   131  	if len(pods) != 5 {
   132  		t.Error("StatefulSet does not respect termination")
   133  	}
   134  	sort.Sort(ascendingOrdinal(pods))
   135  	spc.DeleteStatefulPod(set, pods[3])
   136  	spc.DeleteStatefulPod(set, pods[4])
   137  	*set.Spec.Replicas = 0
   138  	if err := scaleDownStatefulSetController(logger, set, ssc, spc, om); err != nil {
   139  		t.Errorf("Failed to turn down StatefulSet : %s", err)
   140  	}
   141  	if obj, _, err := om.setsIndexer.Get(set); err != nil {
   142  		t.Error(err)
   143  	} else {
   144  		set = obj.(*apps.StatefulSet)
   145  	}
   146  	if set.Status.Replicas != 0 {
   147  		t.Errorf("set.Status.Replicas = %v; want 0", set.Status.Replicas)
   148  	}
   149  }
   150  
   151  func TestStatefulSetControllerBlocksScaling(t *testing.T) {
   152  	logger, ctx := ktesting.NewTestContext(t)
   153  	set := newStatefulSet(3)
   154  	ssc, spc, om, _ := newFakeStatefulSetController(ctx, set)
   155  	if err := scaleUpStatefulSetController(logger, set, ssc, spc, om); err != nil {
   156  		t.Errorf("Failed to turn up StatefulSet : %s", err)
   157  	}
   158  	if obj, _, err := om.setsIndexer.Get(set); err != nil {
   159  		t.Error(err)
   160  	} else {
   161  		set = obj.(*apps.StatefulSet)
   162  	}
   163  	if set.Status.Replicas != 3 {
   164  		t.Errorf("set.Status.Replicas = %v; want 3", set.Status.Replicas)
   165  	}
   166  	*set.Spec.Replicas = 5
   167  	fakeResourceVersion(set)
   168  	om.setsIndexer.Update(set)
   169  	_, err := om.setPodTerminated(set, 0)
   170  	if err != nil {
   171  		t.Error("Failed to set pod terminated at ordinal 0")
   172  	}
   173  	ssc.enqueueStatefulSet(set)
   174  	fakeWorker(ssc)
   175  	selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
   176  	if err != nil {
   177  		t.Error(err)
   178  	}
   179  	pods, err := om.podsLister.Pods(set.Namespace).List(selector)
   180  	if err != nil {
   181  		t.Error(err)
   182  	}
   183  	if len(pods) != 3 {
   184  		t.Error("StatefulSet does not block scaling")
   185  	}
   186  	sort.Sort(ascendingOrdinal(pods))
   187  	spc.DeleteStatefulPod(set, pods[0])
   188  	ssc.enqueueStatefulSet(set)
   189  	fakeWorker(ssc)
   190  	pods, err = om.podsLister.Pods(set.Namespace).List(selector)
   191  	if err != nil {
   192  		t.Error(err)
   193  	}
   194  	if len(pods) != 3 {
   195  		t.Error("StatefulSet does not resume when terminated Pod is removed")
   196  	}
   197  }
   198  
   199  func TestStatefulSetControllerDeletionTimestamp(t *testing.T) {
   200  	_, ctx := ktesting.NewTestContext(t)
   201  	set := newStatefulSet(3)
   202  	set.DeletionTimestamp = new(metav1.Time)
   203  	ssc, _, om, _ := newFakeStatefulSetController(ctx, set)
   204  
   205  	om.setsIndexer.Add(set)
   206  
   207  	// Force a sync. It should not try to create any Pods.
   208  	ssc.enqueueStatefulSet(set)
   209  	fakeWorker(ssc)
   210  
   211  	selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
   212  	if err != nil {
   213  		t.Fatal(err)
   214  	}
   215  	pods, err := om.podsLister.Pods(set.Namespace).List(selector)
   216  	if err != nil {
   217  		t.Fatal(err)
   218  	}
   219  	if got, want := len(pods), 0; got != want {
   220  		t.Errorf("len(pods) = %v, want %v", got, want)
   221  	}
   222  }
   223  
   224  func TestStatefulSetControllerDeletionTimestampRace(t *testing.T) {
   225  	_, ctx := ktesting.NewTestContext(t)
   226  	set := newStatefulSet(3)
   227  	// The bare client says it IS deleted.
   228  	set.DeletionTimestamp = new(metav1.Time)
   229  	ssc, _, om, ssh := newFakeStatefulSetController(ctx, set)
   230  
   231  	// The lister (cache) says it's NOT deleted.
   232  	set2 := *set
   233  	set2.DeletionTimestamp = nil
   234  	om.setsIndexer.Add(&set2)
   235  
   236  	// The recheck occurs in the presence of a matching orphan.
   237  	pod := newStatefulSetPod(set, 1)
   238  	pod.OwnerReferences = nil
   239  	om.podsIndexer.Add(pod)
   240  	set.Status.CollisionCount = new(int32)
   241  	revision, err := newRevision(set, 1, set.Status.CollisionCount)
   242  	if err != nil {
   243  		t.Fatal(err)
   244  	}
   245  	revision.OwnerReferences = nil
   246  	_, err = ssh.CreateControllerRevision(set, revision, set.Status.CollisionCount)
   247  	if err != nil {
   248  		t.Fatal(err)
   249  	}
   250  
   251  	// Force a sync. It should not try to create any Pods.
   252  	ssc.enqueueStatefulSet(set)
   253  	fakeWorker(ssc)
   254  
   255  	selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
   256  	if err != nil {
   257  		t.Fatal(err)
   258  	}
   259  	pods, err := om.podsLister.Pods(set.Namespace).List(selector)
   260  	if err != nil {
   261  		t.Fatal(err)
   262  	}
   263  	if got, want := len(pods), 1; got != want {
   264  		t.Errorf("len(pods) = %v, want %v", got, want)
   265  	}
   266  
   267  	// It should not adopt pods.
   268  	for _, pod := range pods {
   269  		if len(pod.OwnerReferences) > 0 {
   270  			t.Errorf("unexpected pod owner references: %v", pod.OwnerReferences)
   271  		}
   272  	}
   273  
   274  	// It should not adopt revisions.
   275  	revisions, err := ssh.ListControllerRevisions(set, selector)
   276  	if err != nil {
   277  		t.Fatal(err)
   278  	}
   279  	if got, want := len(revisions), 1; got != want {
   280  		t.Errorf("len(revisions) = %v, want %v", got, want)
   281  	}
   282  	for _, revision := range revisions {
   283  		if len(revision.OwnerReferences) > 0 {
   284  			t.Errorf("unexpected revision owner references: %v", revision.OwnerReferences)
   285  		}
   286  	}
   287  }
   288  
   289  func TestStatefulSetControllerAddPod(t *testing.T) {
   290  	logger, ctx := ktesting.NewTestContext(t)
   291  	ssc, _, om, _ := newFakeStatefulSetController(ctx)
   292  	set1 := newStatefulSet(3)
   293  	set2 := newStatefulSet(3)
   294  	pod1 := newStatefulSetPod(set1, 0)
   295  	pod2 := newStatefulSetPod(set2, 0)
   296  	om.setsIndexer.Add(set1)
   297  	om.setsIndexer.Add(set2)
   298  
   299  	ssc.addPod(logger, pod1)
   300  	key, done := ssc.queue.Get()
   301  	if key == nil || done {
   302  		t.Error("failed to enqueue StatefulSet")
   303  	} else if key, ok := key.(string); !ok {
   304  		t.Error("key is not a string")
   305  	} else if expectedKey, _ := controller.KeyFunc(set1); expectedKey != key {
   306  		t.Errorf("expected StatefulSet key %s found %s", expectedKey, key)
   307  	}
   308  	ssc.queue.Done(key)
   309  
   310  	ssc.addPod(logger, pod2)
   311  	key, done = ssc.queue.Get()
   312  	if key == nil || done {
   313  		t.Error("failed to enqueue StatefulSet")
   314  	} else if key, ok := key.(string); !ok {
   315  		t.Error("key is not a string")
   316  	} else if expectedKey, _ := controller.KeyFunc(set2); expectedKey != key {
   317  		t.Errorf("expected StatefulSet key %s found %s", expectedKey, key)
   318  	}
   319  	ssc.queue.Done(key)
   320  }
   321  
   322  func TestStatefulSetControllerAddPodOrphan(t *testing.T) {
   323  	logger, ctx := ktesting.NewTestContext(t)
   324  	ssc, _, om, _ := newFakeStatefulSetController(ctx)
   325  	set1 := newStatefulSet(3)
   326  	set2 := newStatefulSet(3)
   327  	set2.Name = "foo2"
   328  	set3 := newStatefulSet(3)
   329  	set3.Name = "foo3"
   330  	set3.Spec.Selector.MatchLabels = map[string]string{"foo3": "bar"}
   331  	pod := newStatefulSetPod(set1, 0)
   332  	om.setsIndexer.Add(set1)
   333  	om.setsIndexer.Add(set2)
   334  	om.setsIndexer.Add(set3)
   335  
   336  	// Make pod an orphan. Expect matching sets to be queued.
   337  	pod.OwnerReferences = nil
   338  	ssc.addPod(logger, pod)
   339  	if got, want := ssc.queue.Len(), 2; got != want {
   340  		t.Errorf("queue.Len() = %v, want %v", got, want)
   341  	}
   342  }
   343  
   344  func TestStatefulSetControllerAddPodNoSet(t *testing.T) {
   345  	logger, ctx := ktesting.NewTestContext(t)
   346  	ssc, _, _, _ := newFakeStatefulSetController(ctx)
   347  	set := newStatefulSet(3)
   348  	pod := newStatefulSetPod(set, 0)
   349  	ssc.addPod(logger, pod)
   350  	ssc.queue.ShutDown()
   351  	key, _ := ssc.queue.Get()
   352  	if key != nil {
   353  		t.Errorf("StatefulSet enqueued key for Pod with no Set %s", key)
   354  	}
   355  }
   356  
   357  func TestStatefulSetControllerUpdatePod(t *testing.T) {
   358  	logger, ctx := ktesting.NewTestContext(t)
   359  	ssc, _, om, _ := newFakeStatefulSetController(ctx)
   360  	set1 := newStatefulSet(3)
   361  	set2 := newStatefulSet(3)
   362  	set2.Name = "foo2"
   363  	pod1 := newStatefulSetPod(set1, 0)
   364  	pod2 := newStatefulSetPod(set2, 0)
   365  	om.setsIndexer.Add(set1)
   366  	om.setsIndexer.Add(set2)
   367  
   368  	prev := *pod1
   369  	fakeResourceVersion(pod1)
   370  	ssc.updatePod(logger, &prev, pod1)
   371  	key, done := ssc.queue.Get()
   372  	if key == nil || done {
   373  		t.Error("failed to enqueue StatefulSet")
   374  	} else if key, ok := key.(string); !ok {
   375  		t.Error("key is not a string")
   376  	} else if expectedKey, _ := controller.KeyFunc(set1); expectedKey != key {
   377  		t.Errorf("expected StatefulSet key %s found %s", expectedKey, key)
   378  	}
   379  
   380  	prev = *pod2
   381  	fakeResourceVersion(pod2)
   382  	ssc.updatePod(logger, &prev, pod2)
   383  	key, done = ssc.queue.Get()
   384  	if key == nil || done {
   385  		t.Error("failed to enqueue StatefulSet")
   386  	} else if key, ok := key.(string); !ok {
   387  		t.Error("key is not a string")
   388  	} else if expectedKey, _ := controller.KeyFunc(set2); expectedKey != key {
   389  		t.Errorf("expected StatefulSet key %s found %s", expectedKey, key)
   390  	}
   391  }
   392  
   393  func TestStatefulSetControllerUpdatePodWithNoSet(t *testing.T) {
   394  	logger, ctx := ktesting.NewTestContext(t)
   395  	ssc, _, _, _ := newFakeStatefulSetController(ctx)
   396  	set := newStatefulSet(3)
   397  	pod := newStatefulSetPod(set, 0)
   398  	prev := *pod
   399  	fakeResourceVersion(pod)
   400  	ssc.updatePod(logger, &prev, pod)
   401  	ssc.queue.ShutDown()
   402  	key, _ := ssc.queue.Get()
   403  	if key != nil {
   404  		t.Errorf("StatefulSet enqueued key for Pod with no Set %s", key)
   405  	}
   406  }
   407  
   408  func TestStatefulSetControllerUpdatePodWithSameVersion(t *testing.T) {
   409  	logger, ctx := ktesting.NewTestContext(t)
   410  	ssc, _, om, _ := newFakeStatefulSetController(ctx)
   411  	set := newStatefulSet(3)
   412  	pod := newStatefulSetPod(set, 0)
   413  	om.setsIndexer.Add(set)
   414  	ssc.updatePod(logger, pod, pod)
   415  	ssc.queue.ShutDown()
   416  	key, _ := ssc.queue.Get()
   417  	if key != nil {
   418  		t.Errorf("StatefulSet enqueued key for Pod with no Set %s", key)
   419  	}
   420  }
   421  
   422  func TestStatefulSetControllerUpdatePodOrphanWithNewLabels(t *testing.T) {
   423  	logger, ctx := ktesting.NewTestContext(t)
   424  	ssc, _, om, _ := newFakeStatefulSetController(ctx)
   425  	set := newStatefulSet(3)
   426  	pod := newStatefulSetPod(set, 0)
   427  	pod.OwnerReferences = nil
   428  	set2 := newStatefulSet(3)
   429  	set2.Name = "foo2"
   430  	om.setsIndexer.Add(set)
   431  	om.setsIndexer.Add(set2)
   432  	clone := *pod
   433  	clone.Labels = map[string]string{"foo2": "bar2"}
   434  	fakeResourceVersion(&clone)
   435  	ssc.updatePod(logger, &clone, pod)
   436  	if got, want := ssc.queue.Len(), 2; got != want {
   437  		t.Errorf("queue.Len() = %v, want %v", got, want)
   438  	}
   439  }
   440  
   441  func TestStatefulSetControllerUpdatePodChangeControllerRef(t *testing.T) {
   442  	logger, ctx := ktesting.NewTestContext(t)
   443  	ssc, _, om, _ := newFakeStatefulSetController(ctx)
   444  	set := newStatefulSet(3)
   445  	set2 := newStatefulSet(3)
   446  	set2.Name = "foo2"
   447  	pod := newStatefulSetPod(set, 0)
   448  	pod2 := newStatefulSetPod(set2, 0)
   449  	om.setsIndexer.Add(set)
   450  	om.setsIndexer.Add(set2)
   451  	clone := *pod
   452  	clone.OwnerReferences = pod2.OwnerReferences
   453  	fakeResourceVersion(&clone)
   454  	ssc.updatePod(logger, &clone, pod)
   455  	if got, want := ssc.queue.Len(), 2; got != want {
   456  		t.Errorf("queue.Len() = %v, want %v", got, want)
   457  	}
   458  }
   459  
   460  func TestStatefulSetControllerUpdatePodRelease(t *testing.T) {
   461  	logger, ctx := ktesting.NewTestContext(t)
   462  	ssc, _, om, _ := newFakeStatefulSetController(ctx)
   463  	set := newStatefulSet(3)
   464  	set2 := newStatefulSet(3)
   465  	set2.Name = "foo2"
   466  	pod := newStatefulSetPod(set, 0)
   467  	om.setsIndexer.Add(set)
   468  	om.setsIndexer.Add(set2)
   469  	clone := *pod
   470  	clone.OwnerReferences = nil
   471  	fakeResourceVersion(&clone)
   472  	ssc.updatePod(logger, pod, &clone)
   473  	if got, want := ssc.queue.Len(), 2; got != want {
   474  		t.Errorf("queue.Len() = %v, want %v", got, want)
   475  	}
   476  }
   477  
   478  func TestStatefulSetControllerDeletePod(t *testing.T) {
   479  	logger, ctx := ktesting.NewTestContext(t)
   480  	ssc, _, om, _ := newFakeStatefulSetController(ctx)
   481  	set1 := newStatefulSet(3)
   482  	set2 := newStatefulSet(3)
   483  	set2.Name = "foo2"
   484  	pod1 := newStatefulSetPod(set1, 0)
   485  	pod2 := newStatefulSetPod(set2, 0)
   486  	om.setsIndexer.Add(set1)
   487  	om.setsIndexer.Add(set2)
   488  
   489  	ssc.deletePod(logger, pod1)
   490  	key, done := ssc.queue.Get()
   491  	if key == nil || done {
   492  		t.Error("failed to enqueue StatefulSet")
   493  	} else if key, ok := key.(string); !ok {
   494  		t.Error("key is not a string")
   495  	} else if expectedKey, _ := controller.KeyFunc(set1); expectedKey != key {
   496  		t.Errorf("expected StatefulSet key %s found %s", expectedKey, key)
   497  	}
   498  
   499  	ssc.deletePod(logger, pod2)
   500  	key, done = ssc.queue.Get()
   501  	if key == nil || done {
   502  		t.Error("failed to enqueue StatefulSet")
   503  	} else if key, ok := key.(string); !ok {
   504  		t.Error("key is not a string")
   505  	} else if expectedKey, _ := controller.KeyFunc(set2); expectedKey != key {
   506  		t.Errorf("expected StatefulSet key %s found %s", expectedKey, key)
   507  	}
   508  }
   509  
   510  func TestStatefulSetControllerDeletePodOrphan(t *testing.T) {
   511  	logger, ctx := ktesting.NewTestContext(t)
   512  	ssc, _, om, _ := newFakeStatefulSetController(ctx)
   513  	set1 := newStatefulSet(3)
   514  	set2 := newStatefulSet(3)
   515  	set2.Name = "foo2"
   516  	pod1 := newStatefulSetPod(set1, 0)
   517  	om.setsIndexer.Add(set1)
   518  	om.setsIndexer.Add(set2)
   519  
   520  	pod1.OwnerReferences = nil
   521  	ssc.deletePod(logger, pod1)
   522  	if got, want := ssc.queue.Len(), 0; got != want {
   523  		t.Errorf("queue.Len() = %v, want %v", got, want)
   524  	}
   525  }
   526  
   527  func TestStatefulSetControllerDeletePodTombstone(t *testing.T) {
   528  	logger, ctx := ktesting.NewTestContext(t)
   529  	ssc, _, om, _ := newFakeStatefulSetController(ctx)
   530  	set := newStatefulSet(3)
   531  	pod := newStatefulSetPod(set, 0)
   532  	om.setsIndexer.Add(set)
   533  	tombstoneKey, _ := controller.KeyFunc(pod)
   534  	tombstone := cache.DeletedFinalStateUnknown{Key: tombstoneKey, Obj: pod}
   535  	ssc.deletePod(logger, tombstone)
   536  	key, done := ssc.queue.Get()
   537  	if key == nil || done {
   538  		t.Error("failed to enqueue StatefulSet")
   539  	} else if key, ok := key.(string); !ok {
   540  		t.Error("key is not a string")
   541  	} else if expectedKey, _ := controller.KeyFunc(set); expectedKey != key {
   542  		t.Errorf("expected StatefulSet key %s found %s", expectedKey, key)
   543  	}
   544  }
   545  
   546  func TestStatefulSetControllerGetStatefulSetsForPod(t *testing.T) {
   547  	_, ctx := ktesting.NewTestContext(t)
   548  	ssc, _, om, _ := newFakeStatefulSetController(ctx)
   549  	set1 := newStatefulSet(3)
   550  	set2 := newStatefulSet(3)
   551  	set2.Name = "foo2"
   552  	pod := newStatefulSetPod(set1, 0)
   553  	om.setsIndexer.Add(set1)
   554  	om.setsIndexer.Add(set2)
   555  	om.podsIndexer.Add(pod)
   556  	sets := ssc.getStatefulSetsForPod(pod)
   557  	if got, want := len(sets), 2; got != want {
   558  		t.Errorf("len(sets) = %v, want %v", got, want)
   559  	}
   560  }
   561  
   562  func TestGetPodsForStatefulSetAdopt(t *testing.T) {
   563  	set := newStatefulSet(5)
   564  	pod1 := newStatefulSetPod(set, 1)
   565  	// pod2 is an orphan with matching labels and name.
   566  	pod2 := newStatefulSetPod(set, 2)
   567  	pod2.OwnerReferences = nil
   568  	// pod3 has wrong labels.
   569  	pod3 := newStatefulSetPod(set, 3)
   570  	pod3.OwnerReferences = nil
   571  	pod3.Labels = nil
   572  	// pod4 has wrong name.
   573  	pod4 := newStatefulSetPod(set, 4)
   574  	pod4.OwnerReferences = nil
   575  	pod4.Name = "x" + pod4.Name
   576  
   577  	_, ctx := ktesting.NewTestContext(t)
   578  	ssc, _, om, _ := newFakeStatefulSetController(ctx, set, pod1, pod2, pod3, pod4)
   579  
   580  	om.podsIndexer.Add(pod1)
   581  	om.podsIndexer.Add(pod2)
   582  	om.podsIndexer.Add(pod3)
   583  	om.podsIndexer.Add(pod4)
   584  	selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
   585  	if err != nil {
   586  		t.Fatal(err)
   587  	}
   588  	pods, err := ssc.getPodsForStatefulSet(context.TODO(), set, selector)
   589  	if err != nil {
   590  		t.Fatalf("getPodsForStatefulSet() error: %v", err)
   591  	}
   592  	got := sets.NewString()
   593  	for _, pod := range pods {
   594  		got.Insert(pod.Name)
   595  	}
   596  	// pod2 should be claimed, pod3 and pod4 ignored
   597  	want := sets.NewString(pod1.Name, pod2.Name)
   598  	if !got.Equal(want) {
   599  		t.Errorf("getPodsForStatefulSet() = %v, want %v", got, want)
   600  	}
   601  }
   602  
   603  func TestAdoptOrphanRevisions(t *testing.T) {
   604  	ss1 := newStatefulSetWithLabels(3, "ss1", types.UID("ss1"), map[string]string{"foo": "bar"})
   605  	ss1.Status.CollisionCount = new(int32)
   606  	ss1Rev1, err := history.NewControllerRevision(ss1, parentKind, ss1.Spec.Template.Labels, rawTemplate(&ss1.Spec.Template), 1, ss1.Status.CollisionCount)
   607  	if err != nil {
   608  		t.Fatal(err)
   609  	}
   610  	ss1Rev1.Namespace = ss1.Namespace
   611  	ss1.Spec.Template.Annotations = make(map[string]string)
   612  	ss1.Spec.Template.Annotations["ss1"] = "ss1"
   613  	ss1Rev2, err := history.NewControllerRevision(ss1, parentKind, ss1.Spec.Template.Labels, rawTemplate(&ss1.Spec.Template), 2, ss1.Status.CollisionCount)
   614  	if err != nil {
   615  		t.Fatal(err)
   616  	}
   617  	ss1Rev2.Namespace = ss1.Namespace
   618  	ss1Rev2.OwnerReferences = []metav1.OwnerReference{}
   619  
   620  	_, ctx := ktesting.NewTestContext(t)
   621  	ssc, _, om, _ := newFakeStatefulSetController(ctx, ss1, ss1Rev1, ss1Rev2)
   622  
   623  	om.revisionsIndexer.Add(ss1Rev1)
   624  	om.revisionsIndexer.Add(ss1Rev2)
   625  
   626  	err = ssc.adoptOrphanRevisions(context.TODO(), ss1)
   627  	if err != nil {
   628  		t.Errorf("adoptOrphanRevisions() error: %v", err)
   629  	}
   630  
   631  	if revisions, err := ssc.control.ListRevisions(ss1); err != nil {
   632  		t.Errorf("ListRevisions() error: %v", err)
   633  	} else {
   634  		var adopted bool
   635  		for i := range revisions {
   636  			if revisions[i].Name == ss1Rev2.Name && metav1.GetControllerOf(revisions[i]) != nil {
   637  				adopted = true
   638  			}
   639  		}
   640  		if !adopted {
   641  			t.Error("adoptOrphanRevisions() not adopt orphan revisions")
   642  		}
   643  	}
   644  }
   645  
   646  func TestGetPodsForStatefulSetRelease(t *testing.T) {
   647  	_, ctx := ktesting.NewTestContext(t)
   648  	set := newStatefulSet(3)
   649  	ssc, _, om, _ := newFakeStatefulSetController(ctx, set)
   650  	pod1 := newStatefulSetPod(set, 1)
   651  	// pod2 is owned but has wrong name.
   652  	pod2 := newStatefulSetPod(set, 2)
   653  	pod2.Name = "x" + pod2.Name
   654  	// pod3 is owned but has wrong labels.
   655  	pod3 := newStatefulSetPod(set, 3)
   656  	pod3.Labels = nil
   657  	// pod4 is an orphan that doesn't match.
   658  	pod4 := newStatefulSetPod(set, 4)
   659  	pod4.OwnerReferences = nil
   660  	pod4.Labels = nil
   661  
   662  	om.podsIndexer.Add(pod1)
   663  	om.podsIndexer.Add(pod2)
   664  	om.podsIndexer.Add(pod3)
   665  	om.podsIndexer.Add(pod4)
   666  	selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
   667  	if err != nil {
   668  		t.Fatal(err)
   669  	}
   670  	pods, err := ssc.getPodsForStatefulSet(context.TODO(), set, selector)
   671  	if err != nil {
   672  		t.Fatalf("getPodsForStatefulSet() error: %v", err)
   673  	}
   674  	got := sets.NewString()
   675  	for _, pod := range pods {
   676  		got.Insert(pod.Name)
   677  	}
   678  
   679  	// Expect only pod1 (pod2 and pod3 should be released, pod4 ignored).
   680  	want := sets.NewString(pod1.Name)
   681  	if !got.Equal(want) {
   682  		t.Errorf("getPodsForStatefulSet() = %v, want %v", got, want)
   683  	}
   684  }
   685  
   686  func TestOrphanedPodsWithPVCDeletePolicy(t *testing.T) {
   687  	defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetAutoDeletePVC, true)()
   688  
   689  	testFn := func(t *testing.T, scaledownPolicy, deletionPolicy apps.PersistentVolumeClaimRetentionPolicyType) {
   690  		set := newStatefulSet(4)
   691  		*set.Spec.Replicas = 2
   692  		set.Spec.PersistentVolumeClaimRetentionPolicy.WhenScaled = scaledownPolicy
   693  		set.Spec.PersistentVolumeClaimRetentionPolicy.WhenDeleted = deletionPolicy
   694  		_, ctx := ktesting.NewTestContext(t)
   695  		ssc, _, om, _ := newFakeStatefulSetController(ctx, set)
   696  		om.setsIndexer.Add(set)
   697  
   698  		pods := []*v1.Pod{}
   699  		pods = append(pods, newStatefulSetPod(set, 0))
   700  		// pod1 is orphaned
   701  		pods = append(pods, newStatefulSetPod(set, 1))
   702  		pods[1].OwnerReferences = nil
   703  		// pod2 is owned but has wrong name.
   704  		pods = append(pods, newStatefulSetPod(set, 2))
   705  		pods[2].Name = "x" + pods[2].Name
   706  
   707  		ssc.kubeClient.(*fake.Clientset).PrependReactor("patch", "pods", func(action core.Action) (bool, runtime.Object, error) {
   708  			patch := action.(core.PatchAction).GetPatch()
   709  			target := action.(core.PatchAction).GetName()
   710  			var pod *v1.Pod
   711  			for _, p := range pods {
   712  				if p.Name == target {
   713  					pod = p
   714  					break
   715  				}
   716  			}
   717  			if pod == nil {
   718  				t.Fatalf("Can't find patch target %s", target)
   719  			}
   720  			original, err := json.Marshal(pod)
   721  			if err != nil {
   722  				t.Fatalf("failed to marshal original pod %s: %v", pod.Name, err)
   723  			}
   724  			updated, err := strategicpatch.StrategicMergePatch(original, patch, v1.Pod{})
   725  			if err != nil {
   726  				t.Fatalf("failed to apply strategic merge patch %q on node %s: %v", patch, pod.Name, err)
   727  			}
   728  			if err := json.Unmarshal(updated, pod); err != nil {
   729  				t.Fatalf("failed to unmarshal updated pod %s: %v", pod.Name, err)
   730  			}
   731  
   732  			return true, pod, nil
   733  		})
   734  
   735  		for _, pod := range pods {
   736  			om.podsIndexer.Add(pod)
   737  			claims := getPersistentVolumeClaims(set, pod)
   738  			for _, claim := range claims {
   739  				om.CreateClaim(&claim)
   740  			}
   741  		}
   742  
   743  		for i := range pods {
   744  			if _, err := om.setPodReady(set, i); err != nil {
   745  				t.Errorf("%d: %v", i, err)
   746  			}
   747  			if _, err := om.setPodRunning(set, i); err != nil {
   748  				t.Errorf("%d: %v", i, err)
   749  			}
   750  		}
   751  
   752  		// First sync to manage orphaned pod, then set replicas.
   753  		ssc.enqueueStatefulSet(set)
   754  		fakeWorker(ssc)
   755  		*set.Spec.Replicas = 0 // Put an ownerRef for all scale-down deleted PVCs.
   756  		ssc.enqueueStatefulSet(set)
   757  		fakeWorker(ssc)
   758  
   759  		hasNamedOwnerRef := func(claim *v1.PersistentVolumeClaim, name string) bool {
   760  			for _, ownerRef := range claim.GetOwnerReferences() {
   761  				if ownerRef.Name == name {
   762  					return true
   763  				}
   764  			}
   765  			return false
   766  		}
   767  		verifyOwnerRefs := func(claim *v1.PersistentVolumeClaim, condemned bool) {
   768  			podName := getClaimPodName(set, claim)
   769  			const retain = apps.RetainPersistentVolumeClaimRetentionPolicyType
   770  			const delete = apps.DeletePersistentVolumeClaimRetentionPolicyType
   771  			switch {
   772  			case scaledownPolicy == retain && deletionPolicy == retain:
   773  				if hasNamedOwnerRef(claim, podName) || hasNamedOwnerRef(claim, set.Name) {
   774  					t.Errorf("bad claim ownerRefs: %s: %v", claim.Name, claim.GetOwnerReferences())
   775  				}
   776  			case scaledownPolicy == retain && deletionPolicy == delete:
   777  				if hasNamedOwnerRef(claim, podName) || !hasNamedOwnerRef(claim, set.Name) {
   778  					t.Errorf("bad claim ownerRefs: %s: %v", claim.Name, claim.GetOwnerReferences())
   779  				}
   780  			case scaledownPolicy == delete && deletionPolicy == retain:
   781  				if hasNamedOwnerRef(claim, podName) != condemned || hasNamedOwnerRef(claim, set.Name) {
   782  					t.Errorf("bad claim ownerRefs: %s: %v", claim.Name, claim.GetOwnerReferences())
   783  				}
   784  			case scaledownPolicy == delete && deletionPolicy == delete:
   785  				if hasNamedOwnerRef(claim, podName) != condemned || !hasNamedOwnerRef(claim, set.Name) {
   786  					t.Errorf("bad claim ownerRefs: %s: %v", claim.Name, claim.GetOwnerReferences())
   787  				}
   788  			}
   789  		}
   790  
   791  		claims, _ := om.claimsLister.PersistentVolumeClaims(set.Namespace).List(labels.Everything())
   792  		if len(claims) != len(pods) {
   793  			t.Errorf("Unexpected number of claims: %d", len(claims))
   794  		}
   795  		for _, claim := range claims {
   796  			// Only the first pod and the reclaimed orphan pod should have owner refs.
   797  			switch claim.Name {
   798  			case "datadir-foo-0", "datadir-foo-1":
   799  				verifyOwnerRefs(claim, false)
   800  			case "datadir-foo-2":
   801  				if hasNamedOwnerRef(claim, getClaimPodName(set, claim)) || hasNamedOwnerRef(claim, set.Name) {
   802  					t.Errorf("unexpected ownerRefs for %s: %v", claim.Name, claim.GetOwnerReferences())
   803  				}
   804  			default:
   805  				t.Errorf("Unexpected claim %s", claim.Name)
   806  			}
   807  		}
   808  	}
   809  	policies := []apps.PersistentVolumeClaimRetentionPolicyType{
   810  		apps.RetainPersistentVolumeClaimRetentionPolicyType,
   811  		apps.DeletePersistentVolumeClaimRetentionPolicyType,
   812  	}
   813  	for _, scaledownPolicy := range policies {
   814  		for _, deletionPolicy := range policies {
   815  			testName := fmt.Sprintf("ScaleDown:%s/SetDeletion:%s", scaledownPolicy, deletionPolicy)
   816  			t.Run(testName, func(t *testing.T) { testFn(t, scaledownPolicy, deletionPolicy) })
   817  		}
   818  	}
   819  }
   820  
   821  func TestStaleOwnerRefOnScaleup(t *testing.T) {
   822  	defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetAutoDeletePVC, true)()
   823  
   824  	for _, policy := range []*apps.StatefulSetPersistentVolumeClaimRetentionPolicy{
   825  		{
   826  			WhenScaled:  apps.DeletePersistentVolumeClaimRetentionPolicyType,
   827  			WhenDeleted: apps.RetainPersistentVolumeClaimRetentionPolicyType,
   828  		},
   829  		{
   830  			WhenScaled:  apps.DeletePersistentVolumeClaimRetentionPolicyType,
   831  			WhenDeleted: apps.DeletePersistentVolumeClaimRetentionPolicyType,
   832  		},
   833  	} {
   834  		onPolicy := func(msg string, args ...interface{}) string {
   835  			return fmt.Sprintf(fmt.Sprintf("(%s) %s", policy, msg), args...)
   836  		}
   837  		set := newStatefulSet(3)
   838  		set.Spec.PersistentVolumeClaimRetentionPolicy = policy
   839  		logger, ctx := ktesting.NewTestContext(t)
   840  		ssc, spc, om, _ := newFakeStatefulSetController(ctx, set)
   841  		if err := scaleUpStatefulSetController(logger, set, ssc, spc, om); err != nil {
   842  			t.Errorf(onPolicy("Failed to turn up StatefulSet : %s", err))
   843  		}
   844  		var err error
   845  		if set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name); err != nil {
   846  			t.Errorf(onPolicy("Could not get scaled up set: %v", err))
   847  		}
   848  		if set.Status.Replicas != 3 {
   849  			t.Errorf(onPolicy("set.Status.Replicas = %v; want 3", set.Status.Replicas))
   850  		}
   851  		*set.Spec.Replicas = 2
   852  		if err := scaleDownStatefulSetController(logger, set, ssc, spc, om); err != nil {
   853  			t.Errorf(onPolicy("Failed to scale down StatefulSet : msg, %s", err))
   854  		}
   855  		set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
   856  		if err != nil {
   857  			t.Errorf(onPolicy("Could not get scaled down StatefulSet: %v", err))
   858  		}
   859  		if set.Status.Replicas != 2 {
   860  			t.Errorf(onPolicy("Failed to scale statefulset to 2 replicas"))
   861  		}
   862  
   863  		var claim *v1.PersistentVolumeClaim
   864  		claim, err = om.claimsLister.PersistentVolumeClaims(set.Namespace).Get("datadir-foo-2")
   865  		if err != nil {
   866  			t.Errorf(onPolicy("Could not find expected pvc datadir-foo-2"))
   867  		}
   868  		refs := claim.GetOwnerReferences()
   869  		if len(refs) != 1 {
   870  			t.Errorf(onPolicy("Expected only one refs: %v", refs))
   871  		}
   872  		// Make the pod ref stale.
   873  		for i := range refs {
   874  			if refs[i].Name == "foo-2" {
   875  				refs[i].UID = "stale"
   876  				break
   877  			}
   878  		}
   879  		claim.SetOwnerReferences(refs)
   880  		if err = om.claimsIndexer.Update(claim); err != nil {
   881  			t.Errorf(onPolicy("Could not update claim with new owner ref: %v", err))
   882  		}
   883  
   884  		*set.Spec.Replicas = 3
   885  		// Until the stale PVC goes away, the scale up should never finish. Run 10 iterations, then delete the PVC.
   886  		if err := scaleUpStatefulSetControllerBounded(logger, set, ssc, spc, om, 10); err != nil {
   887  			t.Errorf(onPolicy("Failed attempt to scale StatefulSet back up: %v", err))
   888  		}
   889  		set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
   890  		if err != nil {
   891  			t.Errorf(onPolicy("Could not get scaled down StatefulSet: %v", err))
   892  		}
   893  		if set.Status.Replicas != 2 {
   894  			t.Errorf(onPolicy("Expected set to stay at two replicas"))
   895  		}
   896  
   897  		claim, err = om.claimsLister.PersistentVolumeClaims(set.Namespace).Get("datadir-foo-2")
   898  		if err != nil {
   899  			t.Errorf(onPolicy("Could not find expected pvc datadir-foo-2"))
   900  		}
   901  		refs = claim.GetOwnerReferences()
   902  		if len(refs) != 1 {
   903  			t.Errorf(onPolicy("Unexpected change to condemned pvc ownerRefs: %v", refs))
   904  		}
   905  		foundPodRef := false
   906  		for i := range refs {
   907  			if refs[i].UID == "stale" {
   908  				foundPodRef = true
   909  				break
   910  			}
   911  		}
   912  		if !foundPodRef {
   913  			t.Errorf(onPolicy("Claim ref unexpectedly changed: %v", refs))
   914  		}
   915  		if err = om.claimsIndexer.Delete(claim); err != nil {
   916  			t.Errorf(onPolicy("Could not delete stale pvc: %v", err))
   917  		}
   918  
   919  		if err := scaleUpStatefulSetController(logger, set, ssc, spc, om); err != nil {
   920  			t.Errorf(onPolicy("Failed to scale StatefulSet back up: %v", err))
   921  		}
   922  		set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
   923  		if err != nil {
   924  			t.Errorf(onPolicy("Could not get scaled down StatefulSet: %v", err))
   925  		}
   926  		if set.Status.Replicas != 3 {
   927  			t.Errorf(onPolicy("Failed to scale set back up once PVC was deleted"))
   928  		}
   929  	}
   930  }
   931  
   932  func newFakeStatefulSetController(ctx context.Context, initialObjects ...runtime.Object) (*StatefulSetController, *StatefulPodControl, *fakeObjectManager, history.Interface) {
   933  	client := fake.NewSimpleClientset(initialObjects...)
   934  	informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
   935  	om := newFakeObjectManager(informerFactory)
   936  	spc := NewStatefulPodControlFromManager(om, &noopRecorder{})
   937  	ssu := newFakeStatefulSetStatusUpdater(informerFactory.Apps().V1().StatefulSets())
   938  	ssc := NewStatefulSetController(
   939  		ctx,
   940  		informerFactory.Core().V1().Pods(),
   941  		informerFactory.Apps().V1().StatefulSets(),
   942  		informerFactory.Core().V1().PersistentVolumeClaims(),
   943  		informerFactory.Apps().V1().ControllerRevisions(),
   944  		client,
   945  	)
   946  	ssh := history.NewFakeHistory(informerFactory.Apps().V1().ControllerRevisions())
   947  	ssc.podListerSynced = alwaysReady
   948  	ssc.setListerSynced = alwaysReady
   949  	recorder := record.NewFakeRecorder(10)
   950  	ssc.control = NewDefaultStatefulSetControl(spc, ssu, ssh, recorder)
   951  
   952  	return ssc, spc, om, ssh
   953  }
   954  
   955  func fakeWorker(ssc *StatefulSetController) {
   956  	if obj, done := ssc.queue.Get(); !done {
   957  		ssc.sync(context.TODO(), obj.(string))
   958  		ssc.queue.Done(obj)
   959  	}
   960  }
   961  
   962  func getPodAtOrdinal(pods []*v1.Pod, ordinal int) *v1.Pod {
   963  	if 0 > ordinal || ordinal >= len(pods) {
   964  		return nil
   965  	}
   966  	sort.Sort(ascendingOrdinal(pods))
   967  	return pods[ordinal]
   968  }
   969  
   970  func scaleUpStatefulSetController(logger klog.Logger, set *apps.StatefulSet, ssc *StatefulSetController, spc *StatefulPodControl, om *fakeObjectManager) error {
   971  	return scaleUpStatefulSetControllerBounded(logger, set, ssc, spc, om, -1)
   972  }
   973  
   974  func scaleUpStatefulSetControllerBounded(logger klog.Logger, set *apps.StatefulSet, ssc *StatefulSetController, spc *StatefulPodControl, om *fakeObjectManager, maxIterations int) error {
   975  	om.setsIndexer.Add(set)
   976  	ssc.enqueueStatefulSet(set)
   977  	fakeWorker(ssc)
   978  	selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
   979  	if err != nil {
   980  		return err
   981  	}
   982  	iterations := 0
   983  	for (maxIterations < 0 || iterations < maxIterations) && set.Status.ReadyReplicas < *set.Spec.Replicas {
   984  		iterations++
   985  		pods, err := om.podsLister.Pods(set.Namespace).List(selector)
   986  		if err != nil {
   987  			return err
   988  		}
   989  		ord := len(pods) - 1
   990  		if pods, err = om.setPodPending(set, ord); err != nil {
   991  			return err
   992  		}
   993  		pod := getPodAtOrdinal(pods, ord)
   994  		ssc.addPod(logger, pod)
   995  		fakeWorker(ssc)
   996  		pod = getPodAtOrdinal(pods, ord)
   997  		prev := *pod
   998  		if pods, err = om.setPodRunning(set, ord); err != nil {
   999  			return err
  1000  		}
  1001  		pod = getPodAtOrdinal(pods, ord)
  1002  		ssc.updatePod(logger, &prev, pod)
  1003  		fakeWorker(ssc)
  1004  		pod = getPodAtOrdinal(pods, ord)
  1005  		prev = *pod
  1006  		if pods, err = om.setPodReady(set, ord); err != nil {
  1007  			return err
  1008  		}
  1009  		pod = getPodAtOrdinal(pods, ord)
  1010  		ssc.updatePod(logger, &prev, pod)
  1011  		fakeWorker(ssc)
  1012  		if err := assertMonotonicInvariants(set, om); err != nil {
  1013  			return err
  1014  		}
  1015  		obj, _, err := om.setsIndexer.Get(set)
  1016  		if err != nil {
  1017  			return err
  1018  		}
  1019  		set = obj.(*apps.StatefulSet)
  1020  
  1021  	}
  1022  	return assertMonotonicInvariants(set, om)
  1023  }
  1024  
  1025  func scaleDownStatefulSetController(logger klog.Logger, set *apps.StatefulSet, ssc *StatefulSetController, spc *StatefulPodControl, om *fakeObjectManager) error {
  1026  	selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
  1027  	if err != nil {
  1028  		return err
  1029  	}
  1030  	pods, err := om.podsLister.Pods(set.Namespace).List(selector)
  1031  	if err != nil {
  1032  		return err
  1033  	}
  1034  	ord := len(pods) - 1
  1035  	pod := getPodAtOrdinal(pods, ord)
  1036  	prev := *pod
  1037  	fakeResourceVersion(set)
  1038  	om.setsIndexer.Add(set)
  1039  	ssc.enqueueStatefulSet(set)
  1040  	fakeWorker(ssc)
  1041  	pods, err = om.addTerminatingPod(set, ord)
  1042  	if err != nil {
  1043  		return err
  1044  	}
  1045  	pod = getPodAtOrdinal(pods, ord)
  1046  	ssc.updatePod(logger, &prev, pod)
  1047  	fakeWorker(ssc)
  1048  	spc.DeleteStatefulPod(set, pod)
  1049  	ssc.deletePod(logger, pod)
  1050  	fakeWorker(ssc)
  1051  	for set.Status.Replicas > *set.Spec.Replicas {
  1052  		pods, err = om.podsLister.Pods(set.Namespace).List(selector)
  1053  		if err != nil {
  1054  			return err
  1055  		}
  1056  
  1057  		ord := len(pods)
  1058  		pods, err = om.addTerminatingPod(set, ord)
  1059  		if err != nil {
  1060  			return err
  1061  		}
  1062  		pod = getPodAtOrdinal(pods, ord)
  1063  		ssc.updatePod(logger, &prev, pod)
  1064  		fakeWorker(ssc)
  1065  		spc.DeleteStatefulPod(set, pod)
  1066  		ssc.deletePod(logger, pod)
  1067  		fakeWorker(ssc)
  1068  		obj, _, err := om.setsIndexer.Get(set)
  1069  		if err != nil {
  1070  			return err
  1071  		}
  1072  		set = obj.(*apps.StatefulSet)
  1073  
  1074  	}
  1075  	return assertMonotonicInvariants(set, om)
  1076  }
  1077  
  1078  func rawTemplate(template *v1.PodTemplateSpec) runtime.RawExtension {
  1079  	buf := new(bytes.Buffer)
  1080  	enc := json.NewEncoder(buf)
  1081  	if err := enc.Encode(template); err != nil {
  1082  		panic(err)
  1083  	}
  1084  	return runtime.RawExtension{Raw: buf.Bytes()}
  1085  }
  1086  

View as plain text