1
16
17 package storage
18
19 import (
20 "context"
21 "fmt"
22 "sync"
23 "testing"
24
25 "github.com/google/go-cmp/cmp"
26 apiequality "k8s.io/apimachinery/pkg/api/equality"
27 apierrors "k8s.io/apimachinery/pkg/api/errors"
28 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29 "k8s.io/apimachinery/pkg/fields"
30 "k8s.io/apimachinery/pkg/labels"
31 "k8s.io/apimachinery/pkg/runtime"
32 genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
33 "k8s.io/apiserver/pkg/registry/generic"
34 genericregistrytest "k8s.io/apiserver/pkg/registry/generic/testing"
35 "k8s.io/apiserver/pkg/registry/rest"
36 etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
37 "k8s.io/kubernetes/pkg/apis/apps"
38 "k8s.io/kubernetes/pkg/apis/autoscaling"
39 api "k8s.io/kubernetes/pkg/apis/core"
40 "k8s.io/kubernetes/pkg/registry/registrytest"
41 )
42
43 const defaultReplicas = 100
44
45 func newStorage(t *testing.T) (*ReplicaSetStorage, *etcd3testing.EtcdTestServer) {
46 etcdStorage, server := registrytest.NewEtcdStorage(t, "extensions")
47 restOptions := generic.RESTOptions{StorageConfig: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1, ResourcePrefix: "replicasets"}
48 replicaSetStorage, err := NewStorage(restOptions)
49 if err != nil {
50 t.Fatalf("unexpected error from REST storage: %v", err)
51 }
52 return &replicaSetStorage, server
53 }
54
55
56 func createReplicaSet(storage *REST, rs apps.ReplicaSet, t *testing.T) (apps.ReplicaSet, error) {
57 ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), rs.Namespace)
58 obj, err := storage.Create(ctx, &rs, rest.ValidateAllObjectFunc, &metav1.CreateOptions{})
59 if err != nil {
60 t.Errorf("Failed to create ReplicaSet, %v", err)
61 }
62 newRS := obj.(*apps.ReplicaSet)
63 return *newRS, nil
64 }
65
66 func validNewReplicaSet() *apps.ReplicaSet {
67 return &apps.ReplicaSet{
68 ObjectMeta: metav1.ObjectMeta{
69 Name: "foo",
70 Namespace: metav1.NamespaceDefault,
71 },
72 Spec: apps.ReplicaSetSpec{
73 Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"a": "b"}},
74 Template: api.PodTemplateSpec{
75 ObjectMeta: metav1.ObjectMeta{
76 Labels: map[string]string{"a": "b"},
77 },
78 Spec: api.PodSpec{
79 Containers: []api.Container{
80 {
81 Name: "test",
82 Image: "test_image",
83 ImagePullPolicy: api.PullIfNotPresent,
84 TerminationMessagePolicy: api.TerminationMessageReadFile,
85 },
86 },
87 RestartPolicy: api.RestartPolicyAlways,
88 DNSPolicy: api.DNSClusterFirst,
89 },
90 },
91 Replicas: 7,
92 },
93 Status: apps.ReplicaSetStatus{
94 Replicas: 5,
95 },
96 }
97 }
98
99 var validReplicaSet = *validNewReplicaSet()
100
101 func TestCreate(t *testing.T) {
102 storage, server := newStorage(t)
103 defer server.Terminate(t)
104 defer storage.ReplicaSet.Store.DestroyFunc()
105 test := genericregistrytest.New(t, storage.ReplicaSet.Store)
106 rs := validNewReplicaSet()
107 rs.ObjectMeta = metav1.ObjectMeta{}
108 test.TestCreate(
109
110 rs,
111
112 &apps.ReplicaSet{
113 Spec: apps.ReplicaSetSpec{
114 Replicas: 2,
115 Selector: &metav1.LabelSelector{MatchLabels: map[string]string{}},
116 Template: validReplicaSet.Spec.Template,
117 },
118 },
119 )
120 }
121
122 func TestUpdate(t *testing.T) {
123 storage, server := newStorage(t)
124 defer server.Terminate(t)
125 defer storage.ReplicaSet.Store.DestroyFunc()
126 test := genericregistrytest.New(t, storage.ReplicaSet.Store)
127 test.TestUpdate(
128
129 validNewReplicaSet(),
130
131 func(obj runtime.Object) runtime.Object {
132 object := obj.(*apps.ReplicaSet)
133 object.Spec.Replicas = object.Spec.Replicas + 1
134 return object
135 },
136
137 func(obj runtime.Object) runtime.Object {
138 object := obj.(*apps.ReplicaSet)
139 object.Name = ""
140 return object
141 },
142 func(obj runtime.Object) runtime.Object {
143 object := obj.(*apps.ReplicaSet)
144 object.Spec.Selector = &metav1.LabelSelector{MatchLabels: map[string]string{}}
145 return object
146 },
147 )
148 }
149
150 func TestDelete(t *testing.T) {
151 storage, server := newStorage(t)
152 defer server.Terminate(t)
153 defer storage.ReplicaSet.Store.DestroyFunc()
154 test := genericregistrytest.New(t, storage.ReplicaSet.Store)
155 test.TestDelete(validNewReplicaSet())
156 }
157
158 func TestGenerationNumber(t *testing.T) {
159 storage, server := newStorage(t)
160 defer server.Terminate(t)
161 defer storage.ReplicaSet.Store.DestroyFunc()
162 modifiedSno := *validNewReplicaSet()
163 modifiedSno.Generation = 100
164 modifiedSno.Status.ObservedGeneration = 10
165 ctx := genericapirequest.NewDefaultContext()
166 rs, err := createReplicaSet(storage.ReplicaSet, modifiedSno, t)
167 if err != nil {
168 t.Errorf("unexpected error: %v", err)
169 }
170 etcdRS, err := storage.ReplicaSet.Get(ctx, rs.Name, &metav1.GetOptions{})
171 if err != nil {
172 t.Errorf("unexpected error: %v", err)
173 }
174 storedRS, _ := etcdRS.(*apps.ReplicaSet)
175
176
177 if storedRS.Generation != 1 || storedRS.Status.ObservedGeneration != 0 {
178 t.Fatalf("Unexpected generation number %v, status generation %v", storedRS.Generation, storedRS.Status.ObservedGeneration)
179 }
180
181
182 storedRS.Spec.Replicas++
183 if _, _, err := storage.ReplicaSet.Update(ctx, storedRS.Name, rest.DefaultUpdatedObjectInfo(storedRS), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{}); err != nil {
184 t.Errorf("unexpected error: %v", err)
185 }
186 etcdRS, err = storage.ReplicaSet.Get(ctx, rs.Name, &metav1.GetOptions{})
187 if err != nil {
188 t.Errorf("unexpected error: %v", err)
189 }
190 storedRS, _ = etcdRS.(*apps.ReplicaSet)
191 if storedRS.Generation != 2 || storedRS.Status.ObservedGeneration != 0 {
192 t.Fatalf("Unexpected generation, spec: %v, status: %v", storedRS.Generation, storedRS.Status.ObservedGeneration)
193 }
194
195
196 storedRS.Status.Replicas++
197 if _, _, err := storage.ReplicaSet.Update(ctx, storedRS.Name, rest.DefaultUpdatedObjectInfo(storedRS), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{}); err != nil {
198 t.Errorf("unexpected error: %v", err)
199 }
200 etcdRS, err = storage.ReplicaSet.Get(ctx, rs.Name, &metav1.GetOptions{})
201 if err != nil {
202 t.Errorf("unexpected error: %v", err)
203 }
204 storedRS, _ = etcdRS.(*apps.ReplicaSet)
205 if storedRS.Generation != 2 || storedRS.Status.ObservedGeneration != 0 {
206 t.Fatalf("Unexpected generation number, spec: %v, status: %v", storedRS.Generation, storedRS.Status.ObservedGeneration)
207 }
208 }
209
210 func TestGet(t *testing.T) {
211 storage, server := newStorage(t)
212 defer server.Terminate(t)
213 defer storage.ReplicaSet.Store.DestroyFunc()
214 test := genericregistrytest.New(t, storage.ReplicaSet.Store)
215 test.TestGet(validNewReplicaSet())
216 }
217
218 func TestList(t *testing.T) {
219 storage, server := newStorage(t)
220 defer server.Terminate(t)
221 defer storage.ReplicaSet.Store.DestroyFunc()
222 test := genericregistrytest.New(t, storage.ReplicaSet.Store)
223 test.TestList(validNewReplicaSet())
224 }
225
226 func TestWatch(t *testing.T) {
227 storage, server := newStorage(t)
228 defer server.Terminate(t)
229 defer storage.ReplicaSet.Store.DestroyFunc()
230 test := genericregistrytest.New(t, storage.ReplicaSet.Store)
231 test.TestWatch(
232 validNewReplicaSet(),
233
234 []labels.Set{
235 {"a": "b"},
236 },
237
238 []labels.Set{
239 {"a": "c"},
240 {"foo": "bar"},
241 },
242
243 []fields.Set{
244 {"status.replicas": "5"},
245 {"metadata.name": "foo"},
246 {"status.replicas": "5", "metadata.name": "foo"},
247 },
248
249 []fields.Set{
250 {"status.replicas": "10"},
251 {"metadata.name": "bar"},
252 {"name": "foo"},
253 {"status.replicas": "10", "metadata.name": "foo"},
254 {"status.replicas": "0", "metadata.name": "bar"},
255 },
256 )
257 }
258
259 func TestScaleGet(t *testing.T) {
260 storage, server := newStorage(t)
261 defer server.Terminate(t)
262 defer storage.ReplicaSet.Store.DestroyFunc()
263
264 name := "foo"
265
266 var rs apps.ReplicaSet
267 ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), metav1.NamespaceDefault)
268 key := "/replicasets/" + metav1.NamespaceDefault + "/" + name
269 if err := storage.ReplicaSet.Storage.Create(ctx, key, &validReplicaSet, &rs, 0, false); err != nil {
270 t.Fatalf("error setting new replica set (key: %s) %v: %v", key, validReplicaSet, err)
271 }
272
273 selector, err := metav1.LabelSelectorAsSelector(validReplicaSet.Spec.Selector)
274 if err != nil {
275 t.Fatal(err)
276 }
277
278 want := &autoscaling.Scale{
279 ObjectMeta: metav1.ObjectMeta{
280 Name: name,
281 Namespace: metav1.NamespaceDefault,
282 UID: rs.UID,
283 ResourceVersion: rs.ResourceVersion,
284 CreationTimestamp: rs.CreationTimestamp,
285 },
286 Spec: autoscaling.ScaleSpec{
287 Replicas: validReplicaSet.Spec.Replicas,
288 },
289 Status: autoscaling.ScaleStatus{
290 Replicas: validReplicaSet.Status.Replicas,
291 Selector: selector.String(),
292 },
293 }
294 obj, err := storage.Scale.Get(ctx, name, &metav1.GetOptions{})
295 got := obj.(*autoscaling.Scale)
296 if err != nil {
297 t.Fatalf("error fetching scale for %s: %v", name, err)
298 }
299 if !apiequality.Semantic.DeepEqual(got, want) {
300 t.Errorf("unexpected scale: %s", cmp.Diff(got, want))
301 }
302 }
303
304 func TestScaleUpdate(t *testing.T) {
305 storage, server := newStorage(t)
306 defer server.Terminate(t)
307 defer storage.ReplicaSet.Store.DestroyFunc()
308
309 name := "foo"
310
311 var rs apps.ReplicaSet
312 ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), metav1.NamespaceDefault)
313 key := "/replicasets/" + metav1.NamespaceDefault + "/" + name
314 if err := storage.ReplicaSet.Storage.Create(ctx, key, &validReplicaSet, &rs, 0, false); err != nil {
315 t.Fatalf("error setting new replica set (key: %s) %v: %v", key, validReplicaSet, err)
316 }
317 replicas := 12
318 update := autoscaling.Scale{
319 ObjectMeta: metav1.ObjectMeta{
320 Name: name,
321 Namespace: metav1.NamespaceDefault,
322 },
323 Spec: autoscaling.ScaleSpec{
324 Replicas: int32(replicas),
325 },
326 }
327
328 if _, _, err := storage.Scale.Update(ctx, update.Name, rest.DefaultUpdatedObjectInfo(&update), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{}); err != nil {
329 t.Fatalf("error updating scale %v: %v", update, err)
330 }
331
332 obj, err := storage.Scale.Get(ctx, name, &metav1.GetOptions{})
333 if err != nil {
334 t.Fatalf("error fetching scale for %s: %v", name, err)
335 }
336 scale := obj.(*autoscaling.Scale)
337 if scale.Spec.Replicas != int32(replicas) {
338 t.Errorf("wrong replicas count expected: %d got: %d", replicas, scale.Spec.Replicas)
339 }
340
341 update.ResourceVersion = rs.ResourceVersion
342 update.Spec.Replicas = 15
343
344 if _, _, err = storage.Scale.Update(ctx, update.Name, rest.DefaultUpdatedObjectInfo(&update), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{}); err != nil && !apierrors.IsConflict(err) {
345 t.Fatalf("unexpected error, expecting an update conflict but got %v", err)
346 }
347 }
348
349 func TestStatusUpdate(t *testing.T) {
350 storage, server := newStorage(t)
351 defer server.Terminate(t)
352 defer storage.ReplicaSet.Store.DestroyFunc()
353
354 ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), metav1.NamespaceDefault)
355 key := "/replicasets/" + metav1.NamespaceDefault + "/foo"
356 if err := storage.ReplicaSet.Storage.Create(ctx, key, &validReplicaSet, nil, 0, false); err != nil {
357 t.Fatalf("unexpected error: %v", err)
358 }
359 update := apps.ReplicaSet{
360 ObjectMeta: validReplicaSet.ObjectMeta,
361 Spec: apps.ReplicaSetSpec{
362 Replicas: defaultReplicas,
363 },
364 Status: apps.ReplicaSetStatus{
365 Replicas: defaultReplicas,
366 },
367 }
368
369 if _, _, err := storage.Status.Update(ctx, update.Name, rest.DefaultUpdatedObjectInfo(&update), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{}); err != nil {
370 t.Fatalf("unexpected error: %v", err)
371 }
372 obj, err := storage.ReplicaSet.Get(ctx, "foo", &metav1.GetOptions{})
373 if err != nil {
374 t.Fatalf("unexpected error: %v", err)
375 }
376
377 rs := obj.(*apps.ReplicaSet)
378 if rs.Spec.Replicas != 7 {
379 t.Errorf("we expected .spec.replicas to not be updated but it was updated to %v", rs.Spec.Replicas)
380 }
381 if rs.Status.Replicas != defaultReplicas {
382 t.Errorf("we expected .status.replicas to be updated to %d but it was %v", defaultReplicas, rs.Status.Replicas)
383 }
384 }
385
386 func TestShortNames(t *testing.T) {
387 storage, server := newStorage(t)
388 defer server.Terminate(t)
389 defer storage.ReplicaSet.DestroyFunc()
390 expected := []string{"rs"}
391 registrytest.AssertShortNames(t, storage.ReplicaSet, expected)
392 }
393
394 func TestCategories(t *testing.T) {
395 storage, server := newStorage(t)
396 defer server.Terminate(t)
397 defer storage.ReplicaSet.Store.DestroyFunc()
398 expected := []string{"all"}
399 registrytest.AssertCategories(t, storage.ReplicaSet, expected)
400 }
401
402 func TestScalePatchErrors(t *testing.T) {
403 storage, server := newStorage(t)
404 defer server.Terminate(t)
405 validObj := &validReplicaSet
406 namespace := validObj.Namespace
407 name := validObj.Name
408 resourceStore := storage.ReplicaSet.Store
409 scaleStore := storage.Scale
410
411 defer resourceStore.DestroyFunc()
412 ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), namespace)
413
414 {
415 applyNotFoundPatch := func() rest.TransformFunc {
416 return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) {
417 t.Errorf("notfound patch called")
418 return currentObject, nil
419 }
420 }
421 _, _, err := scaleStore.Update(ctx, "bad-name", rest.DefaultUpdatedObjectInfo(nil, applyNotFoundPatch()), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{})
422 if !apierrors.IsNotFound(err) {
423 t.Errorf("expected notfound, got %v", err)
424 }
425 }
426
427 if _, err := resourceStore.Create(ctx, validObj, rest.ValidateAllObjectFunc, &metav1.CreateOptions{}); err != nil {
428 t.Errorf("Unexpected error: %v", err)
429 }
430
431 {
432 applyBadUIDPatch := func() rest.TransformFunc {
433 return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) {
434 currentObject.(*autoscaling.Scale).UID = "123"
435 return currentObject, nil
436 }
437 }
438 _, _, err := scaleStore.Update(ctx, name, rest.DefaultUpdatedObjectInfo(nil, applyBadUIDPatch()), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{})
439 if !apierrors.IsConflict(err) {
440 t.Errorf("expected conflict, got %v", err)
441 }
442 }
443
444 {
445 applyBadResourceVersionPatch := func() rest.TransformFunc {
446 return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) {
447 currentObject.(*autoscaling.Scale).ResourceVersion = "123"
448 return currentObject, nil
449 }
450 }
451 _, _, err := scaleStore.Update(ctx, name, rest.DefaultUpdatedObjectInfo(nil, applyBadResourceVersionPatch()), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{})
452 if !apierrors.IsConflict(err) {
453 t.Errorf("expected conflict, got %v", err)
454 }
455 }
456 }
457
458 func TestScalePatchConflicts(t *testing.T) {
459 storage, server := newStorage(t)
460 defer server.Terminate(t)
461 validObj := &validReplicaSet
462 namespace := validObj.Namespace
463 name := validObj.Name
464 resourceStore := storage.ReplicaSet.Store
465 scaleStore := storage.Scale
466
467 defer resourceStore.DestroyFunc()
468 ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), namespace)
469 if _, err := resourceStore.Create(ctx, validObj, rest.ValidateAllObjectFunc, &metav1.CreateOptions{}); err != nil {
470 t.Fatalf("Unexpected error: %v", err)
471 }
472 applyLabelPatch := func(labelName, labelValue string) rest.TransformFunc {
473 return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) {
474 currentObject.(metav1.Object).SetLabels(map[string]string{labelName: labelValue})
475 return currentObject, nil
476 }
477 }
478 stopCh := make(chan struct{})
479 wg := &sync.WaitGroup{}
480 wg.Add(1)
481 go func() {
482 defer wg.Done()
483
484 labelName := "timestamp"
485 for i := 0; ; i++ {
486 select {
487 case <-stopCh:
488 return
489 default:
490 expectedLabelValue := fmt.Sprint(i)
491 updated, _, err := resourceStore.Update(ctx, name, rest.DefaultUpdatedObjectInfo(nil, applyLabelPatch(labelName, fmt.Sprint(i))), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{})
492 if err != nil {
493 t.Errorf("error patching main resource: %v", err)
494 return
495 }
496 gotLabelValue := updated.(metav1.Object).GetLabels()[labelName]
497 if gotLabelValue != expectedLabelValue {
498 t.Errorf("wrong label value: expected: %s, got: %s", expectedLabelValue, gotLabelValue)
499 return
500 }
501 }
502 }
503 }()
504
505
506 applyReplicaPatch := func(replicas int) rest.TransformFunc {
507 return func(_ context.Context, _, currentObject runtime.Object) (objToUpdate runtime.Object, patchErr error) {
508 currentObject.(*autoscaling.Scale).Spec.Replicas = int32(replicas)
509 return currentObject, nil
510 }
511 }
512 for i := 0; i < 100; i++ {
513 result, _, err := scaleStore.Update(ctx, name, rest.DefaultUpdatedObjectInfo(nil, applyReplicaPatch(i)), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{})
514 if err != nil {
515 t.Fatalf("error patching scale: %v", err)
516 }
517 scale := result.(*autoscaling.Scale)
518 if scale.Spec.Replicas != int32(i) {
519 t.Errorf("wrong replicas count: expected: %d got: %d", i, scale.Spec.Replicas)
520 }
521 }
522 close(stopCh)
523 wg.Wait()
524 }
525
View as plain text