1
16
17 package watch
18
19 import (
20 "context"
21 "reflect"
22 goruntime "runtime"
23 "sort"
24 "testing"
25 "time"
26
27 "github.com/google/go-cmp/cmp"
28
29 corev1 "k8s.io/api/core/v1"
30 apierrors "k8s.io/apimachinery/pkg/api/errors"
31 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
32 "k8s.io/apimachinery/pkg/runtime"
33 "k8s.io/apimachinery/pkg/runtime/schema"
34 "k8s.io/apimachinery/pkg/util/dump"
35 "k8s.io/apimachinery/pkg/watch"
36 fakeclientset "k8s.io/client-go/kubernetes/fake"
37 testcore "k8s.io/client-go/testing"
38 "k8s.io/client-go/tools/cache"
39 )
40
41
42
43 func TestEventProcessorExit(t *testing.T) {
44 event := watch.Event{}
45
46 tests := []struct {
47 name string
48 write func(e *eventProcessor)
49 }{
50 {
51 name: "exit on blocked read",
52 write: func(e *eventProcessor) {
53 e.push(event)
54 },
55 },
56 {
57 name: "exit on blocked write",
58 write: func(e *eventProcessor) {
59 e.push(event)
60 e.push(event)
61 },
62 },
63 }
64 for _, test := range tests {
65 t.Run(test.name, func(t *testing.T) {
66 out := make(chan watch.Event)
67 e := newEventProcessor(out)
68
69 test.write(e)
70
71 exited := make(chan struct{})
72 go func() {
73 e.run()
74 close(exited)
75 }()
76
77 <-out
78 e.stop()
79 goruntime.Gosched()
80 <-exited
81 })
82 }
83 }
84
85 type apiInt int
86
87 func (apiInt) GetObjectKind() schema.ObjectKind { return nil }
88 func (apiInt) DeepCopyObject() runtime.Object { return nil }
89
90 func TestEventProcessorOrdersEvents(t *testing.T) {
91 out := make(chan watch.Event)
92 e := newEventProcessor(out)
93 go e.run()
94
95 numProcessed := 0
96 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
97 go func() {
98 for i := 0; i < 1000; i++ {
99 e := <-out
100 if got, want := int(e.Object.(apiInt)), i; got != want {
101 t.Errorf("unexpected event: got=%d, want=%d", got, want)
102 }
103 numProcessed++
104 }
105 cancel()
106 }()
107
108 for i := 0; i < 1000; i++ {
109 e.push(watch.Event{Object: apiInt(i)})
110 }
111
112 <-ctx.Done()
113 e.stop()
114
115 if numProcessed != 1000 {
116 t.Errorf("unexpected number of events processed: %d", numProcessed)
117 }
118
119 }
120
121 type byEventTypeAndName []watch.Event
122
123 func (a byEventTypeAndName) Len() int { return len(a) }
124 func (a byEventTypeAndName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
125 func (a byEventTypeAndName) Less(i, j int) bool {
126 if a[i].Type < a[j].Type {
127 return true
128 }
129
130 if a[i].Type > a[j].Type {
131 return false
132 }
133
134 return a[i].Object.(*corev1.Secret).Name < a[j].Object.(*corev1.Secret).Name
135 }
136
137 func TestNewInformerWatcher(t *testing.T) {
138
139 tt := []struct {
140 name string
141 objects []runtime.Object
142 events []watch.Event
143 }{
144 {
145 name: "basic test",
146 objects: []runtime.Object{
147 &corev1.Secret{
148 ObjectMeta: metav1.ObjectMeta{
149 Name: "pod-1",
150 },
151 StringData: map[string]string{
152 "foo-1": "initial",
153 },
154 },
155 &corev1.Secret{
156 ObjectMeta: metav1.ObjectMeta{
157 Name: "pod-2",
158 },
159 StringData: map[string]string{
160 "foo-2": "initial",
161 },
162 },
163 &corev1.Secret{
164 ObjectMeta: metav1.ObjectMeta{
165 Name: "pod-3",
166 },
167 StringData: map[string]string{
168 "foo-3": "initial",
169 },
170 },
171 },
172 events: []watch.Event{
173 {
174 Type: watch.Added,
175 Object: &corev1.Secret{
176 ObjectMeta: metav1.ObjectMeta{
177 Name: "pod-4",
178 },
179 StringData: map[string]string{
180 "foo-4": "initial",
181 },
182 },
183 },
184 {
185 Type: watch.Modified,
186 Object: &corev1.Secret{
187 ObjectMeta: metav1.ObjectMeta{
188 Name: "pod-2",
189 },
190 StringData: map[string]string{
191 "foo-2": "new",
192 },
193 },
194 },
195 {
196 Type: watch.Deleted,
197 Object: &corev1.Secret{
198 ObjectMeta: metav1.ObjectMeta{
199 Name: "pod-3",
200 },
201 },
202 },
203 },
204 },
205 }
206
207 for _, tc := range tt {
208 t.Run(tc.name, func(t *testing.T) {
209 var expected []watch.Event
210 for _, o := range tc.objects {
211 expected = append(expected, watch.Event{
212 Type: watch.Added,
213 Object: o.DeepCopyObject(),
214 })
215 }
216 for _, e := range tc.events {
217 expected = append(expected, *e.DeepCopy())
218 }
219
220 fake := fakeclientset.NewSimpleClientset(tc.objects...)
221 fakeWatch := watch.NewFakeWithChanSize(len(tc.events), false)
222 fake.PrependWatchReactor("secrets", testcore.DefaultWatchReactor(fakeWatch, nil))
223
224 for _, e := range tc.events {
225 fakeWatch.Action(e.Type, e.Object)
226 }
227
228 lw := &cache.ListWatch{
229 ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
230 return fake.CoreV1().Secrets("").List(context.TODO(), options)
231 },
232 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
233 return fake.CoreV1().Secrets("").Watch(context.TODO(), options)
234 },
235 }
236 _, _, w, done := NewIndexerInformerWatcher(lw, &corev1.Secret{})
237
238 var result []watch.Event
239 loop:
240 for {
241 var event watch.Event
242 var ok bool
243 select {
244 case event, ok = <-w.ResultChan():
245 if !ok {
246 t.Errorf("Failed to read event: channel is already closed!")
247 return
248 }
249
250 result = append(result, *event.DeepCopy())
251 case <-time.After(time.Second * 1):
252
253
254 break loop
255 }
256 }
257
258
259 sort.Sort(byEventTypeAndName(expected))
260 sort.Sort(byEventTypeAndName(result))
261
262 if !reflect.DeepEqual(expected, result) {
263 t.Errorf("\nexpected: %s,\ngot: %s,\ndiff: %s", dump.Pretty(expected), dump.Pretty(result), cmp.Diff(expected, result))
264 return
265 }
266
267
268 for _, e := range tc.events {
269 fakeWatch.Action(e.Type, e.Object)
270 }
271
272
273 w.Stop()
274
275 <-done
276 })
277 }
278
279 }
280
281
282
283
284
285
286
287 func TestInformerWatcherDeletedFinalStateUnknown(t *testing.T) {
288 listCalls := 0
289 watchCalls := 0
290 lw := &cache.ListWatch{
291 ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
292 retval := &corev1.SecretList{}
293 if listCalls == 0 {
294
295 retval.ResourceVersion = "1"
296 retval.Items = []corev1.Secret{{ObjectMeta: metav1.ObjectMeta{Name: "secret1", Namespace: "ns1", ResourceVersion: "123"}}}
297 } else {
298
299 retval.ResourceVersion = "2"
300 }
301 listCalls++
302 return retval, nil
303 },
304 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
305 w := watch.NewRaceFreeFake()
306 if options.ResourceVersion == "1" {
307 go func() {
308
309 w.Error(&apierrors.NewGone("gone").ErrStatus)
310 w.Stop()
311 }()
312 }
313 watchCalls++
314 return w, nil
315 },
316 }
317 _, _, w, done := NewIndexerInformerWatcher(lw, &corev1.Secret{})
318 defer w.Stop()
319
320
321 select {
322 case event, ok := <-w.ResultChan():
323 if !ok {
324 t.Fatal("unexpected close")
325 }
326 if event.Type != watch.Added {
327 t.Fatalf("expected Added event, got %#v", event)
328 }
329 if event.Object.(*corev1.Secret).ResourceVersion != "123" {
330 t.Fatalf("expected added Secret with rv=123, got %#v", event.Object)
331 }
332 case <-time.After(time.Second * 10):
333 t.Fatal("timeout")
334 }
335
336
337 select {
338 case event, ok := <-w.ResultChan():
339 if !ok {
340 t.Fatal("unexpected close")
341 }
342 if event.Type != watch.Deleted {
343 t.Fatalf("expected Deleted event, got %#v", event)
344 }
345 if event.Object.(*corev1.Secret).ResourceVersion != "123" {
346 t.Fatalf("expected deleted Secret with rv=123, got %#v", event.Object)
347 }
348 case <-time.After(time.Second * 10):
349 t.Fatal("timeout")
350 }
351
352 w.Stop()
353 select {
354 case <-done:
355 case <-time.After(time.Second * 10):
356 t.Fatal("timeout")
357 }
358
359 if listCalls < 2 {
360 t.Fatalf("expected at least 2 list calls, got %d", listCalls)
361 }
362 if watchCalls < 1 {
363 t.Fatalf("expected at least 1 watch call, got %d", watchCalls)
364 }
365 }
366
View as plain text