1
16
17 package cache
18
19 import (
20 "fmt"
21 "math/rand"
22 "sync"
23 "testing"
24 "time"
25
26 v1 "k8s.io/api/core/v1"
27 apiequality "k8s.io/apimachinery/pkg/api/equality"
28 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29 "k8s.io/apimachinery/pkg/runtime"
30 "k8s.io/apimachinery/pkg/util/sets"
31 "k8s.io/apimachinery/pkg/util/wait"
32 "k8s.io/apimachinery/pkg/watch"
33 fcache "k8s.io/client-go/tools/cache/testing"
34
35 fuzz "github.com/google/gofuzz"
36 )
37
38 func Example() {
39
40 source := fcache.NewFakeControllerSource()
41
42
43 downstream := NewStore(DeletionHandlingMetaNamespaceKeyFunc)
44
45
46
47
48 fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
49 KeyFunction: MetaNamespaceKeyFunc,
50 KnownObjects: downstream,
51 })
52
53
54 deletionCounter := make(chan string, 1000)
55
56 cfg := &Config{
57 Queue: fifo,
58 ListerWatcher: source,
59 ObjectType: &v1.Pod{},
60 FullResyncPeriod: time.Millisecond * 100,
61 RetryOnError: false,
62
63
64
65 Process: func(obj interface{}, isInInitialList bool) error {
66
67 newest := obj.(Deltas).Newest()
68
69 if newest.Type != Deleted {
70
71 err := downstream.Add(newest.Object)
72 if err != nil {
73 return err
74 }
75
76
77 source.Delete(newest.Object.(runtime.Object))
78 } else {
79
80 err := downstream.Delete(newest.Object)
81 if err != nil {
82 return err
83 }
84
85
86
87 key, err := fifo.KeyOf(newest.Object)
88 if err != nil {
89 return err
90 }
91
92
93 deletionCounter <- key
94 }
95 return nil
96 },
97 }
98
99
100 stop := make(chan struct{})
101 defer close(stop)
102 go New(cfg).Run(stop)
103
104
105 testIDs := []string{"a-hello", "b-controller", "c-framework"}
106 for _, name := range testIDs {
107
108
109 source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: name}})
110 }
111
112
113 outputSet := sets.String{}
114 for i := 0; i < len(testIDs); i++ {
115 outputSet.Insert(<-deletionCounter)
116 }
117
118 for _, key := range outputSet.List() {
119 fmt.Println(key)
120 }
121
122
123
124
125 }
126
127 func ExampleNewInformer() {
128
129 source := fcache.NewFakeControllerSource()
130
131
132 deletionCounter := make(chan string, 1000)
133
134
135
136 _, controller := NewInformer(
137 source,
138 &v1.Pod{},
139 time.Millisecond*100,
140 ResourceEventHandlerDetailedFuncs{
141 AddFunc: func(obj interface{}, isInInitialList bool) {
142 source.Delete(obj.(runtime.Object))
143 },
144 DeleteFunc: func(obj interface{}) {
145 key, err := DeletionHandlingMetaNamespaceKeyFunc(obj)
146 if err != nil {
147 key = "oops something went wrong with the key"
148 }
149
150
151 deletionCounter <- key
152 },
153 },
154 )
155
156
157 stop := make(chan struct{})
158 defer close(stop)
159 go controller.Run(stop)
160
161
162 testIDs := []string{"a-hello", "b-controller", "c-framework"}
163 for _, name := range testIDs {
164
165
166 source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: name}})
167 }
168
169
170 outputSet := sets.String{}
171 for i := 0; i < len(testIDs); i++ {
172 outputSet.Insert(<-deletionCounter)
173 }
174
175 for _, key := range outputSet.List() {
176 fmt.Println(key)
177 }
178
179
180
181
182 }
183
184 func TestHammerController(t *testing.T) {
185
186
187
188
189
190
191 source := fcache.NewFakeControllerSource()
192
193
194 outputSetLock := sync.Mutex{}
195
196 outputSet := map[string][]string{}
197
198 recordFunc := func(eventType string, obj interface{}) {
199 key, err := DeletionHandlingMetaNamespaceKeyFunc(obj)
200 if err != nil {
201 t.Errorf("something wrong with key: %v", err)
202 key = "oops something went wrong with the key"
203 }
204
205
206 outputSetLock.Lock()
207 defer outputSetLock.Unlock()
208 outputSet[key] = append(outputSet[key], eventType)
209 }
210
211
212 _, controller := NewInformer(
213 source,
214 &v1.Pod{},
215 time.Millisecond*100,
216 ResourceEventHandlerDetailedFuncs{
217 AddFunc: func(obj interface{}, isInInitialList bool) { recordFunc("add", obj) },
218 UpdateFunc: func(oldObj, newObj interface{}) { recordFunc("update", newObj) },
219 DeleteFunc: func(obj interface{}) { recordFunc("delete", obj) },
220 },
221 )
222
223 if controller.HasSynced() {
224 t.Errorf("Expected HasSynced() to return false before we started the controller")
225 }
226
227
228 stop := make(chan struct{})
229 go controller.Run(stop)
230
231
232 wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
233 return controller.HasSynced(), nil
234 })
235 if !controller.HasSynced() {
236 t.Errorf("Expected HasSynced() to return true after the initial sync")
237 }
238
239 wg := sync.WaitGroup{}
240 const threads = 3
241 wg.Add(threads)
242 for i := 0; i < threads; i++ {
243 go func() {
244 defer wg.Done()
245
246 currentNames := sets.String{}
247 rs := rand.NewSource(rand.Int63())
248 f := fuzz.New().NilChance(.5).NumElements(0, 2).RandSource(rs)
249 for i := 0; i < 100; i++ {
250 var name string
251 var isNew bool
252 if currentNames.Len() == 0 || rand.Intn(3) == 1 {
253 f.Fuzz(&name)
254 isNew = true
255 } else {
256 l := currentNames.List()
257 name = l[rand.Intn(len(l))]
258 }
259
260 pod := &v1.Pod{}
261 f.Fuzz(pod)
262 pod.ObjectMeta.Name = name
263 pod.ObjectMeta.Namespace = "default"
264
265
266
267 if isNew {
268 currentNames.Insert(name)
269 source.Add(pod)
270 continue
271 }
272 switch rand.Intn(2) {
273 case 0:
274 currentNames.Insert(name)
275 source.Modify(pod)
276 case 1:
277 currentNames.Delete(name)
278 source.Delete(pod)
279 }
280 }
281 }()
282 }
283 wg.Wait()
284
285
286
287 time.Sleep(100 * time.Millisecond)
288 close(stop)
289
290
291
292
293 outputSetLock.Lock()
294 t.Logf("got: %#v", outputSet)
295 }
296
297 func TestUpdate(t *testing.T) {
298
299
300
301
302 source := fcache.NewFakeControllerSource()
303
304 const (
305 FROM = "from"
306 TO = "to"
307 )
308
309
310
311 type pair struct{ from, to string }
312 allowedTransitions := map[pair]bool{
313 {FROM, TO}: true,
314
315
316
317 {TO, TO}: true,
318
319 {FROM, FROM}: true,
320 }
321
322 pod := func(name, check string, final bool) *v1.Pod {
323 p := &v1.Pod{
324 ObjectMeta: metav1.ObjectMeta{
325 Name: name,
326 Labels: map[string]string{"check": check},
327 },
328 }
329 if final {
330 p.Labels["final"] = "true"
331 }
332 return p
333 }
334 deletePod := func(p *v1.Pod) bool {
335 return p.Labels["final"] == "true"
336 }
337
338 tests := []func(string){
339 func(name string) {
340 name = "a-" + name
341 source.Add(pod(name, FROM, false))
342 source.Modify(pod(name, TO, true))
343 },
344 }
345
346 const threads = 3
347
348 var testDoneWG sync.WaitGroup
349 testDoneWG.Add(threads * len(tests))
350
351
352
353
354 watchCh := make(chan struct{})
355 _, controller := NewInformer(
356 &testLW{
357 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
358 watch, err := source.Watch(options)
359 close(watchCh)
360 return watch, err
361 },
362 ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
363 return source.List(options)
364 },
365 },
366 &v1.Pod{},
367 0,
368 ResourceEventHandlerFuncs{
369 UpdateFunc: func(oldObj, newObj interface{}) {
370 o, n := oldObj.(*v1.Pod), newObj.(*v1.Pod)
371 from, to := o.Labels["check"], n.Labels["check"]
372 if !allowedTransitions[pair{from, to}] {
373 t.Errorf("observed transition %q -> %q for %v", from, to, n.Name)
374 }
375 if deletePod(n) {
376 source.Delete(n)
377 }
378 },
379 DeleteFunc: func(obj interface{}) {
380 testDoneWG.Done()
381 },
382 },
383 )
384
385
386
387
388 stop := make(chan struct{})
389 go controller.Run(stop)
390 <-watchCh
391
392
393 var wg sync.WaitGroup
394 wg.Add(threads * len(tests))
395 for i := 0; i < threads; i++ {
396 for j, f := range tests {
397 go func(name string, f func(string)) {
398 defer wg.Done()
399 f(name)
400 }(fmt.Sprintf("%v-%v", i, j), f)
401 }
402 }
403 wg.Wait()
404
405
406 testDoneWG.Wait()
407 close(stop)
408 }
409
410 func TestPanicPropagated(t *testing.T) {
411
412 source := fcache.NewFakeControllerSource()
413
414
415 _, controller := NewInformer(
416 source,
417 &v1.Pod{},
418 time.Millisecond*100,
419 ResourceEventHandlerDetailedFuncs{
420 AddFunc: func(obj interface{}, isInInitialList bool) {
421
422 panic("Just panic.")
423 },
424 },
425 )
426
427
428 stop := make(chan struct{})
429 defer close(stop)
430
431 propagated := make(chan interface{})
432 go func() {
433 defer func() {
434 if r := recover(); r != nil {
435 propagated <- r
436 }
437 }()
438 controller.Run(stop)
439 }()
440
441 source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test"}})
442
443
444 select {
445 case p := <-propagated:
446 if p == "Just panic." {
447 t.Logf("Test Passed")
448 } else {
449 t.Errorf("unrecognized panic in controller run: %v", p)
450 }
451 case <-time.After(wait.ForeverTestTimeout):
452 t.Errorf("timeout: the panic failed to propagate from the controller run method!")
453 }
454 }
455
456 func TestTransformingInformer(t *testing.T) {
457
458 source := fcache.NewFakeControllerSource()
459
460 makePod := func(name, generation string) *v1.Pod {
461 return &v1.Pod{
462 ObjectMeta: metav1.ObjectMeta{
463 Name: name,
464 Namespace: "namespace",
465 Labels: map[string]string{"generation": generation},
466 },
467 Spec: v1.PodSpec{
468 Hostname: "hostname",
469 Subdomain: "subdomain",
470 },
471 }
472 }
473 expectedPod := func(name, generation string) *v1.Pod {
474 pod := makePod(name, generation)
475 pod.Spec.Hostname = "new-hostname"
476 pod.Spec.Subdomain = ""
477 pod.Spec.NodeName = "nodename"
478 return pod
479 }
480
481 source.Add(makePod("pod1", "1"))
482 source.Modify(makePod("pod1", "2"))
483
484 type event struct {
485 eventType watch.EventType
486 previous interface{}
487 current interface{}
488 }
489 events := make(chan event, 10)
490 recordEvent := func(eventType watch.EventType, previous, current interface{}) {
491 events <- event{eventType: eventType, previous: previous, current: current}
492 }
493 verifyEvent := func(eventType watch.EventType, previous, current interface{}) {
494 select {
495 case event := <-events:
496 if event.eventType != eventType {
497 t.Errorf("expected type %v, got %v", eventType, event.eventType)
498 }
499 if !apiequality.Semantic.DeepEqual(event.previous, previous) {
500 t.Errorf("expected previous object %#v, got %#v", previous, event.previous)
501 }
502 if !apiequality.Semantic.DeepEqual(event.current, current) {
503 t.Errorf("expected object %#v, got %#v", current, event.current)
504 }
505 case <-time.After(wait.ForeverTestTimeout):
506 t.Errorf("failed to get event")
507 }
508 }
509
510 podTransformer := func(obj interface{}) (interface{}, error) {
511 pod, ok := obj.(*v1.Pod)
512 if !ok {
513 return nil, fmt.Errorf("unexpected object type: %T", obj)
514 }
515 pod.Spec.Hostname = "new-hostname"
516 pod.Spec.Subdomain = ""
517 pod.Spec.NodeName = "nodename"
518
519
520 pod.ResourceVersion = ""
521
522 return pod, nil
523 }
524
525 store, controller := NewTransformingInformer(
526 source,
527 &v1.Pod{},
528 0,
529 ResourceEventHandlerDetailedFuncs{
530 AddFunc: func(obj interface{}, isInInitialList bool) { recordEvent(watch.Added, nil, obj) },
531 UpdateFunc: func(oldObj, newObj interface{}) { recordEvent(watch.Modified, oldObj, newObj) },
532 DeleteFunc: func(obj interface{}) { recordEvent(watch.Deleted, obj, nil) },
533 },
534 podTransformer,
535 )
536
537 verifyStore := func(expectedItems []interface{}) {
538 items := store.List()
539 if len(items) != len(expectedItems) {
540 t.Errorf("unexpected items %v, expected %v", items, expectedItems)
541 }
542 for _, expectedItem := range expectedItems {
543 found := false
544 for _, item := range items {
545 if apiequality.Semantic.DeepEqual(item, expectedItem) {
546 found = true
547 }
548 }
549 if !found {
550 t.Errorf("expected item %v not found in %v", expectedItem, items)
551 }
552 }
553 }
554
555 stopCh := make(chan struct{})
556 go controller.Run(stopCh)
557
558 verifyEvent(watch.Added, nil, expectedPod("pod1", "2"))
559 verifyStore([]interface{}{expectedPod("pod1", "2")})
560
561 source.Add(makePod("pod2", "1"))
562 verifyEvent(watch.Added, nil, expectedPod("pod2", "1"))
563 verifyStore([]interface{}{expectedPod("pod1", "2"), expectedPod("pod2", "1")})
564
565 source.Add(makePod("pod3", "1"))
566 verifyEvent(watch.Added, nil, expectedPod("pod3", "1"))
567
568 source.Modify(makePod("pod2", "2"))
569 verifyEvent(watch.Modified, expectedPod("pod2", "1"), expectedPod("pod2", "2"))
570
571 source.Delete(makePod("pod1", "2"))
572 verifyEvent(watch.Deleted, expectedPod("pod1", "2"), nil)
573 verifyStore([]interface{}{expectedPod("pod2", "2"), expectedPod("pod3", "1")})
574
575 close(stopCh)
576 }
577
578 func TestDeletionHandlingObjectToName(t *testing.T) {
579 cm := &v1.ConfigMap{
580 ObjectMeta: metav1.ObjectMeta{
581 Name: "testname",
582 Namespace: "testnamespace",
583 },
584 }
585 stringKey, err := MetaNamespaceKeyFunc(cm)
586 if err != nil {
587 t.Error(err)
588 }
589 deleted := DeletedFinalStateUnknown{
590 Key: stringKey,
591 Obj: cm,
592 }
593 expected, err := ObjectToName(cm)
594 if err != nil {
595 t.Error(err)
596 }
597 actual, err := DeletionHandlingObjectToName(deleted)
598 if err != nil {
599 t.Error(err)
600 }
601 if expected != actual {
602 t.Errorf("Expected %#v, got %#v", expected, actual)
603 }
604 }
605
View as plain text