1
16
17 package framework
18
19 import (
20 "sync"
21 "testing"
22
23 v1 "k8s.io/api/core/v1"
24 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
25 "k8s.io/apimachinery/pkg/watch"
26 )
27
28
29 func consume(t *testing.T, w watch.Interface, rvs []string, done *sync.WaitGroup) {
30 defer done.Done()
31 for _, rv := range rvs {
32 got, ok := <-w.ResultChan()
33 if !ok {
34 t.Errorf("%#v: unexpected channel close, wanted %v", rvs, rv)
35 return
36 }
37 gotRV := got.Object.(*v1.Pod).ObjectMeta.ResourceVersion
38 if e, a := rv, gotRV; e != a {
39 t.Errorf("wanted %v, got %v", e, a)
40 } else {
41 t.Logf("Got %v as expected", gotRV)
42 }
43 }
44
45 got, open := <-w.ResultChan()
46 if open {
47 t.Errorf("%#v: unwanted object %#v", rvs, got)
48 }
49 }
50
51 func TestRCNumber(t *testing.T) {
52 pod := func(name string) *v1.Pod {
53 return &v1.Pod{
54 ObjectMeta: metav1.ObjectMeta{
55 Name: name,
56 },
57 }
58 }
59
60 wg := &sync.WaitGroup{}
61 wg.Add(3)
62
63 source := NewFakeControllerSource()
64 source.Add(pod("foo"))
65 source.Modify(pod("foo"))
66 source.Modify(pod("foo"))
67
68 w, err := source.Watch(metav1.ListOptions{ResourceVersion: "1"})
69 if err != nil {
70 t.Fatalf("Unexpected error: %v", err)
71 }
72 go consume(t, w, []string{"2", "3"}, wg)
73
74 list, err := source.List(metav1.ListOptions{})
75 if err != nil {
76 t.Fatalf("Unexpected error: %v", err)
77 }
78 if e, a := "3", list.(*v1.List).ResourceVersion; e != a {
79 t.Errorf("wanted %v, got %v", e, a)
80 }
81
82 w2, err := source.Watch(metav1.ListOptions{ResourceVersion: "2"})
83 if err != nil {
84 t.Fatalf("Unexpected error: %v", err)
85 }
86 go consume(t, w2, []string{"3"}, wg)
87
88 w3, err := source.Watch(metav1.ListOptions{ResourceVersion: "3"})
89 if err != nil {
90 t.Fatalf("Unexpected error: %v", err)
91 }
92 go consume(t, w3, []string{}, wg)
93 source.Shutdown()
94 wg.Wait()
95 }
96
97
98
99 func TestResetWatch(t *testing.T) {
100 pod := func(name string) *v1.Pod {
101 return &v1.Pod{
102 ObjectMeta: metav1.ObjectMeta{
103 Name: name,
104 },
105 }
106 }
107
108 wg := &sync.WaitGroup{}
109 wg.Add(1)
110
111 source := NewFakeControllerSource()
112 source.Add(pod("foo"))
113 source.Modify(pod("foo"))
114 source.Modify(pod("foo"))
115
116
117 source.ResetWatch()
118
119
120 _, err := source.Watch(metav1.ListOptions{ResourceVersion: "1"})
121 if err == nil {
122 t.Fatalf("Unexpected non-error")
123 }
124
125
126 w, err := source.Watch(metav1.ListOptions{ResourceVersion: "3"})
127 if err != nil {
128 t.Fatalf("Unexpected error: %v", err)
129 }
130
131
132 source.Modify(pod("foo"))
133 go consume(t, w, []string{"4"}, wg)
134 source.Shutdown()
135 wg.Wait()
136 }
137
View as plain text