1
16
17 package cache
18
19 import (
20 "context"
21 "fmt"
22 "math/rand"
23 "strconv"
24 "strings"
25 "sync"
26 "testing"
27 "time"
28
29 "github.com/google/go-cmp/cmp"
30 "github.com/google/go-cmp/cmp/cmpopts"
31 "github.com/stretchr/testify/assert"
32 v1 "k8s.io/api/core/v1"
33 "k8s.io/apimachinery/pkg/api/meta"
34 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
35 "k8s.io/apimachinery/pkg/util/sets"
36 "k8s.io/apimachinery/pkg/util/wait"
37 fcache "k8s.io/client-go/tools/cache/testing"
38 testingclock "k8s.io/utils/clock/testing"
39 )
40
41 type testListener struct {
42 lock sync.RWMutex
43 resyncPeriod time.Duration
44 expectedItemNames sets.String
45 receivedItemNames []string
46 name string
47 }
48
49 func newTestListener(name string, resyncPeriod time.Duration, expected ...string) *testListener {
50 l := &testListener{
51 resyncPeriod: resyncPeriod,
52 expectedItemNames: sets.NewString(expected...),
53 name: name,
54 }
55 return l
56 }
57
58 func (l *testListener) OnAdd(obj interface{}, isInInitialList bool) {
59 l.handle(obj)
60 }
61
62 func (l *testListener) OnUpdate(old, new interface{}) {
63 l.handle(new)
64 }
65
66 func (l *testListener) OnDelete(obj interface{}) {
67 }
68
69 func (l *testListener) handle(obj interface{}) {
70 key, _ := MetaNamespaceKeyFunc(obj)
71 fmt.Printf("%s: handle: %v\n", l.name, key)
72 l.lock.Lock()
73 defer l.lock.Unlock()
74 objectMeta, _ := meta.Accessor(obj)
75 l.receivedItemNames = append(l.receivedItemNames, objectMeta.GetName())
76 }
77
78 func (l *testListener) ok() bool {
79 fmt.Println("polling")
80 err := wait.PollImmediate(100*time.Millisecond, 2*time.Second, func() (bool, error) {
81 if l.satisfiedExpectations() {
82 return true, nil
83 }
84 return false, nil
85 })
86 if err != nil {
87 return false
88 }
89
90
91 fmt.Println("sleeping")
92 time.Sleep(1 * time.Second)
93 fmt.Println("final check")
94 return l.satisfiedExpectations()
95 }
96
97 func (l *testListener) satisfiedExpectations() bool {
98 l.lock.RLock()
99 defer l.lock.RUnlock()
100
101 return sets.NewString(l.receivedItemNames...).Equal(l.expectedItemNames)
102 }
103
104 func eventHandlerCount(i SharedInformer) int {
105 s := i.(*sharedIndexInformer)
106 s.startedLock.Lock()
107 defer s.startedLock.Unlock()
108 return len(s.processor.listeners)
109 }
110
111 func isStarted(i SharedInformer) bool {
112 s := i.(*sharedIndexInformer)
113 s.startedLock.Lock()
114 defer s.startedLock.Unlock()
115 return s.started
116 }
117
118 func isRegistered(i SharedInformer, h ResourceEventHandlerRegistration) bool {
119 s := i.(*sharedIndexInformer)
120 return s.processor.getListener(h) != nil
121 }
122
123 func TestIndexer(t *testing.T) {
124 assert := assert.New(t)
125
126 source := fcache.NewFakeControllerSource()
127 pod1 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", Labels: map[string]string{"a": "a-val", "b": "b-val1"}}}
128 pod2 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", Labels: map[string]string{"b": "b-val2"}}}
129 pod3 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod3", Labels: map[string]string{"a": "a-val2"}}}
130 source.Add(pod1)
131 source.Add(pod2)
132
133
134 informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
135 err := informer.AddIndexers(map[string]IndexFunc{
136 "labels": func(obj interface{}) ([]string, error) {
137 res := []string{}
138 for k := range obj.(*v1.Pod).Labels {
139 res = append(res, k)
140 }
141 return res, nil
142 },
143 })
144 if err != nil {
145 t.Fatal(err)
146 }
147 stop := make(chan struct{})
148 defer close(stop)
149
150 go informer.Run(stop)
151 WaitForCacheSync(stop, informer.HasSynced)
152
153 cmpOps := cmpopts.SortSlices(func(a, b any) bool {
154 return a.(*v1.Pod).Name < b.(*v1.Pod).Name
155 })
156
157
158 res, err := informer.GetIndexer().ByIndex("labels", "a")
159 assert.NoError(err)
160 if diff := cmp.Diff([]any{pod1}, res); diff != "" {
161 t.Fatal(diff)
162 }
163
164
165 source.Add(pod3)
166
167 assert.Eventually(func() bool {
168 res, _ := informer.GetIndexer().ByIndex("labels", "a")
169 return cmp.Diff([]any{pod1, pod3}, res, cmpOps) == ""
170 }, time.Second*3, time.Millisecond)
171
172
173 err = informer.AddIndexers(map[string]IndexFunc{
174 "labels-again": func(obj interface{}) ([]string, error) {
175 res := []string{}
176 for k := range obj.(*v1.Pod).Labels {
177 res = append(res, k)
178 }
179 return res, nil
180 },
181 })
182 assert.NoError(err)
183
184
185 res, err = informer.GetIndexer().ByIndex("labels-again", "a")
186 assert.NoError(err)
187 if diff := cmp.Diff([]any{pod1, pod3}, res, cmpOps); diff != "" {
188 t.Fatal(diff)
189 }
190 if got := informer.GetIndexer().ListIndexFuncValues("labels"); !sets.New(got...).Equal(sets.New("a", "b")) {
191 t.Fatalf("got %v", got)
192 }
193 if got := informer.GetIndexer().ListIndexFuncValues("labels-again"); !sets.New(got...).Equal(sets.New("a", "b")) {
194 t.Fatalf("got %v", got)
195 }
196 }
197
198 func TestListenerResyncPeriods(t *testing.T) {
199
200 source := fcache.NewFakeControllerSource()
201 source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
202 source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2"}})
203
204
205 informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
206
207 clock := testingclock.NewFakeClock(time.Now())
208 informer.clock = clock
209 informer.processor.clock = clock
210
211
212 listener1 := newTestListener("listener1", 0, "pod1", "pod2")
213 informer.AddEventHandlerWithResyncPeriod(listener1, listener1.resyncPeriod)
214
215
216 listener2 := newTestListener("listener2", 2*time.Second, "pod1", "pod2")
217 informer.AddEventHandlerWithResyncPeriod(listener2, listener2.resyncPeriod)
218
219
220 listener3 := newTestListener("listener3", 3*time.Second, "pod1", "pod2")
221 informer.AddEventHandlerWithResyncPeriod(listener3, listener3.resyncPeriod)
222 listeners := []*testListener{listener1, listener2, listener3}
223
224 stop := make(chan struct{})
225 defer close(stop)
226
227 go informer.Run(stop)
228
229
230 for _, listener := range listeners {
231 if !listener.ok() {
232 t.Errorf("%s: expected %v, got %v", listener.name, listener.expectedItemNames, listener.receivedItemNames)
233 }
234 }
235
236
237 for _, listener := range listeners {
238 listener.receivedItemNames = []string{}
239 }
240
241
242 clock.Step(2 * time.Second)
243
244
245 if !listener2.ok() {
246 t.Errorf("%s: expected %v, got %v", listener2.name, listener2.expectedItemNames, listener2.receivedItemNames)
247 }
248
249
250 time.Sleep(1 * time.Second)
251
252
253 if len(listener1.receivedItemNames) != 0 {
254 t.Errorf("listener1: should not have resynced (got %d)", len(listener1.receivedItemNames))
255 }
256 if len(listener3.receivedItemNames) != 0 {
257 t.Errorf("listener3: should not have resynced (got %d)", len(listener3.receivedItemNames))
258 }
259
260
261 for _, listener := range listeners {
262 listener.receivedItemNames = []string{}
263 }
264
265
266 clock.Step(1 * time.Second)
267
268
269 if !listener3.ok() {
270 t.Errorf("%s: expected %v, got %v", listener3.name, listener3.expectedItemNames, listener3.receivedItemNames)
271 }
272
273
274 time.Sleep(1 * time.Second)
275
276
277 if len(listener1.receivedItemNames) != 0 {
278 t.Errorf("listener1: should not have resynced (got %d)", len(listener1.receivedItemNames))
279 }
280 if len(listener2.receivedItemNames) != 0 {
281 t.Errorf("listener2: should not have resynced (got %d)", len(listener2.receivedItemNames))
282 }
283 }
284
285 func TestResyncCheckPeriod(t *testing.T) {
286
287 source := fcache.NewFakeControllerSource()
288
289
290 informer := NewSharedInformer(source, &v1.Pod{}, 12*time.Hour).(*sharedIndexInformer)
291 gl := informer.processor.getListener
292
293 clock := testingclock.NewFakeClock(time.Now())
294 informer.clock = clock
295 informer.processor.clock = clock
296
297
298 listener1 := newTestListener("listener1", 0)
299 handler1, _ := informer.AddEventHandlerWithResyncPeriod(listener1, listener1.resyncPeriod)
300
301 if e, a := 12*time.Hour, informer.resyncCheckPeriod; e != a {
302 t.Errorf("expected %d, got %d", e, a)
303 }
304 if e, a := time.Duration(0), gl(handler1).resyncPeriod; e != a {
305 t.Errorf("expected %d, got %d", e, a)
306 }
307
308
309 listener2 := newTestListener("listener2", 1*time.Minute)
310 handler2, _ := informer.AddEventHandlerWithResyncPeriod(listener2, listener2.resyncPeriod)
311 if e, a := 1*time.Minute, informer.resyncCheckPeriod; e != a {
312 t.Errorf("expected %d, got %d", e, a)
313 }
314 if e, a := time.Duration(0), gl(handler1).resyncPeriod; e != a {
315 t.Errorf("expected %d, got %d", e, a)
316 }
317 if e, a := 1*time.Minute, gl(handler2).resyncPeriod; e != a {
318 t.Errorf("expected %d, got %d", e, a)
319 }
320
321
322 listener3 := newTestListener("listener3", 55*time.Second)
323 handler3, _ := informer.AddEventHandlerWithResyncPeriod(listener3, listener3.resyncPeriod)
324 if e, a := 55*time.Second, informer.resyncCheckPeriod; e != a {
325 t.Errorf("expected %d, got %d", e, a)
326 }
327 if e, a := time.Duration(0), gl(handler1).resyncPeriod; e != a {
328 t.Errorf("expected %d, got %d", e, a)
329 }
330 if e, a := 1*time.Minute, gl(handler2).resyncPeriod; e != a {
331 t.Errorf("expected %d, got %d", e, a)
332 }
333 if e, a := 55*time.Second, gl(handler3).resyncPeriod; e != a {
334 t.Errorf("expected %d, got %d", e, a)
335 }
336
337
338 listener4 := newTestListener("listener4", 5*time.Second)
339 handler4, _ := informer.AddEventHandlerWithResyncPeriod(listener4, listener4.resyncPeriod)
340 if e, a := 5*time.Second, informer.resyncCheckPeriod; e != a {
341 t.Errorf("expected %d, got %d", e, a)
342 }
343 if e, a := time.Duration(0), gl(handler1).resyncPeriod; e != a {
344 t.Errorf("expected %d, got %d", e, a)
345 }
346 if e, a := 1*time.Minute, gl(handler2).resyncPeriod; e != a {
347 t.Errorf("expected %d, got %d", e, a)
348 }
349 if e, a := 55*time.Second, gl(handler3).resyncPeriod; e != a {
350 t.Errorf("expected %d, got %d", e, a)
351 }
352 if e, a := 5*time.Second, gl(handler4).resyncPeriod; e != a {
353 t.Errorf("expected %d, got %d", e, a)
354 }
355 }
356
357
358 func TestSharedInformerInitializationRace(t *testing.T) {
359 source := fcache.NewFakeControllerSource()
360 informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
361 listener := newTestListener("raceListener", 0)
362
363 stop := make(chan struct{})
364 go informer.AddEventHandlerWithResyncPeriod(listener, listener.resyncPeriod)
365 go informer.Run(stop)
366 close(stop)
367 }
368
369
370
371
372 func TestSharedInformerWatchDisruption(t *testing.T) {
373
374 source := fcache.NewFakeControllerSource()
375
376 source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", UID: "pod1", ResourceVersion: "1"}})
377 source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", UID: "pod2", ResourceVersion: "2"}})
378
379
380 informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
381
382 clock := testingclock.NewFakeClock(time.Now())
383 informer.clock = clock
384 informer.processor.clock = clock
385
386
387 listenerNoResync := newTestListener("listenerNoResync", 0, "pod1", "pod2")
388 informer.AddEventHandlerWithResyncPeriod(listenerNoResync, listenerNoResync.resyncPeriod)
389
390 listenerResync := newTestListener("listenerResync", 1*time.Second, "pod1", "pod2")
391 informer.AddEventHandlerWithResyncPeriod(listenerResync, listenerResync.resyncPeriod)
392 listeners := []*testListener{listenerNoResync, listenerResync}
393
394 stop := make(chan struct{})
395 defer close(stop)
396
397 go informer.Run(stop)
398
399 for _, listener := range listeners {
400 if !listener.ok() {
401 t.Errorf("%s: expected %v, got %v", listener.name, listener.expectedItemNames, listener.receivedItemNames)
402 }
403 }
404
405
406 source.AddDropWatch(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod3", UID: "pod3", ResourceVersion: "3"}})
407 source.ModifyDropWatch(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", UID: "pod2", ResourceVersion: "4"}})
408
409
410 for _, listener := range listeners {
411 if !listener.ok() {
412 t.Errorf("%s: expected %v, got %v", listener.name, listener.expectedItemNames, listener.receivedItemNames)
413 }
414 }
415
416 for _, listener := range listeners {
417 listener.receivedItemNames = []string{}
418 }
419
420 listenerNoResync.expectedItemNames = sets.NewString("pod2", "pod3")
421 listenerResync.expectedItemNames = sets.NewString("pod1", "pod2", "pod3")
422
423
424 clock.Step(1 * time.Second)
425
426
427 source.ResetWatch()
428
429
430
431
432 time.Sleep(10 * time.Millisecond)
433
434
435 clock.Step(1601 * time.Millisecond)
436
437
438
439 time.Sleep(10 * time.Millisecond)
440
441 for _, listener := range listeners {
442 if !listener.ok() {
443 t.Errorf("%s: expected %v, got %v", listener.name, listener.expectedItemNames, listener.receivedItemNames)
444 }
445 }
446 }
447
448 func TestSharedInformerErrorHandling(t *testing.T) {
449 source := fcache.NewFakeControllerSource()
450 source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
451 source.ListError = fmt.Errorf("Access Denied")
452
453 informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
454
455 errCh := make(chan error)
456 _ = informer.SetWatchErrorHandler(func(_ *Reflector, err error) {
457 errCh <- err
458 })
459
460 stop := make(chan struct{})
461 go informer.Run(stop)
462
463 select {
464 case err := <-errCh:
465 if !strings.Contains(err.Error(), "Access Denied") {
466 t.Errorf("Expected 'Access Denied' error. Actual: %v", err)
467 }
468 case <-time.After(time.Second):
469 t.Errorf("Timeout waiting for error handler call")
470 }
471 close(stop)
472 }
473
474
475
476 func TestSharedInformerStartRace(t *testing.T) {
477 source := fcache.NewFakeControllerSource()
478 informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
479 stop := make(chan struct{})
480 go func() {
481 for {
482 select {
483 case <-stop:
484 return
485 default:
486 }
487
488 informer.SetTransform(func(i interface{}) (interface{}, error) {
489 return i, nil
490 })
491 informer.SetWatchErrorHandler(func(r *Reflector, err error) {
492 })
493 }
494 }()
495
496 go informer.Run(stop)
497
498 close(stop)
499 }
500
501 func TestSharedInformerTransformer(t *testing.T) {
502
503 source := fcache.NewFakeControllerSource()
504
505 source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", UID: "pod1", ResourceVersion: "1"}})
506 source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", UID: "pod2", ResourceVersion: "2"}})
507
508 informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
509 informer.SetTransform(func(obj interface{}) (interface{}, error) {
510 if pod, ok := obj.(*v1.Pod); ok {
511 name := pod.GetName()
512
513 if upper := strings.ToUpper(name); upper != name {
514 pod.SetName(upper)
515 return pod, nil
516 }
517 }
518 return obj, nil
519 })
520
521 listenerTransformer := newTestListener("listenerTransformer", 0, "POD1", "POD2")
522 informer.AddEventHandler(listenerTransformer)
523
524 stop := make(chan struct{})
525 go informer.Run(stop)
526 defer close(stop)
527
528 if !listenerTransformer.ok() {
529 t.Errorf("%s: expected %v, got %v", listenerTransformer.name, listenerTransformer.expectedItemNames, listenerTransformer.receivedItemNames)
530 }
531 }
532
533 func TestSharedInformerRemoveHandler(t *testing.T) {
534 source := fcache.NewFakeControllerSource()
535 source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
536
537 informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second)
538
539 handler1 := &ResourceEventHandlerFuncs{}
540 handle1, err := informer.AddEventHandler(handler1)
541 if err != nil {
542 t.Errorf("informer did not add handler1: %s", err)
543 return
544 }
545 handler2 := &ResourceEventHandlerFuncs{}
546 handle2, err := informer.AddEventHandler(handler2)
547 if err != nil {
548 t.Errorf("informer did not add handler2: %s", err)
549 return
550 }
551
552 if eventHandlerCount(informer) != 2 {
553 t.Errorf("informer has %d registered handler, instead of 2", eventHandlerCount(informer))
554 }
555
556 if err := informer.RemoveEventHandler(handle2); err != nil {
557 t.Errorf("removing of second pointer handler failed: %s", err)
558 }
559 if eventHandlerCount(informer) != 1 {
560 t.Errorf("after removing handler informer has %d registered handler(s), instead of 1", eventHandlerCount(informer))
561 }
562
563 if err := informer.RemoveEventHandler(handle1); err != nil {
564 t.Errorf("removing of first pointer handler failed: %s", err)
565 }
566 if eventHandlerCount(informer) != 0 {
567 t.Errorf("informer still has registered handlers after removing both handlers")
568 }
569 }
570
571 func TestSharedInformerRemoveForeignHandler(t *testing.T) {
572 source := fcache.NewFakeControllerSource()
573 source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
574
575 informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
576
577 source2 := fcache.NewFakeControllerSource()
578 source2.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
579
580 informer2 := NewSharedInformer(source2, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
581
582 handler1 := &ResourceEventHandlerFuncs{}
583 handle1, err := informer.AddEventHandler(handler1)
584 if err != nil {
585 t.Errorf("informer did not add handler1: %s", err)
586 return
587 }
588 handler2 := &ResourceEventHandlerFuncs{}
589 handle2, err := informer.AddEventHandler(handler2)
590 if err != nil {
591 t.Errorf("informer did not add handler2: %s", err)
592 return
593 }
594
595 if eventHandlerCount(informer) != 2 {
596 t.Errorf("informer has %d registered handler, instead of 2", eventHandlerCount(informer))
597 }
598 if eventHandlerCount(informer2) != 0 {
599 t.Errorf("informer2 has %d registered handler, instead of 0", eventHandlerCount(informer2))
600 }
601
602
603 if isRegistered(informer2, handle1) {
604 t.Errorf("handle1 registered for informer2")
605 }
606 if isRegistered(informer2, handle2) {
607 t.Errorf("handle2 registered for informer2")
608 }
609 if err := informer2.RemoveEventHandler(handle1); err != nil {
610 t.Errorf("removing of second pointer handler failed: %s", err)
611 }
612 if eventHandlerCount(informer) != 2 {
613 t.Errorf("informer has %d registered handler, instead of 2", eventHandlerCount(informer))
614 }
615 if eventHandlerCount(informer2) != 0 {
616 t.Errorf("informer2 has %d registered handler, instead of 0", eventHandlerCount(informer2))
617 }
618 if !isRegistered(informer, handle1) {
619 t.Errorf("handle1 not registered anymore for informer")
620 }
621 if !isRegistered(informer, handle2) {
622 t.Errorf("handle2 not registered anymore for informer")
623 }
624
625 if eventHandlerCount(informer) != 2 {
626 t.Errorf("informer has %d registered handler, instead of 2", eventHandlerCount(informer))
627 }
628 if eventHandlerCount(informer2) != 0 {
629 t.Errorf("informer2 has %d registered handler, instead of 0", eventHandlerCount(informer2))
630 }
631 if !isRegistered(informer, handle1) {
632 t.Errorf("handle1 not registered anymore for informer")
633 }
634 if !isRegistered(informer, handle2) {
635 t.Errorf("handle2 not registered anymore for informer")
636 }
637
638 if err := informer.RemoveEventHandler(handle2); err != nil {
639 t.Errorf("removing of second pointer handler failed: %s", err)
640 }
641 if eventHandlerCount(informer) != 1 {
642 t.Errorf("after removing handler informer has %d registered handler(s), instead of 1", eventHandlerCount(informer))
643 }
644
645 if err := informer.RemoveEventHandler(handle1); err != nil {
646 t.Errorf("removing of first pointer handler failed: %s", err)
647 }
648 if eventHandlerCount(informer) != 0 {
649 t.Errorf("informer still has registered handlers after removing both handlers")
650 }
651 }
652
653 func TestSharedInformerMultipleRegistration(t *testing.T) {
654 source := fcache.NewFakeControllerSource()
655 source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
656
657 informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
658
659 handler1 := &ResourceEventHandlerFuncs{}
660 reg1, err := informer.AddEventHandler(handler1)
661 if err != nil {
662 t.Errorf("informer did not add handler for the first time: %s", err)
663 return
664 }
665
666 if !isRegistered(informer, reg1) {
667 t.Errorf("handle1 is not active after successful registration")
668 return
669 }
670
671 reg2, err := informer.AddEventHandler(handler1)
672 if err != nil {
673 t.Errorf("informer did not add handler for the second: %s", err)
674 return
675 }
676
677 if !isRegistered(informer, reg2) {
678 t.Errorf("handle2 is not active after successful registration")
679 return
680 }
681
682 if eventHandlerCount(informer) != 2 {
683 t.Errorf("informer has %d registered handler(s), instead of 2", eventHandlerCount(informer))
684 }
685
686 if err := informer.RemoveEventHandler(reg1); err != nil {
687 t.Errorf("removing of duplicate handler registration failed: %s", err)
688 }
689
690 if isRegistered(informer, reg1) {
691 t.Errorf("handle1 is still active after successful remove")
692 return
693 }
694 if !isRegistered(informer, reg2) {
695 t.Errorf("handle2 is not active after removing handle1")
696 return
697 }
698
699 if eventHandlerCount(informer) != 1 {
700 if eventHandlerCount(informer) == 0 {
701 t.Errorf("informer has no registered handler anymore after removal of duplicate registrations")
702 } else {
703 t.Errorf("informer has unexpected number (%d) of handlers after removal of duplicate handler registration", eventHandlerCount(informer))
704 }
705 }
706
707 if err := informer.RemoveEventHandler(reg2); err != nil {
708 t.Errorf("removing of second handler registration failed: %s", err)
709 }
710
711 if isRegistered(informer, reg2) {
712 t.Errorf("handle2 is still active after successful remove")
713 return
714 }
715
716 if eventHandlerCount(informer) != 0 {
717 t.Errorf("informer has unexpected number (%d) of handlers after removal of second handler registrations", eventHandlerCount(informer))
718 }
719 }
720
721 func TestRemovingRemovedSharedInformer(t *testing.T) {
722 source := fcache.NewFakeControllerSource()
723 source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
724
725 informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
726 handler := &ResourceEventHandlerFuncs{}
727 reg, err := informer.AddEventHandler(handler)
728
729 if err != nil {
730 t.Errorf("informer did not add handler for the first time: %s", err)
731 return
732 }
733 if err := informer.RemoveEventHandler(reg); err != nil {
734 t.Errorf("removing of handler registration failed: %s", err)
735 return
736 }
737 if isRegistered(informer, reg) {
738 t.Errorf("handle is still active after successful remove")
739 return
740 }
741 if err := informer.RemoveEventHandler(reg); err != nil {
742 t.Errorf("removing of already removed registration yields unexpected error: %s", err)
743 }
744 if isRegistered(informer, reg) {
745 t.Errorf("handle is still active after second remove")
746 return
747 }
748 }
749
750
751
752
753 func TestSharedInformerHandlerAbuse(t *testing.T) {
754 source := fcache.NewFakeControllerSource()
755 informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
756
757 ctx, cancel := context.WithCancel(context.Background())
758 informerCtx, informerCancel := context.WithCancel(context.Background())
759 go func() {
760 informer.Run(informerCtx.Done())
761 cancel()
762 }()
763
764 worker := func() {
765
766
767 funcs := ResourceEventHandlerDetailedFuncs{
768 AddFunc: func(obj interface{}, isInInitialList bool) {},
769 UpdateFunc: func(oldObj, newObj interface{}) {},
770 DeleteFunc: func(obj interface{}) {},
771 }
772 handles := []ResourceEventHandlerRegistration{}
773
774 for {
775 select {
776 case <-ctx.Done():
777 return
778 default:
779 switch rand.Intn(2) {
780 case 0:
781
782 reg, err := informer.AddEventHandlerWithResyncPeriod(funcs, 1*time.Second)
783 if err != nil {
784 if strings.Contains(err.Error(), "stopped already") {
785
786 return
787 }
788 t.Errorf("failed to add handler: %v", err)
789 return
790 }
791 handles = append(handles, reg)
792 case 1:
793
794 if len(handles) == 0 {
795 continue
796 }
797
798 idx := rand.Intn(len(handles))
799 err := informer.RemoveEventHandler(handles[idx])
800 if err != nil {
801 if strings.Contains(err.Error(), "stopped already") {
802
803 return
804 }
805 t.Errorf("failed to remove handler: %v", err)
806 return
807 }
808 handles = append(handles[:idx], handles[idx+1:]...)
809 }
810 }
811 }
812 }
813
814 wg := sync.WaitGroup{}
815 for i := 0; i < 100; i++ {
816 wg.Add(1)
817 go func() {
818 worker()
819 wg.Done()
820 }()
821 }
822
823 objs := []*v1.Pod{}
824
825
826 for i := 0; i < 10000; i++ {
827 if len(objs) == 0 {
828
829 obj := &v1.Pod{ObjectMeta: metav1.ObjectMeta{
830 Name: "pod" + strconv.Itoa(i),
831 }}
832 objs = append(objs, obj)
833
834
835 source.Add(obj.DeepCopy())
836 }
837
838 switch rand.Intn(3) {
839 case 0:
840
841 obj := &v1.Pod{ObjectMeta: metav1.ObjectMeta{
842 Name: "pod" + strconv.Itoa(i),
843 }}
844 objs = append(objs, obj)
845 source.Add(obj.DeepCopy())
846 case 1:
847
848 idx := rand.Intn(len(objs))
849 source.Modify(objs[idx].DeepCopy())
850
851 case 2:
852
853 idx := rand.Intn(len(objs))
854 source.Delete(objs[idx].DeepCopy())
855 objs = append(objs[:idx], objs[idx+1:]...)
856 }
857 }
858
859
860
861 informerCancel()
862
863
864 wg.Wait()
865 }
866
867 func TestStateSharedInformer(t *testing.T) {
868 source := fcache.NewFakeControllerSource()
869 source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
870
871 informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
872 listener := newTestListener("listener", 0, "pod1")
873 informer.AddEventHandlerWithResyncPeriod(listener, listener.resyncPeriod)
874
875 if isStarted(informer) {
876 t.Errorf("informer already started after creation")
877 return
878 }
879 if informer.IsStopped() {
880 t.Errorf("informer already stopped after creation")
881 return
882 }
883 stop := make(chan struct{})
884 go informer.Run(stop)
885 if !listener.ok() {
886 t.Errorf("informer did not report initial objects")
887 close(stop)
888 return
889 }
890
891 if !isStarted(informer) {
892 t.Errorf("informer does not report to be started although handling events")
893 close(stop)
894 return
895 }
896 if informer.IsStopped() {
897 t.Errorf("informer reports to be stopped although stop channel not closed")
898 close(stop)
899 return
900 }
901
902 close(stop)
903 fmt.Println("sleeping")
904 time.Sleep(1 * time.Second)
905
906 if !informer.IsStopped() {
907 t.Errorf("informer reports not to be stopped although stop channel closed")
908 return
909 }
910 if !isStarted(informer) {
911 t.Errorf("informer reports not to be started after it has been started and stopped")
912 return
913 }
914 }
915
916 func TestAddOnStoppedSharedInformer(t *testing.T) {
917 source := fcache.NewFakeControllerSource()
918 source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
919
920 informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
921 listener := newTestListener("listener", 0, "pod1")
922 stop := make(chan struct{})
923 go informer.Run(stop)
924 close(stop)
925
926 err := wait.PollImmediate(100*time.Millisecond, 2*time.Second, func() (bool, error) {
927 if informer.IsStopped() {
928 return true, nil
929 }
930 return false, nil
931 })
932
933 if err != nil {
934 t.Errorf("informer reports not to be stopped although stop channel closed")
935 return
936 }
937
938 _, err = informer.AddEventHandlerWithResyncPeriod(listener, listener.resyncPeriod)
939 if err == nil {
940 t.Errorf("stopped informer did not reject add handler")
941 return
942 }
943 if !strings.HasSuffix(err.Error(), "was not added to shared informer because it has stopped already") {
944 t.Errorf("adding handler to a stopped informer yields unexpected error: %s", err)
945 return
946 }
947 }
948
949 func TestRemoveOnStoppedSharedInformer(t *testing.T) {
950 source := fcache.NewFakeControllerSource()
951 source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
952
953 informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
954 listener := newTestListener("listener", 0, "pod1")
955 handle, err := informer.AddEventHandlerWithResyncPeriod(listener, listener.resyncPeriod)
956 if err != nil {
957 t.Errorf("informer did not add handler: %s", err)
958 return
959 }
960 stop := make(chan struct{})
961 go informer.Run(stop)
962 close(stop)
963 fmt.Println("sleeping")
964 time.Sleep(1 * time.Second)
965
966 if !informer.IsStopped() {
967 t.Errorf("informer reports not to be stopped although stop channel closed")
968 return
969 }
970 err = informer.RemoveEventHandler(handle)
971 if err != nil {
972 t.Errorf("informer does not remove handler on stopped informer")
973 return
974 }
975 }
976
977 func TestRemoveWhileActive(t *testing.T) {
978
979 source := fcache.NewFakeControllerSource()
980
981
982 informer := NewSharedInformer(source, &v1.Pod{}, 0).(*sharedIndexInformer)
983
984 listener := newTestListener("listener", 0, "pod1")
985 handle, _ := informer.AddEventHandler(listener)
986
987 stop := make(chan struct{})
988 defer close(stop)
989
990 go informer.Run(stop)
991 source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
992
993 if !listener.ok() {
994 t.Errorf("event did not occur")
995 return
996 }
997
998 informer.RemoveEventHandler(handle)
999
1000 if isRegistered(informer, handle) {
1001 t.Errorf("handle is still active after successful remove")
1002 return
1003 }
1004
1005 source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2"}})
1006
1007 if !listener.ok() {
1008 t.Errorf("unexpected event occurred")
1009 return
1010 }
1011 }
1012
1013 func TestAddWhileActive(t *testing.T) {
1014
1015 source := fcache.NewFakeControllerSource()
1016
1017
1018 informer := NewSharedInformer(source, &v1.Pod{}, 0).(*sharedIndexInformer)
1019 listener1 := newTestListener("originalListener", 0, "pod1")
1020 listener2 := newTestListener("listener2", 0, "pod1", "pod2")
1021 handle1, _ := informer.AddEventHandler(listener1)
1022
1023 if handle1.HasSynced() {
1024 t.Error("Synced before Run??")
1025 }
1026
1027 stop := make(chan struct{})
1028 defer close(stop)
1029
1030 go informer.Run(stop)
1031 source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
1032
1033 if !listener1.ok() {
1034 t.Errorf("events on listener1 did not occur")
1035 return
1036 }
1037
1038 if !handle1.HasSynced() {
1039 t.Error("Not synced after Run??")
1040 }
1041
1042 listener2.lock.Lock()
1043 handle2, _ := informer.AddEventHandler(listener2)
1044 if handle2.HasSynced() {
1045 t.Error("Synced before processing anything?")
1046 }
1047 listener2.lock.Unlock()
1048
1049 source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2"}})
1050
1051 if !listener2.ok() {
1052 t.Errorf("event on listener2 did not occur")
1053 return
1054 }
1055
1056 if !handle2.HasSynced() {
1057 t.Error("Not synced even after processing?")
1058 }
1059
1060 if !isRegistered(informer, handle1) {
1061 t.Errorf("handle1 is not active")
1062 return
1063 }
1064 if !isRegistered(informer, handle2) {
1065 t.Errorf("handle2 is not active")
1066 return
1067 }
1068
1069 listener1.expectedItemNames = listener2.expectedItemNames
1070 if !listener1.ok() {
1071 t.Errorf("events on listener1 did not occur")
1072 return
1073 }
1074 }
1075
View as plain text