...

Source file src/k8s.io/kubernetes/pkg/registry/apps/statefulset/storage/storage_test.go

Documentation: k8s.io/kubernetes/pkg/registry/apps/statefulset/storage

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package storage
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"sync"
    23  	"testing"
    24  
    25  	"github.com/google/go-cmp/cmp"
    26  	apiequality "k8s.io/apimachinery/pkg/api/equality"
    27  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    28  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    29  	"k8s.io/apimachinery/pkg/fields"
    30  	"k8s.io/apimachinery/pkg/labels"
    31  	"k8s.io/apimachinery/pkg/runtime"
    32  	genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
    33  	"k8s.io/apiserver/pkg/registry/generic"
    34  	genericregistrytest "k8s.io/apiserver/pkg/registry/generic/testing"
    35  	"k8s.io/apiserver/pkg/registry/rest"
    36  	etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
    37  	"k8s.io/kubernetes/pkg/apis/apps"
    38  	"k8s.io/kubernetes/pkg/apis/autoscaling"
    39  	api "k8s.io/kubernetes/pkg/apis/core"
    40  	"k8s.io/kubernetes/pkg/registry/registrytest"
    41  )
    42  
    43  // TODO: allow for global factory override
    44  func newStorage(t *testing.T) (StatefulSetStorage, *etcd3testing.EtcdTestServer) {
    45  	etcdStorage, server := registrytest.NewEtcdStorage(t, apps.GroupName)
    46  	restOptions := generic.RESTOptions{StorageConfig: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1, ResourcePrefix: "statefulsets"}
    47  	storage, err := NewStorage(restOptions)
    48  	if err != nil {
    49  		t.Fatalf("unexpected error from REST storage: %v", err)
    50  	}
    51  	return storage, server
    52  }
    53  
    54  func validNewStatefulSet() *apps.StatefulSet {
    55  	return &apps.StatefulSet{
    56  		ObjectMeta: metav1.ObjectMeta{
    57  			Name:      "foo",
    58  			Namespace: metav1.NamespaceDefault,
    59  			Labels:    map[string]string{"a": "b"},
    60  		},
    61  		Spec: apps.StatefulSetSpec{
    62  			PodManagementPolicy: apps.OrderedReadyPodManagement,
    63  			Selector:            &metav1.LabelSelector{MatchLabels: map[string]string{"a": "b"}},
    64  			Template: api.PodTemplateSpec{
    65  				ObjectMeta: metav1.ObjectMeta{
    66  					Labels: map[string]string{"a": "b"},
    67  				},
    68  				Spec: api.PodSpec{
    69  					Containers: []api.Container{
    70  						{
    71  							Name:            "test",
    72  							Image:           "test_image",
    73  							ImagePullPolicy: api.PullIfNotPresent,
    74  						},
    75  					},
    76  					RestartPolicy: api.RestartPolicyAlways,
    77  					DNSPolicy:     api.DNSClusterFirst,
    78  				},
    79  			},
    80  			Replicas:       7,
    81  			UpdateStrategy: apps.StatefulSetUpdateStrategy{Type: apps.RollingUpdateStatefulSetStrategyType},
    82  		},
    83  		Status: apps.StatefulSetStatus{},
    84  	}
    85  }
    86  
    87  var validStatefulSet = *validNewStatefulSet()
    88  
    89  func TestCreate(t *testing.T) {
    90  	storage, server := newStorage(t)
    91  	defer server.Terminate(t)
    92  	defer storage.StatefulSet.Store.DestroyFunc()
    93  	test := genericregistrytest.New(t, storage.StatefulSet.Store)
    94  	ps := validNewStatefulSet()
    95  	ps.ObjectMeta = metav1.ObjectMeta{}
    96  	test.TestCreate(
    97  		// valid
    98  		ps,
    99  		// TODO: Add an invalid case when we have validation.
   100  	)
   101  }
   102  
   103  // TODO: Test updates to spec when we allow them.
   104  
   105  func TestStatusUpdate(t *testing.T) {
   106  	storage, server := newStorage(t)
   107  	defer server.Terminate(t)
   108  	defer storage.StatefulSet.Store.DestroyFunc()
   109  	ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), metav1.NamespaceDefault)
   110  	key := "/statefulsets/" + metav1.NamespaceDefault + "/foo"
   111  	validStatefulSet := validNewStatefulSet()
   112  	if err := storage.StatefulSet.Storage.Create(ctx, key, validStatefulSet, nil, 0, false); err != nil {
   113  		t.Fatalf("unexpected error: %v", err)
   114  	}
   115  	update := apps.StatefulSet{
   116  		ObjectMeta: validStatefulSet.ObjectMeta,
   117  		Spec: apps.StatefulSetSpec{
   118  			Replicas: 7,
   119  		},
   120  		Status: apps.StatefulSetStatus{
   121  			Replicas: 7,
   122  		},
   123  	}
   124  
   125  	if _, _, err := storage.Status.Update(ctx, update.Name, rest.DefaultUpdatedObjectInfo(&update), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{}); err != nil {
   126  		t.Fatalf("unexpected error: %v", err)
   127  	}
   128  	obj, err := storage.StatefulSet.Get(ctx, "foo", &metav1.GetOptions{})
   129  	if err != nil {
   130  		t.Fatalf("unexpected error: %v", err)
   131  	}
   132  
   133  	ps := obj.(*apps.StatefulSet)
   134  	if ps.Spec.Replicas != 7 {
   135  		t.Errorf("we expected .spec.replicas to not be updated but it was updated to %v", ps.Spec.Replicas)
   136  	}
   137  	if ps.Status.Replicas != 7 {
   138  		t.Errorf("we expected .status.replicas to be updated to %d but it was %v", 7, ps.Status.Replicas)
   139  	}
   140  }
   141  
   142  func TestGet(t *testing.T) {
   143  	storage, server := newStorage(t)
   144  	defer server.Terminate(t)
   145  	defer storage.StatefulSet.Store.DestroyFunc()
   146  	test := genericregistrytest.New(t, storage.StatefulSet.Store)
   147  	test.TestGet(validNewStatefulSet())
   148  }
   149  
   150  func TestList(t *testing.T) {
   151  	storage, server := newStorage(t)
   152  	defer server.Terminate(t)
   153  	defer storage.StatefulSet.Store.DestroyFunc()
   154  	test := genericregistrytest.New(t, storage.StatefulSet.Store)
   155  	test.TestList(validNewStatefulSet())
   156  }
   157  
   158  func TestDelete(t *testing.T) {
   159  	storage, server := newStorage(t)
   160  	defer server.Terminate(t)
   161  	defer storage.StatefulSet.Store.DestroyFunc()
   162  	test := genericregistrytest.New(t, storage.StatefulSet.Store)
   163  	test.TestDelete(validNewStatefulSet())
   164  }
   165  
   166  func TestWatch(t *testing.T) {
   167  	storage, server := newStorage(t)
   168  	defer server.Terminate(t)
   169  	defer storage.StatefulSet.Store.DestroyFunc()
   170  	test := genericregistrytest.New(t, storage.StatefulSet.Store)
   171  	test.TestWatch(
   172  		validNewStatefulSet(),
   173  		// matching labels
   174  		[]labels.Set{
   175  			{"a": "b"},
   176  		},
   177  		// not matching labels
   178  		[]labels.Set{
   179  			{"a": "c"},
   180  			{"foo": "bar"},
   181  		},
   182  
   183  		// matching fields
   184  		[]fields.Set{
   185  			{"metadata.name": "foo"},
   186  		},
   187  		// not matching fields
   188  		[]fields.Set{
   189  			{"metadata.name": "bar"},
   190  		},
   191  	)
   192  }
   193  
   194  func TestCategories(t *testing.T) {
   195  	storage, server := newStorage(t)
   196  	defer server.Terminate(t)
   197  	defer storage.StatefulSet.Store.DestroyFunc()
   198  	expected := []string{"all"}
   199  	registrytest.AssertCategories(t, storage.StatefulSet, expected)
   200  }
   201  
   202  func TestShortNames(t *testing.T) {
   203  	storage, server := newStorage(t)
   204  	defer server.Terminate(t)
   205  	defer storage.StatefulSet.Store.DestroyFunc()
   206  	expected := []string{"sts"}
   207  	registrytest.AssertShortNames(t, storage.StatefulSet, expected)
   208  }
   209  
   210  func TestScaleGet(t *testing.T) {
   211  	storage, server := newStorage(t)
   212  	defer server.Terminate(t)
   213  	defer storage.StatefulSet.Store.DestroyFunc()
   214  
   215  	name := "foo"
   216  
   217  	var sts apps.StatefulSet
   218  	ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), metav1.NamespaceDefault)
   219  	key := "/statefulsets/" + metav1.NamespaceDefault + "/" + name
   220  	if err := storage.StatefulSet.Storage.Create(ctx, key, &validStatefulSet, &sts, 0, false); err != nil {
   221  		t.Fatalf("error setting new statefulset (key: %s) %v: %v", key, validStatefulSet, err)
   222  	}
   223  
   224  	selector, err := metav1.LabelSelectorAsSelector(validStatefulSet.Spec.Selector)
   225  	if err != nil {
   226  		t.Fatal(err)
   227  	}
   228  	want := &autoscaling.Scale{
   229  		ObjectMeta: metav1.ObjectMeta{
   230  			Name:              name,
   231  			Namespace:         metav1.NamespaceDefault,
   232  			UID:               sts.UID,
   233  			ResourceVersion:   sts.ResourceVersion,
   234  			CreationTimestamp: sts.CreationTimestamp,
   235  		},
   236  		Spec: autoscaling.ScaleSpec{
   237  			Replicas: validStatefulSet.Spec.Replicas,
   238  		},
   239  		Status: autoscaling.ScaleStatus{
   240  			Replicas: validStatefulSet.Status.Replicas,
   241  			Selector: selector.String(),
   242  		},
   243  	}
   244  	obj, err := storage.Scale.Get(ctx, name, &metav1.GetOptions{})
   245  	got := obj.(*autoscaling.Scale)
   246  	if err != nil {
   247  		t.Fatalf("error fetching scale for %s: %v", name, err)
   248  	}
   249  	if !apiequality.Semantic.DeepEqual(got, want) {
   250  		t.Errorf("unexpected scale: %s", cmp.Diff(got, want))
   251  	}
   252  }
   253  
   254  func TestScaleUpdate(t *testing.T) {
   255  	storage, server := newStorage(t)
   256  	defer server.Terminate(t)
   257  	defer storage.StatefulSet.Store.DestroyFunc()
   258  
   259  	name := "foo"
   260  
   261  	var sts apps.StatefulSet
   262  	ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), metav1.NamespaceDefault)
   263  	key := "/statefulsets/" + metav1.NamespaceDefault + "/" + name
   264  	if err := storage.StatefulSet.Storage.Create(ctx, key, &validStatefulSet, &sts, 0, false); err != nil {
   265  		t.Fatalf("error setting new statefulset (key: %s) %v: %v", key, validStatefulSet, err)
   266  	}
   267  	replicas := 12
   268  	update := autoscaling.Scale{
   269  		ObjectMeta: metav1.ObjectMeta{
   270  			Name:      name,
   271  			Namespace: metav1.NamespaceDefault,
   272  		},
   273  		Spec: autoscaling.ScaleSpec{
   274  			Replicas: int32(replicas),
   275  		},
   276  	}
   277  
   278  	if _, _, err := storage.Scale.Update(ctx, update.Name, rest.DefaultUpdatedObjectInfo(&update), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{}); err != nil {
   279  		t.Fatalf("error updating scale %v: %v", update, err)
   280  	}
   281  
   282  	obj, err := storage.Scale.Get(ctx, name, &metav1.GetOptions{})
   283  	if err != nil {
   284  		t.Fatalf("error fetching scale for %s: %v", name, err)
   285  	}
   286  	scale := obj.(*autoscaling.Scale)
   287  	if scale.Spec.Replicas != int32(replicas) {
   288  		t.Errorf("wrong replicas count expected: %d got: %d", replicas, scale.Spec.Replicas)
   289  	}
   290  
   291  	update.ResourceVersion = sts.ResourceVersion
   292  	update.Spec.Replicas = 15
   293  
   294  	if _, _, err = storage.Scale.Update(ctx, update.Name, rest.DefaultUpdatedObjectInfo(&update), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{}); err != nil && !apierrors.IsConflict(err) {
   295  		t.Fatalf("unexpected error, expecting an update conflict but got %v", err)
   296  	}
   297  }
   298  
   299  // TODO: Test generation number.
   300  
   301  func TestScalePatchErrors(t *testing.T) {
   302  	storage, server := newStorage(t)
   303  	defer server.Terminate(t)
   304  	validObj := &validStatefulSet
   305  	namespace := validObj.Namespace
   306  	name := validObj.Name
   307  	resourceStore := storage.StatefulSet.Store
   308  	scaleStore := storage.Scale
   309  
   310  	defer resourceStore.DestroyFunc()
   311  	ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), namespace)
   312  
   313  	{
   314  		applyNotFoundPatch := func() rest.TransformFunc {
   315  			return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) {
   316  				t.Errorf("notfound patch called")
   317  				return currentObject, nil
   318  			}
   319  		}
   320  		_, _, err := scaleStore.Update(ctx, "bad-name", rest.DefaultUpdatedObjectInfo(nil, applyNotFoundPatch()), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{})
   321  		if !apierrors.IsNotFound(err) {
   322  			t.Errorf("expected notfound, got %v", err)
   323  		}
   324  	}
   325  
   326  	if _, err := resourceStore.Create(ctx, validObj, rest.ValidateAllObjectFunc, &metav1.CreateOptions{}); err != nil {
   327  		t.Errorf("Unexpected error: %v", err)
   328  	}
   329  
   330  	{
   331  		applyBadUIDPatch := func() rest.TransformFunc {
   332  			return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) {
   333  				currentObject.(*autoscaling.Scale).UID = "123"
   334  				return currentObject, nil
   335  			}
   336  		}
   337  		_, _, err := scaleStore.Update(ctx, name, rest.DefaultUpdatedObjectInfo(nil, applyBadUIDPatch()), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{})
   338  		if !apierrors.IsConflict(err) {
   339  			t.Errorf("expected conflict, got %v", err)
   340  		}
   341  	}
   342  
   343  	{
   344  		applyBadResourceVersionPatch := func() rest.TransformFunc {
   345  			return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) {
   346  				currentObject.(*autoscaling.Scale).ResourceVersion = "123"
   347  				return currentObject, nil
   348  			}
   349  		}
   350  		_, _, err := scaleStore.Update(ctx, name, rest.DefaultUpdatedObjectInfo(nil, applyBadResourceVersionPatch()), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{})
   351  		if !apierrors.IsConflict(err) {
   352  			t.Errorf("expected conflict, got %v", err)
   353  		}
   354  	}
   355  }
   356  
   357  func TestScalePatchConflicts(t *testing.T) {
   358  	storage, server := newStorage(t)
   359  	defer server.Terminate(t)
   360  	validObj := &validStatefulSet
   361  	namespace := validObj.Namespace
   362  	name := validObj.Name
   363  	resourceStore := storage.StatefulSet.Store
   364  	scaleStore := storage.Scale
   365  
   366  	defer resourceStore.DestroyFunc()
   367  	ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), namespace)
   368  	if _, err := resourceStore.Create(ctx, validObj, rest.ValidateAllObjectFunc, &metav1.CreateOptions{}); err != nil {
   369  		t.Fatalf("Unexpected error: %v", err)
   370  	}
   371  	applyLabelPatch := func(labelName, labelValue string) rest.TransformFunc {
   372  		return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) {
   373  			currentObject.(metav1.Object).SetLabels(map[string]string{labelName: labelValue})
   374  			return currentObject, nil
   375  		}
   376  	}
   377  	stopCh := make(chan struct{})
   378  	wg := &sync.WaitGroup{}
   379  	wg.Add(1)
   380  	go func() {
   381  		defer wg.Done()
   382  		// continuously submits a patch that updates a label and verifies the label update was effective
   383  		labelName := "timestamp"
   384  		for i := 0; ; i++ {
   385  			select {
   386  			case <-stopCh:
   387  				return
   388  			default:
   389  				expectedLabelValue := fmt.Sprint(i)
   390  				updated, _, err := resourceStore.Update(ctx, name, rest.DefaultUpdatedObjectInfo(nil, applyLabelPatch(labelName, fmt.Sprint(i))), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{})
   391  				if err != nil {
   392  					t.Errorf("error patching main resource: %v", err)
   393  					return
   394  				}
   395  				gotLabelValue := updated.(metav1.Object).GetLabels()[labelName]
   396  				if gotLabelValue != expectedLabelValue {
   397  					t.Errorf("wrong label value: expected: %s, got: %s", expectedLabelValue, gotLabelValue)
   398  					return
   399  				}
   400  			}
   401  		}
   402  	}()
   403  
   404  	// continuously submits a scale patch of replicas for a monotonically increasing replica value
   405  	applyReplicaPatch := func(replicas int) rest.TransformFunc {
   406  		return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) {
   407  			currentObject.(*autoscaling.Scale).Spec.Replicas = int32(replicas)
   408  			return currentObject, nil
   409  		}
   410  	}
   411  	for i := 0; i < 100; i++ {
   412  		result, _, err := scaleStore.Update(ctx, name, rest.DefaultUpdatedObjectInfo(nil, applyReplicaPatch(i)), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{})
   413  		if err != nil {
   414  			t.Fatalf("error patching scale: %v", err)
   415  		}
   416  		scale := result.(*autoscaling.Scale)
   417  		if scale.Spec.Replicas != int32(i) {
   418  			t.Errorf("wrong replicas count: expected: %d got: %d", i, scale.Spec.Replicas)
   419  		}
   420  	}
   421  	close(stopCh)
   422  	wg.Wait()
   423  }
   424  

View as plain text