1
16
17 package cache
18
19 import (
20 "context"
21 "errors"
22 "fmt"
23 "math/rand"
24 "reflect"
25 goruntime "runtime"
26 "strconv"
27 "syscall"
28 "testing"
29 "time"
30
31 "github.com/stretchr/testify/require"
32
33 v1 "k8s.io/api/core/v1"
34 apierrors "k8s.io/apimachinery/pkg/api/errors"
35 "k8s.io/apimachinery/pkg/api/meta"
36 "k8s.io/apimachinery/pkg/api/resource"
37 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
38 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
39 "k8s.io/apimachinery/pkg/runtime"
40 "k8s.io/apimachinery/pkg/runtime/schema"
41 "k8s.io/apimachinery/pkg/util/sets"
42 "k8s.io/apimachinery/pkg/util/wait"
43 "k8s.io/apimachinery/pkg/watch"
44 "k8s.io/utils/clock"
45 testingclock "k8s.io/utils/clock/testing"
46 )
47
48 var nevererrc chan error
49
50 type testLW struct {
51 ListFunc func(options metav1.ListOptions) (runtime.Object, error)
52 WatchFunc func(options metav1.ListOptions) (watch.Interface, error)
53 }
54
55 func (t *testLW) List(options metav1.ListOptions) (runtime.Object, error) {
56 return t.ListFunc(options)
57 }
58 func (t *testLW) Watch(options metav1.ListOptions) (watch.Interface, error) {
59 return t.WatchFunc(options)
60 }
61
62 func TestCloseWatchChannelOnError(t *testing.T) {
63 r := NewReflector(&testLW{}, &v1.Pod{}, NewStore(MetaNamespaceKeyFunc), 0)
64 pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}
65 fw := watch.NewFake()
66 r.listerWatcher = &testLW{
67 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
68 return fw, nil
69 },
70 ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
71 return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil
72 },
73 }
74 go r.ListAndWatch(wait.NeverStop)
75 fw.Error(pod)
76 select {
77 case _, ok := <-fw.ResultChan():
78 if ok {
79 t.Errorf("Watch channel left open after cancellation")
80 }
81 case <-time.After(wait.ForeverTestTimeout):
82 t.Errorf("the cancellation is at least %s late", wait.ForeverTestTimeout.String())
83 break
84 }
85 }
86
87 func TestRunUntil(t *testing.T) {
88 stopCh := make(chan struct{})
89 store := NewStore(MetaNamespaceKeyFunc)
90 r := NewReflector(&testLW{}, &v1.Pod{}, store, 0)
91 fw := watch.NewFake()
92 r.listerWatcher = &testLW{
93 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
94 return fw, nil
95 },
96 ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
97 return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil
98 },
99 }
100 go r.Run(stopCh)
101
102
103 fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}})
104 close(stopCh)
105 select {
106 case _, ok := <-fw.ResultChan():
107 if ok {
108 t.Errorf("Watch channel left open after stopping the watch")
109 }
110 case <-time.After(wait.ForeverTestTimeout):
111 t.Errorf("the cancellation is at least %s late", wait.ForeverTestTimeout.String())
112 break
113 }
114 }
115
116 func TestReflectorResyncChan(t *testing.T) {
117 s := NewStore(MetaNamespaceKeyFunc)
118 g := NewReflector(&testLW{}, &v1.Pod{}, s, time.Millisecond)
119 a, _ := g.resyncChan()
120 b := time.After(wait.ForeverTestTimeout)
121 select {
122 case <-a:
123 t.Logf("got timeout as expected")
124 case <-b:
125 t.Errorf("resyncChan() is at least 99 milliseconds late??")
126 }
127 }
128
129
130
131
132 func TestEstablishedWatchStoppedAfterStopCh(t *testing.T) {
133 ctx, ctxCancel := context.WithCancel(context.TODO())
134 ctxCancel()
135 w := watch.NewFake()
136 require.False(t, w.IsStopped())
137
138
139 target := NewReflector(nil, &v1.Pod{}, nil, 0)
140 err := target.watch(w, ctx.Done(), nil)
141 require.NoError(t, err)
142 require.True(t, w.IsStopped())
143
144
145 err = target.watch(nil, ctx.Done(), nil)
146 require.NoError(t, err)
147 }
148
149 func BenchmarkReflectorResyncChanMany(b *testing.B) {
150 s := NewStore(MetaNamespaceKeyFunc)
151 g := NewReflector(&testLW{}, &v1.Pod{}, s, 25*time.Millisecond)
152
153
154 for i := 0; i < b.N; i++ {
155 g.resyncPeriod = time.Duration(rand.Float64() * float64(time.Millisecond) * 25)
156 _, stop := g.resyncChan()
157 stop()
158 }
159 }
160
161 func TestReflectorWatchHandlerError(t *testing.T) {
162 s := NewStore(MetaNamespaceKeyFunc)
163 g := NewReflector(&testLW{}, &v1.Pod{}, s, 0)
164 fw := watch.NewFake()
165 go func() {
166 fw.Stop()
167 }()
168 err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, nil, g.clock, nevererrc, wait.NeverStop)
169 if err == nil {
170 t.Errorf("unexpected non-error")
171 }
172 }
173
174 func TestReflectorWatchHandler(t *testing.T) {
175 s := NewStore(MetaNamespaceKeyFunc)
176 g := NewReflector(&testLW{}, &v1.Pod{}, s, 0)
177 fw := watch.NewFake()
178 s.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
179 s.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}})
180 go func() {
181 fw.Add(&v1.Service{ObjectMeta: metav1.ObjectMeta{Name: "rejected"}})
182 fw.Delete(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
183 fw.Modify(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar", ResourceVersion: "55"}})
184 fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "baz", ResourceVersion: "32"}})
185 fw.Stop()
186 }()
187 err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, nil, g.clock, nevererrc, wait.NeverStop)
188 if err != nil {
189 t.Errorf("unexpected error %v", err)
190 }
191
192 mkPod := func(id string, rv string) *v1.Pod {
193 return &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: id, ResourceVersion: rv}}
194 }
195
196 table := []struct {
197 Pod *v1.Pod
198 exists bool
199 }{
200 {mkPod("foo", ""), false},
201 {mkPod("rejected", ""), false},
202 {mkPod("bar", "55"), true},
203 {mkPod("baz", "32"), true},
204 }
205 for _, item := range table {
206 obj, exists, _ := s.Get(item.Pod)
207 if e, a := item.exists, exists; e != a {
208 t.Errorf("%v: expected %v, got %v", item.Pod, e, a)
209 }
210 if !exists {
211 continue
212 }
213 if e, a := item.Pod.ResourceVersion, obj.(*v1.Pod).ResourceVersion; e != a {
214 t.Errorf("%v: expected %v, got %v", item.Pod, e, a)
215 }
216 }
217
218
219 if e, a := "32", g.LastSyncResourceVersion(); e != a {
220 t.Errorf("expected %v, got %v", e, a)
221 }
222
223
224 if e, a := "32", g.LastSyncResourceVersion(); e != a {
225 t.Errorf("expected %v, got %v", e, a)
226 }
227 }
228
229 func TestReflectorStopWatch(t *testing.T) {
230 s := NewStore(MetaNamespaceKeyFunc)
231 g := NewReflector(&testLW{}, &v1.Pod{}, s, 0)
232 fw := watch.NewFake()
233 stopWatch := make(chan struct{}, 1)
234 stopWatch <- struct{}{}
235 err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, nil, g.clock, nevererrc, stopWatch)
236 if err != errorStopRequested {
237 t.Errorf("expected stop error, got %q", err)
238 }
239 }
240
241 func TestReflectorListAndWatch(t *testing.T) {
242 createdFakes := make(chan *watch.FakeWatcher)
243
244
245
246
247 expectedRVs := []string{"1", "3"}
248 lw := &testLW{
249 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
250 rv := options.ResourceVersion
251 fw := watch.NewFake()
252 if e, a := expectedRVs[0], rv; e != a {
253 t.Errorf("Expected rv %v, but got %v", e, a)
254 }
255 expectedRVs = expectedRVs[1:]
256
257
258 go func() { createdFakes <- fw }()
259 return fw, nil
260 },
261 ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
262 return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil
263 },
264 }
265 s := NewFIFO(MetaNamespaceKeyFunc)
266 r := NewReflector(lw, &v1.Pod{}, s, 0)
267 go r.ListAndWatch(wait.NeverStop)
268
269 ids := []string{"foo", "bar", "baz", "qux", "zoo"}
270 var fw *watch.FakeWatcher
271 for i, id := range ids {
272 if fw == nil {
273 fw = <-createdFakes
274 }
275 sendingRV := strconv.FormatUint(uint64(i+2), 10)
276 fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: id, ResourceVersion: sendingRV}})
277 if sendingRV == "3" {
278
279 fw.Stop()
280 fw = nil
281 }
282 }
283
284
285 for i, id := range ids {
286 pod := Pop(s).(*v1.Pod)
287 if e, a := id, pod.Name; e != a {
288 t.Errorf("%v: Expected %v, got %v", i, e, a)
289 }
290 if e, a := strconv.FormatUint(uint64(i+2), 10), pod.ResourceVersion; e != a {
291 t.Errorf("%v: Expected %v, got %v", i, e, a)
292 }
293 }
294
295 if len(expectedRVs) != 0 {
296 t.Error("called watchStarter an unexpected number of times")
297 }
298 }
299
300 func TestReflectorListAndWatchWithErrors(t *testing.T) {
301 mkPod := func(id string, rv string) *v1.Pod {
302 return &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: id, ResourceVersion: rv}}
303 }
304 mkList := func(rv string, pods ...*v1.Pod) *v1.PodList {
305 list := &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: rv}}
306 for _, pod := range pods {
307 list.Items = append(list.Items, *pod)
308 }
309 return list
310 }
311 table := []struct {
312 list *v1.PodList
313 listErr error
314 events []watch.Event
315 watchErr error
316 }{
317 {
318 list: mkList("1"),
319 events: []watch.Event{
320 {Type: watch.Added, Object: mkPod("foo", "2")},
321 {Type: watch.Added, Object: mkPod("bar", "3")},
322 },
323 }, {
324 list: mkList("3", mkPod("foo", "2"), mkPod("bar", "3")),
325 events: []watch.Event{
326 {Type: watch.Deleted, Object: mkPod("foo", "4")},
327 {Type: watch.Added, Object: mkPod("qux", "5")},
328 },
329 }, {
330 listErr: fmt.Errorf("a list error"),
331 }, {
332 list: mkList("5", mkPod("bar", "3"), mkPod("qux", "5")),
333 watchErr: fmt.Errorf("a watch error"),
334 }, {
335 list: mkList("5", mkPod("bar", "3"), mkPod("qux", "5")),
336 events: []watch.Event{
337 {Type: watch.Added, Object: mkPod("baz", "6")},
338 },
339 }, {
340 list: mkList("6", mkPod("bar", "3"), mkPod("qux", "5"), mkPod("baz", "6")),
341 },
342 }
343
344 s := NewFIFO(MetaNamespaceKeyFunc)
345 for line, item := range table {
346 if item.list != nil {
347
348 current := s.List()
349 checkMap := map[string]string{}
350 for _, item := range current {
351 pod := item.(*v1.Pod)
352 checkMap[pod.Name] = pod.ResourceVersion
353 }
354 for _, pod := range item.list.Items {
355 if e, a := pod.ResourceVersion, checkMap[pod.Name]; e != a {
356 t.Errorf("%v: expected %v, got %v for pod %v", line, e, a, pod.Name)
357 }
358 }
359 if e, a := len(item.list.Items), len(checkMap); e != a {
360 t.Errorf("%v: expected %v, got %v", line, e, a)
361 }
362 }
363 watchRet, watchErr := item.events, item.watchErr
364 lw := &testLW{
365 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
366 if watchErr != nil {
367 return nil, watchErr
368 }
369 watchErr = fmt.Errorf("second watch")
370 fw := watch.NewFake()
371 go func() {
372 for _, e := range watchRet {
373 fw.Action(e.Type, e.Object)
374 }
375 fw.Stop()
376 }()
377 return fw, nil
378 },
379 ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
380 return item.list, item.listErr
381 },
382 }
383 r := NewReflector(lw, &v1.Pod{}, s, 0)
384 r.ListAndWatch(wait.NeverStop)
385 }
386 }
387
388 func TestReflectorListAndWatchInitConnBackoff(t *testing.T) {
389 maxBackoff := 50 * time.Millisecond
390 table := []struct {
391 numConnFails int
392 expLowerBound time.Duration
393 expUpperBound time.Duration
394 }{
395 {5, 32 * time.Millisecond, 64 * time.Millisecond},
396 {40, 35 * 2 * maxBackoff, 40 * 2 * maxBackoff},
397
398 }
399 for _, test := range table {
400 t.Run(fmt.Sprintf("%d connection failures takes at least %d ms", test.numConnFails, 1<<test.numConnFails),
401 func(t *testing.T) {
402 stopCh := make(chan struct{})
403 connFails := test.numConnFails
404 fakeClock := testingclock.NewFakeClock(time.Unix(0, 0))
405 bm := wait.NewExponentialBackoffManager(time.Millisecond, maxBackoff, 100*time.Millisecond, 2.0, 1.0, fakeClock)
406 done := make(chan struct{})
407 defer close(done)
408 go func() {
409 i := 0
410 for {
411 select {
412 case <-done:
413 return
414 default:
415 }
416 if fakeClock.HasWaiters() {
417 step := (1 << (i + 1)) * time.Millisecond
418 if step > maxBackoff*2 {
419 step = maxBackoff * 2
420 }
421 fakeClock.Step(step)
422 i++
423 }
424 time.Sleep(100 * time.Microsecond)
425 }
426 }()
427 lw := &testLW{
428 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
429 if connFails > 0 {
430 connFails--
431 return nil, syscall.ECONNREFUSED
432 }
433 close(stopCh)
434 return watch.NewFake(), nil
435 },
436 ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
437 return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil
438 },
439 }
440 r := &Reflector{
441 name: "test-reflector",
442 listerWatcher: lw,
443 store: NewFIFO(MetaNamespaceKeyFunc),
444 backoffManager: bm,
445 clock: fakeClock,
446 watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler),
447 }
448 start := fakeClock.Now()
449 err := r.ListAndWatch(stopCh)
450 elapsed := fakeClock.Since(start)
451 if err != nil {
452 t.Errorf("unexpected error %v", err)
453 }
454 if elapsed < (test.expLowerBound) {
455 t.Errorf("expected lower bound of ListAndWatch: %v, got %v", test.expLowerBound, elapsed)
456 }
457 if elapsed > (test.expUpperBound) {
458 t.Errorf("expected upper bound of ListAndWatch: %v, got %v", test.expUpperBound, elapsed)
459 }
460 })
461 }
462 }
463
464 type fakeBackoff struct {
465 clock clock.Clock
466 calls int
467 }
468
469 func (f *fakeBackoff) Backoff() clock.Timer {
470 f.calls++
471 return f.clock.NewTimer(time.Duration(0))
472 }
473
474 func TestBackoffOnTooManyRequests(t *testing.T) {
475 err := apierrors.NewTooManyRequests("too many requests", 1)
476 clock := &clock.RealClock{}
477 bm := &fakeBackoff{clock: clock}
478
479 lw := &testLW{
480 ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
481 return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil
482 },
483 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
484 switch bm.calls {
485 case 0:
486 return nil, err
487 case 1:
488 w := watch.NewFakeWithChanSize(1, false)
489 status := err.Status()
490 w.Error(&status)
491 return w, nil
492 default:
493 w := watch.NewFake()
494 w.Stop()
495 return w, nil
496 }
497 },
498 }
499
500 r := &Reflector{
501 name: "test-reflector",
502 listerWatcher: lw,
503 store: NewFIFO(MetaNamespaceKeyFunc),
504 backoffManager: bm,
505 clock: clock,
506 watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler),
507 }
508
509 stopCh := make(chan struct{})
510 r.ListAndWatch(stopCh)
511 close(stopCh)
512 if bm.calls != 2 {
513 t.Errorf("unexpected watch backoff calls: %d", bm.calls)
514 }
515 }
516
517 func TestRetryInternalError(t *testing.T) {
518 testCases := []struct {
519 name string
520 maxInternalDuration time.Duration
521 rewindTime int
522 wantRetries int
523 }{
524 {
525 name: "retries off",
526 maxInternalDuration: time.Duration(0),
527 wantRetries: 0,
528 },
529 {
530 name: "retries on, all calls fail",
531 maxInternalDuration: time.Second * 30,
532 wantRetries: 31,
533 },
534 {
535 name: "retries on, one call successful",
536 maxInternalDuration: time.Second * 30,
537 rewindTime: 10,
538 wantRetries: 40,
539 },
540 }
541
542 for _, tc := range testCases {
543 err := apierrors.NewInternalError(fmt.Errorf("etcdserver: no leader"))
544 fakeClock := testingclock.NewFakeClock(time.Now())
545 bm := &fakeBackoff{clock: fakeClock}
546
547 counter := 0
548
549 lw := &testLW{
550 ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
551 return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil
552 },
553 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
554 counter = counter + 1
555 t.Logf("Counter: %v", counter)
556 if counter == tc.rewindTime {
557 t.Logf("Rewinding")
558 fakeClock.Step(time.Minute)
559 }
560
561 fakeClock.Step(time.Second)
562 w := watch.NewFakeWithChanSize(1, false)
563 status := err.Status()
564 w.Error(&status)
565 return w, nil
566 },
567 }
568
569 r := &Reflector{
570 name: "test-reflector",
571 listerWatcher: lw,
572 store: NewFIFO(MetaNamespaceKeyFunc),
573 backoffManager: bm,
574 clock: fakeClock,
575 watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler),
576 }
577
578 r.MaxInternalErrorRetryDuration = tc.maxInternalDuration
579
580 stopCh := make(chan struct{})
581 r.ListAndWatch(stopCh)
582 close(stopCh)
583
584 if counter-1 != tc.wantRetries {
585 t.Errorf("%v unexpected number of retries: %d", tc, counter-1)
586 }
587 }
588 }
589
590 func TestReflectorResync(t *testing.T) {
591 iteration := 0
592 stopCh := make(chan struct{})
593 rerr := errors.New("expected resync reached")
594 s := &FakeCustomStore{
595 ResyncFunc: func() error {
596 iteration++
597 if iteration == 2 {
598 return rerr
599 }
600 return nil
601 },
602 }
603
604 lw := &testLW{
605 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
606 fw := watch.NewFake()
607 return fw, nil
608 },
609 ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
610 return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "0"}}, nil
611 },
612 }
613 resyncPeriod := 1 * time.Millisecond
614 r := NewReflector(lw, &v1.Pod{}, s, resyncPeriod)
615 if err := r.ListAndWatch(stopCh); err != nil {
616
617 t.Errorf("expected error %v", err)
618 }
619 if iteration != 2 {
620 t.Errorf("exactly 2 iterations were expected, got: %v", iteration)
621 }
622 }
623
624 func TestReflectorWatchListPageSize(t *testing.T) {
625 stopCh := make(chan struct{})
626 s := NewStore(MetaNamespaceKeyFunc)
627
628 lw := &testLW{
629 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
630
631 close(stopCh)
632 fw := watch.NewFake()
633 return fw, nil
634 },
635 ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
636 if options.Limit != 4 {
637 t.Fatalf("Expected list Limit of 4 but got %d", options.Limit)
638 }
639 pods := make([]v1.Pod, 10)
640 for i := 0; i < 10; i++ {
641 pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}}
642 }
643 switch options.Continue {
644 case "":
645 return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10", Continue: "C1"}, Items: pods[0:4]}, nil
646 case "C1":
647 return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10", Continue: "C2"}, Items: pods[4:8]}, nil
648 case "C2":
649 return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods[8:10]}, nil
650 default:
651 t.Fatalf("Unrecognized continue: %s", options.Continue)
652 }
653 return nil, nil
654 },
655 }
656 r := NewReflector(lw, &v1.Pod{}, s, 0)
657
658 r.setLastSyncResourceVersion("10")
659
660 r.WatchListPageSize = 4
661 r.ListAndWatch(stopCh)
662
663 results := s.List()
664 if len(results) != 10 {
665 t.Errorf("Expected 10 results, got %d", len(results))
666 }
667 }
668
669 func TestReflectorNotPaginatingNotConsistentReads(t *testing.T) {
670 stopCh := make(chan struct{})
671 s := NewStore(MetaNamespaceKeyFunc)
672
673 lw := &testLW{
674 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
675
676 close(stopCh)
677 fw := watch.NewFake()
678 return fw, nil
679 },
680 ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
681 if options.ResourceVersion != "10" {
682 t.Fatalf("Expected ResourceVersion: \"10\", got: %s", options.ResourceVersion)
683 }
684 if options.Limit != 0 {
685 t.Fatalf("Expected list Limit of 0 but got %d", options.Limit)
686 }
687 pods := make([]v1.Pod, 10)
688 for i := 0; i < 10; i++ {
689 pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}}
690 }
691 return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods}, nil
692 },
693 }
694 r := NewReflector(lw, &v1.Pod{}, s, 0)
695 r.setLastSyncResourceVersion("10")
696 r.ListAndWatch(stopCh)
697
698 results := s.List()
699 if len(results) != 10 {
700 t.Errorf("Expected 10 results, got %d", len(results))
701 }
702 }
703
704 func TestReflectorPaginatingNonConsistentReadsIfWatchCacheDisabled(t *testing.T) {
705 var stopCh chan struct{}
706 s := NewStore(MetaNamespaceKeyFunc)
707
708 lw := &testLW{
709 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
710
711 close(stopCh)
712 fw := watch.NewFake()
713 return fw, nil
714 },
715 ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
716
717 if options.Limit != 500 {
718 t.Fatalf("Expected list Limit of 500 but got %d", options.Limit)
719 }
720 pods := make([]v1.Pod, 10)
721 for i := 0; i < 10; i++ {
722 pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}}
723 }
724 switch options.Continue {
725 case "":
726 return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10", Continue: "C1"}, Items: pods[0:4]}, nil
727 case "C1":
728 return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10", Continue: "C2"}, Items: pods[4:8]}, nil
729 case "C2":
730 return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods[8:10]}, nil
731 default:
732 t.Fatalf("Unrecognized continue: %s", options.Continue)
733 }
734 return nil, nil
735 },
736 }
737 r := NewReflector(lw, &v1.Pod{}, s, 0)
738
739
740 stopCh = make(chan struct{})
741 r.ListAndWatch(stopCh)
742 if results := s.List(); len(results) != 10 {
743 t.Errorf("Expected 10 results, got %d", len(results))
744 }
745
746
747
748 stopCh = make(chan struct{})
749 r.ListAndWatch(stopCh)
750 if results := s.List(); len(results) != 10 {
751 t.Errorf("Expected 10 results, got %d", len(results))
752 }
753 }
754
755
756
757
758 func TestReflectorResyncWithResourceVersion(t *testing.T) {
759 stopCh := make(chan struct{})
760 s := NewStore(MetaNamespaceKeyFunc)
761 listCallRVs := []string{}
762
763 lw := &testLW{
764 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
765
766 close(stopCh)
767 fw := watch.NewFake()
768 return fw, nil
769 },
770 ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
771 listCallRVs = append(listCallRVs, options.ResourceVersion)
772 pods := make([]v1.Pod, 8)
773 for i := 0; i < 8; i++ {
774 pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}}
775 }
776 switch options.ResourceVersion {
777 case "0":
778 return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods[0:4]}, nil
779 case "10":
780 return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "11"}, Items: pods[0:8]}, nil
781 default:
782 t.Fatalf("Unrecognized ResourceVersion: %s", options.ResourceVersion)
783 }
784 return nil, nil
785 },
786 }
787 r := NewReflector(lw, &v1.Pod{}, s, 0)
788
789
790 r.ListAndWatch(stopCh)
791
792 results := s.List()
793 if len(results) != 4 {
794 t.Errorf("Expected 4 results, got %d", len(results))
795 }
796
797
798 stopCh = make(chan struct{})
799 r.ListAndWatch(stopCh)
800
801 results = s.List()
802 if len(results) != 8 {
803 t.Errorf("Expected 8 results, got %d", len(results))
804 }
805
806 expectedRVs := []string{"0", "10"}
807 if !reflect.DeepEqual(listCallRVs, expectedRVs) {
808 t.Errorf("Expected series of list calls with resource versiosn of %v but got: %v", expectedRVs, listCallRVs)
809 }
810 }
811
812
813
814
815
816
817 func TestReflectorExpiredExactResourceVersion(t *testing.T) {
818 stopCh := make(chan struct{})
819 s := NewStore(MetaNamespaceKeyFunc)
820 listCallRVs := []string{}
821
822 lw := &testLW{
823 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
824
825 close(stopCh)
826 fw := watch.NewFake()
827 return fw, nil
828 },
829 ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
830 listCallRVs = append(listCallRVs, options.ResourceVersion)
831 pods := make([]v1.Pod, 8)
832 for i := 0; i < 8; i++ {
833 pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}}
834 }
835 switch options.ResourceVersion {
836 case "0":
837 return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods[0:4]}, nil
838 case "10":
839
840 return nil, apierrors.NewResourceExpired("The resourceVersion for the provided watch is too old.")
841 case "":
842 return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "11"}, Items: pods[0:8]}, nil
843 default:
844 t.Fatalf("Unrecognized ResourceVersion: %s", options.ResourceVersion)
845 }
846 return nil, nil
847 },
848 }
849 r := NewReflector(lw, &v1.Pod{}, s, 0)
850
851
852 r.ListAndWatch(stopCh)
853
854 results := s.List()
855 if len(results) != 4 {
856 t.Errorf("Expected 4 results, got %d", len(results))
857 }
858
859
860 stopCh = make(chan struct{})
861 r.ListAndWatch(stopCh)
862
863 results = s.List()
864 if len(results) != 8 {
865 t.Errorf("Expected 8 results, got %d", len(results))
866 }
867
868 expectedRVs := []string{"0", "10", ""}
869 if !reflect.DeepEqual(listCallRVs, expectedRVs) {
870 t.Errorf("Expected series of list calls with resource versiosn of %v but got: %v", expectedRVs, listCallRVs)
871 }
872 }
873
874 func TestReflectorFullListIfExpired(t *testing.T) {
875 stopCh := make(chan struct{})
876 s := NewStore(MetaNamespaceKeyFunc)
877 listCallRVs := []string{}
878
879 lw := &testLW{
880 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
881
882 close(stopCh)
883 fw := watch.NewFake()
884 return fw, nil
885 },
886 ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
887 listCallRVs = append(listCallRVs, options.ResourceVersion)
888 pods := make([]v1.Pod, 8)
889 for i := 0; i < 8; i++ {
890 pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}}
891 }
892 rvContinueLimit := func(rv, c string, l int64) metav1.ListOptions {
893 return metav1.ListOptions{ResourceVersion: rv, Continue: c, Limit: l}
894 }
895 switch rvContinueLimit(options.ResourceVersion, options.Continue, options.Limit) {
896
897 case rvContinueLimit("0", "", 4):
898 return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods[0:4]}, nil
899
900 case rvContinueLimit("10", "", 4):
901 return &v1.PodList{ListMeta: metav1.ListMeta{Continue: "C1", ResourceVersion: "11"}, Items: pods[0:4]}, nil
902
903 case rvContinueLimit("", "C1", 4):
904 return nil, apierrors.NewResourceExpired("The resourceVersion for the provided watch is too old.")
905
906 case rvContinueLimit("10", "", 0):
907 return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "11"}, Items: pods[0:8]}, nil
908 default:
909 err := fmt.Errorf("unexpected list options: %#v", options)
910 t.Error(err)
911 return nil, err
912 }
913 },
914 }
915 r := NewReflector(lw, &v1.Pod{}, s, 0)
916 r.WatchListPageSize = 4
917
918
919 if err := r.ListAndWatch(stopCh); err != nil {
920 t.Fatal(err)
921 }
922
923 results := s.List()
924 if len(results) != 4 {
925 t.Errorf("Expected 4 results, got %d", len(results))
926 }
927
928
929 stopCh = make(chan struct{})
930 if err := r.ListAndWatch(stopCh); err != nil {
931 t.Fatal(err)
932 }
933
934 results = s.List()
935 if len(results) != 8 {
936 t.Errorf("Expected 8 results, got %d", len(results))
937 }
938
939 expectedRVs := []string{"0", "10", "", "10"}
940 if !reflect.DeepEqual(listCallRVs, expectedRVs) {
941 t.Errorf("Expected series of list calls with resource versiosn of %#v but got: %#v", expectedRVs, listCallRVs)
942 }
943 }
944
945 func TestReflectorFullListIfTooLarge(t *testing.T) {
946 stopCh := make(chan struct{})
947 s := NewStore(MetaNamespaceKeyFunc)
948 listCallRVs := []string{}
949 version := 30
950
951 lw := &testLW{
952 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
953
954 close(stopCh)
955 fw := watch.NewFake()
956 return fw, nil
957 },
958 ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
959 listCallRVs = append(listCallRVs, options.ResourceVersion)
960 resourceVersion := strconv.Itoa(version)
961
962 switch options.ResourceVersion {
963
964 case "0":
965 return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "20"}}, nil
966
967 case "20":
968 err := apierrors.NewTimeoutError("too large resource version", 1)
969 err.ErrStatus.Details.Causes = []metav1.StatusCause{{Type: metav1.CauseTypeResourceVersionTooLarge}}
970 return nil, err
971
972 case "30":
973 err := apierrors.NewTimeoutError("too large resource version", 1)
974 err.ErrStatus.Details.Causes = []metav1.StatusCause{{Message: "Too large resource version"}}
975 return nil, err
976
977 case "40":
978 err := apierrors.NewTimeoutError("Too large resource version", 1)
979 return nil, err
980
981 case "":
982 version += 10
983 return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: resourceVersion}}, nil
984 default:
985 return nil, fmt.Errorf("unexpected List call: %s", options.ResourceVersion)
986 }
987 },
988 }
989 r := NewReflector(lw, &v1.Pod{}, s, 0)
990
991
992 if err := r.ListAndWatch(stopCh); err != nil {
993 t.Fatal(err)
994 }
995
996
997
998
999
1000
1001
1002 for i := 1; i <= 3; i++ {
1003
1004 stopCh = make(chan struct{})
1005 if err := r.ListAndWatch(stopCh); err != nil {
1006 t.Fatal(err)
1007 }
1008 }
1009
1010 expectedRVs := []string{"0", "20", "", "30", "", "40", ""}
1011 if !reflect.DeepEqual(listCallRVs, expectedRVs) {
1012 t.Errorf("Expected series of list calls with resource version of %#v but got: %#v", expectedRVs, listCallRVs)
1013 }
1014 }
1015
1016 func TestGetTypeDescriptionFromObject(t *testing.T) {
1017 obj := &unstructured.Unstructured{}
1018 gvk := schema.GroupVersionKind{
1019 Group: "mygroup",
1020 Version: "v1",
1021 Kind: "MyKind",
1022 }
1023 obj.SetGroupVersionKind(gvk)
1024
1025 testCases := map[string]struct {
1026 inputType interface{}
1027 expectedTypeDescription string
1028 }{
1029 "Nil type": {
1030 expectedTypeDescription: defaultExpectedTypeName,
1031 },
1032 "Normal type": {
1033 inputType: &v1.Pod{},
1034 expectedTypeDescription: "*v1.Pod",
1035 },
1036 "Unstructured type without GVK": {
1037 inputType: &unstructured.Unstructured{},
1038 expectedTypeDescription: "*unstructured.Unstructured",
1039 },
1040 "Unstructured type with GVK": {
1041 inputType: obj,
1042 expectedTypeDescription: gvk.String(),
1043 },
1044 }
1045 for testName, tc := range testCases {
1046 t.Run(testName, func(t *testing.T) {
1047 typeDescription := getTypeDescriptionFromObject(tc.inputType)
1048 if tc.expectedTypeDescription != typeDescription {
1049 t.Fatalf("Expected typeDescription %v, got %v", tc.expectedTypeDescription, typeDescription)
1050 }
1051 })
1052 }
1053 }
1054
1055 func TestGetExpectedGVKFromObject(t *testing.T) {
1056 obj := &unstructured.Unstructured{}
1057 gvk := schema.GroupVersionKind{
1058 Group: "mygroup",
1059 Version: "v1",
1060 Kind: "MyKind",
1061 }
1062 obj.SetGroupVersionKind(gvk)
1063
1064 testCases := map[string]struct {
1065 inputType interface{}
1066 expectedGVK *schema.GroupVersionKind
1067 }{
1068 "Nil type": {},
1069 "Some non Unstructured type": {
1070 inputType: &v1.Pod{},
1071 },
1072 "Unstructured type without GVK": {
1073 inputType: &unstructured.Unstructured{},
1074 },
1075 "Unstructured type with GVK": {
1076 inputType: obj,
1077 expectedGVK: &gvk,
1078 },
1079 }
1080 for testName, tc := range testCases {
1081 t.Run(testName, func(t *testing.T) {
1082 expectedGVK := getExpectedGVKFromObject(tc.inputType)
1083 gvkNotEqual := (tc.expectedGVK == nil) != (expectedGVK == nil)
1084 if tc.expectedGVK != nil && expectedGVK != nil {
1085 gvkNotEqual = *tc.expectedGVK != *expectedGVK
1086 }
1087 if gvkNotEqual {
1088 t.Fatalf("Expected expectedGVK %v, got %v", tc.expectedGVK, expectedGVK)
1089 }
1090 })
1091 }
1092 }
1093
1094 type storeWithRV struct {
1095 Store
1096
1097
1098 resourceVersions []string
1099 }
1100
1101 func (s *storeWithRV) UpdateResourceVersion(resourceVersion string) {
1102 s.resourceVersions = append(s.resourceVersions, resourceVersion)
1103 }
1104
1105 func newStoreWithRV() *storeWithRV {
1106 return &storeWithRV{
1107 Store: NewStore(MetaNamespaceKeyFunc),
1108 }
1109 }
1110
1111 func TestReflectorResourceVersionUpdate(t *testing.T) {
1112 s := newStoreWithRV()
1113
1114 stopCh := make(chan struct{})
1115 fw := watch.NewFake()
1116
1117 lw := &testLW{
1118 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
1119 return fw, nil
1120 },
1121 ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
1122 return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}}, nil
1123 },
1124 }
1125 r := NewReflector(lw, &v1.Pod{}, s, 0)
1126
1127 makePod := func(rv string) *v1.Pod {
1128 return &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: rv}}
1129 }
1130
1131 go func() {
1132 fw.Action(watch.Added, makePod("10"))
1133 fw.Action(watch.Modified, makePod("20"))
1134 fw.Action(watch.Bookmark, makePod("30"))
1135 fw.Action(watch.Deleted, makePod("40"))
1136 close(stopCh)
1137 }()
1138
1139
1140 if err := r.ListAndWatch(stopCh); err != nil {
1141 t.Fatal(err)
1142 }
1143
1144 expectedRVs := []string{"10", "20", "30", "40"}
1145 if !reflect.DeepEqual(s.resourceVersions, expectedRVs) {
1146 t.Errorf("Expected series of resource version updates of %#v but got: %#v", expectedRVs, s.resourceVersions)
1147 }
1148 }
1149
1150 const (
1151 fakeItemsNum = 100
1152 exemptObjectIndex = fakeItemsNum / 4
1153 pageNum = 3
1154 )
1155
1156 func getPodListItems(start int, numItems int) (string, string, *v1.PodList) {
1157 out := &v1.PodList{
1158 Items: make([]v1.Pod, numItems),
1159 }
1160
1161 for i := 0; i < numItems; i++ {
1162
1163 out.Items[i] = v1.Pod{
1164 TypeMeta: metav1.TypeMeta{
1165 APIVersion: "v1",
1166 Kind: "Pod",
1167 },
1168 ObjectMeta: metav1.ObjectMeta{
1169 Name: fmt.Sprintf("pod-%d", i+start),
1170 Namespace: "default",
1171 Labels: map[string]string{
1172 "label-key-1": "label-value-1",
1173 },
1174 Annotations: map[string]string{
1175 "annotations-key-1": "annotations-value-1",
1176 },
1177 },
1178 Spec: v1.PodSpec{
1179 Overhead: v1.ResourceList{
1180 v1.ResourceCPU: resource.MustParse("3"),
1181 v1.ResourceMemory: resource.MustParse("8"),
1182 },
1183 NodeSelector: map[string]string{
1184 "foo": "bar",
1185 "baz": "quux",
1186 },
1187 Affinity: &v1.Affinity{
1188 NodeAffinity: &v1.NodeAffinity{
1189 RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
1190 NodeSelectorTerms: []v1.NodeSelectorTerm{
1191 {MatchExpressions: []v1.NodeSelectorRequirement{{Key: `foo`}}},
1192 },
1193 },
1194 PreferredDuringSchedulingIgnoredDuringExecution: []v1.PreferredSchedulingTerm{
1195 {Preference: v1.NodeSelectorTerm{MatchExpressions: []v1.NodeSelectorRequirement{{Key: `foo`}}}},
1196 },
1197 },
1198 },
1199 TopologySpreadConstraints: []v1.TopologySpreadConstraint{
1200 {TopologyKey: `foo`},
1201 },
1202 HostAliases: []v1.HostAlias{
1203 {IP: "1.1.1.1"},
1204 {IP: "2.2.2.2"},
1205 },
1206 ImagePullSecrets: []v1.LocalObjectReference{
1207 {Name: "secret1"},
1208 {Name: "secret2"},
1209 },
1210 Containers: []v1.Container{
1211 {
1212 Name: "foobar",
1213 Image: "alpine",
1214 Resources: v1.ResourceRequirements{
1215 Requests: v1.ResourceList{
1216 v1.ResourceName(v1.ResourceCPU): resource.MustParse("1"),
1217 v1.ResourceName(v1.ResourceMemory): resource.MustParse("5"),
1218 },
1219 Limits: v1.ResourceList{
1220 v1.ResourceName(v1.ResourceCPU): resource.MustParse("2"),
1221 v1.ResourceName(v1.ResourceMemory): resource.MustParse("10"),
1222 },
1223 },
1224 },
1225 {
1226 Name: "foobar2",
1227 Image: "alpine",
1228 Resources: v1.ResourceRequirements{
1229 Requests: v1.ResourceList{
1230 v1.ResourceName(v1.ResourceCPU): resource.MustParse("4"),
1231 v1.ResourceName(v1.ResourceMemory): resource.MustParse("12"),
1232 },
1233 Limits: v1.ResourceList{
1234 v1.ResourceName(v1.ResourceCPU): resource.MustParse("8"),
1235 v1.ResourceName(v1.ResourceMemory): resource.MustParse("24"),
1236 },
1237 },
1238 },
1239 },
1240 InitContainers: []v1.Container{
1241 {
1242 Name: "small-init",
1243 Image: "alpine",
1244 Resources: v1.ResourceRequirements{
1245 Requests: v1.ResourceList{
1246 v1.ResourceName(v1.ResourceCPU): resource.MustParse("1"),
1247 v1.ResourceName(v1.ResourceMemory): resource.MustParse("5"),
1248 },
1249 Limits: v1.ResourceList{
1250 v1.ResourceName(v1.ResourceCPU): resource.MustParse("1"),
1251 v1.ResourceName(v1.ResourceMemory): resource.MustParse("5"),
1252 },
1253 },
1254 },
1255 {
1256 Name: "big-init",
1257 Image: "alpine",
1258 Resources: v1.ResourceRequirements{
1259 Requests: v1.ResourceList{
1260 v1.ResourceName(v1.ResourceCPU): resource.MustParse("40"),
1261 v1.ResourceName(v1.ResourceMemory): resource.MustParse("120"),
1262 },
1263 Limits: v1.ResourceList{
1264 v1.ResourceName(v1.ResourceCPU): resource.MustParse("80"),
1265 v1.ResourceName(v1.ResourceMemory): resource.MustParse("240"),
1266 },
1267 },
1268 },
1269 },
1270 Hostname: fmt.Sprintf("node-%d", i),
1271 },
1272 Status: v1.PodStatus{
1273 Phase: v1.PodRunning,
1274 ContainerStatuses: []v1.ContainerStatus{
1275 {
1276 ContainerID: "docker://numbers",
1277 Image: "alpine",
1278 Name: "foobar",
1279 Ready: false,
1280 },
1281 {
1282 ContainerID: "docker://numbers",
1283 Image: "alpine",
1284 Name: "foobar2",
1285 Ready: false,
1286 },
1287 },
1288 InitContainerStatuses: []v1.ContainerStatus{
1289 {
1290 ContainerID: "docker://numbers",
1291 Image: "alpine",
1292 Name: "small-init",
1293 Ready: false,
1294 },
1295 {
1296 ContainerID: "docker://numbers",
1297 Image: "alpine",
1298 Name: "big-init",
1299 Ready: false,
1300 },
1301 },
1302 Conditions: []v1.PodCondition{
1303 {
1304 Type: v1.PodScheduled,
1305 Status: v1.ConditionTrue,
1306 Reason: "successfully",
1307 Message: "sync pod successfully",
1308 LastProbeTime: metav1.Now(),
1309 LastTransitionTime: metav1.Now(),
1310 },
1311 },
1312 },
1313 }
1314 }
1315
1316 return out.Items[0].GetName(), out.Items[exemptObjectIndex].GetName(), out
1317 }
1318
1319 func getConfigmapListItems(start int, numItems int) (string, string, *v1.ConfigMapList) {
1320 out := &v1.ConfigMapList{
1321 Items: make([]v1.ConfigMap, numItems),
1322 }
1323
1324 for i := 0; i < numItems; i++ {
1325 out.Items[i] = v1.ConfigMap{
1326 TypeMeta: metav1.TypeMeta{
1327 APIVersion: "v1",
1328 Kind: "ConfigMap",
1329 },
1330 ObjectMeta: metav1.ObjectMeta{
1331 Name: fmt.Sprintf("cm-%d", i+start),
1332 Namespace: "default",
1333 Labels: map[string]string{
1334 "label-key-1": "label-value-1",
1335 },
1336 Annotations: map[string]string{
1337 "annotations-key-1": "annotations-value-1",
1338 },
1339 },
1340 Data: map[string]string{
1341 "data-1": "value-1",
1342 "data-2": "value-2",
1343 },
1344 }
1345 }
1346
1347 return out.Items[0].GetName(), out.Items[exemptObjectIndex].GetName(), out
1348 }
1349
1350 type TestPagingPodsLW struct {
1351 totalPageCount int
1352 fetchedPageCount int
1353
1354 detectedObjectNameList []string
1355 exemptObjectNameList []string
1356 }
1357
1358 func newPageTestLW(totalPageNum int) *TestPagingPodsLW {
1359 return &TestPagingPodsLW{
1360 totalPageCount: totalPageNum,
1361 fetchedPageCount: 0,
1362 }
1363 }
1364
1365 func (t *TestPagingPodsLW) List(options metav1.ListOptions) (runtime.Object, error) {
1366 firstPodName, exemptPodName, list := getPodListItems(t.fetchedPageCount*fakeItemsNum, fakeItemsNum)
1367 t.detectedObjectNameList = append(t.detectedObjectNameList, firstPodName)
1368 t.exemptObjectNameList = append(t.exemptObjectNameList, exemptPodName)
1369 t.fetchedPageCount++
1370 if t.fetchedPageCount >= t.totalPageCount {
1371 return list, nil
1372 }
1373 list.SetContinue("true")
1374 return list, nil
1375 }
1376
1377 func (t *TestPagingPodsLW) Watch(options metav1.ListOptions) (watch.Interface, error) {
1378 return nil, nil
1379 }
1380
1381 func TestReflectorListExtract(t *testing.T) {
1382 store := NewStore(func(obj interface{}) (string, error) {
1383 pod, ok := obj.(*v1.Pod)
1384 if !ok {
1385 return "", fmt.Errorf("expect *v1.Pod, but got %T", obj)
1386 }
1387 return pod.GetName(), nil
1388 })
1389
1390 lw := newPageTestLW(5)
1391 reflector := NewReflector(lw, &v1.Pod{}, store, 0)
1392 reflector.WatchListPageSize = fakeItemsNum
1393
1394
1395 stopCh := make(chan struct{})
1396 if err := reflector.list(stopCh); err != nil {
1397 t.Fatal(err)
1398 }
1399
1400
1401
1402 for _, podName := range lw.exemptObjectNameList {
1403 _, exist, err := store.GetByKey(podName)
1404 if err != nil || !exist {
1405 t.Fatalf("%s should exist in pod store", podName)
1406 }
1407 }
1408
1409
1410
1411
1412 detectedPodAlreadyBeCleared := make(chan struct{}, len(lw.detectedObjectNameList))
1413
1414 for _, firstPodName := range lw.detectedObjectNameList {
1415 _, exist, err := store.GetByKey(firstPodName)
1416 if err != nil || !exist {
1417 t.Fatalf("%s should exist in pod store", firstPodName)
1418 }
1419 firstPod, exist, err := store.GetByKey(firstPodName)
1420 if err != nil || !exist {
1421 t.Fatalf("%s should exist in pod store", firstPodName)
1422 }
1423 goruntime.SetFinalizer(firstPod, func(obj interface{}) {
1424 t.Logf("%s already be gc\n", obj.(*v1.Pod).GetName())
1425 detectedPodAlreadyBeCleared <- struct{}{}
1426 })
1427 }
1428
1429 storedObjectKeys := store.ListKeys()
1430 for _, k := range storedObjectKeys {
1431
1432 if sets.NewString(lw.exemptObjectNameList...).Has(k) {
1433 continue
1434 }
1435 obj, exist, err := store.GetByKey(k)
1436 if err != nil || !exist {
1437 t.Fatalf("%s should exist in pod store", k)
1438 }
1439
1440 if err := store.Delete(obj); err != nil {
1441 t.Fatalf("delete object: %v", err)
1442 }
1443 goruntime.GC()
1444 }
1445
1446 clearedNum := 0
1447 for {
1448 select {
1449 case <-detectedPodAlreadyBeCleared:
1450 clearedNum++
1451 if clearedNum == len(lw.detectedObjectNameList) {
1452 return
1453 }
1454 }
1455 }
1456 }
1457
1458 func BenchmarkExtractList(b *testing.B) {
1459 _, _, podList := getPodListItems(0, fakeItemsNum)
1460 _, _, configMapList := getConfigmapListItems(0, fakeItemsNum)
1461 tests := []struct {
1462 name string
1463 list runtime.Object
1464 }{
1465 {
1466 name: "PodList",
1467 list: podList,
1468 },
1469 {
1470 name: "ConfigMapList",
1471 list: configMapList,
1472 },
1473 }
1474
1475 for _, tc := range tests {
1476 b.Run(tc.name, func(b *testing.B) {
1477 b.ResetTimer()
1478 for i := 0; i < b.N; i++ {
1479 _, err := meta.ExtractList(tc.list)
1480 if err != nil {
1481 b.Errorf("extract list: %v", err)
1482 }
1483 }
1484 b.StopTimer()
1485 })
1486 }
1487 }
1488
1489 func BenchmarkEachListItem(b *testing.B) {
1490 _, _, podList := getPodListItems(0, fakeItemsNum)
1491 _, _, configMapList := getConfigmapListItems(0, fakeItemsNum)
1492 tests := []struct {
1493 name string
1494 list runtime.Object
1495 }{
1496 {
1497 name: "PodList",
1498 list: podList,
1499 },
1500 {
1501 name: "ConfigMapList",
1502 list: configMapList,
1503 },
1504 }
1505
1506 for _, tc := range tests {
1507 b.Run(tc.name, func(b *testing.B) {
1508 b.ResetTimer()
1509 for i := 0; i < b.N; i++ {
1510 err := meta.EachListItem(tc.list, func(object runtime.Object) error {
1511 return nil
1512 })
1513 if err != nil {
1514 b.Errorf("each list: %v", err)
1515 }
1516 }
1517 b.StopTimer()
1518 })
1519 }
1520 }
1521
1522 func BenchmarkExtractListWithAlloc(b *testing.B) {
1523 _, _, podList := getPodListItems(0, fakeItemsNum)
1524 _, _, configMapList := getConfigmapListItems(0, fakeItemsNum)
1525 tests := []struct {
1526 name string
1527 list runtime.Object
1528 }{
1529 {
1530 name: "PodList",
1531 list: podList,
1532 },
1533 {
1534 name: "ConfigMapList",
1535 list: configMapList,
1536 },
1537 }
1538
1539 for _, tc := range tests {
1540 b.Run(tc.name, func(b *testing.B) {
1541 b.ResetTimer()
1542 for i := 0; i < b.N; i++ {
1543 _, err := meta.ExtractListWithAlloc(tc.list)
1544 if err != nil {
1545 b.Errorf("extract list with alloc: %v", err)
1546 }
1547 }
1548 b.StopTimer()
1549 })
1550 }
1551 }
1552
1553 func BenchmarkEachListItemWithAlloc(b *testing.B) {
1554 _, _, podList := getPodListItems(0, fakeItemsNum)
1555 _, _, configMapList := getConfigmapListItems(0, fakeItemsNum)
1556 tests := []struct {
1557 name string
1558 list runtime.Object
1559 }{
1560 {
1561 name: "PodList",
1562 list: podList,
1563 },
1564 {
1565 name: "ConfigMapList",
1566 list: configMapList,
1567 },
1568 }
1569
1570 for _, tc := range tests {
1571 b.Run(tc.name, func(b *testing.B) {
1572 b.ResetTimer()
1573 for i := 0; i < b.N; i++ {
1574 err := meta.EachListItemWithAlloc(tc.list, func(object runtime.Object) error {
1575 return nil
1576 })
1577 if err != nil {
1578 b.Errorf("each list with alloc: %v", err)
1579 }
1580 }
1581 b.StopTimer()
1582 })
1583 }
1584 }
1585
1586 func BenchmarkReflectorList(b *testing.B) {
1587 ctx, cancel := context.WithTimeout(context.Background(), wait.ForeverTestTimeout)
1588 defer cancel()
1589
1590 store := NewStore(func(obj interface{}) (string, error) {
1591 o, err := meta.Accessor(obj)
1592 if err != nil {
1593 return "", err
1594 }
1595 return o.GetName(), nil
1596 })
1597
1598 _, _, podList := getPodListItems(0, fakeItemsNum)
1599 _, _, configMapList := getConfigmapListItems(0, fakeItemsNum)
1600 tests := []struct {
1601 name string
1602 sample func() interface{}
1603 list runtime.Object
1604 }{
1605 {
1606 name: "PodList",
1607 sample: func() interface{} {
1608 return v1.Pod{}
1609 },
1610 list: podList,
1611 },
1612 {
1613 name: "ConfigMapList",
1614 sample: func() interface{} {
1615 return v1.ConfigMap{}
1616 },
1617 list: configMapList,
1618 },
1619 }
1620
1621 for _, tc := range tests {
1622 b.Run(tc.name, func(b *testing.B) {
1623
1624 sample := tc.sample()
1625 reflector := NewReflector(newPageTestLW(pageNum), &sample, store, 0)
1626 reflector.WatchListPageSize = fakeItemsNum
1627
1628 b.ResetTimer()
1629 for i := 0; i < b.N; i++ {
1630 err := reflector.list(ctx.Done())
1631 if err != nil {
1632 b.Fatalf("reflect list: %v", err)
1633 }
1634 }
1635 b.StopTimer()
1636 })
1637 }
1638 }
1639
View as plain text