/* Copyright 2018 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package statefulset import ( "context" "fmt" "testing" "time" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/json" "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/dynamic" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" featuregatetesting "k8s.io/component-base/featuregate/testing" apiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/controller/statefulset" "k8s.io/kubernetes/pkg/controlplane" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/test/integration/framework" "k8s.io/kubernetes/test/utils/ktesting" "k8s.io/utils/ptr" ) const ( interval = 100 * time.Millisecond timeout = 60 * time.Second ) // TestVolumeTemplateNoopUpdate ensures embedded StatefulSet objects with embedded PersistentVolumes can be updated func TestVolumeTemplateNoopUpdate(t *testing.T) { // Start the server with default storage setup server := apiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd()) defer server.TearDownFn() c, err := dynamic.NewForConfig(server.ClientConfig) if err != nil { t.Fatal(err) } // Use an unstructured client to ensure we send exactly the bytes we expect for the embedded PVC template sts := &unstructured.Unstructured{} err = json.Unmarshal([]byte(`{ "apiVersion": "apps/v1", "kind": "StatefulSet", "metadata": {"name": "web"}, "spec": { "selector": {"matchLabels": {"app": "nginx"}}, "serviceName": "nginx", "replicas": 3, "template": { "metadata": {"labels": {"app": "nginx"}}, "spec": { "terminationGracePeriodSeconds": 10, "containers": [{ "name": "nginx", "image": "registry.k8s.io/nginx-slim:0.8", "ports": [{"containerPort": 80,"name": "web"}], "volumeMounts": [{"name": "www","mountPath": "/usr/share/nginx/html"}] }] } }, "volumeClaimTemplates": [{ "apiVersion": "v1", "kind": "PersistentVolumeClaim", "metadata": {"name": "www"}, "spec": { "accessModes": ["ReadWriteOnce"], "storageClassName": "my-storage-class", "resources": {"requests": {"storage": "1Gi"}} } } ] } }`), &sts.Object) if err != nil { t.Fatal(err) } stsClient := c.Resource(appsv1.SchemeGroupVersion.WithResource("statefulsets")).Namespace("default") // Create the statefulset persistedSTS, err := stsClient.Create(context.TODO(), sts, metav1.CreateOptions{}) if err != nil { t.Fatal(err) } // Update with the original spec (all the same defaulting should apply, should be a no-op and pass validation originalSpec, ok, err := unstructured.NestedFieldCopy(sts.Object, "spec") if err != nil || !ok { t.Fatal(err, ok) } err = unstructured.SetNestedField(persistedSTS.Object, originalSpec, "spec") if err != nil { t.Fatal(err) } _, err = stsClient.Update(context.TODO(), persistedSTS, metav1.UpdateOptions{}) if err != nil { t.Fatal(err) } } func TestSpecReplicasChange(t *testing.T) { tCtx, closeFn, rm, informers, c := scSetup(t) defer closeFn() ns := framework.CreateNamespaceOrDie(c, "test-spec-replicas-change", t) defer framework.DeleteNamespaceOrDie(c, ns, t) cancel := runControllerAndInformers(tCtx, rm, informers) defer cancel() createHeadlessService(t, c, newHeadlessService(ns.Name)) sts := newSTS("sts", ns.Name, 2) stss, _ := createSTSsPods(t, c, []*appsv1.StatefulSet{sts}, []*v1.Pod{}) sts = stss[0] waitSTSStable(t, c, sts) // Update .Spec.Replicas and verify .Status.Replicas is changed accordingly scaleSTS(t, c, sts, 3) scaleSTS(t, c, sts, 0) scaleSTS(t, c, sts, 2) // Add a template annotation change to test STS's status does update // without .Spec.Replicas change stsClient := c.AppsV1().StatefulSets(ns.Name) var oldGeneration int64 newSTS := updateSTS(t, stsClient, sts.Name, func(sts *appsv1.StatefulSet) { oldGeneration = sts.Generation sts.Spec.Template.Annotations = map[string]string{"test": "annotation"} }) savedGeneration := newSTS.Generation if savedGeneration == oldGeneration { t.Fatalf("failed to verify .Generation has incremented for sts %s", sts.Name) } if err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) { newSTS, err := stsClient.Get(context.TODO(), sts.Name, metav1.GetOptions{}) if err != nil { return false, err } return newSTS.Status.ObservedGeneration >= savedGeneration, nil }); err != nil { t.Fatalf("failed to verify .Status.ObservedGeneration has incremented for sts %s: %v", sts.Name, err) } } func TestDeletingAndTerminatingPods(t *testing.T) { tCtx, closeFn, rm, informers, c := scSetup(t) defer closeFn() ns := framework.CreateNamespaceOrDie(c, "test-deleting-and-failed-pods", t) defer framework.DeleteNamespaceOrDie(c, ns, t) cancel := runControllerAndInformers(tCtx, rm, informers) defer cancel() podCount := 3 labelMap := labelMap() sts := newSTS("sts", ns.Name, podCount) stss, _ := createSTSsPods(t, c, []*appsv1.StatefulSet{sts}, []*v1.Pod{}) sts = stss[0] waitSTSStable(t, c, sts) // Verify STS creates 3 pods podClient := c.CoreV1().Pods(ns.Name) pods := getPods(t, podClient, labelMap) if len(pods.Items) != podCount { t.Fatalf("len(pods) = %d, want %d", len(pods.Items), podCount) } // Set first pod as deleting pod // Set finalizers for the pod to simulate pending deletion status deletingPod := &pods.Items[0] updatePod(t, podClient, deletingPod.Name, func(pod *v1.Pod) { pod.Finalizers = []string{"fake.example.com/blockDeletion"} }) if err := c.CoreV1().Pods(ns.Name).Delete(context.TODO(), deletingPod.Name, metav1.DeleteOptions{}); err != nil { t.Fatalf("error deleting pod %s: %v", deletingPod.Name, err) } // Set second pod as failed pod failedPod := &pods.Items[1] updatePodStatus(t, podClient, failedPod.Name, func(pod *v1.Pod) { pod.Status.Phase = v1.PodFailed }) // Set third pod as succeeded pod succeededPod := &pods.Items[2] updatePodStatus(t, podClient, succeededPod.Name, func(pod *v1.Pod) { pod.Status.Phase = v1.PodSucceeded }) exists := func(pods []v1.Pod, uid types.UID) bool { for _, pod := range pods { if pod.UID == uid { return true } } return false } if err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) { // Verify only 3 pods exist: deleting pod and new pod replacing failed pod pods = getPods(t, podClient, labelMap) if len(pods.Items) != podCount { return false, nil } // Verify deleting pod still exists // Immediately return false with an error if it does not exist if !exists(pods.Items, deletingPod.UID) { return false, fmt.Errorf("expected deleting pod %s still exists, but it is not found", deletingPod.Name) } // Verify failed pod does not exist anymore if exists(pods.Items, failedPod.UID) { return false, nil } // Verify succeeded pod does not exist anymore if exists(pods.Items, succeededPod.UID) { return false, nil } // Verify all pods have non-terminated status for _, pod := range pods.Items { if pod.Status.Phase == v1.PodFailed || pod.Status.Phase == v1.PodSucceeded { return false, nil } } return true, nil }); err != nil { t.Fatalf("failed to verify failed pod %s has been replaced with a new non-failed pod, and deleting pod %s survives: %v", failedPod.Name, deletingPod.Name, err) } // Remove finalizers of deleting pod to simulate successful deletion updatePod(t, podClient, deletingPod.Name, func(pod *v1.Pod) { pod.Finalizers = []string{} }) if err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) { // Verify only 2 pods exist: new non-deleting pod replacing deleting pod and the non-failed pod pods = getPods(t, podClient, labelMap) if len(pods.Items) != podCount { return false, nil } // Verify deleting pod does not exist anymore return !exists(pods.Items, deletingPod.UID), nil }); err != nil { t.Fatalf("failed to verify deleting pod %s has been replaced with a new non-deleting pod: %v", deletingPod.Name, err) } } func TestStatefulSetAvailable(t *testing.T) { tests := []struct { name string totalReplicas int32 readyReplicas int32 activeReplicas int32 }{ { name: "only certain replicas would become active", totalReplicas: 4, readyReplicas: 3, activeReplicas: 2, }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { tCtx, closeFn, rm, informers, c := scSetup(t) defer closeFn() ns := framework.CreateNamespaceOrDie(c, "test-available-pods", t) defer framework.DeleteNamespaceOrDie(c, ns, t) cancel := runControllerAndInformers(tCtx, rm, informers) defer cancel() labelMap := labelMap() sts := newSTS("sts", ns.Name, 4) sts.Spec.MinReadySeconds = int32(3600) stss, _ := createSTSsPods(t, c, []*appsv1.StatefulSet{sts}, []*v1.Pod{}) sts = stss[0] waitSTSStable(t, c, sts) // Verify STS creates 4 pods podClient := c.CoreV1().Pods(ns.Name) pods := getPods(t, podClient, labelMap) if len(pods.Items) != 4 { t.Fatalf("len(pods) = %d, want 4", len(pods.Items)) } // Separate 3 pods into their own list firstPodList := &v1.PodList{Items: pods.Items[:1]} secondPodList := &v1.PodList{Items: pods.Items[1:2]} thirdPodList := &v1.PodList{Items: pods.Items[2:]} // First pod: Running, but not Ready // by setting the Ready condition to false with LastTransitionTime to be now setPodsReadyCondition(t, c, firstPodList, v1.ConditionFalse, time.Now()) // Second pod: Running and Ready, but not Available // by setting LastTransitionTime to now setPodsReadyCondition(t, c, secondPodList, v1.ConditionTrue, time.Now()) // Third pod: Running, Ready, and Available // by setting LastTransitionTime to more than 3600 seconds ago setPodsReadyCondition(t, c, thirdPodList, v1.ConditionTrue, time.Now().Add(-120*time.Minute)) stsClient := c.AppsV1().StatefulSets(ns.Name) if err := wait.PollImmediate(interval, timeout, func() (bool, error) { newSts, err := stsClient.Get(context.TODO(), sts.Name, metav1.GetOptions{}) if err != nil { return false, err } // Verify 4 pods exist, 3 pods are Ready, and 2 pods are Available return newSts.Status.Replicas == test.totalReplicas && newSts.Status.ReadyReplicas == test.readyReplicas && newSts.Status.AvailableReplicas == test.activeReplicas, nil }); err != nil { t.Fatalf("Failed to verify number of Replicas, ReadyReplicas and AvailableReplicas of rs %s to be as expected: %v", sts.Name, err) } }) } } func setPodsReadyCondition(t *testing.T, clientSet clientset.Interface, pods *v1.PodList, conditionStatus v1.ConditionStatus, lastTransitionTime time.Time) { replicas := int32(len(pods.Items)) var readyPods int32 err := wait.PollImmediate(interval, timeout, func() (bool, error) { readyPods = 0 for i := range pods.Items { pod := &pods.Items[i] if podutil.IsPodReady(pod) { readyPods++ continue } pod.Status.Phase = v1.PodRunning _, condition := podutil.GetPodCondition(&pod.Status, v1.PodReady) if condition != nil { condition.Status = conditionStatus condition.LastTransitionTime = metav1.Time{Time: lastTransitionTime} } else { condition = &v1.PodCondition{ Type: v1.PodReady, Status: conditionStatus, LastTransitionTime: metav1.Time{Time: lastTransitionTime}, } pod.Status.Conditions = append(pod.Status.Conditions, *condition) } _, err := clientSet.CoreV1().Pods(pod.Namespace).UpdateStatus(context.TODO(), pod, metav1.UpdateOptions{}) if err != nil { // When status fails to be updated, we continue to next pod continue } readyPods++ } return readyPods >= replicas, nil }) if err != nil { t.Fatalf("failed to mark all StatefulSet pods to ready: %v", err) } } // add for issue: https://github.com/kubernetes/kubernetes/issues/108837 func TestStatefulSetStatusWithPodFail(t *testing.T) { tCtx := ktesting.Init(t) limitedPodNumber := 2 c, config, closeFn := framework.StartTestServer(tCtx, t, framework.TestServerSetup{ ModifyServerConfig: func(config *controlplane.Config) { config.GenericConfig.AdmissionControl = &fakePodFailAdmission{ limitedPodNumber: limitedPodNumber, } }, }) defer closeFn() defer tCtx.Cancel("test has completed") resyncPeriod := 12 * time.Hour informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(config, "statefulset-informers")), resyncPeriod) ssc := statefulset.NewStatefulSetController( tCtx, informers.Core().V1().Pods(), informers.Apps().V1().StatefulSets(), informers.Core().V1().PersistentVolumeClaims(), informers.Apps().V1().ControllerRevisions(), clientset.NewForConfigOrDie(restclient.AddUserAgent(config, "statefulset-controller")), ) ns := framework.CreateNamespaceOrDie(c, "test-pod-fail", t) defer framework.DeleteNamespaceOrDie(c, ns, t) informers.Start(tCtx.Done()) go ssc.Run(tCtx, 5) sts := newSTS("sts", ns.Name, 4) _, err := c.AppsV1().StatefulSets(sts.Namespace).Create(tCtx, sts, metav1.CreateOptions{}) if err != nil { t.Fatalf("Could not create statefulSet %s: %v", sts.Name, err) } wantReplicas := limitedPodNumber var gotReplicas int32 if err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) { newSTS, err := c.AppsV1().StatefulSets(sts.Namespace).Get(tCtx, sts.Name, metav1.GetOptions{}) if err != nil { return false, err } gotReplicas = newSTS.Status.Replicas return gotReplicas == int32(wantReplicas), nil }); err != nil { t.Fatalf("StatefulSet %s status has %d replicas, want replicas %d: %v", sts.Name, gotReplicas, wantReplicas, err) } } func TestAutodeleteOwnerRefs(t *testing.T) { tests := []struct { namespace string policy appsv1.StatefulSetPersistentVolumeClaimRetentionPolicy expectPodOwnerRef bool expectSetOwnerRef bool }{ { namespace: "always-retain", policy: appsv1.StatefulSetPersistentVolumeClaimRetentionPolicy{ WhenDeleted: appsv1.RetainPersistentVolumeClaimRetentionPolicyType, WhenScaled: appsv1.RetainPersistentVolumeClaimRetentionPolicyType, }, expectPodOwnerRef: false, expectSetOwnerRef: false, }, { namespace: "delete-on-scaledown-only", policy: appsv1.StatefulSetPersistentVolumeClaimRetentionPolicy{ WhenDeleted: appsv1.RetainPersistentVolumeClaimRetentionPolicyType, WhenScaled: appsv1.DeletePersistentVolumeClaimRetentionPolicyType, }, expectPodOwnerRef: true, expectSetOwnerRef: false, }, { namespace: "delete-with-set-only", policy: appsv1.StatefulSetPersistentVolumeClaimRetentionPolicy{ WhenDeleted: appsv1.DeletePersistentVolumeClaimRetentionPolicyType, WhenScaled: appsv1.RetainPersistentVolumeClaimRetentionPolicyType, }, expectPodOwnerRef: false, expectSetOwnerRef: true, }, { namespace: "always-delete", policy: appsv1.StatefulSetPersistentVolumeClaimRetentionPolicy{ WhenDeleted: appsv1.DeletePersistentVolumeClaimRetentionPolicyType, WhenScaled: appsv1.DeletePersistentVolumeClaimRetentionPolicyType, }, expectPodOwnerRef: true, expectSetOwnerRef: true, }, } defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetAutoDeletePVC, true)() tCtx, closeFn, rm, informers, c := scSetup(t) defer closeFn() cancel := runControllerAndInformers(tCtx, rm, informers) defer cancel() for _, test := range tests { t.Run(test.namespace, func(t *testing.T) { ns := framework.CreateNamespaceOrDie(c, test.namespace, t) defer framework.DeleteNamespaceOrDie(c, ns, t) sts := newSTS("sts", ns.Name, 3) sts.Spec.PersistentVolumeClaimRetentionPolicy = &test.policy stss, _ := createSTSsPods(t, c, []*appsv1.StatefulSet{sts}, []*v1.Pod{}) sts = stss[0] waitSTSStable(t, c, sts) // Verify StatefulSet ownerref has been added as appropriate. pvcClient := c.CoreV1().PersistentVolumeClaims(ns.Name) pvcs := getStatefulSetPVCs(t, pvcClient, sts) for _, pvc := range pvcs { verifyOwnerRef(t, pvc, "StatefulSet", test.expectSetOwnerRef) verifyOwnerRef(t, pvc, "Pod", false) } // Scale down to 1 pod and verify Pod ownerrefs as appropriate. one := int32(1) sts.Spec.Replicas = &one waitSTSStable(t, c, sts) pvcs = getStatefulSetPVCs(t, pvcClient, sts) for i, pvc := range pvcs { verifyOwnerRef(t, pvc, "StatefulSet", test.expectSetOwnerRef) if i == 0 { verifyOwnerRef(t, pvc, "Pod", false) } else { verifyOwnerRef(t, pvc, "Pod", test.expectPodOwnerRef) } } }) } } func TestDeletingPodForRollingUpdatePartition(t *testing.T) { tCtx, closeFn, rm, informers, c := scSetup(t) defer closeFn() ns := framework.CreateNamespaceOrDie(c, "test-deleting-pod-for-rolling-update-partition", t) defer framework.DeleteNamespaceOrDie(c, ns, t) cancel := runControllerAndInformers(tCtx, rm, informers) defer cancel() labelMap := labelMap() sts := newSTS("sts", ns.Name, 2) sts.Spec.UpdateStrategy = appsv1.StatefulSetUpdateStrategy{ Type: appsv1.RollingUpdateStatefulSetStrategyType, RollingUpdate: func() *appsv1.RollingUpdateStatefulSetStrategy { return &appsv1.RollingUpdateStatefulSetStrategy{ Partition: ptr.To[int32](1), } }(), } stss, _ := createSTSsPods(t, c, []*appsv1.StatefulSet{sts}, []*v1.Pod{}) sts = stss[0] waitSTSStable(t, c, sts) // Verify STS creates 2 pods podClient := c.CoreV1().Pods(ns.Name) pods := getPods(t, podClient, labelMap) if len(pods.Items) != 2 { t.Fatalf("len(pods) = %d, want 2", len(pods.Items)) } // Setting all pods in Running, Ready, and Available setPodsReadyCondition(t, c, &v1.PodList{Items: pods.Items}, v1.ConditionTrue, time.Now()) // 1. Roll out a new image. oldImage := sts.Spec.Template.Spec.Containers[0].Image newImage := "new-image" if oldImage == newImage { t.Fatalf("bad test setup, statefulSet %s roll out with the same image", sts.Name) } // Set finalizers for the pod-0 to trigger pod recreation failure while the status UpdateRevision is bumped pod0 := &pods.Items[0] updatePod(t, podClient, pod0.Name, func(pod *v1.Pod) { pod.Finalizers = []string{"fake.example.com/blockDeletion"} }) stsClient := c.AppsV1().StatefulSets(ns.Name) _ = updateSTS(t, stsClient, sts.Name, func(sts *appsv1.StatefulSet) { sts.Spec.Template.Spec.Containers[0].Image = newImage }) // Await for the pod-1 to be recreated, while pod-0 remains running if err := wait.PollUntilContextTimeout(tCtx, pollInterval, pollTimeout, false, func(ctx context.Context) (bool, error) { ss, err := stsClient.Get(ctx, sts.Name, metav1.GetOptions{}) if err != nil { return false, err } pods := getPods(t, podClient, labelMap) recreatedPods := v1.PodList{} for _, pod := range pods.Items { if pod.Status.Phase == v1.PodPending { recreatedPods.Items = append(recreatedPods.Items, pod) } } setPodsReadyCondition(t, c, &v1.PodList{Items: recreatedPods.Items}, v1.ConditionTrue, time.Now()) return ss.Status.UpdatedReplicas == *ss.Spec.Replicas-*sts.Spec.UpdateStrategy.RollingUpdate.Partition && ss.Status.Replicas == *ss.Spec.Replicas && ss.Status.ReadyReplicas == *ss.Spec.Replicas, nil }); err != nil { t.Fatalf("failed to await for pod-1 to be recreated by sts %s: %v", sts.Name, err) } // Mark pod-0 as terminal and not ready updatePodStatus(t, podClient, pod0.Name, func(pod *v1.Pod) { pod.Status.Phase = v1.PodFailed }) // Make sure pod-0 gets deletion timestamp so that it is recreated if err := c.CoreV1().Pods(ns.Name).Delete(context.TODO(), pod0.Name, metav1.DeleteOptions{}); err != nil { t.Fatalf("error deleting pod %s: %v", pod0.Name, err) } // Await for pod-0 to be not ready if err := wait.PollUntilContextTimeout(tCtx, pollInterval, pollTimeout, false, func(ctx context.Context) (bool, error) { ss, err := stsClient.Get(ctx, sts.Name, metav1.GetOptions{}) if err != nil { return false, err } return ss.Status.ReadyReplicas == *ss.Spec.Replicas-1, nil }); err != nil { t.Fatalf("failed to await for pod-0 to be not counted as ready in status of sts %s: %v", sts.Name, err) } // Remove the finalizer to allow recreation updatePod(t, podClient, pod0.Name, func(pod *v1.Pod) { pod.Finalizers = []string{} }) // Await for pod-0 to be recreated and make it running if err := wait.PollUntilContextTimeout(tCtx, pollInterval, pollTimeout, false, func(ctx context.Context) (bool, error) { pods := getPods(t, podClient, labelMap) recreatedPods := v1.PodList{} for _, pod := range pods.Items { if pod.Status.Phase == v1.PodPending { recreatedPods.Items = append(recreatedPods.Items, pod) } } setPodsReadyCondition(t, c, &v1.PodList{Items: recreatedPods.Items}, v1.ConditionTrue, time.Now().Add(-120*time.Minute)) return len(recreatedPods.Items) > 0, nil }); err != nil { t.Fatalf("failed to await for pod-0 to be recreated by sts %s: %v", sts.Name, err) } // Await for all stateful set status to record all replicas as ready if err := wait.PollUntilContextTimeout(tCtx, pollInterval, pollTimeout, false, func(ctx context.Context) (bool, error) { ss, err := stsClient.Get(ctx, sts.Name, metav1.GetOptions{}) if err != nil { return false, err } return ss.Status.ReadyReplicas == *ss.Spec.Replicas, nil }); err != nil { t.Fatalf("failed to verify .Spec.Template.Spec.Containers[0].Image is updated for sts %s: %v", sts.Name, err) } // Verify 3 pods exist pods = getPods(t, podClient, labelMap) if len(pods.Items) != int(*sts.Spec.Replicas) { t.Fatalf("Unexpected number of pods") } // Verify pod images for i := range pods.Items { if i < int(*sts.Spec.UpdateStrategy.RollingUpdate.Partition) { if pods.Items[i].Spec.Containers[0].Image != oldImage { t.Fatalf("Pod %s has image %s not equal to old image %s", pods.Items[i].Name, pods.Items[i].Spec.Containers[0].Image, oldImage) } } else { if pods.Items[i].Spec.Containers[0].Image != newImage { t.Fatalf("Pod %s has image %s not equal to new image %s", pods.Items[i].Name, pods.Items[i].Spec.Containers[0].Image, newImage) } } } } func TestStatefulSetStartOrdinal(t *testing.T) { tests := []struct { ordinals *appsv1.StatefulSetOrdinals name string namespace string replicas int expectedPodNames []string }{ { name: "default start ordinal, no ordinals set", namespace: "no-ordinals", replicas: 3, expectedPodNames: []string{"sts-0", "sts-1", "sts-2"}, }, { name: "default start ordinal", namespace: "no-start-ordinals", ordinals: &appsv1.StatefulSetOrdinals{}, replicas: 3, expectedPodNames: []string{"sts-0", "sts-1", "sts-2"}, }, { name: "start ordinal 4", namespace: "start-ordinal-4", ordinals: &appsv1.StatefulSetOrdinals{ Start: 4, }, replicas: 4, expectedPodNames: []string{"sts-4", "sts-5", "sts-6", "sts-7"}, }, { name: "start ordinal 5", namespace: "start-ordinal-5", ordinals: &appsv1.StatefulSetOrdinals{ Start: 2, }, replicas: 7, expectedPodNames: []string{"sts-2", "sts-3", "sts-4", "sts-5", "sts-6", "sts-7", "sts-8"}, }, } defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetStartOrdinal, true)() tCtx, closeFn, rm, informers, c := scSetup(t) defer closeFn() cancel := runControllerAndInformers(tCtx, rm, informers) defer cancel() for _, test := range tests { t.Run(test.name, func(t *testing.T) { ns := framework.CreateNamespaceOrDie(c, test.namespace, t) defer framework.DeleteNamespaceOrDie(c, ns, t) // Label map is the map of pod labels used in newSTS() labelMap := labelMap() sts := newSTS("sts", ns.Name, test.replicas) sts.Spec.Ordinals = test.ordinals stss := createSTSs(t, c, []*appsv1.StatefulSet{sts}) sts = stss[0] waitSTSStable(t, c, sts) podClient := c.CoreV1().Pods(ns.Name) pods := getPods(t, podClient, labelMap) if len(pods.Items) != test.replicas { t.Errorf("len(pods) = %v, want %v", len(pods.Items), test.replicas) } var podNames []string for _, pod := range pods.Items { podNames = append(podNames, pod.Name) } ignoreOrder := cmpopts.SortSlices(func(a, b string) bool { return a < b }) // Validate all the expected pods were created. if diff := cmp.Diff(test.expectedPodNames, podNames, ignoreOrder); diff != "" { t.Errorf("Unexpected pod names: (-want +got): %v", diff) } // Scale down to 1 pod and verify it matches the first pod. scaleSTS(t, c, sts, 1) waitSTSStable(t, c, sts) pods = getPods(t, podClient, labelMap) if len(pods.Items) != 1 { t.Errorf("len(pods) = %v, want %v", len(pods.Items), 1) } if pods.Items[0].Name != test.expectedPodNames[0] { t.Errorf("Unexpected singleton pod name: got = %v, want %v", pods.Items[0].Name, test.expectedPodNames[0]) } }) } }