...

Source file src/k8s.io/kubernetes/test/integration/statefulset/statefulset_test.go

Documentation: k8s.io/kubernetes/test/integration/statefulset

     1  /*
     2  Copyright 2018 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package statefulset
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"testing"
    23  	"time"
    24  
    25  	"github.com/google/go-cmp/cmp"
    26  	"github.com/google/go-cmp/cmp/cmpopts"
    27  	appsv1 "k8s.io/api/apps/v1"
    28  	v1 "k8s.io/api/core/v1"
    29  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    30  	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    31  	"k8s.io/apimachinery/pkg/types"
    32  	"k8s.io/apimachinery/pkg/util/json"
    33  	"k8s.io/apimachinery/pkg/util/wait"
    34  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    35  	"k8s.io/client-go/dynamic"
    36  	"k8s.io/client-go/informers"
    37  	clientset "k8s.io/client-go/kubernetes"
    38  	restclient "k8s.io/client-go/rest"
    39  	featuregatetesting "k8s.io/component-base/featuregate/testing"
    40  	apiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
    41  	podutil "k8s.io/kubernetes/pkg/api/v1/pod"
    42  	"k8s.io/kubernetes/pkg/controller/statefulset"
    43  	"k8s.io/kubernetes/pkg/controlplane"
    44  	"k8s.io/kubernetes/pkg/features"
    45  	"k8s.io/kubernetes/test/integration/framework"
    46  	"k8s.io/kubernetes/test/utils/ktesting"
    47  	"k8s.io/utils/ptr"
    48  )
    49  
    50  const (
    51  	interval = 100 * time.Millisecond
    52  	timeout  = 60 * time.Second
    53  )
    54  
    55  // TestVolumeTemplateNoopUpdate ensures embedded StatefulSet objects with embedded PersistentVolumes can be updated
    56  func TestVolumeTemplateNoopUpdate(t *testing.T) {
    57  	// Start the server with default storage setup
    58  	server := apiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
    59  	defer server.TearDownFn()
    60  
    61  	c, err := dynamic.NewForConfig(server.ClientConfig)
    62  	if err != nil {
    63  		t.Fatal(err)
    64  	}
    65  
    66  	// Use an unstructured client to ensure we send exactly the bytes we expect for the embedded PVC template
    67  	sts := &unstructured.Unstructured{}
    68  	err = json.Unmarshal([]byte(`{
    69  		"apiVersion": "apps/v1",
    70  		"kind": "StatefulSet",
    71  		"metadata": {"name": "web"},
    72  		"spec": {
    73  		  "selector": {"matchLabels": {"app": "nginx"}},
    74  		  "serviceName": "nginx",
    75  		  "replicas": 3,
    76  		  "template": {
    77  			"metadata": {"labels": {"app": "nginx"}},
    78  			"spec": {
    79  			  "terminationGracePeriodSeconds": 10,
    80  			  "containers": [{
    81  				  "name": "nginx",
    82  				  "image": "registry.k8s.io/nginx-slim:0.8",
    83  				  "ports": [{"containerPort": 80,"name": "web"}],
    84  				  "volumeMounts": [{"name": "www","mountPath": "/usr/share/nginx/html"}]
    85  			  }]
    86  			}
    87  		  },
    88  		  "volumeClaimTemplates": [{
    89  			  "apiVersion": "v1",
    90  			  "kind": "PersistentVolumeClaim",
    91  			  "metadata": {"name": "www"},
    92  			  "spec": {
    93  				"accessModes": ["ReadWriteOnce"],
    94  				"storageClassName": "my-storage-class",
    95  				"resources": {"requests": {"storage": "1Gi"}}
    96  			  }
    97  			}
    98  		  ]
    99  		}
   100  	  }`), &sts.Object)
   101  	if err != nil {
   102  		t.Fatal(err)
   103  	}
   104  
   105  	stsClient := c.Resource(appsv1.SchemeGroupVersion.WithResource("statefulsets")).Namespace("default")
   106  
   107  	// Create the statefulset
   108  	persistedSTS, err := stsClient.Create(context.TODO(), sts, metav1.CreateOptions{})
   109  	if err != nil {
   110  		t.Fatal(err)
   111  	}
   112  
   113  	// Update with the original spec (all the same defaulting should apply, should be a no-op and pass validation
   114  	originalSpec, ok, err := unstructured.NestedFieldCopy(sts.Object, "spec")
   115  	if err != nil || !ok {
   116  		t.Fatal(err, ok)
   117  	}
   118  	err = unstructured.SetNestedField(persistedSTS.Object, originalSpec, "spec")
   119  	if err != nil {
   120  		t.Fatal(err)
   121  	}
   122  	_, err = stsClient.Update(context.TODO(), persistedSTS, metav1.UpdateOptions{})
   123  	if err != nil {
   124  		t.Fatal(err)
   125  	}
   126  }
   127  
   128  func TestSpecReplicasChange(t *testing.T) {
   129  	tCtx, closeFn, rm, informers, c := scSetup(t)
   130  	defer closeFn()
   131  	ns := framework.CreateNamespaceOrDie(c, "test-spec-replicas-change", t)
   132  	defer framework.DeleteNamespaceOrDie(c, ns, t)
   133  	cancel := runControllerAndInformers(tCtx, rm, informers)
   134  	defer cancel()
   135  
   136  	createHeadlessService(t, c, newHeadlessService(ns.Name))
   137  	sts := newSTS("sts", ns.Name, 2)
   138  	stss, _ := createSTSsPods(t, c, []*appsv1.StatefulSet{sts}, []*v1.Pod{})
   139  	sts = stss[0]
   140  	waitSTSStable(t, c, sts)
   141  
   142  	// Update .Spec.Replicas and verify .Status.Replicas is changed accordingly
   143  	scaleSTS(t, c, sts, 3)
   144  	scaleSTS(t, c, sts, 0)
   145  	scaleSTS(t, c, sts, 2)
   146  
   147  	// Add a template annotation change to test STS's status does update
   148  	// without .Spec.Replicas change
   149  	stsClient := c.AppsV1().StatefulSets(ns.Name)
   150  	var oldGeneration int64
   151  	newSTS := updateSTS(t, stsClient, sts.Name, func(sts *appsv1.StatefulSet) {
   152  		oldGeneration = sts.Generation
   153  		sts.Spec.Template.Annotations = map[string]string{"test": "annotation"}
   154  	})
   155  	savedGeneration := newSTS.Generation
   156  	if savedGeneration == oldGeneration {
   157  		t.Fatalf("failed to verify .Generation has incremented for sts %s", sts.Name)
   158  	}
   159  
   160  	if err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
   161  		newSTS, err := stsClient.Get(context.TODO(), sts.Name, metav1.GetOptions{})
   162  		if err != nil {
   163  			return false, err
   164  		}
   165  		return newSTS.Status.ObservedGeneration >= savedGeneration, nil
   166  	}); err != nil {
   167  		t.Fatalf("failed to verify .Status.ObservedGeneration has incremented for sts %s: %v", sts.Name, err)
   168  	}
   169  }
   170  
   171  func TestDeletingAndTerminatingPods(t *testing.T) {
   172  	tCtx, closeFn, rm, informers, c := scSetup(t)
   173  	defer closeFn()
   174  	ns := framework.CreateNamespaceOrDie(c, "test-deleting-and-failed-pods", t)
   175  	defer framework.DeleteNamespaceOrDie(c, ns, t)
   176  	cancel := runControllerAndInformers(tCtx, rm, informers)
   177  	defer cancel()
   178  
   179  	podCount := 3
   180  
   181  	labelMap := labelMap()
   182  	sts := newSTS("sts", ns.Name, podCount)
   183  	stss, _ := createSTSsPods(t, c, []*appsv1.StatefulSet{sts}, []*v1.Pod{})
   184  	sts = stss[0]
   185  	waitSTSStable(t, c, sts)
   186  
   187  	// Verify STS creates 3 pods
   188  	podClient := c.CoreV1().Pods(ns.Name)
   189  	pods := getPods(t, podClient, labelMap)
   190  	if len(pods.Items) != podCount {
   191  		t.Fatalf("len(pods) = %d, want %d", len(pods.Items), podCount)
   192  	}
   193  
   194  	// Set first pod as deleting pod
   195  	// Set finalizers for the pod to simulate pending deletion status
   196  	deletingPod := &pods.Items[0]
   197  	updatePod(t, podClient, deletingPod.Name, func(pod *v1.Pod) {
   198  		pod.Finalizers = []string{"fake.example.com/blockDeletion"}
   199  	})
   200  	if err := c.CoreV1().Pods(ns.Name).Delete(context.TODO(), deletingPod.Name, metav1.DeleteOptions{}); err != nil {
   201  		t.Fatalf("error deleting pod %s: %v", deletingPod.Name, err)
   202  	}
   203  
   204  	// Set second pod as failed pod
   205  	failedPod := &pods.Items[1]
   206  	updatePodStatus(t, podClient, failedPod.Name, func(pod *v1.Pod) {
   207  		pod.Status.Phase = v1.PodFailed
   208  	})
   209  
   210  	// Set third pod as succeeded pod
   211  	succeededPod := &pods.Items[2]
   212  	updatePodStatus(t, podClient, succeededPod.Name, func(pod *v1.Pod) {
   213  		pod.Status.Phase = v1.PodSucceeded
   214  	})
   215  
   216  	exists := func(pods []v1.Pod, uid types.UID) bool {
   217  		for _, pod := range pods {
   218  			if pod.UID == uid {
   219  				return true
   220  			}
   221  		}
   222  		return false
   223  	}
   224  
   225  	if err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
   226  		// Verify only 3 pods exist: deleting pod and new pod replacing failed pod
   227  		pods = getPods(t, podClient, labelMap)
   228  		if len(pods.Items) != podCount {
   229  			return false, nil
   230  		}
   231  
   232  		// Verify deleting pod still exists
   233  		// Immediately return false with an error if it does not exist
   234  		if !exists(pods.Items, deletingPod.UID) {
   235  			return false, fmt.Errorf("expected deleting pod %s still exists, but it is not found", deletingPod.Name)
   236  		}
   237  		// Verify failed pod does not exist anymore
   238  		if exists(pods.Items, failedPod.UID) {
   239  			return false, nil
   240  		}
   241  		// Verify succeeded pod does not exist anymore
   242  		if exists(pods.Items, succeededPod.UID) {
   243  			return false, nil
   244  		}
   245  		// Verify all pods have non-terminated status
   246  		for _, pod := range pods.Items {
   247  			if pod.Status.Phase == v1.PodFailed || pod.Status.Phase == v1.PodSucceeded {
   248  				return false, nil
   249  			}
   250  		}
   251  		return true, nil
   252  	}); err != nil {
   253  		t.Fatalf("failed to verify failed pod %s has been replaced with a new non-failed pod, and deleting pod %s survives: %v", failedPod.Name, deletingPod.Name, err)
   254  	}
   255  
   256  	// Remove finalizers of deleting pod to simulate successful deletion
   257  	updatePod(t, podClient, deletingPod.Name, func(pod *v1.Pod) {
   258  		pod.Finalizers = []string{}
   259  	})
   260  
   261  	if err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
   262  		// Verify only 2 pods exist: new non-deleting pod replacing deleting pod and the non-failed pod
   263  		pods = getPods(t, podClient, labelMap)
   264  		if len(pods.Items) != podCount {
   265  			return false, nil
   266  		}
   267  		// Verify deleting pod does not exist anymore
   268  		return !exists(pods.Items, deletingPod.UID), nil
   269  	}); err != nil {
   270  		t.Fatalf("failed to verify deleting pod %s has been replaced with a new non-deleting pod: %v", deletingPod.Name, err)
   271  	}
   272  }
   273  
   274  func TestStatefulSetAvailable(t *testing.T) {
   275  	tests := []struct {
   276  		name           string
   277  		totalReplicas  int32
   278  		readyReplicas  int32
   279  		activeReplicas int32
   280  	}{
   281  		{
   282  			name:           "only certain replicas would become active",
   283  			totalReplicas:  4,
   284  			readyReplicas:  3,
   285  			activeReplicas: 2,
   286  		},
   287  	}
   288  	for _, test := range tests {
   289  		t.Run(test.name, func(t *testing.T) {
   290  			tCtx, closeFn, rm, informers, c := scSetup(t)
   291  			defer closeFn()
   292  			ns := framework.CreateNamespaceOrDie(c, "test-available-pods", t)
   293  			defer framework.DeleteNamespaceOrDie(c, ns, t)
   294  			cancel := runControllerAndInformers(tCtx, rm, informers)
   295  			defer cancel()
   296  
   297  			labelMap := labelMap()
   298  			sts := newSTS("sts", ns.Name, 4)
   299  			sts.Spec.MinReadySeconds = int32(3600)
   300  			stss, _ := createSTSsPods(t, c, []*appsv1.StatefulSet{sts}, []*v1.Pod{})
   301  			sts = stss[0]
   302  			waitSTSStable(t, c, sts)
   303  
   304  			// Verify STS creates 4 pods
   305  			podClient := c.CoreV1().Pods(ns.Name)
   306  			pods := getPods(t, podClient, labelMap)
   307  			if len(pods.Items) != 4 {
   308  				t.Fatalf("len(pods) = %d, want 4", len(pods.Items))
   309  			}
   310  
   311  			// Separate 3 pods into their own list
   312  			firstPodList := &v1.PodList{Items: pods.Items[:1]}
   313  			secondPodList := &v1.PodList{Items: pods.Items[1:2]}
   314  			thirdPodList := &v1.PodList{Items: pods.Items[2:]}
   315  			// First pod: Running, but not Ready
   316  			// by setting the Ready condition to false with LastTransitionTime to be now
   317  			setPodsReadyCondition(t, c, firstPodList, v1.ConditionFalse, time.Now())
   318  			// Second pod: Running and Ready, but not Available
   319  			// by setting LastTransitionTime to now
   320  			setPodsReadyCondition(t, c, secondPodList, v1.ConditionTrue, time.Now())
   321  			// Third pod: Running, Ready, and Available
   322  			// by setting LastTransitionTime to more than 3600 seconds ago
   323  			setPodsReadyCondition(t, c, thirdPodList, v1.ConditionTrue, time.Now().Add(-120*time.Minute))
   324  
   325  			stsClient := c.AppsV1().StatefulSets(ns.Name)
   326  			if err := wait.PollImmediate(interval, timeout, func() (bool, error) {
   327  				newSts, err := stsClient.Get(context.TODO(), sts.Name, metav1.GetOptions{})
   328  				if err != nil {
   329  					return false, err
   330  				}
   331  				// Verify 4 pods exist, 3 pods are Ready, and 2 pods are Available
   332  				return newSts.Status.Replicas == test.totalReplicas && newSts.Status.ReadyReplicas == test.readyReplicas && newSts.Status.AvailableReplicas == test.activeReplicas, nil
   333  			}); err != nil {
   334  				t.Fatalf("Failed to verify number of Replicas, ReadyReplicas and AvailableReplicas of rs %s to be as expected: %v", sts.Name, err)
   335  			}
   336  		})
   337  	}
   338  }
   339  
   340  func setPodsReadyCondition(t *testing.T, clientSet clientset.Interface, pods *v1.PodList, conditionStatus v1.ConditionStatus, lastTransitionTime time.Time) {
   341  	replicas := int32(len(pods.Items))
   342  	var readyPods int32
   343  	err := wait.PollImmediate(interval, timeout, func() (bool, error) {
   344  		readyPods = 0
   345  		for i := range pods.Items {
   346  			pod := &pods.Items[i]
   347  			if podutil.IsPodReady(pod) {
   348  				readyPods++
   349  				continue
   350  			}
   351  			pod.Status.Phase = v1.PodRunning
   352  			_, condition := podutil.GetPodCondition(&pod.Status, v1.PodReady)
   353  			if condition != nil {
   354  				condition.Status = conditionStatus
   355  				condition.LastTransitionTime = metav1.Time{Time: lastTransitionTime}
   356  			} else {
   357  				condition = &v1.PodCondition{
   358  					Type:               v1.PodReady,
   359  					Status:             conditionStatus,
   360  					LastTransitionTime: metav1.Time{Time: lastTransitionTime},
   361  				}
   362  				pod.Status.Conditions = append(pod.Status.Conditions, *condition)
   363  			}
   364  			_, err := clientSet.CoreV1().Pods(pod.Namespace).UpdateStatus(context.TODO(), pod, metav1.UpdateOptions{})
   365  			if err != nil {
   366  				// When status fails to be updated, we continue to next pod
   367  				continue
   368  			}
   369  			readyPods++
   370  		}
   371  		return readyPods >= replicas, nil
   372  	})
   373  	if err != nil {
   374  		t.Fatalf("failed to mark all StatefulSet pods to ready: %v", err)
   375  	}
   376  }
   377  
   378  // add for issue: https://github.com/kubernetes/kubernetes/issues/108837
   379  func TestStatefulSetStatusWithPodFail(t *testing.T) {
   380  	tCtx := ktesting.Init(t)
   381  	limitedPodNumber := 2
   382  	c, config, closeFn := framework.StartTestServer(tCtx, t, framework.TestServerSetup{
   383  		ModifyServerConfig: func(config *controlplane.Config) {
   384  			config.GenericConfig.AdmissionControl = &fakePodFailAdmission{
   385  				limitedPodNumber: limitedPodNumber,
   386  			}
   387  		},
   388  	})
   389  	defer closeFn()
   390  	defer tCtx.Cancel("test has completed")
   391  	resyncPeriod := 12 * time.Hour
   392  	informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(config, "statefulset-informers")), resyncPeriod)
   393  	ssc := statefulset.NewStatefulSetController(
   394  		tCtx,
   395  		informers.Core().V1().Pods(),
   396  		informers.Apps().V1().StatefulSets(),
   397  		informers.Core().V1().PersistentVolumeClaims(),
   398  		informers.Apps().V1().ControllerRevisions(),
   399  		clientset.NewForConfigOrDie(restclient.AddUserAgent(config, "statefulset-controller")),
   400  	)
   401  
   402  	ns := framework.CreateNamespaceOrDie(c, "test-pod-fail", t)
   403  	defer framework.DeleteNamespaceOrDie(c, ns, t)
   404  
   405  	informers.Start(tCtx.Done())
   406  	go ssc.Run(tCtx, 5)
   407  
   408  	sts := newSTS("sts", ns.Name, 4)
   409  	_, err := c.AppsV1().StatefulSets(sts.Namespace).Create(tCtx, sts, metav1.CreateOptions{})
   410  	if err != nil {
   411  		t.Fatalf("Could not create statefulSet %s: %v", sts.Name, err)
   412  	}
   413  
   414  	wantReplicas := limitedPodNumber
   415  	var gotReplicas int32
   416  	if err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
   417  		newSTS, err := c.AppsV1().StatefulSets(sts.Namespace).Get(tCtx, sts.Name, metav1.GetOptions{})
   418  		if err != nil {
   419  			return false, err
   420  		}
   421  		gotReplicas = newSTS.Status.Replicas
   422  		return gotReplicas == int32(wantReplicas), nil
   423  	}); err != nil {
   424  		t.Fatalf("StatefulSet %s status has %d replicas, want replicas %d: %v", sts.Name, gotReplicas, wantReplicas, err)
   425  	}
   426  }
   427  
   428  func TestAutodeleteOwnerRefs(t *testing.T) {
   429  	tests := []struct {
   430  		namespace         string
   431  		policy            appsv1.StatefulSetPersistentVolumeClaimRetentionPolicy
   432  		expectPodOwnerRef bool
   433  		expectSetOwnerRef bool
   434  	}{
   435  		{
   436  			namespace: "always-retain",
   437  			policy: appsv1.StatefulSetPersistentVolumeClaimRetentionPolicy{
   438  				WhenDeleted: appsv1.RetainPersistentVolumeClaimRetentionPolicyType,
   439  				WhenScaled:  appsv1.RetainPersistentVolumeClaimRetentionPolicyType,
   440  			},
   441  			expectPodOwnerRef: false,
   442  			expectSetOwnerRef: false,
   443  		},
   444  		{
   445  			namespace: "delete-on-scaledown-only",
   446  			policy: appsv1.StatefulSetPersistentVolumeClaimRetentionPolicy{
   447  				WhenDeleted: appsv1.RetainPersistentVolumeClaimRetentionPolicyType,
   448  				WhenScaled:  appsv1.DeletePersistentVolumeClaimRetentionPolicyType,
   449  			},
   450  			expectPodOwnerRef: true,
   451  			expectSetOwnerRef: false,
   452  		},
   453  		{
   454  			namespace: "delete-with-set-only",
   455  			policy: appsv1.StatefulSetPersistentVolumeClaimRetentionPolicy{
   456  				WhenDeleted: appsv1.DeletePersistentVolumeClaimRetentionPolicyType,
   457  				WhenScaled:  appsv1.RetainPersistentVolumeClaimRetentionPolicyType,
   458  			},
   459  			expectPodOwnerRef: false,
   460  			expectSetOwnerRef: true,
   461  		},
   462  		{
   463  			namespace: "always-delete",
   464  			policy: appsv1.StatefulSetPersistentVolumeClaimRetentionPolicy{
   465  				WhenDeleted: appsv1.DeletePersistentVolumeClaimRetentionPolicyType,
   466  				WhenScaled:  appsv1.DeletePersistentVolumeClaimRetentionPolicyType,
   467  			},
   468  			expectPodOwnerRef: true,
   469  			expectSetOwnerRef: true,
   470  		},
   471  	}
   472  
   473  	defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetAutoDeletePVC, true)()
   474  
   475  	tCtx, closeFn, rm, informers, c := scSetup(t)
   476  	defer closeFn()
   477  	cancel := runControllerAndInformers(tCtx, rm, informers)
   478  	defer cancel()
   479  
   480  	for _, test := range tests {
   481  		t.Run(test.namespace, func(t *testing.T) {
   482  			ns := framework.CreateNamespaceOrDie(c, test.namespace, t)
   483  			defer framework.DeleteNamespaceOrDie(c, ns, t)
   484  
   485  			sts := newSTS("sts", ns.Name, 3)
   486  			sts.Spec.PersistentVolumeClaimRetentionPolicy = &test.policy
   487  			stss, _ := createSTSsPods(t, c, []*appsv1.StatefulSet{sts}, []*v1.Pod{})
   488  			sts = stss[0]
   489  			waitSTSStable(t, c, sts)
   490  
   491  			// Verify StatefulSet ownerref has been added as appropriate.
   492  			pvcClient := c.CoreV1().PersistentVolumeClaims(ns.Name)
   493  			pvcs := getStatefulSetPVCs(t, pvcClient, sts)
   494  			for _, pvc := range pvcs {
   495  				verifyOwnerRef(t, pvc, "StatefulSet", test.expectSetOwnerRef)
   496  				verifyOwnerRef(t, pvc, "Pod", false)
   497  			}
   498  
   499  			// Scale down to 1 pod and verify Pod ownerrefs as appropriate.
   500  			one := int32(1)
   501  			sts.Spec.Replicas = &one
   502  			waitSTSStable(t, c, sts)
   503  
   504  			pvcs = getStatefulSetPVCs(t, pvcClient, sts)
   505  			for i, pvc := range pvcs {
   506  				verifyOwnerRef(t, pvc, "StatefulSet", test.expectSetOwnerRef)
   507  				if i == 0 {
   508  					verifyOwnerRef(t, pvc, "Pod", false)
   509  				} else {
   510  					verifyOwnerRef(t, pvc, "Pod", test.expectPodOwnerRef)
   511  				}
   512  			}
   513  		})
   514  	}
   515  }
   516  
   517  func TestDeletingPodForRollingUpdatePartition(t *testing.T) {
   518  	tCtx, closeFn, rm, informers, c := scSetup(t)
   519  	defer closeFn()
   520  	ns := framework.CreateNamespaceOrDie(c, "test-deleting-pod-for-rolling-update-partition", t)
   521  	defer framework.DeleteNamespaceOrDie(c, ns, t)
   522  	cancel := runControllerAndInformers(tCtx, rm, informers)
   523  	defer cancel()
   524  
   525  	labelMap := labelMap()
   526  	sts := newSTS("sts", ns.Name, 2)
   527  	sts.Spec.UpdateStrategy = appsv1.StatefulSetUpdateStrategy{
   528  		Type: appsv1.RollingUpdateStatefulSetStrategyType,
   529  		RollingUpdate: func() *appsv1.RollingUpdateStatefulSetStrategy {
   530  			return &appsv1.RollingUpdateStatefulSetStrategy{
   531  				Partition: ptr.To[int32](1),
   532  			}
   533  		}(),
   534  	}
   535  	stss, _ := createSTSsPods(t, c, []*appsv1.StatefulSet{sts}, []*v1.Pod{})
   536  	sts = stss[0]
   537  	waitSTSStable(t, c, sts)
   538  
   539  	// Verify STS creates 2 pods
   540  	podClient := c.CoreV1().Pods(ns.Name)
   541  	pods := getPods(t, podClient, labelMap)
   542  	if len(pods.Items) != 2 {
   543  		t.Fatalf("len(pods) = %d, want 2", len(pods.Items))
   544  	}
   545  	// Setting all pods in Running, Ready, and Available
   546  	setPodsReadyCondition(t, c, &v1.PodList{Items: pods.Items}, v1.ConditionTrue, time.Now())
   547  
   548  	// 1. Roll out a new image.
   549  	oldImage := sts.Spec.Template.Spec.Containers[0].Image
   550  	newImage := "new-image"
   551  	if oldImage == newImage {
   552  		t.Fatalf("bad test setup, statefulSet %s roll out with the same image", sts.Name)
   553  	}
   554  	// Set finalizers for the pod-0 to trigger pod recreation failure while the status UpdateRevision is bumped
   555  	pod0 := &pods.Items[0]
   556  	updatePod(t, podClient, pod0.Name, func(pod *v1.Pod) {
   557  		pod.Finalizers = []string{"fake.example.com/blockDeletion"}
   558  	})
   559  
   560  	stsClient := c.AppsV1().StatefulSets(ns.Name)
   561  	_ = updateSTS(t, stsClient, sts.Name, func(sts *appsv1.StatefulSet) {
   562  		sts.Spec.Template.Spec.Containers[0].Image = newImage
   563  	})
   564  
   565  	// Await for the pod-1 to be recreated, while pod-0 remains running
   566  	if err := wait.PollUntilContextTimeout(tCtx, pollInterval, pollTimeout, false, func(ctx context.Context) (bool, error) {
   567  		ss, err := stsClient.Get(ctx, sts.Name, metav1.GetOptions{})
   568  		if err != nil {
   569  			return false, err
   570  		}
   571  		pods := getPods(t, podClient, labelMap)
   572  		recreatedPods := v1.PodList{}
   573  		for _, pod := range pods.Items {
   574  			if pod.Status.Phase == v1.PodPending {
   575  				recreatedPods.Items = append(recreatedPods.Items, pod)
   576  			}
   577  		}
   578  		setPodsReadyCondition(t, c, &v1.PodList{Items: recreatedPods.Items}, v1.ConditionTrue, time.Now())
   579  		return ss.Status.UpdatedReplicas == *ss.Spec.Replicas-*sts.Spec.UpdateStrategy.RollingUpdate.Partition && ss.Status.Replicas == *ss.Spec.Replicas && ss.Status.ReadyReplicas == *ss.Spec.Replicas, nil
   580  	}); err != nil {
   581  		t.Fatalf("failed to await for pod-1 to be recreated by sts %s: %v", sts.Name, err)
   582  	}
   583  
   584  	// Mark pod-0 as terminal and not ready
   585  	updatePodStatus(t, podClient, pod0.Name, func(pod *v1.Pod) {
   586  		pod.Status.Phase = v1.PodFailed
   587  	})
   588  
   589  	// Make sure pod-0 gets deletion timestamp so that it is recreated
   590  	if err := c.CoreV1().Pods(ns.Name).Delete(context.TODO(), pod0.Name, metav1.DeleteOptions{}); err != nil {
   591  		t.Fatalf("error deleting pod %s: %v", pod0.Name, err)
   592  	}
   593  
   594  	// Await for pod-0 to be not ready
   595  	if err := wait.PollUntilContextTimeout(tCtx, pollInterval, pollTimeout, false, func(ctx context.Context) (bool, error) {
   596  		ss, err := stsClient.Get(ctx, sts.Name, metav1.GetOptions{})
   597  		if err != nil {
   598  			return false, err
   599  		}
   600  		return ss.Status.ReadyReplicas == *ss.Spec.Replicas-1, nil
   601  	}); err != nil {
   602  		t.Fatalf("failed to await for pod-0 to be not counted as ready in status of sts %s: %v", sts.Name, err)
   603  	}
   604  
   605  	// Remove the finalizer to allow recreation
   606  	updatePod(t, podClient, pod0.Name, func(pod *v1.Pod) {
   607  		pod.Finalizers = []string{}
   608  	})
   609  
   610  	// Await for pod-0 to be recreated and make it running
   611  	if err := wait.PollUntilContextTimeout(tCtx, pollInterval, pollTimeout, false, func(ctx context.Context) (bool, error) {
   612  		pods := getPods(t, podClient, labelMap)
   613  		recreatedPods := v1.PodList{}
   614  		for _, pod := range pods.Items {
   615  			if pod.Status.Phase == v1.PodPending {
   616  				recreatedPods.Items = append(recreatedPods.Items, pod)
   617  			}
   618  		}
   619  		setPodsReadyCondition(t, c, &v1.PodList{Items: recreatedPods.Items}, v1.ConditionTrue, time.Now().Add(-120*time.Minute))
   620  		return len(recreatedPods.Items) > 0, nil
   621  	}); err != nil {
   622  		t.Fatalf("failed to await for pod-0 to be recreated by sts %s: %v", sts.Name, err)
   623  	}
   624  
   625  	// Await for all stateful set status to record all replicas as ready
   626  	if err := wait.PollUntilContextTimeout(tCtx, pollInterval, pollTimeout, false, func(ctx context.Context) (bool, error) {
   627  		ss, err := stsClient.Get(ctx, sts.Name, metav1.GetOptions{})
   628  		if err != nil {
   629  			return false, err
   630  		}
   631  		return ss.Status.ReadyReplicas == *ss.Spec.Replicas, nil
   632  	}); err != nil {
   633  		t.Fatalf("failed to verify .Spec.Template.Spec.Containers[0].Image is updated for sts %s: %v", sts.Name, err)
   634  	}
   635  
   636  	// Verify 3 pods exist
   637  	pods = getPods(t, podClient, labelMap)
   638  	if len(pods.Items) != int(*sts.Spec.Replicas) {
   639  		t.Fatalf("Unexpected number of pods")
   640  	}
   641  
   642  	// Verify pod images
   643  	for i := range pods.Items {
   644  		if i < int(*sts.Spec.UpdateStrategy.RollingUpdate.Partition) {
   645  			if pods.Items[i].Spec.Containers[0].Image != oldImage {
   646  				t.Fatalf("Pod %s has image %s not equal to old image %s", pods.Items[i].Name, pods.Items[i].Spec.Containers[0].Image, oldImage)
   647  			}
   648  		} else {
   649  			if pods.Items[i].Spec.Containers[0].Image != newImage {
   650  				t.Fatalf("Pod %s has image %s not equal to new image %s", pods.Items[i].Name, pods.Items[i].Spec.Containers[0].Image, newImage)
   651  			}
   652  		}
   653  	}
   654  }
   655  
   656  func TestStatefulSetStartOrdinal(t *testing.T) {
   657  	tests := []struct {
   658  		ordinals         *appsv1.StatefulSetOrdinals
   659  		name             string
   660  		namespace        string
   661  		replicas         int
   662  		expectedPodNames []string
   663  	}{
   664  		{
   665  			name:             "default start ordinal, no ordinals set",
   666  			namespace:        "no-ordinals",
   667  			replicas:         3,
   668  			expectedPodNames: []string{"sts-0", "sts-1", "sts-2"},
   669  		},
   670  		{
   671  			name:             "default start ordinal",
   672  			namespace:        "no-start-ordinals",
   673  			ordinals:         &appsv1.StatefulSetOrdinals{},
   674  			replicas:         3,
   675  			expectedPodNames: []string{"sts-0", "sts-1", "sts-2"},
   676  		},
   677  		{
   678  			name:      "start ordinal 4",
   679  			namespace: "start-ordinal-4",
   680  			ordinals: &appsv1.StatefulSetOrdinals{
   681  				Start: 4,
   682  			},
   683  			replicas:         4,
   684  			expectedPodNames: []string{"sts-4", "sts-5", "sts-6", "sts-7"},
   685  		},
   686  		{
   687  			name:      "start ordinal 5",
   688  			namespace: "start-ordinal-5",
   689  			ordinals: &appsv1.StatefulSetOrdinals{
   690  				Start: 2,
   691  			},
   692  			replicas:         7,
   693  			expectedPodNames: []string{"sts-2", "sts-3", "sts-4", "sts-5", "sts-6", "sts-7", "sts-8"},
   694  		},
   695  	}
   696  
   697  	defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetStartOrdinal, true)()
   698  	tCtx, closeFn, rm, informers, c := scSetup(t)
   699  	defer closeFn()
   700  	cancel := runControllerAndInformers(tCtx, rm, informers)
   701  	defer cancel()
   702  
   703  	for _, test := range tests {
   704  		t.Run(test.name, func(t *testing.T) {
   705  			ns := framework.CreateNamespaceOrDie(c, test.namespace, t)
   706  			defer framework.DeleteNamespaceOrDie(c, ns, t)
   707  
   708  			// Label map is the map of pod labels used in newSTS()
   709  			labelMap := labelMap()
   710  			sts := newSTS("sts", ns.Name, test.replicas)
   711  			sts.Spec.Ordinals = test.ordinals
   712  			stss := createSTSs(t, c, []*appsv1.StatefulSet{sts})
   713  			sts = stss[0]
   714  			waitSTSStable(t, c, sts)
   715  
   716  			podClient := c.CoreV1().Pods(ns.Name)
   717  			pods := getPods(t, podClient, labelMap)
   718  			if len(pods.Items) != test.replicas {
   719  				t.Errorf("len(pods) = %v, want %v", len(pods.Items), test.replicas)
   720  			}
   721  
   722  			var podNames []string
   723  			for _, pod := range pods.Items {
   724  				podNames = append(podNames, pod.Name)
   725  			}
   726  			ignoreOrder := cmpopts.SortSlices(func(a, b string) bool {
   727  				return a < b
   728  			})
   729  
   730  			// Validate all the expected pods were created.
   731  			if diff := cmp.Diff(test.expectedPodNames, podNames, ignoreOrder); diff != "" {
   732  				t.Errorf("Unexpected pod names: (-want +got): %v", diff)
   733  			}
   734  
   735  			// Scale down to 1 pod and verify it matches the first pod.
   736  			scaleSTS(t, c, sts, 1)
   737  			waitSTSStable(t, c, sts)
   738  
   739  			pods = getPods(t, podClient, labelMap)
   740  			if len(pods.Items) != 1 {
   741  				t.Errorf("len(pods) = %v, want %v", len(pods.Items), 1)
   742  			}
   743  			if pods.Items[0].Name != test.expectedPodNames[0] {
   744  				t.Errorf("Unexpected singleton pod name: got = %v, want %v", pods.Items[0].Name, test.expectedPodNames[0])
   745  			}
   746  		})
   747  	}
   748  }
   749  

View as plain text