1
16
17 package manager
18
19 import (
20 "context"
21 "fmt"
22 "strings"
23 "testing"
24 "time"
25
26 v1 "k8s.io/api/core/v1"
27 apiequality "k8s.io/apimachinery/pkg/api/equality"
28 apierrors "k8s.io/apimachinery/pkg/api/errors"
29 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30 "k8s.io/apimachinery/pkg/runtime"
31 "k8s.io/apimachinery/pkg/types"
32 "k8s.io/apimachinery/pkg/util/wait"
33 "k8s.io/apimachinery/pkg/watch"
34
35 clientset "k8s.io/client-go/kubernetes"
36 "k8s.io/client-go/kubernetes/fake"
37 core "k8s.io/client-go/testing"
38
39 corev1 "k8s.io/kubernetes/pkg/apis/core/v1"
40
41 "k8s.io/utils/clock"
42 testingclock "k8s.io/utils/clock/testing"
43
44 "github.com/stretchr/testify/assert"
45 )
46
47 func listSecret(fakeClient clientset.Interface) listObjectFunc {
48 return func(namespace string, opts metav1.ListOptions) (runtime.Object, error) {
49 return fakeClient.CoreV1().Secrets(namespace).List(context.TODO(), opts)
50 }
51 }
52
53 func watchSecret(fakeClient clientset.Interface) watchObjectFunc {
54 return func(namespace string, opts metav1.ListOptions) (watch.Interface, error) {
55 return fakeClient.CoreV1().Secrets(namespace).Watch(context.TODO(), opts)
56 }
57 }
58
59 func isSecretImmutable(object runtime.Object) bool {
60 if secret, ok := object.(*v1.Secret); ok {
61 return secret.Immutable != nil && *secret.Immutable
62 }
63 return false
64 }
65
66 func newSecretCache(fakeClient clientset.Interface, fakeClock clock.Clock, maxIdleTime time.Duration) *objectCache {
67 return &objectCache{
68 listObject: listSecret(fakeClient),
69 watchObject: watchSecret(fakeClient),
70 newObject: func() runtime.Object { return &v1.Secret{} },
71 isImmutable: isSecretImmutable,
72 groupResource: corev1.Resource("secret"),
73 clock: fakeClock,
74 maxIdleTime: maxIdleTime,
75 items: make(map[objectKey]*objectCacheItem),
76 }
77 }
78
79 func TestSecretCache(t *testing.T) {
80 fakeClient := &fake.Clientset{}
81
82 listReactor := func(a core.Action) (bool, runtime.Object, error) {
83 result := &v1.SecretList{
84 ListMeta: metav1.ListMeta{
85 ResourceVersion: "123",
86 },
87 }
88 return true, result, nil
89 }
90 fakeClient.AddReactor("list", "secrets", listReactor)
91 fakeWatch := watch.NewFake()
92 fakeClient.AddWatchReactor("secrets", core.DefaultWatchReactor(fakeWatch, nil))
93
94 fakeClock := testingclock.NewFakeClock(time.Now())
95 store := newSecretCache(fakeClient, fakeClock, time.Minute)
96
97 store.AddReference("ns", "name", "pod")
98 _, err := store.Get("ns", "name")
99 if !apierrors.IsNotFound(err) {
100 t.Errorf("Expected NotFound error, got: %v", err)
101 }
102
103
104 secret := &v1.Secret{
105 ObjectMeta: metav1.ObjectMeta{Name: "name", Namespace: "ns", ResourceVersion: "125"},
106 }
107 fakeWatch.Add(secret)
108 getFn := func() (bool, error) {
109 object, err := store.Get("ns", "name")
110 if err != nil {
111 if apierrors.IsNotFound(err) {
112 return false, nil
113 }
114 return false, err
115 }
116 secret := object.(*v1.Secret)
117 if secret == nil || secret.Name != "name" || secret.Namespace != "ns" {
118 return false, fmt.Errorf("unexpected secret: %v", secret)
119 }
120 return true, nil
121 }
122 if err := wait.PollImmediate(10*time.Millisecond, time.Second, getFn); err != nil {
123 t.Errorf("unexpected error: %v", err)
124 }
125
126
127 fakeWatch.Delete(secret)
128 getFn = func() (bool, error) {
129 _, err := store.Get("ns", "name")
130 if err != nil {
131 if apierrors.IsNotFound(err) {
132 return true, nil
133 }
134 return false, err
135 }
136 return false, nil
137 }
138 if err := wait.PollImmediate(10*time.Millisecond, time.Second, getFn); err != nil {
139 t.Errorf("unexpected error: %v", err)
140 }
141
142 store.DeleteReference("ns", "name", "pod")
143 _, err = store.Get("ns", "name")
144 if err == nil || !strings.Contains(err.Error(), "not registered") {
145 t.Errorf("unexpected error: %v", err)
146 }
147 }
148
149 func TestSecretCacheMultipleRegistrations(t *testing.T) {
150 fakeClient := &fake.Clientset{}
151
152 listReactor := func(a core.Action) (bool, runtime.Object, error) {
153 result := &v1.SecretList{
154 ListMeta: metav1.ListMeta{
155 ResourceVersion: "123",
156 },
157 }
158 return true, result, nil
159 }
160 fakeClient.AddReactor("list", "secrets", listReactor)
161 fakeWatch := watch.NewFake()
162 fakeClient.AddWatchReactor("secrets", core.DefaultWatchReactor(fakeWatch, nil))
163
164 fakeClock := testingclock.NewFakeClock(time.Now())
165 store := newSecretCache(fakeClient, fakeClock, time.Minute)
166
167 store.AddReference("ns", "name", "pod")
168
169 actionsFn := func() (bool, error) {
170 actions := fakeClient.Actions()
171 if len(actions) > 2 {
172 return false, fmt.Errorf("too many actions: %v", actions)
173 }
174 if len(actions) < 2 {
175 return false, nil
176 }
177 if actions[0].GetVerb() != "list" || actions[1].GetVerb() != "watch" {
178 return false, fmt.Errorf("unexpected actions: %v", actions)
179 }
180 return true, nil
181 }
182 if err := wait.PollImmediate(10*time.Millisecond, time.Second, actionsFn); err != nil {
183 t.Errorf("unexpected error: %v", err)
184 }
185
186
187 for i := 0; i < 20; i++ {
188 store.AddReference("ns", "name", types.UID(fmt.Sprintf("pod-%d", i)))
189 store.DeleteReference("ns", "name", types.UID(fmt.Sprintf("pod-%d", i)))
190 }
191 actions := fakeClient.Actions()
192 assert.Equal(t, 2, len(actions), "unexpected actions: %#v", actions)
193
194
195 store.DeleteReference("ns", "name", "pod")
196 _, err := store.Get("ns", "name")
197 if err == nil || !strings.Contains(err.Error(), "not registered") {
198 t.Errorf("unexpected error: %v", err)
199 }
200 actions = fakeClient.Actions()
201 assert.Equal(t, 2, len(actions), "unexpected actions: %#v", actions)
202 }
203
204 func TestImmutableSecretStopsTheReflector(t *testing.T) {
205 secret := func(rv string, immutable bool) *v1.Secret {
206 result := &v1.Secret{
207 ObjectMeta: metav1.ObjectMeta{
208 Name: "name",
209 Namespace: "ns",
210 ResourceVersion: rv,
211 },
212 }
213 if immutable {
214 trueVal := true
215 result.Immutable = &trueVal
216 }
217 return result
218 }
219
220 tests := []struct {
221 desc string
222 initial *v1.Secret
223 eventual *v1.Secret
224 }{
225 {
226 desc: "secret doesn't exist, created as mutable",
227 initial: nil,
228 eventual: secret("200", false),
229 },
230 {
231 desc: "secret doesn't exist, created as immutable",
232 initial: nil,
233 eventual: secret("200", true),
234 },
235 {
236 desc: "mutable secret modified to mutable",
237 initial: secret("100", false),
238 eventual: secret("200", false),
239 },
240 {
241 desc: "mutable secret modified to immutable",
242 initial: secret("100", false),
243 eventual: secret("200", true),
244 },
245 {
246 desc: "immutable secret",
247 initial: secret("100", true),
248 eventual: nil,
249 },
250 }
251
252 for _, tc := range tests {
253 t.Run(tc.desc, func(t *testing.T) {
254 fakeClient := &fake.Clientset{}
255 listReactor := func(a core.Action) (bool, runtime.Object, error) {
256 result := &v1.SecretList{
257 ListMeta: metav1.ListMeta{
258 ResourceVersion: "100",
259 },
260 }
261 if tc.initial != nil {
262 result.Items = []v1.Secret{*tc.initial}
263 }
264 return true, result, nil
265 }
266 fakeClient.AddReactor("list", "secrets", listReactor)
267 fakeWatch := watch.NewFake()
268 fakeClient.AddWatchReactor("secrets", core.DefaultWatchReactor(fakeWatch, nil))
269
270 fakeClock := testingclock.NewFakeClock(time.Now())
271 store := newSecretCache(fakeClient, fakeClock, time.Minute)
272
273 key := objectKey{namespace: "ns", name: "name"}
274 itemExists := func() (bool, error) {
275 store.lock.Lock()
276 defer store.lock.Unlock()
277 _, ok := store.items[key]
278 return ok, nil
279 }
280 reflectorRunning := func() bool {
281 store.lock.Lock()
282 defer store.lock.Unlock()
283 item := store.items[key]
284
285 item.lock.Lock()
286 defer item.lock.Unlock()
287 return !item.stopped
288 }
289
290
291 store.AddReference("ns", "name", "pod")
292 if err := wait.Poll(10*time.Millisecond, time.Second, itemExists); err != nil {
293 t.Errorf("item wasn't added to cache")
294 }
295
296 obj, err := store.Get("ns", "name")
297 if tc.initial != nil {
298 assert.True(t, apiequality.Semantic.DeepEqual(tc.initial, obj))
299 } else {
300 assert.True(t, apierrors.IsNotFound(err))
301 }
302
303
304 assert.Equal(t, tc.initial == nil || !isSecretImmutable(tc.initial), reflectorRunning())
305
306 if tc.eventual == nil {
307 return
308 }
309 fakeWatch.Add(tc.eventual)
310
311
312 getFn := func() (bool, error) {
313 object, err := store.Get("ns", "name")
314 if err != nil {
315 if apierrors.IsNotFound(err) {
316 return false, nil
317 }
318 return false, err
319 }
320 secret := object.(*v1.Secret)
321 return apiequality.Semantic.DeepEqual(tc.eventual, secret), nil
322 }
323 if err := wait.PollImmediate(10*time.Millisecond, time.Second, getFn); err != nil {
324 t.Errorf("unexpected error: %v", err)
325 }
326
327
328 assert.Equal(t, tc.eventual == nil || !isSecretImmutable(tc.eventual), reflectorRunning())
329 })
330 }
331 }
332
333 func TestMaxIdleTimeStopsTheReflector(t *testing.T) {
334 secret := &v1.Secret{
335 ObjectMeta: metav1.ObjectMeta{
336 Name: "name",
337 Namespace: "ns",
338 ResourceVersion: "200",
339 },
340 }
341
342 fakeClient := &fake.Clientset{}
343 listReactor := func(a core.Action) (bool, runtime.Object, error) {
344 result := &v1.SecretList{
345 ListMeta: metav1.ListMeta{
346 ResourceVersion: "200",
347 },
348 Items: []v1.Secret{*secret},
349 }
350
351 return true, result, nil
352 }
353
354 fakeClient.AddReactor("list", "secrets", listReactor)
355 fakeWatch := watch.NewFake()
356 fakeClient.AddWatchReactor("secrets", core.DefaultWatchReactor(fakeWatch, nil))
357 fakeClock := testingclock.NewFakeClock(time.Now())
358 store := newSecretCache(fakeClient, fakeClock, time.Minute)
359
360 key := objectKey{namespace: "ns", name: "name"}
361 itemExists := func() (bool, error) {
362 store.lock.Lock()
363 defer store.lock.Unlock()
364 _, ok := store.items[key]
365 return ok, nil
366 }
367
368 reflectorRunning := func() bool {
369 store.lock.Lock()
370 defer store.lock.Unlock()
371 item := store.items[key]
372
373 item.lock.Lock()
374 defer item.lock.Unlock()
375 return !item.stopped
376 }
377
378
379 store.AddReference("ns", "name", "pod")
380 if err := wait.Poll(10*time.Millisecond, 10*time.Second, itemExists); err != nil {
381 t.Errorf("item wasn't added to cache")
382 }
383
384 obj, _ := store.Get("ns", "name")
385 assert.True(t, apiequality.Semantic.DeepEqual(secret, obj))
386
387 assert.True(t, reflectorRunning())
388
389 fakeClock.Step(90 * time.Second)
390 store.startRecycleIdleWatch()
391
392
393 assert.False(t, reflectorRunning())
394
395 obj, _ = store.Get("ns", "name")
396 assert.True(t, apiequality.Semantic.DeepEqual(secret, obj))
397
398 assert.True(t, reflectorRunning())
399
400 fakeClock.Step(20 * time.Second)
401 _, _ = store.Get("ns", "name")
402 fakeClock.Step(20 * time.Second)
403 _, _ = store.Get("ns", "name")
404 fakeClock.Step(20 * time.Second)
405 _, _ = store.Get("ns", "name")
406 store.startRecycleIdleWatch()
407
408
409 assert.True(t, reflectorRunning())
410 }
411
412 func TestReflectorNotStoppedOnSlowInitialization(t *testing.T) {
413 secret := &v1.Secret{
414 ObjectMeta: metav1.ObjectMeta{
415 Name: "name",
416 Namespace: "ns",
417 ResourceVersion: "200",
418 },
419 }
420
421 fakeClock := testingclock.NewFakeClock(time.Now())
422
423 fakeClient := &fake.Clientset{}
424 listReactor := func(a core.Action) (bool, runtime.Object, error) {
425 <-fakeClock.After(120 * time.Second)
426
427 result := &v1.SecretList{
428 ListMeta: metav1.ListMeta{
429 ResourceVersion: "200",
430 },
431 Items: []v1.Secret{*secret},
432 }
433
434 return true, result, nil
435 }
436
437 fakeClient.AddReactor("list", "secrets", listReactor)
438 fakeWatch := watch.NewFake()
439 fakeClient.AddWatchReactor("secrets", core.DefaultWatchReactor(fakeWatch, nil))
440 store := newSecretCache(fakeClient, fakeClock, time.Minute)
441
442 key := objectKey{namespace: "ns", name: "name"}
443 itemExists := func() (bool, error) {
444 store.lock.Lock()
445 defer store.lock.Unlock()
446 _, ok := store.items[key]
447 return ok, nil
448 }
449
450 reflectorRunning := func() bool {
451 store.lock.Lock()
452 defer store.lock.Unlock()
453 item := store.items[key]
454
455 item.lock.Lock()
456 defer item.lock.Unlock()
457 return !item.stopped
458 }
459
460 reflectorInitialized := func() (bool, error) {
461 store.lock.Lock()
462 defer store.lock.Unlock()
463 item := store.items[key]
464
465 item.lock.Lock()
466 defer item.lock.Unlock()
467 return item.store.hasSynced(), nil
468 }
469
470
471 store.AddReference("ns", "name", "pod")
472 if err := wait.Poll(10*time.Millisecond, 10*time.Second, itemExists); err != nil {
473 t.Errorf("item wasn't added to cache")
474 }
475
476 fakeClock.Step(90 * time.Second)
477 store.startRecycleIdleWatch()
478
479
480
481 assert.True(t, reflectorRunning())
482 initialized, _ := reflectorInitialized()
483 assert.False(t, initialized)
484 _, err := store.Get("ns", "name")
485 if err == nil || !strings.Contains(err.Error(), "failed to sync") {
486 t.Errorf("Expected failed to sync error, got: %v", err)
487 }
488
489
490 fakeClock.Step(30 * time.Second)
491 if err := wait.Poll(10*time.Millisecond, time.Second, reflectorInitialized); err != nil {
492 t.Errorf("reflector didn't iniailize correctly")
493 }
494
495
496 store.startRecycleIdleWatch()
497 assert.True(t, reflectorRunning())
498
499 obj, _ := store.Get("ns", "name")
500 assert.True(t, apiequality.Semantic.DeepEqual(secret, obj))
501 }
502
503 func TestRefMapHandlesReferencesCorrectly(t *testing.T) {
504 secret1 := &v1.Secret{
505 ObjectMeta: metav1.ObjectMeta{
506 Name: "secret1",
507 Namespace: "ns1",
508 },
509 }
510 type step struct {
511 action string
512 ns string
513 name string
514 referencedFrom types.UID
515 }
516 type expect struct {
517 ns string
518 name string
519 referencedFrom types.UID
520 expectCount int
521 }
522 tests := []struct {
523 desc string
524 steps []step
525 expects []expect
526 }{
527 {
528 desc: "adding and deleting should works",
529 steps: []step{
530 {"add", "ns1", "secret1", "pod1"},
531 {"add", "ns1", "secret1", "pod1"},
532 {"delete", "ns1", "secret1", "pod1"},
533 {"delete", "ns1", "secret1", "pod1"},
534 },
535 expects: []expect{
536 {"ns1", "secret1", "pod1", 1},
537 {"ns1", "secret1", "pod1", 2},
538 {"ns1", "secret1", "pod1", 1},
539 {"ns1", "secret1", "pod1", 0},
540 },
541 },
542 {
543 desc: "deleting a non-existent reference should have no effect",
544 steps: []step{
545 {"delete", "ns1", "secret1", "pod1"},
546 },
547 expects: []expect{
548 {"ns1", "secret1", "pod1", 0},
549 },
550 },
551 {
552 desc: "deleting more than adding should not lead to negative refcount",
553 steps: []step{
554 {"add", "ns1", "secret1", "pod1"},
555 {"delete", "ns1", "secret1", "pod1"},
556 {"delete", "ns1", "secret1", "pod1"},
557 },
558 expects: []expect{
559 {"ns1", "secret1", "pod1", 1},
560 {"ns1", "secret1", "pod1", 0},
561 {"ns1", "secret1", "pod1", 0},
562 },
563 },
564 {
565 desc: "deleting should not affect refcount of other objects or referencedFrom",
566 steps: []step{
567 {"add", "ns1", "secret1", "pod1"},
568 {"delete", "ns1", "secret1", "pod2"},
569 {"delete", "ns1", "secret2", "pod1"},
570 {"delete", "ns2", "secret1", "pod1"},
571 },
572 expects: []expect{
573 {"ns1", "secret1", "pod1", 1},
574 {"ns1", "secret1", "pod1", 1},
575 {"ns1", "secret1", "pod1", 1},
576 {"ns1", "secret1", "pod1", 1},
577 },
578 },
579 }
580 for _, tc := range tests {
581 t.Run(tc.desc, func(t *testing.T) {
582 fakeClient := &fake.Clientset{}
583 listReactor := func(a core.Action) (bool, runtime.Object, error) {
584 result := &v1.SecretList{
585 ListMeta: metav1.ListMeta{
586 ResourceVersion: "200",
587 },
588 Items: []v1.Secret{*secret1},
589 }
590 return true, result, nil
591 }
592 fakeClient.AddReactor("list", "secrets", listReactor)
593 fakeWatch := watch.NewFake()
594 fakeClient.AddWatchReactor("secrets", core.DefaultWatchReactor(fakeWatch, nil))
595 fakeClock := testingclock.NewFakeClock(time.Now())
596 store := newSecretCache(fakeClient, fakeClock, time.Minute)
597
598 for i, step := range tc.steps {
599 expect := tc.expects[i]
600 switch step.action {
601 case "add":
602 store.AddReference(step.ns, step.name, step.referencedFrom)
603 case "delete":
604 store.DeleteReference(step.ns, step.name, step.referencedFrom)
605 default:
606 t.Errorf("unrecognized action of testcase %v", tc.desc)
607 }
608
609 key := objectKey{namespace: expect.ns, name: expect.name}
610 item, exists := store.items[key]
611 if !exists {
612 if tc.expects[i].expectCount != 0 {
613 t.Errorf("reference to %v/%v from %v should exists", expect.ns, expect.name, expect.referencedFrom)
614 }
615 } else if item.refMap[expect.referencedFrom] != expect.expectCount {
616 t.Errorf("expects %v but got %v", expect.expectCount, item.refMap[expect.referencedFrom])
617 }
618 }
619 })
620 }
621 }
622
View as plain text