1
16
17 package storage
18
19 import (
20 "context"
21 "fmt"
22 "sync"
23 "testing"
24
25 "github.com/google/go-cmp/cmp"
26 apiequality "k8s.io/apimachinery/pkg/api/equality"
27 apierrors "k8s.io/apimachinery/pkg/api/errors"
28 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29 "k8s.io/apimachinery/pkg/fields"
30 "k8s.io/apimachinery/pkg/labels"
31 "k8s.io/apimachinery/pkg/runtime"
32 genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
33 "k8s.io/apiserver/pkg/registry/generic"
34 genericregistrytest "k8s.io/apiserver/pkg/registry/generic/testing"
35 "k8s.io/apiserver/pkg/registry/rest"
36 etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
37 "k8s.io/kubernetes/pkg/apis/apps"
38 "k8s.io/kubernetes/pkg/apis/autoscaling"
39 api "k8s.io/kubernetes/pkg/apis/core"
40 "k8s.io/kubernetes/pkg/registry/registrytest"
41 )
42
43
44 func newStorage(t *testing.T) (StatefulSetStorage, *etcd3testing.EtcdTestServer) {
45 etcdStorage, server := registrytest.NewEtcdStorage(t, apps.GroupName)
46 restOptions := generic.RESTOptions{StorageConfig: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1, ResourcePrefix: "statefulsets"}
47 storage, err := NewStorage(restOptions)
48 if err != nil {
49 t.Fatalf("unexpected error from REST storage: %v", err)
50 }
51 return storage, server
52 }
53
54 func validNewStatefulSet() *apps.StatefulSet {
55 return &apps.StatefulSet{
56 ObjectMeta: metav1.ObjectMeta{
57 Name: "foo",
58 Namespace: metav1.NamespaceDefault,
59 Labels: map[string]string{"a": "b"},
60 },
61 Spec: apps.StatefulSetSpec{
62 PodManagementPolicy: apps.OrderedReadyPodManagement,
63 Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"a": "b"}},
64 Template: api.PodTemplateSpec{
65 ObjectMeta: metav1.ObjectMeta{
66 Labels: map[string]string{"a": "b"},
67 },
68 Spec: api.PodSpec{
69 Containers: []api.Container{
70 {
71 Name: "test",
72 Image: "test_image",
73 ImagePullPolicy: api.PullIfNotPresent,
74 },
75 },
76 RestartPolicy: api.RestartPolicyAlways,
77 DNSPolicy: api.DNSClusterFirst,
78 },
79 },
80 Replicas: 7,
81 UpdateStrategy: apps.StatefulSetUpdateStrategy{Type: apps.RollingUpdateStatefulSetStrategyType},
82 },
83 Status: apps.StatefulSetStatus{},
84 }
85 }
86
87 var validStatefulSet = *validNewStatefulSet()
88
89 func TestCreate(t *testing.T) {
90 storage, server := newStorage(t)
91 defer server.Terminate(t)
92 defer storage.StatefulSet.Store.DestroyFunc()
93 test := genericregistrytest.New(t, storage.StatefulSet.Store)
94 ps := validNewStatefulSet()
95 ps.ObjectMeta = metav1.ObjectMeta{}
96 test.TestCreate(
97
98 ps,
99
100 )
101 }
102
103
104
105 func TestStatusUpdate(t *testing.T) {
106 storage, server := newStorage(t)
107 defer server.Terminate(t)
108 defer storage.StatefulSet.Store.DestroyFunc()
109 ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), metav1.NamespaceDefault)
110 key := "/statefulsets/" + metav1.NamespaceDefault + "/foo"
111 validStatefulSet := validNewStatefulSet()
112 if err := storage.StatefulSet.Storage.Create(ctx, key, validStatefulSet, nil, 0, false); err != nil {
113 t.Fatalf("unexpected error: %v", err)
114 }
115 update := apps.StatefulSet{
116 ObjectMeta: validStatefulSet.ObjectMeta,
117 Spec: apps.StatefulSetSpec{
118 Replicas: 7,
119 },
120 Status: apps.StatefulSetStatus{
121 Replicas: 7,
122 },
123 }
124
125 if _, _, err := storage.Status.Update(ctx, update.Name, rest.DefaultUpdatedObjectInfo(&update), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{}); err != nil {
126 t.Fatalf("unexpected error: %v", err)
127 }
128 obj, err := storage.StatefulSet.Get(ctx, "foo", &metav1.GetOptions{})
129 if err != nil {
130 t.Fatalf("unexpected error: %v", err)
131 }
132
133 ps := obj.(*apps.StatefulSet)
134 if ps.Spec.Replicas != 7 {
135 t.Errorf("we expected .spec.replicas to not be updated but it was updated to %v", ps.Spec.Replicas)
136 }
137 if ps.Status.Replicas != 7 {
138 t.Errorf("we expected .status.replicas to be updated to %d but it was %v", 7, ps.Status.Replicas)
139 }
140 }
141
142 func TestGet(t *testing.T) {
143 storage, server := newStorage(t)
144 defer server.Terminate(t)
145 defer storage.StatefulSet.Store.DestroyFunc()
146 test := genericregistrytest.New(t, storage.StatefulSet.Store)
147 test.TestGet(validNewStatefulSet())
148 }
149
150 func TestList(t *testing.T) {
151 storage, server := newStorage(t)
152 defer server.Terminate(t)
153 defer storage.StatefulSet.Store.DestroyFunc()
154 test := genericregistrytest.New(t, storage.StatefulSet.Store)
155 test.TestList(validNewStatefulSet())
156 }
157
158 func TestDelete(t *testing.T) {
159 storage, server := newStorage(t)
160 defer server.Terminate(t)
161 defer storage.StatefulSet.Store.DestroyFunc()
162 test := genericregistrytest.New(t, storage.StatefulSet.Store)
163 test.TestDelete(validNewStatefulSet())
164 }
165
166 func TestWatch(t *testing.T) {
167 storage, server := newStorage(t)
168 defer server.Terminate(t)
169 defer storage.StatefulSet.Store.DestroyFunc()
170 test := genericregistrytest.New(t, storage.StatefulSet.Store)
171 test.TestWatch(
172 validNewStatefulSet(),
173
174 []labels.Set{
175 {"a": "b"},
176 },
177
178 []labels.Set{
179 {"a": "c"},
180 {"foo": "bar"},
181 },
182
183
184 []fields.Set{
185 {"metadata.name": "foo"},
186 },
187
188 []fields.Set{
189 {"metadata.name": "bar"},
190 },
191 )
192 }
193
194 func TestCategories(t *testing.T) {
195 storage, server := newStorage(t)
196 defer server.Terminate(t)
197 defer storage.StatefulSet.Store.DestroyFunc()
198 expected := []string{"all"}
199 registrytest.AssertCategories(t, storage.StatefulSet, expected)
200 }
201
202 func TestShortNames(t *testing.T) {
203 storage, server := newStorage(t)
204 defer server.Terminate(t)
205 defer storage.StatefulSet.Store.DestroyFunc()
206 expected := []string{"sts"}
207 registrytest.AssertShortNames(t, storage.StatefulSet, expected)
208 }
209
210 func TestScaleGet(t *testing.T) {
211 storage, server := newStorage(t)
212 defer server.Terminate(t)
213 defer storage.StatefulSet.Store.DestroyFunc()
214
215 name := "foo"
216
217 var sts apps.StatefulSet
218 ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), metav1.NamespaceDefault)
219 key := "/statefulsets/" + metav1.NamespaceDefault + "/" + name
220 if err := storage.StatefulSet.Storage.Create(ctx, key, &validStatefulSet, &sts, 0, false); err != nil {
221 t.Fatalf("error setting new statefulset (key: %s) %v: %v", key, validStatefulSet, err)
222 }
223
224 selector, err := metav1.LabelSelectorAsSelector(validStatefulSet.Spec.Selector)
225 if err != nil {
226 t.Fatal(err)
227 }
228 want := &autoscaling.Scale{
229 ObjectMeta: metav1.ObjectMeta{
230 Name: name,
231 Namespace: metav1.NamespaceDefault,
232 UID: sts.UID,
233 ResourceVersion: sts.ResourceVersion,
234 CreationTimestamp: sts.CreationTimestamp,
235 },
236 Spec: autoscaling.ScaleSpec{
237 Replicas: validStatefulSet.Spec.Replicas,
238 },
239 Status: autoscaling.ScaleStatus{
240 Replicas: validStatefulSet.Status.Replicas,
241 Selector: selector.String(),
242 },
243 }
244 obj, err := storage.Scale.Get(ctx, name, &metav1.GetOptions{})
245 got := obj.(*autoscaling.Scale)
246 if err != nil {
247 t.Fatalf("error fetching scale for %s: %v", name, err)
248 }
249 if !apiequality.Semantic.DeepEqual(got, want) {
250 t.Errorf("unexpected scale: %s", cmp.Diff(got, want))
251 }
252 }
253
254 func TestScaleUpdate(t *testing.T) {
255 storage, server := newStorage(t)
256 defer server.Terminate(t)
257 defer storage.StatefulSet.Store.DestroyFunc()
258
259 name := "foo"
260
261 var sts apps.StatefulSet
262 ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), metav1.NamespaceDefault)
263 key := "/statefulsets/" + metav1.NamespaceDefault + "/" + name
264 if err := storage.StatefulSet.Storage.Create(ctx, key, &validStatefulSet, &sts, 0, false); err != nil {
265 t.Fatalf("error setting new statefulset (key: %s) %v: %v", key, validStatefulSet, err)
266 }
267 replicas := 12
268 update := autoscaling.Scale{
269 ObjectMeta: metav1.ObjectMeta{
270 Name: name,
271 Namespace: metav1.NamespaceDefault,
272 },
273 Spec: autoscaling.ScaleSpec{
274 Replicas: int32(replicas),
275 },
276 }
277
278 if _, _, err := storage.Scale.Update(ctx, update.Name, rest.DefaultUpdatedObjectInfo(&update), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{}); err != nil {
279 t.Fatalf("error updating scale %v: %v", update, err)
280 }
281
282 obj, err := storage.Scale.Get(ctx, name, &metav1.GetOptions{})
283 if err != nil {
284 t.Fatalf("error fetching scale for %s: %v", name, err)
285 }
286 scale := obj.(*autoscaling.Scale)
287 if scale.Spec.Replicas != int32(replicas) {
288 t.Errorf("wrong replicas count expected: %d got: %d", replicas, scale.Spec.Replicas)
289 }
290
291 update.ResourceVersion = sts.ResourceVersion
292 update.Spec.Replicas = 15
293
294 if _, _, err = storage.Scale.Update(ctx, update.Name, rest.DefaultUpdatedObjectInfo(&update), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{}); err != nil && !apierrors.IsConflict(err) {
295 t.Fatalf("unexpected error, expecting an update conflict but got %v", err)
296 }
297 }
298
299
300
301 func TestScalePatchErrors(t *testing.T) {
302 storage, server := newStorage(t)
303 defer server.Terminate(t)
304 validObj := &validStatefulSet
305 namespace := validObj.Namespace
306 name := validObj.Name
307 resourceStore := storage.StatefulSet.Store
308 scaleStore := storage.Scale
309
310 defer resourceStore.DestroyFunc()
311 ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), namespace)
312
313 {
314 applyNotFoundPatch := func() rest.TransformFunc {
315 return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) {
316 t.Errorf("notfound patch called")
317 return currentObject, nil
318 }
319 }
320 _, _, err := scaleStore.Update(ctx, "bad-name", rest.DefaultUpdatedObjectInfo(nil, applyNotFoundPatch()), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{})
321 if !apierrors.IsNotFound(err) {
322 t.Errorf("expected notfound, got %v", err)
323 }
324 }
325
326 if _, err := resourceStore.Create(ctx, validObj, rest.ValidateAllObjectFunc, &metav1.CreateOptions{}); err != nil {
327 t.Errorf("Unexpected error: %v", err)
328 }
329
330 {
331 applyBadUIDPatch := func() rest.TransformFunc {
332 return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) {
333 currentObject.(*autoscaling.Scale).UID = "123"
334 return currentObject, nil
335 }
336 }
337 _, _, err := scaleStore.Update(ctx, name, rest.DefaultUpdatedObjectInfo(nil, applyBadUIDPatch()), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{})
338 if !apierrors.IsConflict(err) {
339 t.Errorf("expected conflict, got %v", err)
340 }
341 }
342
343 {
344 applyBadResourceVersionPatch := func() rest.TransformFunc {
345 return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) {
346 currentObject.(*autoscaling.Scale).ResourceVersion = "123"
347 return currentObject, nil
348 }
349 }
350 _, _, err := scaleStore.Update(ctx, name, rest.DefaultUpdatedObjectInfo(nil, applyBadResourceVersionPatch()), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{})
351 if !apierrors.IsConflict(err) {
352 t.Errorf("expected conflict, got %v", err)
353 }
354 }
355 }
356
357 func TestScalePatchConflicts(t *testing.T) {
358 storage, server := newStorage(t)
359 defer server.Terminate(t)
360 validObj := &validStatefulSet
361 namespace := validObj.Namespace
362 name := validObj.Name
363 resourceStore := storage.StatefulSet.Store
364 scaleStore := storage.Scale
365
366 defer resourceStore.DestroyFunc()
367 ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), namespace)
368 if _, err := resourceStore.Create(ctx, validObj, rest.ValidateAllObjectFunc, &metav1.CreateOptions{}); err != nil {
369 t.Fatalf("Unexpected error: %v", err)
370 }
371 applyLabelPatch := func(labelName, labelValue string) rest.TransformFunc {
372 return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) {
373 currentObject.(metav1.Object).SetLabels(map[string]string{labelName: labelValue})
374 return currentObject, nil
375 }
376 }
377 stopCh := make(chan struct{})
378 wg := &sync.WaitGroup{}
379 wg.Add(1)
380 go func() {
381 defer wg.Done()
382
383 labelName := "timestamp"
384 for i := 0; ; i++ {
385 select {
386 case <-stopCh:
387 return
388 default:
389 expectedLabelValue := fmt.Sprint(i)
390 updated, _, err := resourceStore.Update(ctx, name, rest.DefaultUpdatedObjectInfo(nil, applyLabelPatch(labelName, fmt.Sprint(i))), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{})
391 if err != nil {
392 t.Errorf("error patching main resource: %v", err)
393 return
394 }
395 gotLabelValue := updated.(metav1.Object).GetLabels()[labelName]
396 if gotLabelValue != expectedLabelValue {
397 t.Errorf("wrong label value: expected: %s, got: %s", expectedLabelValue, gotLabelValue)
398 return
399 }
400 }
401 }
402 }()
403
404
405 applyReplicaPatch := func(replicas int) rest.TransformFunc {
406 return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) {
407 currentObject.(*autoscaling.Scale).Spec.Replicas = int32(replicas)
408 return currentObject, nil
409 }
410 }
411 for i := 0; i < 100; i++ {
412 result, _, err := scaleStore.Update(ctx, name, rest.DefaultUpdatedObjectInfo(nil, applyReplicaPatch(i)), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{})
413 if err != nil {
414 t.Fatalf("error patching scale: %v", err)
415 }
416 scale := result.(*autoscaling.Scale)
417 if scale.Spec.Replicas != int32(i) {
418 t.Errorf("wrong replicas count: expected: %d got: %d", i, scale.Spec.Replicas)
419 }
420 }
421 close(stopCh)
422 wg.Wait()
423 }
424
View as plain text