1
16
17 package watch
18
19 import (
20 "errors"
21 "flag"
22 "fmt"
23 "reflect"
24 "strconv"
25 "sync/atomic"
26 "testing"
27 "time"
28
29 "github.com/google/go-cmp/cmp"
30
31 apierrors "k8s.io/apimachinery/pkg/api/errors"
32 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33 "k8s.io/apimachinery/pkg/runtime"
34 "k8s.io/apimachinery/pkg/runtime/schema"
35 "k8s.io/apimachinery/pkg/util/dump"
36 "k8s.io/apimachinery/pkg/util/wait"
37 "k8s.io/apimachinery/pkg/watch"
38 "k8s.io/client-go/tools/cache"
39 "k8s.io/klog/v2"
40 )
41
42 func init() {
43
44 klog.InitFlags(nil)
45 flag.Set("logtostderr", "true")
46 flag.Set("v", "9")
47 }
48
49 type testObject struct {
50 resourceVersion string
51 }
52
53 func (o testObject) GetObjectKind() schema.ObjectKind { return schema.EmptyObjectKind }
54 func (o testObject) DeepCopyObject() runtime.Object { return o }
55 func (o testObject) GetResourceVersion() string { return o.resourceVersion }
56
57 func withCounter(w cache.Watcher) (*uint32, cache.Watcher) {
58 var counter uint32
59 return &counter, &cache.ListWatch{
60 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
61 atomic.AddUint32(&counter, 1)
62 return w.Watch(options)
63 },
64 }
65 }
66
67 func makeTestEvent(rv int) watch.Event {
68 return watch.Event{
69 Type: watch.Added,
70 Object: testObject{
71 resourceVersion: fmt.Sprintf("%d", rv),
72 },
73 }
74 }
75
76 func arrayToChannel(array []watch.Event) chan watch.Event {
77 ch := make(chan watch.Event, len(array))
78
79 for _, event := range array {
80 ch <- event
81 }
82
83 return ch
84 }
85
86
87 func parseResourceVersionOrDie(resourceVersion string) uint64 {
88
89
90 if resourceVersion == "" {
91 return 0
92 }
93 version, err := strconv.ParseUint(resourceVersion, 10, 64)
94 if err != nil {
95 panic(fmt.Errorf("failed to parse resourceVersion %q", resourceVersion))
96 }
97 return version
98 }
99
100 func fromRV(resourceVersion string, array []watch.Event) []watch.Event {
101 var result []watch.Event
102 rv := parseResourceVersionOrDie(resourceVersion)
103 for _, event := range array {
104 if event.Type == watch.Error {
105 if len(result) == 0 {
106
107 continue
108 }
109 } else {
110 rvGetter, ok := event.Object.(resourceVersionGetter)
111 if ok {
112 if parseResourceVersionOrDie(rvGetter.GetResourceVersion()) <= rv {
113 continue
114 }
115 }
116 }
117
118 result = append(result, event)
119 }
120
121 return result
122 }
123
124 func closeAfterN(n int, source chan watch.Event) chan watch.Event {
125 result := make(chan watch.Event, 0)
126 go func() {
127 defer close(result)
128 defer close(source)
129 for i := 0; i < n; i++ {
130 result <- <-source
131 }
132 }()
133 return result
134 }
135
136 type unexpectedError struct {
137
138 metav1.Status
139 }
140
141 var _ runtime.Object = &unexpectedError{}
142
143 func TestNewRetryWatcher(t *testing.T) {
144 tt := []struct {
145 name string
146 initialRV string
147 err error
148 }{
149 {
150 name: "empty RV should fail",
151 initialRV: "",
152 err: errors.New("initial RV \"\" is not supported due to issues with underlying WATCH"),
153 },
154 {
155 name: "RV \"0\" should fail",
156 initialRV: "0",
157 err: errors.New("initial RV \"0\" is not supported due to issues with underlying WATCH"),
158 },
159 }
160 for _, tc := range tt {
161 t.Run(tc.name, func(t *testing.T) {
162 _, err := NewRetryWatcher(tc.initialRV, nil)
163 if !reflect.DeepEqual(err, tc.err) {
164 t.Errorf("Expected error: %v, got: %v", tc.err, err)
165 }
166 })
167 }
168 }
169
170 func TestRetryWatcher(t *testing.T) {
171 tt := []struct {
172 name string
173 initialRV string
174 watchClient cache.Watcher
175 watchCount uint32
176 expected []watch.Event
177 }{
178 {
179 name: "recovers if watchClient returns error",
180 initialRV: "1",
181 watchClient: &cache.ListWatch{
182 WatchFunc: func() func(options metav1.ListOptions) (watch.Interface, error) {
183 firstRun := true
184 return func(options metav1.ListOptions) (watch.Interface, error) {
185 if firstRun {
186 firstRun = false
187 return nil, fmt.Errorf("test error")
188 }
189
190 return watch.NewProxyWatcher(arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{
191 makeTestEvent(2),
192 }))), nil
193 }
194 }(),
195 },
196 watchCount: 2,
197 expected: []watch.Event{
198 makeTestEvent(2),
199 },
200 },
201 {
202 name: "recovers if watchClient returns nil watcher",
203 initialRV: "1",
204 watchClient: &cache.ListWatch{
205 WatchFunc: func() func(options metav1.ListOptions) (watch.Interface, error) {
206 firstRun := true
207 return func(options metav1.ListOptions) (watch.Interface, error) {
208 if firstRun {
209 firstRun = false
210 return nil, nil
211 }
212
213 return watch.NewProxyWatcher(arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{
214 makeTestEvent(2),
215 }))), nil
216 }
217 }(),
218 },
219 watchCount: 2,
220 expected: []watch.Event{
221 makeTestEvent(2),
222 },
223 },
224 {
225 name: "works with empty initialRV",
226 initialRV: "1",
227 watchClient: &cache.ListWatch{
228 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
229 return watch.NewProxyWatcher(arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{
230 makeTestEvent(2),
231 }))), nil
232 },
233 },
234 watchCount: 1,
235 expected: []watch.Event{
236 makeTestEvent(2),
237 },
238 },
239 {
240 name: "works with initialRV set, skipping the preceding items but reading those directly following",
241 initialRV: "1",
242 watchClient: &cache.ListWatch{
243 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
244 return watch.NewProxyWatcher(arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{
245 makeTestEvent(1),
246 makeTestEvent(2),
247 }))), nil
248 },
249 },
250 watchCount: 1,
251 expected: []watch.Event{
252 makeTestEvent(2),
253 },
254 },
255 {
256 name: "works with initialRV set, skipping the preceding items with none following",
257 initialRV: "3",
258 watchClient: &cache.ListWatch{
259 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
260 return watch.NewProxyWatcher(arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{
261 makeTestEvent(2),
262 }))), nil
263 },
264 },
265 watchCount: 1,
266 expected: nil,
267 },
268 {
269 name: "fails on Gone (RV too old error)",
270 initialRV: "5",
271 watchClient: &cache.ListWatch{
272 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
273 return watch.NewProxyWatcher(arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{
274 makeTestEvent(5),
275 makeTestEvent(6),
276 {Type: watch.Error, Object: &apierrors.NewGone("").ErrStatus},
277 makeTestEvent(7),
278 makeTestEvent(8),
279 }))), nil
280 },
281 },
282 watchCount: 1,
283 expected: []watch.Event{
284 makeTestEvent(6),
285 {
286 Type: watch.Error,
287 Object: &apierrors.NewGone("").ErrStatus,
288 },
289 },
290 },
291 {
292 name: "recovers from timeout error",
293 initialRV: "5",
294 watchClient: &cache.ListWatch{
295 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
296 return watch.NewProxyWatcher(arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{
297 makeTestEvent(6),
298 {
299 Type: watch.Error,
300 Object: &apierrors.NewTimeoutError("", 0).ErrStatus,
301 },
302 makeTestEvent(7),
303 }))), nil
304 },
305 },
306 watchCount: 2,
307 expected: []watch.Event{
308 makeTestEvent(6),
309 makeTestEvent(7),
310 },
311 },
312 {
313 name: "recovers from internal server error",
314 initialRV: "5",
315 watchClient: &cache.ListWatch{
316 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
317 return watch.NewProxyWatcher(arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{
318 makeTestEvent(6),
319 {
320 Type: watch.Error,
321 Object: &apierrors.NewInternalError(errors.New("")).ErrStatus,
322 },
323 makeTestEvent(7),
324 }))), nil
325 },
326 },
327 watchCount: 2,
328 expected: []watch.Event{
329 makeTestEvent(6),
330 makeTestEvent(7),
331 },
332 },
333 {
334 name: "recovers from unexpected error code",
335 initialRV: "5",
336 watchClient: &cache.ListWatch{
337 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
338 return watch.NewProxyWatcher(arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{
339 makeTestEvent(6),
340 {
341 Type: watch.Error,
342 Object: &metav1.Status{
343 Code: 666,
344 },
345 },
346 makeTestEvent(7),
347 }))), nil
348 },
349 },
350 watchCount: 2,
351 expected: []watch.Event{
352 makeTestEvent(6),
353 makeTestEvent(7),
354 },
355 },
356 {
357 name: "recovers from unexpected error type",
358 initialRV: "5",
359 watchClient: &cache.ListWatch{
360 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
361 return watch.NewProxyWatcher(arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{
362 makeTestEvent(6),
363 {
364 Type: watch.Error,
365 Object: &unexpectedError{},
366 },
367 makeTestEvent(7),
368 }))), nil
369 },
370 },
371 watchCount: 2,
372 expected: []watch.Event{
373 makeTestEvent(6),
374 makeTestEvent(7),
375 },
376 },
377 {
378 name: "survives 1 closed watch and reads 1 item",
379 initialRV: "5",
380 watchClient: &cache.ListWatch{
381 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
382 return watch.NewProxyWatcher(closeAfterN(1, arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{
383 makeTestEvent(6),
384 })))), nil
385 },
386 },
387 watchCount: 2,
388 expected: []watch.Event{
389 makeTestEvent(6),
390 },
391 },
392 {
393 name: "survives 2 closed watches and reads 2 items",
394 initialRV: "4",
395 watchClient: &cache.ListWatch{
396 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
397 return watch.NewProxyWatcher(closeAfterN(1, arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{
398 makeTestEvent(5),
399 makeTestEvent(6),
400 })))), nil
401 },
402 },
403 watchCount: 3,
404 expected: []watch.Event{
405 makeTestEvent(5),
406 makeTestEvent(6),
407 },
408 },
409 {
410 name: "survives 2 closed watches and reads 2 items for nonconsecutive RVs",
411 initialRV: "4",
412 watchClient: &cache.ListWatch{
413 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
414 return watch.NewProxyWatcher(closeAfterN(1, arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{
415 makeTestEvent(5),
416 makeTestEvent(7),
417 })))), nil
418 },
419 },
420 watchCount: 3,
421 expected: []watch.Event{
422 makeTestEvent(5),
423 makeTestEvent(7),
424 },
425 },
426 {
427 name: "survives 2 closed watches and reads 2 items for nonconsecutive RVs starting at much lower RV",
428 initialRV: "2",
429 watchClient: &cache.ListWatch{
430 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
431 return watch.NewProxyWatcher(closeAfterN(1, arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{
432 makeTestEvent(5),
433 makeTestEvent(7),
434 })))), nil
435 },
436 },
437 watchCount: 3,
438 expected: []watch.Event{
439 makeTestEvent(5),
440 makeTestEvent(7),
441 },
442 },
443 {
444 name: "survives 4 closed watches and reads 4 items for nonconsecutive, spread RVs",
445 initialRV: "2",
446 watchClient: &cache.ListWatch{
447 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
448 return watch.NewProxyWatcher(closeAfterN(1, arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{
449 makeTestEvent(5),
450 makeTestEvent(6),
451 makeTestEvent(7),
452 makeTestEvent(11),
453 })))), nil
454 },
455 },
456 watchCount: 5,
457 expected: []watch.Event{
458 makeTestEvent(5),
459 makeTestEvent(6),
460 makeTestEvent(7),
461 makeTestEvent(11),
462 },
463 },
464 {
465 name: "survives 4 closed watches and reads 4 items for nonconsecutive, spread RVs and skips those with lower or equal RV",
466 initialRV: "2",
467 watchClient: &cache.ListWatch{
468 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
469 return watch.NewProxyWatcher(closeAfterN(1, arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{
470 makeTestEvent(1),
471 makeTestEvent(2),
472 makeTestEvent(5),
473 makeTestEvent(6),
474 makeTestEvent(7),
475 makeTestEvent(11),
476 })))), nil
477 },
478 },
479 watchCount: 5,
480 expected: []watch.Event{
481 makeTestEvent(5),
482 makeTestEvent(6),
483 makeTestEvent(7),
484 makeTestEvent(11),
485 },
486 },
487 {
488 name: "survives 2 closed watches and reads 2+2+1 items skipping those with equal RV",
489 initialRV: "1",
490 watchClient: &cache.ListWatch{
491 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
492 return watch.NewProxyWatcher(closeAfterN(2, arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{
493 makeTestEvent(1),
494 makeTestEvent(2),
495 makeTestEvent(5),
496 makeTestEvent(6),
497 makeTestEvent(7),
498 makeTestEvent(11),
499 })))), nil
500 },
501 },
502 watchCount: 3,
503 expected: []watch.Event{
504 makeTestEvent(2),
505 makeTestEvent(5),
506 makeTestEvent(6),
507 makeTestEvent(7),
508 makeTestEvent(11),
509 },
510 },
511 }
512
513 for _, tc := range tt {
514 tc := tc
515 t.Run(tc.name, func(t *testing.T) {
516 t.Parallel()
517
518 atomicCounter, watchFunc := withCounter(tc.watchClient)
519 watcher, err := newRetryWatcher(tc.initialRV, watchFunc, time.Duration(0))
520 if err != nil {
521 t.Fatalf("failed to create a RetryWatcher: %v", err)
522 }
523 defer func() {
524 watcher.Stop()
525 t.Log("Waiting on RetryWatcher to stop...")
526 <-watcher.Done()
527 }()
528
529 var got []watch.Event
530 for i := 0; i < len(tc.expected); i++ {
531 event, ok := <-watcher.ResultChan()
532 if !ok {
533 t.Errorf("expected event %s, but channel is closed", dump.Pretty(tc.expected[i]))
534 break
535 }
536
537 got = append(got, event)
538 }
539
540
541
542
543
544 select {
545 case event, ok := <-watcher.ResultChan():
546 if ok {
547 t.Errorf("Unexpected event received after reading all the expected ones: %s", dump.Pretty(event))
548 }
549 case <-time.After(10 * time.Millisecond):
550 break
551 }
552
553 var counter uint32
554
555
556 err = wait.PollImmediate(10*time.Millisecond, 10*time.Second, func() (done bool, err error) {
557 counter = atomic.LoadUint32(atomicCounter)
558 return counter == tc.watchCount, nil
559 })
560 if err == wait.ErrWaitTimeout {
561 t.Errorf("expected %d watcher starts, but it has started %d times", tc.watchCount, counter)
562 } else if err != nil {
563 t.Fatal(err)
564 }
565
566 if !reflect.DeepEqual(tc.expected, got) {
567 t.Fatalf("expected %s, got %s;\ndiff: %s", dump.Pretty(tc.expected), dump.Pretty(got), cmp.Diff(tc.expected, got))
568 }
569 })
570 }
571 }
572
573 func TestRetryWatcherToFinishWithUnreadEvents(t *testing.T) {
574 watcher, err := NewRetryWatcher("1", &cache.ListWatch{
575 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
576 return watch.NewProxyWatcher(arrayToChannel([]watch.Event{
577 makeTestEvent(2),
578 })), nil
579 },
580 })
581 if err != nil {
582 t.Fatalf("failed to create a RetryWatcher: %v", err)
583 }
584
585
586 time.Sleep(10 * time.Millisecond)
587
588 watcher.Stop()
589
590 maxTime := time.Second
591 select {
592 case <-watcher.Done():
593 break
594 case <-time.After(maxTime):
595 t.Errorf("The watcher failed to be closed in %s", maxTime)
596 }
597
598
599 _, ok := <-watcher.ResultChan()
600 if ok {
601 t.Error("ResultChan is not closed")
602 }
603 }
604
View as plain text