...

Source file src/k8s.io/kubernetes/pkg/controller/statefulset/stateful_pod_control_test.go

Documentation: k8s.io/kubernetes/pkg/controller/statefulset

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package statefulset
    18  
    19  import (
    20  	"context"
    21  	"errors"
    22  	"fmt"
    23  	"strings"
    24  	"testing"
    25  	"time"
    26  
    27  	apps "k8s.io/api/apps/v1"
    28  	v1 "k8s.io/api/core/v1"
    29  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    30  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    31  	"k8s.io/apimachinery/pkg/runtime"
    32  	"k8s.io/apimachinery/pkg/types"
    33  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    34  	"k8s.io/client-go/kubernetes/fake"
    35  	corelisters "k8s.io/client-go/listers/core/v1"
    36  	core "k8s.io/client-go/testing"
    37  	"k8s.io/client-go/tools/cache"
    38  	"k8s.io/client-go/tools/record"
    39  	featuregatetesting "k8s.io/component-base/featuregate/testing"
    40  	"k8s.io/klog/v2/ktesting"
    41  	_ "k8s.io/kubernetes/pkg/apis/apps/install"
    42  	_ "k8s.io/kubernetes/pkg/apis/core/install"
    43  	"k8s.io/kubernetes/pkg/features"
    44  )
    45  
    46  func TestStatefulPodControlCreatesPods(t *testing.T) {
    47  	recorder := record.NewFakeRecorder(10)
    48  	set := newStatefulSet(3)
    49  	pod := newStatefulSetPod(set, 0)
    50  	fakeClient := &fake.Clientset{}
    51  	claimIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
    52  	claimLister := corelisters.NewPersistentVolumeClaimLister(claimIndexer)
    53  	control := NewStatefulPodControl(fakeClient, nil, claimLister, recorder)
    54  	fakeClient.AddReactor("get", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
    55  		return true, nil, apierrors.NewNotFound(action.GetResource().GroupResource(), action.GetResource().Resource)
    56  	})
    57  	fakeClient.AddReactor("create", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
    58  		create := action.(core.CreateAction)
    59  		claimIndexer.Add(create.GetObject())
    60  		return true, create.GetObject(), nil
    61  	})
    62  	fakeClient.AddReactor("create", "pods", func(action core.Action) (bool, runtime.Object, error) {
    63  		create := action.(core.CreateAction)
    64  		return true, create.GetObject(), nil
    65  	})
    66  	if err := control.CreateStatefulPod(context.TODO(), set, pod); err != nil {
    67  		t.Errorf("StatefulPodControl failed to create Pod error: %s", err)
    68  	}
    69  	events := collectEvents(recorder.Events)
    70  	if eventCount := len(events); eventCount != 2 {
    71  		t.Errorf("Expected 2 events for successful create found %d", eventCount)
    72  	}
    73  	for i := range events {
    74  		if !strings.Contains(events[i], v1.EventTypeNormal) {
    75  			t.Errorf("Found unexpected non-normal event %s", events[i])
    76  		}
    77  	}
    78  }
    79  
    80  func TestStatefulPodControlCreatePodExists(t *testing.T) {
    81  	recorder := record.NewFakeRecorder(10)
    82  	set := newStatefulSet(3)
    83  	pod := newStatefulSetPod(set, 0)
    84  	fakeClient := &fake.Clientset{}
    85  	pvcs := getPersistentVolumeClaims(set, pod)
    86  	pvcIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
    87  	for k := range pvcs {
    88  		pvc := pvcs[k]
    89  		pvcIndexer.Add(&pvc)
    90  	}
    91  	pvcLister := corelisters.NewPersistentVolumeClaimLister(pvcIndexer)
    92  	control := NewStatefulPodControl(fakeClient, nil, pvcLister, recorder)
    93  	fakeClient.AddReactor("create", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
    94  		create := action.(core.CreateAction)
    95  		return true, create.GetObject(), nil
    96  	})
    97  	fakeClient.AddReactor("create", "pods", func(action core.Action) (bool, runtime.Object, error) {
    98  		return true, pod, apierrors.NewAlreadyExists(action.GetResource().GroupResource(), pod.Name)
    99  	})
   100  	if err := control.CreateStatefulPod(context.TODO(), set, pod); !apierrors.IsAlreadyExists(err) {
   101  		t.Errorf("Failed to create Pod error: %s", err)
   102  	}
   103  	events := collectEvents(recorder.Events)
   104  	if eventCount := len(events); eventCount != 0 {
   105  		t.Errorf("Pod and PVC exist: got %d events, but want 0", eventCount)
   106  		for i := range events {
   107  			t.Log(events[i])
   108  		}
   109  	}
   110  }
   111  
   112  func TestStatefulPodControlCreatePodPvcCreateFailure(t *testing.T) {
   113  	recorder := record.NewFakeRecorder(10)
   114  	set := newStatefulSet(3)
   115  	pod := newStatefulSetPod(set, 0)
   116  	fakeClient := &fake.Clientset{}
   117  	pvcIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
   118  	pvcLister := corelisters.NewPersistentVolumeClaimLister(pvcIndexer)
   119  	control := NewStatefulPodControl(fakeClient, nil, pvcLister, recorder)
   120  	fakeClient.AddReactor("create", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
   121  		return true, nil, apierrors.NewInternalError(errors.New("API server down"))
   122  	})
   123  	fakeClient.AddReactor("create", "pods", func(action core.Action) (bool, runtime.Object, error) {
   124  		create := action.(core.CreateAction)
   125  		return true, create.GetObject(), nil
   126  	})
   127  	if err := control.CreateStatefulPod(context.TODO(), set, pod); err == nil {
   128  		t.Error("Failed to produce error on PVC creation failure")
   129  	}
   130  	events := collectEvents(recorder.Events)
   131  	if eventCount := len(events); eventCount != 2 {
   132  		t.Errorf("PVC create failure: got %d events, but want 2", eventCount)
   133  	}
   134  	for i := range events {
   135  		if !strings.Contains(events[i], v1.EventTypeWarning) {
   136  			t.Errorf("Found unexpected non-warning event %s", events[i])
   137  		}
   138  	}
   139  }
   140  func TestStatefulPodControlCreatePodPVCDeleting(t *testing.T) {
   141  	recorder := record.NewFakeRecorder(10)
   142  	set := newStatefulSet(3)
   143  	pod := newStatefulSetPod(set, 0)
   144  	fakeClient := &fake.Clientset{}
   145  	pvcs := getPersistentVolumeClaims(set, pod)
   146  	pvcIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
   147  	deleteTime := time.Date(2019, time.January, 1, 0, 0, 0, 0, time.UTC)
   148  	for k := range pvcs {
   149  		pvc := pvcs[k]
   150  		pvc.DeletionTimestamp = &metav1.Time{Time: deleteTime}
   151  		pvcIndexer.Add(&pvc)
   152  	}
   153  	pvcLister := corelisters.NewPersistentVolumeClaimLister(pvcIndexer)
   154  	control := NewStatefulPodControl(fakeClient, nil, pvcLister, recorder)
   155  	fakeClient.AddReactor("create", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
   156  		create := action.(core.CreateAction)
   157  		return true, create.GetObject(), nil
   158  	})
   159  	fakeClient.AddReactor("create", "pods", func(action core.Action) (bool, runtime.Object, error) {
   160  		create := action.(core.CreateAction)
   161  		return true, create.GetObject(), nil
   162  	})
   163  	if err := control.CreateStatefulPod(context.TODO(), set, pod); err == nil {
   164  		t.Error("Failed to produce error on deleting PVC")
   165  	}
   166  	events := collectEvents(recorder.Events)
   167  	if eventCount := len(events); eventCount != 1 {
   168  		t.Errorf("Deleting PVC: got %d events, but want 1", eventCount)
   169  	}
   170  	for i := range events {
   171  		if !strings.Contains(events[i], v1.EventTypeWarning) {
   172  			t.Errorf("Found unexpected non-warning event %s", events[i])
   173  		}
   174  	}
   175  }
   176  
   177  type fakeIndexer struct {
   178  	cache.Indexer
   179  	getError error
   180  }
   181  
   182  func (f *fakeIndexer) GetByKey(key string) (interface{}, bool, error) {
   183  	return nil, false, f.getError
   184  }
   185  
   186  func TestStatefulPodControlCreatePodPvcGetFailure(t *testing.T) {
   187  	recorder := record.NewFakeRecorder(10)
   188  	set := newStatefulSet(3)
   189  	pod := newStatefulSetPod(set, 0)
   190  	fakeClient := &fake.Clientset{}
   191  	pvcIndexer := &fakeIndexer{getError: errors.New("API server down")}
   192  	pvcLister := corelisters.NewPersistentVolumeClaimLister(pvcIndexer)
   193  	control := NewStatefulPodControl(fakeClient, nil, pvcLister, recorder)
   194  	fakeClient.AddReactor("create", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
   195  		return true, nil, apierrors.NewInternalError(errors.New("API server down"))
   196  	})
   197  	fakeClient.AddReactor("create", "pods", func(action core.Action) (bool, runtime.Object, error) {
   198  		create := action.(core.CreateAction)
   199  		return true, create.GetObject(), nil
   200  	})
   201  	if err := control.CreateStatefulPod(context.TODO(), set, pod); err == nil {
   202  		t.Error("Failed to produce error on PVC creation failure")
   203  	}
   204  	events := collectEvents(recorder.Events)
   205  	if eventCount := len(events); eventCount != 2 {
   206  		t.Errorf("PVC create failure: got %d events, but want 2", eventCount)
   207  	}
   208  	for i := range events {
   209  		if !strings.Contains(events[i], v1.EventTypeWarning) {
   210  			t.Errorf("Found unexpected non-warning event: %s", events[i])
   211  		}
   212  	}
   213  }
   214  
   215  func TestStatefulPodControlCreatePodFailed(t *testing.T) {
   216  	recorder := record.NewFakeRecorder(10)
   217  	set := newStatefulSet(3)
   218  	pod := newStatefulSetPod(set, 0)
   219  	fakeClient := &fake.Clientset{}
   220  	pvcIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
   221  	pvcLister := corelisters.NewPersistentVolumeClaimLister(pvcIndexer)
   222  	control := NewStatefulPodControl(fakeClient, nil, pvcLister, recorder)
   223  	fakeClient.AddReactor("create", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
   224  		create := action.(core.CreateAction)
   225  		return true, create.GetObject(), nil
   226  	})
   227  	fakeClient.AddReactor("create", "pods", func(action core.Action) (bool, runtime.Object, error) {
   228  		return true, nil, apierrors.NewInternalError(errors.New("API server down"))
   229  	})
   230  	if err := control.CreateStatefulPod(context.TODO(), set, pod); err == nil {
   231  		t.Error("Failed to produce error on Pod creation failure")
   232  	}
   233  	events := collectEvents(recorder.Events)
   234  	if eventCount := len(events); eventCount != 2 {
   235  		t.Errorf("Pod create failed: got %d events, but want 2", eventCount)
   236  	} else if !strings.Contains(events[0], v1.EventTypeNormal) {
   237  		t.Errorf("Found unexpected non-normal event %s", events[0])
   238  
   239  	} else if !strings.Contains(events[1], v1.EventTypeWarning) {
   240  		t.Errorf("Found unexpected non-warning event %s", events[1])
   241  	}
   242  }
   243  
   244  func TestStatefulPodControlNoOpUpdate(t *testing.T) {
   245  	_, ctx := ktesting.NewTestContext(t)
   246  	recorder := record.NewFakeRecorder(10)
   247  	set := newStatefulSet(3)
   248  	pod := newStatefulSetPod(set, 0)
   249  	fakeClient := &fake.Clientset{}
   250  	claims := getPersistentVolumeClaims(set, pod)
   251  	indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
   252  	for k := range claims {
   253  		claim := claims[k]
   254  		indexer.Add(&claim)
   255  	}
   256  	claimLister := corelisters.NewPersistentVolumeClaimLister(indexer)
   257  	control := NewStatefulPodControl(fakeClient, nil, claimLister, recorder)
   258  	fakeClient.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) {
   259  		t.Error("no-op update should not make any client invocation")
   260  		return true, nil, apierrors.NewInternalError(errors.New("if we are here we have a problem"))
   261  	})
   262  	if err := control.UpdateStatefulPod(ctx, set, pod); err != nil {
   263  		t.Errorf("Error returned on no-op update error: %s", err)
   264  	}
   265  	events := collectEvents(recorder.Events)
   266  	if eventCount := len(events); eventCount != 0 {
   267  		t.Errorf("no-op update: got %d events, but want 0", eventCount)
   268  	}
   269  }
   270  
   271  func TestStatefulPodControlUpdatesIdentity(t *testing.T) {
   272  	_, ctx := ktesting.NewTestContext(t)
   273  	recorder := record.NewFakeRecorder(10)
   274  	set := newStatefulSet(3)
   275  	pod := newStatefulSetPod(set, 0)
   276  	fakeClient := fake.NewSimpleClientset(set, pod)
   277  	indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
   278  	claimLister := corelisters.NewPersistentVolumeClaimLister(indexer)
   279  	control := NewStatefulPodControl(fakeClient, nil, claimLister, recorder)
   280  	var updated *v1.Pod
   281  	fakeClient.PrependReactor("update", "pods", func(action core.Action) (bool, runtime.Object, error) {
   282  		update := action.(core.UpdateAction)
   283  		updated = update.GetObject().(*v1.Pod)
   284  		return true, update.GetObject(), nil
   285  	})
   286  	pod.Name = "goo-0"
   287  	if err := control.UpdateStatefulPod(ctx, set, pod); err != nil {
   288  		t.Errorf("Successful update returned an error: %s", err)
   289  	}
   290  	events := collectEvents(recorder.Events)
   291  	if eventCount := len(events); eventCount != 1 {
   292  		t.Errorf("Pod update successful:got %d events,but want 1", eventCount)
   293  	} else if !strings.Contains(events[0], v1.EventTypeNormal) {
   294  		t.Errorf("Found unexpected non-normal event %s", events[0])
   295  	}
   296  	if !identityMatches(set, updated) {
   297  		t.Error("Name update failed identity does not match")
   298  	}
   299  }
   300  
   301  func TestStatefulPodControlUpdateIdentityFailure(t *testing.T) {
   302  	_, ctx := ktesting.NewTestContext(t)
   303  	recorder := record.NewFakeRecorder(10)
   304  	set := newStatefulSet(3)
   305  	pod := newStatefulSetPod(set, 0)
   306  	fakeClient := &fake.Clientset{}
   307  	podIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
   308  	gooPod := newStatefulSetPod(set, 0)
   309  	gooPod.Name = "goo-0"
   310  	podIndexer.Add(gooPod)
   311  	podLister := corelisters.NewPodLister(podIndexer)
   312  	claimIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
   313  	claimLister := corelisters.NewPersistentVolumeClaimLister(claimIndexer)
   314  	control := NewStatefulPodControl(fakeClient, podLister, claimLister, recorder)
   315  	fakeClient.AddReactor("update", "pods", func(action core.Action) (bool, runtime.Object, error) {
   316  		pod.Name = "goo-0"
   317  		return true, nil, apierrors.NewInternalError(errors.New("API server down"))
   318  	})
   319  	pod.Name = "goo-0"
   320  	if err := control.UpdateStatefulPod(ctx, set, pod); err == nil {
   321  		t.Error("Failed update does not generate an error")
   322  	}
   323  	events := collectEvents(recorder.Events)
   324  	if eventCount := len(events); eventCount != 1 {
   325  		t.Errorf("Pod update failed: got %d events, but want 1", eventCount)
   326  	} else if !strings.Contains(events[0], v1.EventTypeWarning) {
   327  		t.Errorf("Found unexpected non-warning event %s", events[0])
   328  	}
   329  	if identityMatches(set, pod) {
   330  		t.Error("Failed update mutated Pod identity")
   331  	}
   332  }
   333  
   334  func TestStatefulPodControlUpdatesPodStorage(t *testing.T) {
   335  	_, ctx := ktesting.NewTestContext(t)
   336  	recorder := record.NewFakeRecorder(10)
   337  	set := newStatefulSet(3)
   338  	pod := newStatefulSetPod(set, 0)
   339  	fakeClient := &fake.Clientset{}
   340  	pvcIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
   341  	pvcLister := corelisters.NewPersistentVolumeClaimLister(pvcIndexer)
   342  	control := NewStatefulPodControl(fakeClient, nil, pvcLister, recorder)
   343  	pvcs := getPersistentVolumeClaims(set, pod)
   344  	volumes := make([]v1.Volume, 0, len(pod.Spec.Volumes))
   345  	for i := range pod.Spec.Volumes {
   346  		if _, contains := pvcs[pod.Spec.Volumes[i].Name]; !contains {
   347  			volumes = append(volumes, pod.Spec.Volumes[i])
   348  		}
   349  	}
   350  	pod.Spec.Volumes = volumes
   351  	fakeClient.AddReactor("update", "pods", func(action core.Action) (bool, runtime.Object, error) {
   352  		update := action.(core.UpdateAction)
   353  		return true, update.GetObject(), nil
   354  	})
   355  	fakeClient.AddReactor("create", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
   356  		update := action.(core.UpdateAction)
   357  		return true, update.GetObject(), nil
   358  	})
   359  	var updated *v1.Pod
   360  	fakeClient.PrependReactor("update", "pods", func(action core.Action) (bool, runtime.Object, error) {
   361  		update := action.(core.UpdateAction)
   362  		updated = update.GetObject().(*v1.Pod)
   363  		return true, update.GetObject(), nil
   364  	})
   365  	if err := control.UpdateStatefulPod(ctx, set, pod); err != nil {
   366  		t.Errorf("Successful update returned an error: %s", err)
   367  	}
   368  	events := collectEvents(recorder.Events)
   369  	if eventCount := len(events); eventCount != 2 {
   370  		t.Errorf("Pod storage update successful: got %d events, but want 2", eventCount)
   371  	}
   372  	for i := range events {
   373  		if !strings.Contains(events[i], v1.EventTypeNormal) {
   374  			t.Errorf("Found unexpected non-normal event %s", events[i])
   375  		}
   376  	}
   377  	if !storageMatches(set, updated) {
   378  		t.Error("Name update failed identity does not match")
   379  	}
   380  }
   381  
   382  func TestStatefulPodControlUpdatePodStorageFailure(t *testing.T) {
   383  	_, ctx := ktesting.NewTestContext(t)
   384  	recorder := record.NewFakeRecorder(10)
   385  	set := newStatefulSet(3)
   386  	pod := newStatefulSetPod(set, 0)
   387  	fakeClient := &fake.Clientset{}
   388  	pvcIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
   389  	pvcLister := corelisters.NewPersistentVolumeClaimLister(pvcIndexer)
   390  	control := NewStatefulPodControl(fakeClient, nil, pvcLister, recorder)
   391  	pvcs := getPersistentVolumeClaims(set, pod)
   392  	volumes := make([]v1.Volume, 0, len(pod.Spec.Volumes))
   393  	for i := range pod.Spec.Volumes {
   394  		if _, contains := pvcs[pod.Spec.Volumes[i].Name]; !contains {
   395  			volumes = append(volumes, pod.Spec.Volumes[i])
   396  		}
   397  	}
   398  	pod.Spec.Volumes = volumes
   399  	fakeClient.AddReactor("update", "pods", func(action core.Action) (bool, runtime.Object, error) {
   400  		update := action.(core.UpdateAction)
   401  		return true, update.GetObject(), nil
   402  	})
   403  	fakeClient.AddReactor("create", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
   404  		return true, nil, apierrors.NewInternalError(errors.New("API server down"))
   405  	})
   406  	if err := control.UpdateStatefulPod(ctx, set, pod); err == nil {
   407  		t.Error("Failed Pod storage update did not return an error")
   408  	}
   409  	events := collectEvents(recorder.Events)
   410  	if eventCount := len(events); eventCount != 2 {
   411  		t.Errorf("Pod storage update failed: got %d events, but want 2", eventCount)
   412  	}
   413  	for i := range events {
   414  		if !strings.Contains(events[i], v1.EventTypeWarning) {
   415  			t.Errorf("Found unexpected non-normal event %s", events[i])
   416  		}
   417  	}
   418  }
   419  
   420  func TestStatefulPodControlUpdatePodConflictSuccess(t *testing.T) {
   421  	_, ctx := ktesting.NewTestContext(t)
   422  	recorder := record.NewFakeRecorder(10)
   423  	set := newStatefulSet(3)
   424  	pod := newStatefulSetPod(set, 0)
   425  	fakeClient := &fake.Clientset{}
   426  	podIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
   427  	podLister := corelisters.NewPodLister(podIndexer)
   428  	claimIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
   429  	claimLister := corelisters.NewPersistentVolumeClaimLister(podIndexer)
   430  	gooPod := newStatefulSetPod(set, 0)
   431  	gooPod.Labels[apps.StatefulSetPodNameLabel] = "goo-starts"
   432  	podIndexer.Add(gooPod)
   433  	claims := getPersistentVolumeClaims(set, gooPod)
   434  	for k := range claims {
   435  		claim := claims[k]
   436  		claimIndexer.Add(&claim)
   437  	}
   438  	control := NewStatefulPodControl(fakeClient, podLister, claimLister, recorder)
   439  	conflict := false
   440  	fakeClient.AddReactor("update", "pods", func(action core.Action) (bool, runtime.Object, error) {
   441  		update := action.(core.UpdateAction)
   442  		if !conflict {
   443  			conflict = true
   444  			return true, update.GetObject(), apierrors.NewConflict(action.GetResource().GroupResource(), pod.Name, errors.New("conflict"))
   445  		}
   446  		return true, update.GetObject(), nil
   447  
   448  	})
   449  	pod.Labels[apps.StatefulSetPodNameLabel] = "goo-0"
   450  	if err := control.UpdateStatefulPod(ctx, set, pod); err != nil {
   451  		t.Errorf("Successful update returned an error: %s", err)
   452  	}
   453  	events := collectEvents(recorder.Events)
   454  	if eventCount := len(events); eventCount != 1 {
   455  		t.Errorf("Pod update successful: got %d, but want 1", eventCount)
   456  	} else if !strings.Contains(events[0], v1.EventTypeNormal) {
   457  		t.Errorf("Found unexpected non-normal event %s", events[0])
   458  	}
   459  	if !identityMatches(set, pod) {
   460  		t.Error("Name update failed identity does not match")
   461  	}
   462  }
   463  
   464  func TestStatefulPodControlDeletesStatefulPod(t *testing.T) {
   465  	recorder := record.NewFakeRecorder(10)
   466  	set := newStatefulSet(3)
   467  	pod := newStatefulSetPod(set, 0)
   468  	fakeClient := &fake.Clientset{}
   469  	control := NewStatefulPodControl(fakeClient, nil, nil, recorder)
   470  	fakeClient.AddReactor("delete", "pods", func(action core.Action) (bool, runtime.Object, error) {
   471  		return true, nil, nil
   472  	})
   473  	if err := control.DeleteStatefulPod(set, pod); err != nil {
   474  		t.Errorf("Error returned on successful delete: %s", err)
   475  	}
   476  	events := collectEvents(recorder.Events)
   477  	if eventCount := len(events); eventCount != 1 {
   478  		t.Errorf("delete successful: got %d events, but want 1", eventCount)
   479  	} else if !strings.Contains(events[0], v1.EventTypeNormal) {
   480  		t.Errorf("Found unexpected non-normal event %s", events[0])
   481  	}
   482  }
   483  
   484  func TestStatefulPodControlDeleteFailure(t *testing.T) {
   485  	recorder := record.NewFakeRecorder(10)
   486  	set := newStatefulSet(3)
   487  	pod := newStatefulSetPod(set, 0)
   488  	fakeClient := &fake.Clientset{}
   489  	control := NewStatefulPodControl(fakeClient, nil, nil, recorder)
   490  	fakeClient.AddReactor("delete", "pods", func(action core.Action) (bool, runtime.Object, error) {
   491  		return true, nil, apierrors.NewInternalError(errors.New("API server down"))
   492  	})
   493  	if err := control.DeleteStatefulPod(set, pod); err == nil {
   494  		t.Error("Failed to return error on failed delete")
   495  	}
   496  	events := collectEvents(recorder.Events)
   497  	if eventCount := len(events); eventCount != 1 {
   498  		t.Errorf("delete failed: got %d events, but want 1", eventCount)
   499  	} else if !strings.Contains(events[0], v1.EventTypeWarning) {
   500  		t.Errorf("Found unexpected non-warning event %s", events[0])
   501  	}
   502  }
   503  
   504  func TestStatefulPodControlClaimsMatchDeletionPolcy(t *testing.T) {
   505  	// The claimOwnerMatchesSetAndPod is tested exhaustively in stateful_set_utils_test; this
   506  	// test is for the wiring to the method tested there.
   507  	_, ctx := ktesting.NewTestContext(t)
   508  	fakeClient := &fake.Clientset{}
   509  	indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
   510  	claimLister := corelisters.NewPersistentVolumeClaimLister(indexer)
   511  	set := newStatefulSet(3)
   512  	pod := newStatefulSetPod(set, 0)
   513  	claims := getPersistentVolumeClaims(set, pod)
   514  	for k := range claims {
   515  		claim := claims[k]
   516  		indexer.Add(&claim)
   517  	}
   518  	control := NewStatefulPodControl(fakeClient, nil, claimLister, &noopRecorder{})
   519  	set.Spec.PersistentVolumeClaimRetentionPolicy = &apps.StatefulSetPersistentVolumeClaimRetentionPolicy{
   520  		WhenDeleted: apps.RetainPersistentVolumeClaimRetentionPolicyType,
   521  		WhenScaled:  apps.RetainPersistentVolumeClaimRetentionPolicyType,
   522  	}
   523  	if matches, err := control.ClaimsMatchRetentionPolicy(ctx, set, pod); err != nil {
   524  		t.Errorf("Unexpected error for ClaimsMatchRetentionPolicy (retain): %v", err)
   525  	} else if !matches {
   526  		t.Error("Unexpected non-match for ClaimsMatchRetentionPolicy (retain)")
   527  	}
   528  	set.Spec.PersistentVolumeClaimRetentionPolicy = &apps.StatefulSetPersistentVolumeClaimRetentionPolicy{
   529  		WhenDeleted: apps.DeletePersistentVolumeClaimRetentionPolicyType,
   530  		WhenScaled:  apps.RetainPersistentVolumeClaimRetentionPolicyType,
   531  	}
   532  	if matches, err := control.ClaimsMatchRetentionPolicy(ctx, set, pod); err != nil {
   533  		t.Errorf("Unexpected error for ClaimsMatchRetentionPolicy (set deletion): %v", err)
   534  	} else if matches {
   535  		t.Error("Unexpected match for ClaimsMatchRetentionPolicy (set deletion)")
   536  	}
   537  }
   538  
   539  func TestStatefulPodControlUpdatePodClaimForRetentionPolicy(t *testing.T) {
   540  	// All the update conditions are tested exhaustively in stateful_set_utils_test. This
   541  	// tests the wiring from the pod control to that method.
   542  	testFn := func(t *testing.T) {
   543  		_, ctx := ktesting.NewTestContext(t)
   544  		defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetAutoDeletePVC, true)()
   545  		fakeClient := &fake.Clientset{}
   546  		indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
   547  		claimLister := corelisters.NewPersistentVolumeClaimLister(indexer)
   548  		fakeClient.AddReactor("update", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
   549  			update := action.(core.UpdateAction)
   550  			indexer.Update(update.GetObject())
   551  			return true, update.GetObject(), nil
   552  		})
   553  		set := newStatefulSet(3)
   554  		set.GetObjectMeta().SetUID("set-123")
   555  		pod := newStatefulSetPod(set, 0)
   556  		claims := getPersistentVolumeClaims(set, pod)
   557  		for k := range claims {
   558  			claim := claims[k]
   559  			indexer.Add(&claim)
   560  		}
   561  		control := NewStatefulPodControl(fakeClient, nil, claimLister, &noopRecorder{})
   562  		set.Spec.PersistentVolumeClaimRetentionPolicy = &apps.StatefulSetPersistentVolumeClaimRetentionPolicy{
   563  			WhenDeleted: apps.DeletePersistentVolumeClaimRetentionPolicyType,
   564  			WhenScaled:  apps.RetainPersistentVolumeClaimRetentionPolicyType,
   565  		}
   566  		if err := control.UpdatePodClaimForRetentionPolicy(ctx, set, pod); err != nil {
   567  			t.Errorf("Unexpected error for UpdatePodClaimForRetentionPolicy (retain): %v", err)
   568  		}
   569  		expectRef := utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC)
   570  		for k := range claims {
   571  			claim, err := claimLister.PersistentVolumeClaims(claims[k].Namespace).Get(claims[k].Name)
   572  			if err != nil {
   573  				t.Errorf("Unexpected error getting Claim %s/%s: %v", claim.Namespace, claim.Name, err)
   574  			}
   575  			if hasOwnerRef(claim, set) != expectRef {
   576  				t.Errorf("Claim %s/%s bad set owner ref", claim.Namespace, claim.Name)
   577  			}
   578  		}
   579  	}
   580  	t.Run("StatefulSetAutoDeletePVCEnabled", func(t *testing.T) {
   581  		defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetAutoDeletePVC, true)()
   582  		testFn(t)
   583  	})
   584  	t.Run("StatefulSetAutoDeletePVCDisabled", func(t *testing.T) {
   585  		defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetAutoDeletePVC, false)()
   586  		testFn(t)
   587  	})
   588  }
   589  
   590  func TestPodClaimIsStale(t *testing.T) {
   591  	const missing = "missing"
   592  	const exists = "exists"
   593  	const stale = "stale"
   594  	const withRef = "with-ref"
   595  	testCases := []struct {
   596  		name        string
   597  		claimStates []string
   598  		expected    bool
   599  		skipPodUID  bool
   600  	}{
   601  		{
   602  			name:        "all missing",
   603  			claimStates: []string{missing, missing},
   604  			expected:    false,
   605  		},
   606  		{
   607  			name:        "no claims",
   608  			claimStates: []string{},
   609  			expected:    false,
   610  		},
   611  		{
   612  			name:        "exists",
   613  			claimStates: []string{missing, exists},
   614  			expected:    false,
   615  		},
   616  		{
   617  			name:        "all refs",
   618  			claimStates: []string{withRef, withRef},
   619  			expected:    false,
   620  		},
   621  		{
   622  			name:        "stale & exists",
   623  			claimStates: []string{stale, exists},
   624  			expected:    true,
   625  		},
   626  		{
   627  			name:        "stale & missing",
   628  			claimStates: []string{stale, missing},
   629  			expected:    true,
   630  		},
   631  		{
   632  			name:        "withRef & stale",
   633  			claimStates: []string{withRef, stale},
   634  			expected:    true,
   635  		},
   636  		{
   637  			name:        "withRef, no UID",
   638  			claimStates: []string{withRef},
   639  			skipPodUID:  true,
   640  			expected:    true,
   641  		},
   642  	}
   643  	for _, tc := range testCases {
   644  		set := apps.StatefulSet{}
   645  		set.Name = "set"
   646  		set.Namespace = "default"
   647  		set.Spec.PersistentVolumeClaimRetentionPolicy = &apps.StatefulSetPersistentVolumeClaimRetentionPolicy{
   648  			WhenDeleted: apps.RetainPersistentVolumeClaimRetentionPolicyType,
   649  			WhenScaled:  apps.DeletePersistentVolumeClaimRetentionPolicyType,
   650  		}
   651  		set.Spec.Selector = &metav1.LabelSelector{MatchLabels: map[string]string{"key": "value"}}
   652  		claimIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
   653  		for i, claimState := range tc.claimStates {
   654  			claim := v1.PersistentVolumeClaim{}
   655  			claim.Name = fmt.Sprintf("claim-%d", i)
   656  			set.Spec.VolumeClaimTemplates = append(set.Spec.VolumeClaimTemplates, claim)
   657  			claim.Name = fmt.Sprintf("%s-set-3", claim.Name)
   658  			claim.Namespace = set.Namespace
   659  			switch claimState {
   660  			case missing:
   661  			// Do nothing, the claim shouldn't exist.
   662  			case exists:
   663  				claimIndexer.Add(&claim)
   664  			case stale:
   665  				claim.SetOwnerReferences([]metav1.OwnerReference{
   666  					{Name: "set-3", UID: types.UID("stale")},
   667  				})
   668  				claimIndexer.Add(&claim)
   669  			case withRef:
   670  				claim.SetOwnerReferences([]metav1.OwnerReference{
   671  					{Name: "set-3", UID: types.UID("123")},
   672  				})
   673  				claimIndexer.Add(&claim)
   674  			}
   675  		}
   676  		pod := v1.Pod{}
   677  		pod.Name = "set-3"
   678  		if !tc.skipPodUID {
   679  			pod.SetUID("123")
   680  		}
   681  		claimLister := corelisters.NewPersistentVolumeClaimLister(claimIndexer)
   682  		control := NewStatefulPodControl(&fake.Clientset{}, nil, claimLister, &noopRecorder{})
   683  		expected := tc.expected
   684  		// Note that the error isn't / can't be tested.
   685  		if stale, _ := control.PodClaimIsStale(&set, &pod); stale != expected {
   686  			t.Errorf("unexpected stale for %s", tc.name)
   687  		}
   688  	}
   689  }
   690  
   691  func TestStatefulPodControlRetainDeletionPolicyUpdate(t *testing.T) {
   692  	testFn := func(t *testing.T) {
   693  		_, ctx := ktesting.NewTestContext(t)
   694  		recorder := record.NewFakeRecorder(10)
   695  		set := newStatefulSet(1)
   696  		set.Spec.PersistentVolumeClaimRetentionPolicy = &apps.StatefulSetPersistentVolumeClaimRetentionPolicy{
   697  			WhenDeleted: apps.RetainPersistentVolumeClaimRetentionPolicyType,
   698  			WhenScaled:  apps.RetainPersistentVolumeClaimRetentionPolicyType,
   699  		}
   700  		pod := newStatefulSetPod(set, 0)
   701  		fakeClient := &fake.Clientset{}
   702  		podIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
   703  		podLister := corelisters.NewPodLister(podIndexer)
   704  		claimIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
   705  		claimLister := corelisters.NewPersistentVolumeClaimLister(claimIndexer)
   706  		podIndexer.Add(pod)
   707  		claims := getPersistentVolumeClaims(set, pod)
   708  		if len(claims) < 1 {
   709  			t.Errorf("Unexpected missing PVCs")
   710  		}
   711  		for k := range claims {
   712  			claim := claims[k]
   713  			setOwnerRef(&claim, set, &set.TypeMeta) // This ownerRef should be removed in the update.
   714  			claimIndexer.Add(&claim)
   715  		}
   716  		control := NewStatefulPodControl(fakeClient, podLister, claimLister, recorder)
   717  		if err := control.UpdateStatefulPod(ctx, set, pod); err != nil {
   718  			t.Errorf("Successful update returned an error: %s", err)
   719  		}
   720  		for k := range claims {
   721  			claim := claims[k]
   722  			if hasOwnerRef(&claim, set) {
   723  				t.Errorf("ownerRef not removed: %s/%s", claim.Namespace, claim.Name)
   724  			}
   725  		}
   726  		events := collectEvents(recorder.Events)
   727  		if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) {
   728  			if eventCount := len(events); eventCount != 1 {
   729  				t.Errorf("delete failed: got %d events, but want 1", eventCount)
   730  			}
   731  		} else {
   732  			if len(events) != 0 {
   733  				t.Errorf("delete failed: expected no events, but got %v", events)
   734  			}
   735  		}
   736  	}
   737  	t.Run("StatefulSetAutoDeletePVCEnabled", func(t *testing.T) {
   738  		defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetAutoDeletePVC, true)()
   739  		testFn(t)
   740  	})
   741  	t.Run("StatefulSetAutoDeletePVCDisabled", func(t *testing.T) {
   742  		defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetAutoDeletePVC, false)()
   743  		testFn(t)
   744  	})
   745  }
   746  
   747  func TestStatefulPodControlRetentionPolicyUpdate(t *testing.T) {
   748  	_, ctx := ktesting.NewTestContext(t)
   749  	// Only applicable when the feature gate is on; the off case is tested in TestStatefulPodControlRetainRetentionPolicyUpdate.
   750  	defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetAutoDeletePVC, true)()
   751  
   752  	recorder := record.NewFakeRecorder(10)
   753  	set := newStatefulSet(1)
   754  	set.Spec.PersistentVolumeClaimRetentionPolicy = &apps.StatefulSetPersistentVolumeClaimRetentionPolicy{
   755  		WhenDeleted: apps.DeletePersistentVolumeClaimRetentionPolicyType,
   756  		WhenScaled:  apps.RetainPersistentVolumeClaimRetentionPolicyType,
   757  	}
   758  	pod := newStatefulSetPod(set, 0)
   759  	fakeClient := &fake.Clientset{}
   760  	podIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
   761  	claimIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
   762  	podIndexer.Add(pod)
   763  	claims := getPersistentVolumeClaims(set, pod)
   764  	if len(claims) != 1 {
   765  		t.Errorf("Unexpected or missing PVCs")
   766  	}
   767  	var claim v1.PersistentVolumeClaim
   768  	for k := range claims {
   769  		claim = claims[k]
   770  		claimIndexer.Add(&claim)
   771  	}
   772  	fakeClient.AddReactor("update", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
   773  		update := action.(core.UpdateAction)
   774  		claimIndexer.Update(update.GetObject())
   775  		return true, update.GetObject(), nil
   776  	})
   777  	podLister := corelisters.NewPodLister(podIndexer)
   778  	claimLister := corelisters.NewPersistentVolumeClaimLister(claimIndexer)
   779  	control := NewStatefulPodControl(fakeClient, podLister, claimLister, recorder)
   780  	if err := control.UpdateStatefulPod(ctx, set, pod); err != nil {
   781  		t.Errorf("Successful update returned an error: %s", err)
   782  	}
   783  	updatedClaim, err := claimLister.PersistentVolumeClaims(claim.Namespace).Get(claim.Name)
   784  	if err != nil {
   785  		t.Errorf("Error retrieving claim %s/%s: %v", claim.Namespace, claim.Name, err)
   786  	}
   787  	if !hasOwnerRef(updatedClaim, set) {
   788  		t.Errorf("ownerRef not added: %s/%s", claim.Namespace, claim.Name)
   789  	}
   790  	events := collectEvents(recorder.Events)
   791  	if eventCount := len(events); eventCount != 1 {
   792  		t.Errorf("update failed: got %d events, but want 1", eventCount)
   793  	}
   794  }
   795  
   796  func TestStatefulPodControlRetentionPolicyUpdateMissingClaims(t *testing.T) {
   797  	_, ctx := ktesting.NewTestContext(t)
   798  	// Only applicable when the feature gate is on.
   799  	defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StatefulSetAutoDeletePVC, true)()
   800  
   801  	recorder := record.NewFakeRecorder(10)
   802  	set := newStatefulSet(1)
   803  	set.Spec.PersistentVolumeClaimRetentionPolicy = &apps.StatefulSetPersistentVolumeClaimRetentionPolicy{
   804  		WhenDeleted: apps.DeletePersistentVolumeClaimRetentionPolicyType,
   805  		WhenScaled:  apps.RetainPersistentVolumeClaimRetentionPolicyType,
   806  	}
   807  	pod := newStatefulSetPod(set, 0)
   808  	fakeClient := &fake.Clientset{}
   809  	podIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
   810  	podLister := corelisters.NewPodLister(podIndexer)
   811  	claimIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
   812  	claimLister := corelisters.NewPersistentVolumeClaimLister(claimIndexer)
   813  	podIndexer.Add(pod)
   814  	fakeClient.AddReactor("update", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
   815  		update := action.(core.UpdateAction)
   816  		claimIndexer.Update(update.GetObject())
   817  		return true, update.GetObject(), nil
   818  	})
   819  	control := NewStatefulPodControl(fakeClient, podLister, claimLister, recorder)
   820  	if err := control.UpdateStatefulPod(ctx, set, pod); err != nil {
   821  		t.Error("Unexpected error on pod update when PVCs are missing")
   822  	}
   823  	claims := getPersistentVolumeClaims(set, pod)
   824  	if len(claims) != 1 {
   825  		t.Errorf("Unexpected or missing PVCs")
   826  	}
   827  	var claim v1.PersistentVolumeClaim
   828  	for k := range claims {
   829  		claim = claims[k]
   830  		claimIndexer.Add(&claim)
   831  	}
   832  
   833  	if err := control.UpdateStatefulPod(ctx, set, pod); err != nil {
   834  		t.Errorf("Expected update to succeed, saw error %v", err)
   835  	}
   836  	updatedClaim, err := claimLister.PersistentVolumeClaims(claim.Namespace).Get(claim.Name)
   837  	if err != nil {
   838  		t.Errorf("Error retrieving claim %s/%s: %v", claim.Namespace, claim.Name, err)
   839  	}
   840  	if !hasOwnerRef(updatedClaim, set) {
   841  		t.Errorf("ownerRef not added: %s/%s", claim.Namespace, claim.Name)
   842  	}
   843  	events := collectEvents(recorder.Events)
   844  	if eventCount := len(events); eventCount != 1 {
   845  		t.Errorf("update failed: got %d events, but want 2", eventCount)
   846  	}
   847  	if !strings.Contains(events[0], "SuccessfulUpdate") {
   848  		t.Errorf("expected first event to be a successful update: %s", events[1])
   849  	}
   850  }
   851  
   852  func collectEvents(source <-chan string) []string {
   853  	done := false
   854  	events := make([]string, 0)
   855  	for !done {
   856  		select {
   857  		case event := <-source:
   858  			events = append(events, event)
   859  		default:
   860  			done = true
   861  		}
   862  	}
   863  	return events
   864  }
   865  

View as plain text