1
16
17 package cache
18
19 import (
20 "fmt"
21 "sort"
22 "sync"
23 "testing"
24
25 "github.com/google/go-cmp/cmp"
26 "github.com/google/go-cmp/cmp/cmpopts"
27
28 v1 "k8s.io/api/core/v1"
29 apierrors "k8s.io/apimachinery/pkg/api/errors"
30 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
31 "k8s.io/apimachinery/pkg/runtime"
32 "k8s.io/apimachinery/pkg/runtime/schema"
33 "k8s.io/apimachinery/pkg/types"
34 "k8s.io/apimachinery/pkg/watch"
35 "k8s.io/utils/pointer"
36 "k8s.io/utils/ptr"
37 )
38
39 func TestWatchList(t *testing.T) {
40 scenarios := []struct {
41 name string
42 disableUseWatchList bool
43
44
45 closeAfterWatchEvents int
46
47 closeAfterWatchRequests int
48
49 closeAfterListRequests int
50
51
52 stopAfterWatchEvents int
53
54 watchOptionsPredicate func(options metav1.ListOptions) error
55 watchEvents []watch.Event
56 podList *v1.PodList
57
58 expectedRequestOptions []metav1.ListOptions
59 expectedWatchRequests int
60 expectedListRequests int
61 expectedStoreContent []v1.Pod
62 expectedError error
63 }{
64 {
65 name: "the reflector won't be synced if the bookmark event has been received",
66 watchEvents: []watch.Event{{Type: watch.Added, Object: makePod("p1", "1")}},
67 closeAfterWatchEvents: 1,
68 expectedWatchRequests: 1,
69 expectedRequestOptions: []metav1.ListOptions{{
70 SendInitialEvents: pointer.Bool(true),
71 AllowWatchBookmarks: true,
72 ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
73 TimeoutSeconds: pointer.Int64(1),
74 }},
75 },
76 {
77 name: "the reflector uses the old LIST/WATCH semantics if the UseWatchList is turned off",
78 disableUseWatchList: true,
79 closeAfterWatchRequests: 1,
80 podList: &v1.PodList{
81 ListMeta: metav1.ListMeta{ResourceVersion: "1"},
82 Items: []v1.Pod{*makePod("p1", "1")},
83 },
84 expectedWatchRequests: 1,
85 expectedListRequests: 1,
86 expectedRequestOptions: []metav1.ListOptions{
87 {
88 ResourceVersion: "0",
89 Limit: 500,
90 },
91 {
92 AllowWatchBookmarks: true,
93 ResourceVersion: "1",
94 TimeoutSeconds: pointer.Int64(1),
95 }},
96 expectedStoreContent: []v1.Pod{*makePod("p1", "1")},
97 },
98 {
99 name: "returning any other error than apierrors.NewInvalid forces fallback",
100 watchOptionsPredicate: func(options metav1.ListOptions) error {
101 if options.SendInitialEvents != nil && *options.SendInitialEvents {
102 return fmt.Errorf("dummy error")
103 }
104 return nil
105 },
106 podList: &v1.PodList{
107 ListMeta: metav1.ListMeta{ResourceVersion: "1"},
108 Items: []v1.Pod{*makePod("p1", "1")},
109 },
110 closeAfterWatchEvents: 1,
111 watchEvents: []watch.Event{{Type: watch.Added, Object: makePod("p2", "2")}},
112 expectedWatchRequests: 2,
113 expectedListRequests: 1,
114 expectedStoreContent: []v1.Pod{*makePod("p1", "1"), *makePod("p2", "2")},
115 expectedRequestOptions: []metav1.ListOptions{
116 {
117 SendInitialEvents: pointer.Bool(true),
118 AllowWatchBookmarks: true,
119 ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
120 TimeoutSeconds: pointer.Int64(1),
121 },
122 {
123 ResourceVersion: "0",
124 Limit: 500,
125 },
126 {
127 AllowWatchBookmarks: true,
128 ResourceVersion: "1",
129 TimeoutSeconds: pointer.Int64(1),
130 },
131 },
132 },
133 {
134 name: "the reflector can fall back to old LIST/WATCH semantics when a server doesn't support streaming",
135 watchOptionsPredicate: func(options metav1.ListOptions) error {
136 if options.SendInitialEvents != nil && *options.SendInitialEvents {
137 return apierrors.NewInvalid(schema.GroupKind{}, "streaming is not allowed", nil)
138 }
139 return nil
140 },
141 podList: &v1.PodList{
142 ListMeta: metav1.ListMeta{ResourceVersion: "1"},
143 Items: []v1.Pod{*makePod("p1", "1")},
144 },
145 closeAfterWatchEvents: 1,
146 watchEvents: []watch.Event{{Type: watch.Added, Object: makePod("p2", "2")}},
147 expectedWatchRequests: 2,
148 expectedListRequests: 1,
149 expectedStoreContent: []v1.Pod{*makePod("p1", "1"), *makePod("p2", "2")},
150 expectedRequestOptions: []metav1.ListOptions{
151 {
152 SendInitialEvents: pointer.Bool(true),
153 AllowWatchBookmarks: true,
154 ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
155 TimeoutSeconds: pointer.Int64(1),
156 },
157 {
158 ResourceVersion: "0",
159 Limit: 500,
160 },
161 {
162 AllowWatchBookmarks: true,
163 ResourceVersion: "1",
164 TimeoutSeconds: pointer.Int64(1),
165 },
166 },
167 },
168 {
169 name: "prove that the reflector is synced after receiving a bookmark event",
170 closeAfterWatchEvents: 3,
171 watchEvents: []watch.Event{
172 {Type: watch.Added, Object: makePod("p1", "1")},
173 {Type: watch.Added, Object: makePod("p2", "2")},
174 {Type: watch.Bookmark, Object: &v1.Pod{
175 ObjectMeta: metav1.ObjectMeta{
176 ResourceVersion: "2",
177 Annotations: map[string]string{"k8s.io/initial-events-end": "true"},
178 },
179 }},
180 },
181 expectedWatchRequests: 1,
182 expectedRequestOptions: []metav1.ListOptions{{
183 SendInitialEvents: pointer.Bool(true),
184 AllowWatchBookmarks: true,
185 ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
186 TimeoutSeconds: pointer.Int64(1),
187 }},
188 expectedStoreContent: []v1.Pod{*makePod("p1", "1"), *makePod("p2", "2")},
189 },
190 {
191 name: "check if Updates and Deletes events are propagated during streaming (until the bookmark is received)",
192 closeAfterWatchEvents: 6,
193 watchEvents: []watch.Event{
194 {Type: watch.Added, Object: makePod("p1", "1")},
195 {Type: watch.Added, Object: makePod("p2", "2")},
196 {Type: watch.Modified, Object: func() runtime.Object {
197 p1 := makePod("p1", "3")
198 p1.Spec.ActiveDeadlineSeconds = pointer.Int64(12)
199 return p1
200 }()},
201 {Type: watch.Added, Object: makePod("p3", "4")},
202 {Type: watch.Deleted, Object: makePod("p3", "5")},
203 {Type: watch.Bookmark, Object: &v1.Pod{
204 ObjectMeta: metav1.ObjectMeta{
205 ResourceVersion: "5",
206 Annotations: map[string]string{"k8s.io/initial-events-end": "true"},
207 },
208 }},
209 },
210 expectedWatchRequests: 1,
211 expectedRequestOptions: []metav1.ListOptions{{
212 SendInitialEvents: pointer.Bool(true),
213 AllowWatchBookmarks: true,
214 ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
215 TimeoutSeconds: pointer.Int64(1),
216 }},
217 expectedStoreContent: []v1.Pod{
218 *makePod("p2", "2"),
219 func() v1.Pod {
220 p1 := *makePod("p1", "3")
221 p1.Spec.ActiveDeadlineSeconds = pointer.Int64(12)
222 return p1
223 }(),
224 },
225 },
226 {
227 name: "checks if the reflector retries 429",
228 watchOptionsPredicate: func() func(options metav1.ListOptions) error {
229 counter := 1
230 return func(options metav1.ListOptions) error {
231 if counter < 3 {
232 counter++
233 return apierrors.NewTooManyRequests("busy, check again later", 1)
234 }
235 return nil
236 }
237 }(),
238 closeAfterWatchEvents: 2,
239 watchEvents: []watch.Event{
240 {Type: watch.Added, Object: makePod("p1", "1")},
241 {Type: watch.Bookmark, Object: &v1.Pod{
242 ObjectMeta: metav1.ObjectMeta{
243 ResourceVersion: "2",
244 Annotations: map[string]string{"k8s.io/initial-events-end": "true"},
245 },
246 }},
247 },
248 expectedWatchRequests: 3,
249 expectedRequestOptions: []metav1.ListOptions{
250 {
251 SendInitialEvents: pointer.Bool(true),
252 AllowWatchBookmarks: true,
253 ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
254 TimeoutSeconds: pointer.Int64(1),
255 },
256 {
257 SendInitialEvents: pointer.Bool(true),
258 AllowWatchBookmarks: true,
259 ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
260 TimeoutSeconds: pointer.Int64(1),
261 },
262 {
263 SendInitialEvents: pointer.Bool(true),
264 AllowWatchBookmarks: true,
265 ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
266 TimeoutSeconds: pointer.Int64(1),
267 },
268 },
269 expectedStoreContent: []v1.Pod{*makePod("p1", "1")},
270 },
271 {
272 name: "check if stopping a watcher before sync results in creating a new watch-list request",
273 stopAfterWatchEvents: 1,
274 closeAfterWatchEvents: 3,
275 watchEvents: []watch.Event{
276 {Type: watch.Added, Object: makePod("p1", "1")},
277
278 {Type: watch.Added, Object: makePod("p1", "1")},
279 {Type: watch.Bookmark, Object: &v1.Pod{
280 ObjectMeta: metav1.ObjectMeta{
281 ResourceVersion: "1",
282 Annotations: map[string]string{"k8s.io/initial-events-end": "true"},
283 },
284 }},
285 },
286 expectedWatchRequests: 2,
287 expectedRequestOptions: []metav1.ListOptions{
288 {
289 SendInitialEvents: pointer.Bool(true),
290 AllowWatchBookmarks: true,
291 ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
292 TimeoutSeconds: pointer.Int64(1),
293 },
294 {
295 SendInitialEvents: pointer.Bool(true),
296 AllowWatchBookmarks: true,
297 ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
298 TimeoutSeconds: pointer.Int64(1),
299 },
300 },
301 expectedStoreContent: []v1.Pod{*makePod("p1", "1")},
302 },
303 {
304 name: "stopping a watcher after synchronization results in creating a new watch request",
305 stopAfterWatchEvents: 4,
306 closeAfterWatchEvents: 5,
307 watchEvents: []watch.Event{
308 {Type: watch.Added, Object: makePod("p1", "1")},
309 {Type: watch.Added, Object: makePod("p2", "2")},
310 {Type: watch.Bookmark, Object: &v1.Pod{
311 ObjectMeta: metav1.ObjectMeta{
312 ResourceVersion: "2",
313 Annotations: map[string]string{"k8s.io/initial-events-end": "true"},
314 },
315 }},
316 {Type: watch.Added, Object: makePod("p3", "3")},
317
318 {Type: watch.Added, Object: makePod("p4", "4")},
319 },
320 expectedWatchRequests: 2,
321 expectedRequestOptions: []metav1.ListOptions{
322 {
323 SendInitialEvents: pointer.Bool(true),
324 AllowWatchBookmarks: true,
325 ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
326 TimeoutSeconds: pointer.Int64(1),
327 },
328 {
329 AllowWatchBookmarks: true,
330 ResourceVersion: "3",
331 TimeoutSeconds: pointer.Int64(1),
332 },
333 },
334 expectedStoreContent: []v1.Pod{*makePod("p1", "1"), *makePod("p2", "2"), *makePod("p3", "3"), *makePod("p4", "4")},
335 },
336 {
337 name: "expiring an established watcher results in returning an error from the reflector",
338 watchOptionsPredicate: func() func(options metav1.ListOptions) error {
339 counter := 0
340 return func(options metav1.ListOptions) error {
341 counter++
342 if counter == 2 {
343 return apierrors.NewResourceExpired("rv already expired")
344 }
345 return nil
346 }
347 }(),
348 stopAfterWatchEvents: 3,
349 watchEvents: []watch.Event{
350 {Type: watch.Added, Object: makePod("p1", "1")},
351 {Type: watch.Bookmark, Object: &v1.Pod{
352 ObjectMeta: metav1.ObjectMeta{
353 ResourceVersion: "2",
354 Annotations: map[string]string{"k8s.io/initial-events-end": "true"},
355 },
356 }},
357 {Type: watch.Added, Object: makePod("p3", "3")},
358 },
359 expectedWatchRequests: 2,
360 expectedRequestOptions: []metav1.ListOptions{
361 {
362 SendInitialEvents: pointer.Bool(true),
363 AllowWatchBookmarks: true,
364 ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
365 TimeoutSeconds: pointer.Int64(1),
366 },
367 {
368 AllowWatchBookmarks: true,
369 ResourceVersion: "3",
370 TimeoutSeconds: pointer.Int64(1),
371 },
372 },
373 expectedStoreContent: []v1.Pod{*makePod("p1", "1"), *makePod("p3", "3")},
374 expectedError: apierrors.NewResourceExpired("rv already expired"),
375 },
376 {
377 name: "prove that the reflector is checking the value of the initialEventsEnd annotation",
378 closeAfterWatchEvents: 3,
379 watchEvents: []watch.Event{
380 {Type: watch.Added, Object: makePod("p1", "1")},
381 {Type: watch.Added, Object: makePod("p2", "2")},
382 {Type: watch.Bookmark, Object: &v1.Pod{
383 ObjectMeta: metav1.ObjectMeta{
384 ResourceVersion: "2",
385 Annotations: map[string]string{"k8s.io/initial-events-end": "false"},
386 },
387 }},
388 },
389 expectedWatchRequests: 1,
390 expectedRequestOptions: []metav1.ListOptions{{
391 SendInitialEvents: pointer.Bool(true),
392 AllowWatchBookmarks: true,
393 ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
394 TimeoutSeconds: pointer.Int64(1),
395 }},
396 },
397 }
398 for _, s := range scenarios {
399 t.Run(s.name, func(t *testing.T) {
400 scenario := s
401 listWatcher, store, reflector, stopCh := testData()
402 go func() {
403 for i, e := range scenario.watchEvents {
404 listWatcher.fakeWatcher.Action(e.Type, e.Object)
405 if i+1 == scenario.stopAfterWatchEvents {
406 listWatcher.StopAndRecreateWatch()
407 continue
408 }
409 if i+1 == scenario.closeAfterWatchEvents {
410 close(stopCh)
411 }
412 }
413 }()
414 listWatcher.watchOptionsPredicate = scenario.watchOptionsPredicate
415 listWatcher.closeAfterWatchRequests = scenario.closeAfterWatchRequests
416 listWatcher.customListResponse = scenario.podList
417 listWatcher.closeAfterListRequests = scenario.closeAfterListRequests
418 if scenario.disableUseWatchList {
419 reflector.UseWatchList = ptr.To(false)
420 }
421
422 err := reflector.ListAndWatch(stopCh)
423 if scenario.expectedError != nil && err == nil {
424 t.Fatalf("expected error %q, got nil", scenario.expectedError)
425 }
426 if scenario.expectedError == nil && err != nil {
427 t.Fatalf("unexpected error: %v", err)
428 }
429 if scenario.expectedError != nil && err.Error() != scenario.expectedError.Error() {
430 t.Fatalf("expected error %q, got %q", scenario.expectedError, err.Error())
431 }
432
433 verifyWatchCounter(t, listWatcher, scenario.expectedWatchRequests)
434 verifyListCounter(t, listWatcher, scenario.expectedListRequests)
435 verifyRequestOptions(t, listWatcher, scenario.expectedRequestOptions)
436 verifyStore(t, store, scenario.expectedStoreContent)
437 })
438 }
439 }
440
441 func verifyRequestOptions(t *testing.T, lw *fakeListWatcher, expectedRequestOptions []metav1.ListOptions) {
442 if len(lw.requestOptions) != len(expectedRequestOptions) {
443 t.Fatalf("expected to receive exactly %v requests, got %v", len(expectedRequestOptions), len(lw.requestOptions))
444 }
445
446 for index, expectedRequestOption := range expectedRequestOptions {
447 actualRequestOption := lw.requestOptions[index]
448 if actualRequestOption.TimeoutSeconds == nil && expectedRequestOption.TimeoutSeconds != nil {
449 t.Fatalf("expected the request to specify TimeoutSeconds option but it didn't, actual = %#v, expected = %#v", actualRequestOption, expectedRequestOption)
450 }
451 if actualRequestOption.TimeoutSeconds != nil && expectedRequestOption.TimeoutSeconds == nil {
452 t.Fatalf("unexpected TimeoutSeconds option specified, actual = %#v, expected = %#v", actualRequestOption, expectedRequestOption)
453 }
454
455 actualRequestOption.TimeoutSeconds = nil
456 expectedRequestOption.TimeoutSeconds = nil
457 if !cmp.Equal(actualRequestOption, expectedRequestOption) {
458 t.Fatalf("expected %#v, got %#v", expectedRequestOption, actualRequestOption)
459 }
460 }
461 }
462
463 func verifyListCounter(t *testing.T, lw *fakeListWatcher, expectedListCounter int) {
464 if lw.listCounter != expectedListCounter {
465 t.Fatalf("unexpected number of LIST requests, got: %v, expected: %v", lw.listCounter, expectedListCounter)
466 }
467 }
468
469 func verifyWatchCounter(t *testing.T, lw *fakeListWatcher, expectedWatchCounter int) {
470 if lw.watchCounter != expectedWatchCounter {
471 t.Fatalf("unexpected number of WATCH requests, got: %v, expected: %v", lw.watchCounter, expectedWatchCounter)
472 }
473 }
474
475 type byName []v1.Pod
476
477 func (a byName) Len() int { return len(a) }
478 func (a byName) Less(i, j int) bool { return a[i].Name < a[j].Name }
479 func (a byName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
480
481 func verifyStore(t *testing.T, s Store, expectedPods []v1.Pod) {
482 rawPods := s.List()
483 actualPods := []v1.Pod{}
484 for _, p := range rawPods {
485 actualPods = append(actualPods, *p.(*v1.Pod))
486 }
487
488 sort.Sort(byName(actualPods))
489 sort.Sort(byName(expectedPods))
490 if !cmp.Equal(actualPods, expectedPods, cmpopts.EquateEmpty()) {
491 t.Fatalf("unexpected store content, diff: %s", cmp.Diff(actualPods, expectedPods))
492 }
493 }
494
495 func makePod(name, rv string) *v1.Pod {
496 return &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: name, ResourceVersion: rv, UID: types.UID(name)}}
497 }
498
499 func testData() (*fakeListWatcher, Store, *Reflector, chan struct{}) {
500 s := NewStore(MetaNamespaceKeyFunc)
501 stopCh := make(chan struct{})
502 lw := &fakeListWatcher{
503 fakeWatcher: watch.NewFake(),
504 stop: func() {
505 close(stopCh)
506 },
507 }
508 r := NewReflector(lw, &v1.Pod{}, s, 0)
509 r.UseWatchList = ptr.To(true)
510
511 return lw, s, r, stopCh
512 }
513
514 type fakeListWatcher struct {
515 lock sync.Mutex
516 fakeWatcher *watch.FakeWatcher
517 listCounter int
518 watchCounter int
519 closeAfterWatchRequests int
520 closeAfterListRequests int
521 stop func()
522
523 requestOptions []metav1.ListOptions
524
525 customListResponse *v1.PodList
526 watchOptionsPredicate func(options metav1.ListOptions) error
527 }
528
529 func (lw *fakeListWatcher) List(options metav1.ListOptions) (runtime.Object, error) {
530 lw.listCounter++
531 lw.requestOptions = append(lw.requestOptions, options)
532 if lw.listCounter == lw.closeAfterListRequests {
533 lw.stop()
534 }
535 if lw.customListResponse != nil {
536 return lw.customListResponse, nil
537 }
538 return nil, fmt.Errorf("not implemented")
539 }
540
541 func (lw *fakeListWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) {
542 lw.watchCounter++
543 lw.requestOptions = append(lw.requestOptions, options)
544 if lw.watchCounter == lw.closeAfterWatchRequests {
545 lw.stop()
546 }
547 if lw.watchOptionsPredicate != nil {
548 if err := lw.watchOptionsPredicate(options); err != nil {
549 return nil, err
550 }
551 }
552 lw.lock.Lock()
553 defer lw.lock.Unlock()
554 return lw.fakeWatcher, nil
555 }
556
557 func (lw *fakeListWatcher) StopAndRecreateWatch() {
558 lw.lock.Lock()
559 defer lw.lock.Unlock()
560 lw.fakeWatcher.Stop()
561 lw.fakeWatcher = watch.NewFake()
562 }
563
View as plain text