1
16
17 package cache
18
19 import (
20 "context"
21 "errors"
22 "fmt"
23 "io"
24 "math/rand"
25 "os"
26 "reflect"
27 "strings"
28 "sync"
29 "time"
30
31 apierrors "k8s.io/apimachinery/pkg/api/errors"
32 "k8s.io/apimachinery/pkg/api/meta"
33 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
34 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
35 "k8s.io/apimachinery/pkg/runtime"
36 "k8s.io/apimachinery/pkg/runtime/schema"
37 "k8s.io/apimachinery/pkg/util/naming"
38 utilnet "k8s.io/apimachinery/pkg/util/net"
39 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
40 "k8s.io/apimachinery/pkg/util/wait"
41 "k8s.io/apimachinery/pkg/watch"
42 "k8s.io/client-go/tools/pager"
43 "k8s.io/klog/v2"
44 "k8s.io/utils/clock"
45 "k8s.io/utils/pointer"
46 "k8s.io/utils/ptr"
47 "k8s.io/utils/trace"
48 )
49
50 const defaultExpectedTypeName = "<unspecified>"
51
52
53 type Reflector struct {
54
55 name string
56
57
58
59
60 typeDescription string
61
62
63
64
65 expectedType reflect.Type
66
67 expectedGVK *schema.GroupVersionKind
68
69 store Store
70
71 listerWatcher ListerWatcher
72
73 backoffManager wait.BackoffManager
74 resyncPeriod time.Duration
75
76 clock clock.Clock
77
78
79 paginatedResult bool
80
81
82
83 lastSyncResourceVersion string
84
85
86 isLastSyncResourceVersionUnavailable bool
87
88 lastSyncResourceVersionMutex sync.RWMutex
89
90 watchErrorHandler WatchErrorHandler
91
92
93
94
95
96
97
98 WatchListPageSize int64
99
100 ShouldResync func() bool
101
102 MaxInternalErrorRetryDuration time.Duration
103
104
105
106
107
108
109
110
111
112
113 UseWatchList *bool
114 }
115
116
117
118
119 type ResourceVersionUpdater interface {
120
121
122 UpdateResourceVersion(resourceVersion string)
123 }
124
125
126
127
128
129
130
131
132
133
134
135 type WatchErrorHandler func(r *Reflector, err error)
136
137
138 func DefaultWatchErrorHandler(r *Reflector, err error) {
139 switch {
140 case isExpiredError(err):
141
142
143
144 klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.typeDescription, err)
145 case err == io.EOF:
146
147 case err == io.ErrUnexpectedEOF:
148 klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.typeDescription, err)
149 default:
150 utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.typeDescription, err))
151 }
152 }
153
154 var (
155
156
157 minWatchTimeout = 5 * time.Minute
158 )
159
160
161
162 func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interface{}, resyncPeriod time.Duration) (indexer Indexer, reflector *Reflector) {
163 indexer = NewIndexer(MetaNamespaceKeyFunc, Indexers{NamespaceIndex: MetaNamespaceIndexFunc})
164 reflector = NewReflector(lw, expectedType, indexer, resyncPeriod)
165 return indexer, reflector
166 }
167
168
169
170 func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
171 return NewReflectorWithOptions(lw, expectedType, store, ReflectorOptions{ResyncPeriod: resyncPeriod})
172 }
173
174
175
176 func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
177 return NewReflectorWithOptions(lw, expectedType, store, ReflectorOptions{Name: name, ResyncPeriod: resyncPeriod})
178 }
179
180
181 type ReflectorOptions struct {
182
183
184 Name string
185
186
187
188
189
190
191 TypeDescription string
192
193
194
195 ResyncPeriod time.Duration
196
197
198 Clock clock.Clock
199 }
200
201
202
203
204
205
206
207
208
209
210
211 func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store Store, options ReflectorOptions) *Reflector {
212 reflectorClock := options.Clock
213 if reflectorClock == nil {
214 reflectorClock = clock.RealClock{}
215 }
216 r := &Reflector{
217 name: options.Name,
218 resyncPeriod: options.ResyncPeriod,
219 typeDescription: options.TypeDescription,
220 listerWatcher: lw,
221 store: store,
222
223
224
225 backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, reflectorClock),
226 clock: reflectorClock,
227 watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler),
228 expectedType: reflect.TypeOf(expectedType),
229 }
230
231 if r.name == "" {
232 r.name = naming.GetNameFromCallsite(internalPackages...)
233 }
234
235 if r.typeDescription == "" {
236 r.typeDescription = getTypeDescriptionFromObject(expectedType)
237 }
238
239 if r.expectedGVK == nil {
240 r.expectedGVK = getExpectedGVKFromObject(expectedType)
241 }
242
243
244
245 if r.UseWatchList == nil {
246 if s := os.Getenv("ENABLE_CLIENT_GO_WATCH_LIST_ALPHA"); len(s) > 0 {
247 r.UseWatchList = ptr.To(true)
248 }
249 }
250
251 return r
252 }
253
254 func getTypeDescriptionFromObject(expectedType interface{}) string {
255 if expectedType == nil {
256 return defaultExpectedTypeName
257 }
258
259 reflectDescription := reflect.TypeOf(expectedType).String()
260
261 obj, ok := expectedType.(*unstructured.Unstructured)
262 if !ok {
263 return reflectDescription
264 }
265
266 gvk := obj.GroupVersionKind()
267 if gvk.Empty() {
268 return reflectDescription
269 }
270
271 return gvk.String()
272 }
273
274 func getExpectedGVKFromObject(expectedType interface{}) *schema.GroupVersionKind {
275 obj, ok := expectedType.(*unstructured.Unstructured)
276 if !ok {
277 return nil
278 }
279
280 gvk := obj.GroupVersionKind()
281 if gvk.Empty() {
282 return nil
283 }
284
285 return &gvk
286 }
287
288
289
290 var internalPackages = []string{"client-go/tools/cache/"}
291
292
293
294
295 func (r *Reflector) Run(stopCh <-chan struct{}) {
296 klog.V(3).Infof("Starting reflector %s (%s) from %s", r.typeDescription, r.resyncPeriod, r.name)
297 wait.BackoffUntil(func() {
298 if err := r.ListAndWatch(stopCh); err != nil {
299 r.watchErrorHandler(r, err)
300 }
301 }, r.backoffManager, true, stopCh)
302 klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.typeDescription, r.resyncPeriod, r.name)
303 }
304
305 var (
306
307 neverExitWatch <-chan time.Time = make(chan time.Time)
308
309
310
311 errorStopRequested = errors.New("stop requested")
312 )
313
314
315
316 func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {
317 if r.resyncPeriod == 0 {
318 return neverExitWatch, func() bool { return false }
319 }
320
321
322
323
324 t := r.clock.NewTimer(r.resyncPeriod)
325 return t.C(), t.Stop
326 }
327
328
329
330
331 func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
332 klog.V(3).Infof("Listing and watching %v from %s", r.typeDescription, r.name)
333 var err error
334 var w watch.Interface
335 useWatchList := ptr.Deref(r.UseWatchList, false)
336 fallbackToList := !useWatchList
337
338 if useWatchList {
339 w, err = r.watchList(stopCh)
340 if w == nil && err == nil {
341
342 return nil
343 }
344 if err != nil {
345 klog.Warningf("The watchlist request ended with an error, falling back to the standard LIST/WATCH semantics because making progress is better than deadlocking, err = %v", err)
346 fallbackToList = true
347
348 w = nil
349 }
350 }
351
352 if fallbackToList {
353 err = r.list(stopCh)
354 if err != nil {
355 return err
356 }
357 }
358
359 klog.V(2).Infof("Caches populated for %v from %s", r.typeDescription, r.name)
360
361 resyncerrc := make(chan error, 1)
362 cancelCh := make(chan struct{})
363 defer close(cancelCh)
364 go r.startResync(stopCh, cancelCh, resyncerrc)
365 return r.watch(w, stopCh, resyncerrc)
366 }
367
368
369
370
371 func (r *Reflector) startResync(stopCh <-chan struct{}, cancelCh <-chan struct{}, resyncerrc chan error) {
372 resyncCh, cleanup := r.resyncChan()
373 defer func() {
374 cleanup()
375 }()
376 for {
377 select {
378 case <-resyncCh:
379 case <-stopCh:
380 return
381 case <-cancelCh:
382 return
383 }
384 if r.ShouldResync == nil || r.ShouldResync() {
385 klog.V(4).Infof("%s: forcing resync", r.name)
386 if err := r.store.Resync(); err != nil {
387 resyncerrc <- err
388 return
389 }
390 }
391 cleanup()
392 resyncCh, cleanup = r.resyncChan()
393 }
394 }
395
396
397 func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc chan error) error {
398 var err error
399 retry := NewRetryWithDeadline(r.MaxInternalErrorRetryDuration, time.Minute, apierrors.IsInternalError, r.clock)
400
401 for {
402
403 select {
404 case <-stopCh:
405
406
407 if w != nil {
408 w.Stop()
409 }
410 return nil
411 default:
412 }
413
414
415 start := r.clock.Now()
416
417 if w == nil {
418 timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
419 options := metav1.ListOptions{
420 ResourceVersion: r.LastSyncResourceVersion(),
421
422
423 TimeoutSeconds: &timeoutSeconds,
424
425
426
427 AllowWatchBookmarks: true,
428 }
429
430 w, err = r.listerWatcher.Watch(options)
431 if err != nil {
432 if canRetry := isWatchErrorRetriable(err); canRetry {
433 klog.V(4).Infof("%s: watch of %v returned %v - backing off", r.name, r.typeDescription, err)
434 select {
435 case <-stopCh:
436 return nil
437 case <-r.backoffManager.Backoff().C():
438 continue
439 }
440 }
441 return err
442 }
443 }
444
445 err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion, nil, r.clock, resyncerrc, stopCh)
446
447 w.Stop()
448 w = nil
449 retry.After(err)
450 if err != nil {
451 if err != errorStopRequested {
452 switch {
453 case isExpiredError(err):
454
455
456
457 klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.typeDescription, err)
458 case apierrors.IsTooManyRequests(err):
459 klog.V(2).Infof("%s: watch of %v returned 429 - backing off", r.name, r.typeDescription)
460 select {
461 case <-stopCh:
462 return nil
463 case <-r.backoffManager.Backoff().C():
464 continue
465 }
466 case apierrors.IsInternalError(err) && retry.ShouldRetry():
467 klog.V(2).Infof("%s: retrying watch of %v internal error: %v", r.name, r.typeDescription, err)
468 continue
469 default:
470 klog.Warningf("%s: watch of %v ended with: %v", r.name, r.typeDescription, err)
471 }
472 }
473 return nil
474 }
475 }
476 }
477
478
479
480 func (r *Reflector) list(stopCh <-chan struct{}) error {
481 var resourceVersion string
482 options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}
483
484 initTrace := trace.New("Reflector ListAndWatch", trace.Field{Key: "name", Value: r.name})
485 defer initTrace.LogIfLong(10 * time.Second)
486 var list runtime.Object
487 var paginatedResult bool
488 var err error
489 listCh := make(chan struct{}, 1)
490 panicCh := make(chan interface{}, 1)
491 go func() {
492 defer func() {
493 if r := recover(); r != nil {
494 panicCh <- r
495 }
496 }()
497
498
499 pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
500 return r.listerWatcher.List(opts)
501 }))
502 switch {
503 case r.WatchListPageSize != 0:
504 pager.PageSize = r.WatchListPageSize
505 case r.paginatedResult:
506
507
508
509 case options.ResourceVersion != "" && options.ResourceVersion != "0":
510
511
512
513
514
515
516
517
518
519
520
521
522 pager.PageSize = 0
523 }
524
525 list, paginatedResult, err = pager.ListWithAlloc(context.Background(), options)
526 if isExpiredError(err) || isTooLargeResourceVersionError(err) {
527 r.setIsLastSyncResourceVersionUnavailable(true)
528
529
530
531
532
533
534 list, paginatedResult, err = pager.ListWithAlloc(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
535 }
536 close(listCh)
537 }()
538 select {
539 case <-stopCh:
540 return nil
541 case r := <-panicCh:
542 panic(r)
543 case <-listCh:
544 }
545 initTrace.Step("Objects listed", trace.Field{Key: "error", Value: err})
546 if err != nil {
547 klog.Warningf("%s: failed to list %v: %v", r.name, r.typeDescription, err)
548 return fmt.Errorf("failed to list %v: %w", r.typeDescription, err)
549 }
550
551
552
553
554
555
556
557
558
559
560
561 if options.ResourceVersion == "0" && paginatedResult {
562 r.paginatedResult = true
563 }
564
565 r.setIsLastSyncResourceVersionUnavailable(false)
566 listMetaInterface, err := meta.ListAccessor(list)
567 if err != nil {
568 return fmt.Errorf("unable to understand list result %#v: %v", list, err)
569 }
570 resourceVersion = listMetaInterface.GetResourceVersion()
571 initTrace.Step("Resource version extracted")
572 items, err := meta.ExtractListWithAlloc(list)
573 if err != nil {
574 return fmt.Errorf("unable to understand list result %#v (%v)", list, err)
575 }
576 initTrace.Step("Objects extracted")
577 if err := r.syncWith(items, resourceVersion); err != nil {
578 return fmt.Errorf("unable to sync list result: %v", err)
579 }
580 initTrace.Step("SyncWith done")
581 r.setLastSyncResourceVersion(resourceVersion)
582 initTrace.Step("Resource version updated")
583 return nil
584 }
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605 func (r *Reflector) watchList(stopCh <-chan struct{}) (watch.Interface, error) {
606 var w watch.Interface
607 var err error
608 var temporaryStore Store
609 var resourceVersion string
610
611
612
613 isErrorRetriableWithSideEffectsFn := func(err error) bool {
614 if canRetry := isWatchErrorRetriable(err); canRetry {
615 klog.V(2).Infof("%s: watch-list of %v returned %v - backing off", r.name, r.typeDescription, err)
616 <-r.backoffManager.Backoff().C()
617 return true
618 }
619 if isExpiredError(err) || isTooLargeResourceVersionError(err) {
620
621
622
623
624 r.setIsLastSyncResourceVersionUnavailable(true)
625 return true
626 }
627 return false
628 }
629
630 initTrace := trace.New("Reflector WatchList", trace.Field{Key: "name", Value: r.name})
631 defer initTrace.LogIfLong(10 * time.Second)
632 for {
633 select {
634 case <-stopCh:
635 return nil, nil
636 default:
637 }
638
639 resourceVersion = ""
640 lastKnownRV := r.rewatchResourceVersion()
641 temporaryStore = NewStore(DeletionHandlingMetaNamespaceKeyFunc)
642
643
644
645 timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
646 options := metav1.ListOptions{
647 ResourceVersion: lastKnownRV,
648 AllowWatchBookmarks: true,
649 SendInitialEvents: pointer.Bool(true),
650 ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
651 TimeoutSeconds: &timeoutSeconds,
652 }
653 start := r.clock.Now()
654
655 w, err = r.listerWatcher.Watch(options)
656 if err != nil {
657 if isErrorRetriableWithSideEffectsFn(err) {
658 continue
659 }
660 return nil, err
661 }
662 bookmarkReceived := pointer.Bool(false)
663 err = watchHandler(start, w, temporaryStore, r.expectedType, r.expectedGVK, r.name, r.typeDescription,
664 func(rv string) { resourceVersion = rv },
665 bookmarkReceived,
666 r.clock, make(chan error), stopCh)
667 if err != nil {
668 w.Stop()
669 if err == errorStopRequested {
670 return nil, nil
671 }
672 if isErrorRetriableWithSideEffectsFn(err) {
673 continue
674 }
675 return nil, err
676 }
677 if *bookmarkReceived {
678 break
679 }
680 }
681
682
683 initTrace.Step("Objects streamed", trace.Field{Key: "count", Value: len(temporaryStore.List())})
684 r.setIsLastSyncResourceVersionUnavailable(false)
685
686
687
688
689 checkWatchListConsistencyIfRequested(stopCh, r.name, resourceVersion, r.listerWatcher, temporaryStore)
690
691 if err = r.store.Replace(temporaryStore.List(), resourceVersion); err != nil {
692 return nil, fmt.Errorf("unable to sync watch-list result: %v", err)
693 }
694 initTrace.Step("SyncWith done")
695 r.setLastSyncResourceVersion(resourceVersion)
696
697 return w, nil
698 }
699
700
701 func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
702 found := make([]interface{}, 0, len(items))
703 for _, item := range items {
704 found = append(found, item)
705 }
706 return r.store.Replace(found, resourceVersion)
707 }
708
709
710 func watchHandler(start time.Time,
711 w watch.Interface,
712 store Store,
713 expectedType reflect.Type,
714 expectedGVK *schema.GroupVersionKind,
715 name string,
716 expectedTypeName string,
717 setLastSyncResourceVersion func(string),
718 exitOnInitialEventsEndBookmark *bool,
719 clock clock.Clock,
720 errc chan error,
721 stopCh <-chan struct{},
722 ) error {
723 eventCount := 0
724 if exitOnInitialEventsEndBookmark != nil {
725
726
727 *exitOnInitialEventsEndBookmark = false
728 }
729
730 loop:
731 for {
732 select {
733 case <-stopCh:
734 return errorStopRequested
735 case err := <-errc:
736 return err
737 case event, ok := <-w.ResultChan():
738 if !ok {
739 break loop
740 }
741 if event.Type == watch.Error {
742 return apierrors.FromObject(event.Object)
743 }
744 if expectedType != nil {
745 if e, a := expectedType, reflect.TypeOf(event.Object); e != a {
746 utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", name, e, a))
747 continue
748 }
749 }
750 if expectedGVK != nil {
751 if e, a := *expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
752 utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", name, e, a))
753 continue
754 }
755 }
756 meta, err := meta.Accessor(event.Object)
757 if err != nil {
758 utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event))
759 continue
760 }
761 resourceVersion := meta.GetResourceVersion()
762 switch event.Type {
763 case watch.Added:
764 err := store.Add(event.Object)
765 if err != nil {
766 utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", name, event.Object, err))
767 }
768 case watch.Modified:
769 err := store.Update(event.Object)
770 if err != nil {
771 utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", name, event.Object, err))
772 }
773 case watch.Deleted:
774
775
776
777 err := store.Delete(event.Object)
778 if err != nil {
779 utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", name, event.Object, err))
780 }
781 case watch.Bookmark:
782
783 if meta.GetAnnotations()["k8s.io/initial-events-end"] == "true" {
784 if exitOnInitialEventsEndBookmark != nil {
785 *exitOnInitialEventsEndBookmark = true
786 }
787 }
788 default:
789 utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event))
790 }
791 setLastSyncResourceVersion(resourceVersion)
792 if rvu, ok := store.(ResourceVersionUpdater); ok {
793 rvu.UpdateResourceVersion(resourceVersion)
794 }
795 eventCount++
796 if exitOnInitialEventsEndBookmark != nil && *exitOnInitialEventsEndBookmark {
797 watchDuration := clock.Since(start)
798 klog.V(4).Infof("exiting %v Watch because received the bookmark that marks the end of initial events stream, total %v items received in %v", name, eventCount, watchDuration)
799 return nil
800 }
801 }
802 }
803
804 watchDuration := clock.Since(start)
805 if watchDuration < 1*time.Second && eventCount == 0 {
806 return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", name)
807 }
808 klog.V(4).Infof("%s: Watch close - %v total %v items received", name, expectedTypeName, eventCount)
809 return nil
810 }
811
812
813
814 func (r *Reflector) LastSyncResourceVersion() string {
815 r.lastSyncResourceVersionMutex.RLock()
816 defer r.lastSyncResourceVersionMutex.RUnlock()
817 return r.lastSyncResourceVersion
818 }
819
820 func (r *Reflector) setLastSyncResourceVersion(v string) {
821 r.lastSyncResourceVersionMutex.Lock()
822 defer r.lastSyncResourceVersionMutex.Unlock()
823 r.lastSyncResourceVersion = v
824 }
825
826
827
828
829
830
831 func (r *Reflector) relistResourceVersion() string {
832 r.lastSyncResourceVersionMutex.RLock()
833 defer r.lastSyncResourceVersionMutex.RUnlock()
834
835 if r.isLastSyncResourceVersionUnavailable {
836
837
838
839 return ""
840 }
841 if r.lastSyncResourceVersion == "" {
842
843
844 return "0"
845 }
846 return r.lastSyncResourceVersion
847 }
848
849
850 func (r *Reflector) rewatchResourceVersion() string {
851 r.lastSyncResourceVersionMutex.RLock()
852 defer r.lastSyncResourceVersionMutex.RUnlock()
853 if r.isLastSyncResourceVersionUnavailable {
854
855
856 return ""
857 }
858 return r.lastSyncResourceVersion
859 }
860
861
862
863 func (r *Reflector) setIsLastSyncResourceVersionUnavailable(isUnavailable bool) {
864 r.lastSyncResourceVersionMutex.Lock()
865 defer r.lastSyncResourceVersionMutex.Unlock()
866 r.isLastSyncResourceVersionUnavailable = isUnavailable
867 }
868
869 func isExpiredError(err error) bool {
870
871
872
873
874 return apierrors.IsResourceExpired(err) || apierrors.IsGone(err)
875 }
876
877 func isTooLargeResourceVersionError(err error) bool {
878 if apierrors.HasStatusCause(err, metav1.CauseTypeResourceVersionTooLarge) {
879 return true
880 }
881
882
883
884
885
886 if !apierrors.IsTimeout(err) {
887 return false
888 }
889 apierr, ok := err.(apierrors.APIStatus)
890 if !ok || apierr == nil || apierr.Status().Details == nil {
891 return false
892 }
893 for _, cause := range apierr.Status().Details.Causes {
894
895 if cause.Message == "Too large resource version" {
896 return true
897 }
898 }
899
900
901 if strings.Contains(apierr.Status().Message, "Too large resource version") {
902 return true
903 }
904
905 return false
906 }
907
908
909
910 func isWatchErrorRetriable(err error) bool {
911
912
913
914
915
916 if utilnet.IsConnectionRefused(err) || apierrors.IsTooManyRequests(err) {
917 return true
918 }
919 return false
920 }
921
View as plain text