1
2
3
4 package watcher
5
6 import (
7 "context"
8 "errors"
9 "fmt"
10 "io"
11 "sync"
12 "time"
13
14 apierrors "k8s.io/apimachinery/pkg/api/errors"
15 "k8s.io/apimachinery/pkg/api/meta"
16 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
17 "k8s.io/apimachinery/pkg/runtime/schema"
18 "k8s.io/apimachinery/pkg/util/wait"
19 "k8s.io/client-go/tools/cache"
20 "k8s.io/klog/v2"
21 "k8s.io/utils/clock"
22 "sigs.k8s.io/cli-utils/pkg/kstatus/polling/engine"
23 "sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
24 "sigs.k8s.io/cli-utils/pkg/kstatus/status"
25 "sigs.k8s.io/cli-utils/pkg/object"
26 )
27
28
29
30
31 type GroupKindNamespace struct {
32 Group string
33 Kind string
34 Namespace string
35 }
36
37
38 func (gkn GroupKindNamespace) String() string {
39 return fmt.Sprintf("%s/%s/namespaces/%s",
40 gkn.Group, gkn.Kind, gkn.Namespace)
41 }
42
43 func (gkn GroupKindNamespace) GroupKind() schema.GroupKind {
44 return schema.GroupKind{Group: gkn.Group, Kind: gkn.Kind}
45 }
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70 type ObjectStatusReporter struct {
71
72 InformerFactory *DynamicInformerFactory
73
74
75 Mapper meta.RESTMapper
76
77
78
79
80 StatusReader engine.StatusReader
81
82
83
84
85
86 ClusterReader engine.ClusterReader
87
88
89 Targets []GroupKindNamespace
90
91
92 ObjectFilter ObjectFilter
93
94
95
96
97 RESTScope meta.RESTScope
98
99
100 lock sync.Mutex
101
102
103 gk2gkn map[schema.GroupKind]map[GroupKindNamespace]struct{}
104
105
106 ns2gkn map[string]map[GroupKindNamespace]struct{}
107
108
109 informerRefs map[GroupKindNamespace]*informerReference
110
111
112 context context.Context
113
114
115
116 cancel context.CancelFunc
117
118
119
120 funnel *eventFunnel
121
122
123 taskManager *taskManager
124
125 started bool
126 stopped bool
127 }
128
129 func (w *ObjectStatusReporter) Start(ctx context.Context) <-chan event.Event {
130 w.lock.Lock()
131 defer w.lock.Unlock()
132
133 if w.started {
134 panic("ObjectStatusInformer cannot be restarted")
135 }
136
137 w.taskManager = &taskManager{}
138
139
140
141
142 w.gk2gkn = make(map[schema.GroupKind]map[GroupKindNamespace]struct{})
143 for _, gkn := range w.Targets {
144 gk := gkn.GroupKind()
145 m, found := w.gk2gkn[gk]
146 if !found {
147 m = make(map[GroupKindNamespace]struct{})
148 w.gk2gkn[gk] = m
149 }
150 m[gkn] = struct{}{}
151 }
152
153
154
155
156 w.ns2gkn = make(map[string]map[GroupKindNamespace]struct{})
157 for _, gkn := range w.Targets {
158 ns := gkn.Namespace
159 m, found := w.ns2gkn[ns]
160 if !found {
161 m = make(map[GroupKindNamespace]struct{})
162 w.ns2gkn[ns] = m
163 }
164 m[gkn] = struct{}{}
165 }
166
167
168
169
170 w.informerRefs = make(map[GroupKindNamespace]*informerReference, len(w.Targets))
171 for _, gkn := range w.Targets {
172 w.informerRefs[gkn] = &informerReference{}
173 }
174
175 ctx, cancel := context.WithCancel(ctx)
176 w.context = ctx
177 w.cancel = cancel
178
179
180
181
182
183 w.funnel = newEventFunnel(ctx)
184
185
186 for _, gkn := range w.Targets {
187 w.startInformer(gkn)
188 }
189
190 w.started = true
191
192
193
194
195
196 go func() {
197 <-w.funnel.Done()
198
199 w.lock.Lock()
200 defer w.lock.Unlock()
201 w.stopped = true
202 }()
203
204
205 syncEventCh := make(chan event.Event)
206 err := w.funnel.AddInputChannel(syncEventCh)
207 if err != nil {
208
209 return handleFatalError(fmt.Errorf("reporter failed to start: %v", err))
210 }
211 go func() {
212 defer close(syncEventCh)
213
214 if cache.WaitForCacheSync(ctx.Done(), w.HasSynced) {
215 syncEventCh <- event.Event{
216 Type: event.SyncEvent,
217 }
218 }
219 }()
220
221 return w.funnel.OutputChannel()
222 }
223
224
225
226 func (w *ObjectStatusReporter) Stop() {
227 klog.V(4).Info("Stopping reporter")
228 w.cancel()
229 }
230
231
232
233
234
235 func (w *ObjectStatusReporter) HasSynced() bool {
236 w.lock.Lock()
237 defer w.lock.Unlock()
238
239 if w.stopped || !w.started {
240 return false
241 }
242
243 pending := make([]GroupKindNamespace, 0, len(w.informerRefs))
244 for gke, informer := range w.informerRefs {
245 if informer.HasStarted() && !informer.HasSynced() {
246 pending = append(pending, gke)
247 }
248 }
249 if len(pending) > 0 {
250 klog.V(5).Infof("Informers pending synchronization: %v", pending)
251 return false
252 }
253 return true
254 }
255
256
257
258 func (w *ObjectStatusReporter) startInformer(gkn GroupKindNamespace) {
259 ctx, ok := w.informerRefs[gkn].Start(w.context)
260 if !ok {
261 klog.V(5).Infof("Watch start skipped (already started): %v", gkn)
262
263 return
264 }
265 go w.startInformerWithRetry(ctx, gkn)
266 }
267
268
269 func (w *ObjectStatusReporter) stopInformer(gkn GroupKindNamespace) {
270 w.informerRefs[gkn].Stop()
271 }
272
273 func (w *ObjectStatusReporter) startInformerWithRetry(ctx context.Context, gkn GroupKindNamespace) {
274 realClock := &clock.RealClock{}
275 backoffManager := wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock)
276 retryCtx, retryCancel := context.WithCancel(ctx)
277
278 wait.BackoffUntil(func() {
279 err := w.startInformerNow(
280 ctx,
281 gkn,
282 )
283 if err != nil {
284 if meta.IsNoMatchError(err) {
285
286
287 klog.V(3).Infof("Watch start error (blocking until CRD is added): %v: %v", gkn, err)
288
289 w.stopInformer(gkn)
290 return
291 }
292
293
294 eventCh := make(chan event.Event)
295 defer close(eventCh)
296 err := w.funnel.AddInputChannel(eventCh)
297 if err != nil {
298
299
300 klog.V(5).Infof("Informer failed to start: %v", err)
301 return
302 }
303
304 w.handleFatalError(eventCh, err)
305 return
306 }
307
308 retryCancel()
309 }, backoffManager, true, retryCtx.Done())
310 }
311
312
313
314
315
316 func (w *ObjectStatusReporter) startInformerNow(
317 ctx context.Context,
318 gkn GroupKindNamespace,
319 ) error {
320
321
322 gk := gkn.GroupKind()
323 mapping, err := w.Mapper.RESTMapping(gk)
324 if err != nil {
325
326 return err
327 }
328
329 informer := w.InformerFactory.NewInformer(ctx, mapping, gkn.Namespace)
330
331 w.informerRefs[gkn].SetInformer(informer)
332
333 eventCh := make(chan event.Event)
334
335
336 err = w.funnel.AddInputChannel(eventCh)
337 if err != nil {
338
339 return fmt.Errorf("informer failed to build event handler: %w", err)
340 }
341
342
343
344 err = informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
345 w.watchErrorHandler(gkn, eventCh, err)
346 })
347 if err != nil {
348
349
350 return fmt.Errorf("failed to set error handler on new informer for %v: %v", mapping.Resource, err)
351 }
352
353 informer.AddEventHandler(w.eventHandler(ctx, eventCh))
354
355
356
357 go func() {
358 klog.V(3).Infof("Watch starting: %v", gkn)
359 informer.Run(ctx.Done())
360 klog.V(3).Infof("Watch stopped: %v", gkn)
361
362 close(eventCh)
363 }()
364
365 return nil
366 }
367
368 func (w *ObjectStatusReporter) forEachTargetWithGroupKind(gk schema.GroupKind, fn func(GroupKindNamespace)) {
369 for gkn := range w.gk2gkn[gk] {
370 fn(gkn)
371 }
372 }
373
374 func (w *ObjectStatusReporter) forEachTargetWithNamespace(ns string, fn func(GroupKindNamespace)) {
375 for gkn := range w.ns2gkn[ns] {
376 fn(gkn)
377 }
378 }
379
380
381
382 func (w *ObjectStatusReporter) readStatusFromObject(
383 ctx context.Context,
384 obj *unstructured.Unstructured,
385 ) (*event.ResourceStatus, error) {
386 return w.StatusReader.ReadStatusForObject(ctx, w.ClusterReader, obj)
387 }
388
389
390
391
392 func (w *ObjectStatusReporter) readStatusFromCluster(
393 ctx context.Context,
394 id object.ObjMetadata,
395 ) (*event.ResourceStatus, error) {
396 return w.StatusReader.ReadStatus(ctx, w.ClusterReader, id)
397 }
398
399
400
401
402
403
404 func deletedStatus(id object.ObjMetadata) *event.ResourceStatus {
405
406
407 result := &event.ResourceStatus{
408 Identifier: id,
409 Status: status.NotFoundStatus,
410 Message: "Resource not found",
411 }
412
413 return &event.ResourceStatus{
414 Identifier: id,
415 Resource: nil,
416 Status: result.Status,
417 Message: result.Message,
418
419
420
421 GeneratedResources: nil,
422 }
423 }
424
425
426
427 func (w *ObjectStatusReporter) eventHandler(
428 ctx context.Context,
429 eventCh chan<- event.Event,
430 ) cache.ResourceEventHandler {
431 var handler cache.ResourceEventHandlerFuncs
432
433 handler.AddFunc = func(iobj interface{}) {
434
435 if ctx.Err() != nil {
436 return
437 }
438
439 obj, ok := iobj.(*unstructured.Unstructured)
440 if !ok {
441 panic(fmt.Sprintf("AddFunc received unexpected object type %T", iobj))
442 }
443 id := object.UnstructuredToObjMetadata(obj)
444 if w.ObjectFilter.Filter(obj) {
445 klog.V(7).Infof("Watch Event Skipped: AddFunc: %s", id)
446 return
447 }
448 klog.V(5).Infof("AddFunc: Computing status for object: %s", id)
449
450
451 w.taskManager.Cancel(id)
452
453 rs, err := w.readStatusFromObject(ctx, obj)
454 if err != nil {
455
456 w.handleFatalError(eventCh, fmt.Errorf("failed to compute object status: %s: %w", id, err))
457 return
458 }
459
460 if object.IsNamespace(obj) {
461 klog.V(5).Infof("AddFunc: Namespace added: %v", id)
462 w.onNamespaceAdd(obj)
463 } else if object.IsCRD(obj) {
464 klog.V(5).Infof("AddFunc: CRD added: %v", id)
465 w.onCRDAdd(obj)
466 }
467
468 if isObjectUnschedulable(rs) {
469 klog.V(5).Infof("AddFunc: object unschedulable: %v", id)
470
471 w.taskManager.Schedule(ctx, id, status.ScheduleWindow,
472 w.newStatusCheckTaskFunc(ctx, eventCh, id))
473 }
474
475 klog.V(7).Infof("AddFunc: sending update event: %v", rs)
476 eventCh <- event.Event{
477 Type: event.ResourceUpdateEvent,
478 Resource: rs,
479 }
480 }
481
482 handler.UpdateFunc = func(_, iobj interface{}) {
483
484 if ctx.Err() != nil {
485 return
486 }
487
488 obj, ok := iobj.(*unstructured.Unstructured)
489 if !ok {
490 panic(fmt.Sprintf("UpdateFunc received unexpected object type %T", iobj))
491 }
492 id := object.UnstructuredToObjMetadata(obj)
493 if w.ObjectFilter.Filter(obj) {
494 klog.V(7).Infof("UpdateFunc: Watch Event Skipped: %s", id)
495 return
496 }
497 klog.V(5).Infof("UpdateFunc: Computing status for object: %s", id)
498
499
500 w.taskManager.Cancel(id)
501
502 rs, err := w.readStatusFromObject(ctx, obj)
503 if err != nil {
504
505 w.handleFatalError(eventCh, fmt.Errorf("failed to compute object status: %s: %w", id, err))
506 return
507 }
508
509 if object.IsNamespace(obj) {
510 klog.V(5).Infof("UpdateFunc: Namespace updated: %v", id)
511 w.onNamespaceUpdate(obj)
512 } else if object.IsCRD(obj) {
513 klog.V(5).Infof("UpdateFunc: CRD updated: %v", id)
514 w.onCRDUpdate(obj)
515 }
516
517 if isObjectUnschedulable(rs) {
518 klog.V(5).Infof("UpdateFunc: object unschedulable: %v", id)
519
520 w.taskManager.Schedule(ctx, id, status.ScheduleWindow,
521 w.newStatusCheckTaskFunc(ctx, eventCh, id))
522 }
523
524 klog.V(7).Infof("UpdateFunc: sending update event: %v", rs)
525 eventCh <- event.Event{
526 Type: event.ResourceUpdateEvent,
527 Resource: rs,
528 }
529 }
530
531 handler.DeleteFunc = func(iobj interface{}) {
532
533 if ctx.Err() != nil {
534 return
535 }
536
537 if tombstone, ok := iobj.(cache.DeletedFinalStateUnknown); ok {
538
539
540 iobj = tombstone.Obj
541 }
542 obj, ok := iobj.(*unstructured.Unstructured)
543 if !ok {
544 panic(fmt.Sprintf("DeleteFunc received unexpected object type %T", iobj))
545 }
546 id := object.UnstructuredToObjMetadata(obj)
547 if w.ObjectFilter.Filter(obj) {
548 klog.V(7).Infof("DeleteFunc: Watch Event Skipped: %s", id)
549 return
550 }
551 klog.V(5).Infof("DeleteFunc: Computing status for object: %s", id)
552
553
554 w.taskManager.Cancel(id)
555
556 if object.IsNamespace(obj) {
557 klog.V(5).Infof("DeleteFunc: Namespace deleted: %v", id)
558 w.onNamespaceDelete(obj)
559 } else if object.IsCRD(obj) {
560 klog.V(5).Infof("DeleteFunc: CRD deleted: %v", id)
561 w.onCRDDelete(obj)
562 }
563
564 rs := deletedStatus(id)
565 klog.V(7).Infof("DeleteFunc: sending update event: %v", rs)
566 eventCh <- event.Event{
567 Type: event.ResourceUpdateEvent,
568 Resource: rs,
569 }
570 }
571
572 return handler
573 }
574
575
576 func (w *ObjectStatusReporter) onCRDAdd(obj *unstructured.Unstructured) {
577 gk, found := object.GetCRDGroupKind(obj)
578 if !found {
579 id := object.UnstructuredToObjMetadata(obj)
580 klog.Warningf("Invalid CRD added: missing group and/or kind: %v", id)
581
582
583 return
584 }
585 klog.V(3).Infof("CRD added for %s", gk)
586
587 klog.V(3).Info("Resetting RESTMapper")
588
589 meta.MaybeResetRESTMapper(w.Mapper)
590
591 w.forEachTargetWithGroupKind(gk, func(gkn GroupKindNamespace) {
592 w.startInformer(gkn)
593 })
594 }
595
596
597 func (w *ObjectStatusReporter) onCRDUpdate(newObj *unstructured.Unstructured) {
598 gk, found := object.GetCRDGroupKind(newObj)
599 if !found {
600 id := object.UnstructuredToObjMetadata(newObj)
601 klog.Warningf("Invalid CRD updated: missing group and/or kind: %v", id)
602
603
604 return
605 }
606 klog.V(3).Infof("CRD updated for %s", gk)
607
608 klog.V(3).Info("Resetting RESTMapper")
609
610 meta.MaybeResetRESTMapper(w.Mapper)
611
612 w.forEachTargetWithGroupKind(gk, func(gkn GroupKindNamespace) {
613 w.startInformer(gkn)
614 })
615 }
616
617
618 func (w *ObjectStatusReporter) onCRDDelete(oldObj *unstructured.Unstructured) {
619 gk, found := object.GetCRDGroupKind(oldObj)
620 if !found {
621 id := object.UnstructuredToObjMetadata(oldObj)
622 klog.Warningf("Invalid CRD deleted: missing group and/or kind: %v", id)
623
624
625 return
626 }
627 klog.V(3).Infof("CRD deleted for %s", gk)
628
629 w.forEachTargetWithGroupKind(gk, func(gkn GroupKindNamespace) {
630 w.stopInformer(gkn)
631 })
632
633 klog.V(3).Info("Resetting RESTMapper")
634
635 meta.MaybeResetRESTMapper(w.Mapper)
636 }
637
638
639 func (w *ObjectStatusReporter) onNamespaceAdd(obj *unstructured.Unstructured) {
640 if w.RESTScope == meta.RESTScopeRoot {
641
642
643
644 return
645 }
646 namespace := obj.GetName()
647 w.forEachTargetWithNamespace(namespace, func(gkn GroupKindNamespace) {
648 w.startInformer(gkn)
649 })
650 }
651
652
653 func (w *ObjectStatusReporter) onNamespaceUpdate(obj *unstructured.Unstructured) {
654 if w.RESTScope == meta.RESTScopeRoot {
655
656
657
658 return
659 }
660 namespace := obj.GetName()
661 w.forEachTargetWithNamespace(namespace, func(gkn GroupKindNamespace) {
662 w.startInformer(gkn)
663 })
664 }
665
666
667 func (w *ObjectStatusReporter) onNamespaceDelete(obj *unstructured.Unstructured) {
668 if w.RESTScope == meta.RESTScopeRoot {
669
670
671
672 return
673 }
674 namespace := obj.GetName()
675 w.forEachTargetWithNamespace(namespace, func(gkn GroupKindNamespace) {
676 w.stopInformer(gkn)
677 })
678 }
679
680
681
682
683
684
685 func (w *ObjectStatusReporter) newStatusCheckTaskFunc(
686 ctx context.Context,
687 eventCh chan<- event.Event,
688 id object.ObjMetadata,
689 ) taskFunc {
690 return func() {
691 klog.V(5).Infof("Re-reading object status: %s", status.ScheduleWindow, id)
692
693 rs, err := w.readStatusFromCluster(ctx, id)
694 if err != nil {
695
696
697 w.handleFatalError(eventCh, err)
698 return
699 }
700 eventCh <- event.Event{
701 Type: event.ResourceUpdateEvent,
702 Resource: rs,
703 }
704 }
705 }
706
707 func (w *ObjectStatusReporter) handleFatalError(eventCh chan<- event.Event, err error) {
708 klog.V(5).Infof("Reporter error: %v", err)
709 if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
710 return
711 }
712 eventCh <- event.Event{
713 Type: event.ErrorEvent,
714 Error: err,
715 }
716 w.Stop()
717 }
718
719
720
721
722 func (w *ObjectStatusReporter) watchErrorHandler(gkn GroupKindNamespace, eventCh chan<- event.Event, err error) {
723 switch {
724
725 case err == io.EOF:
726 klog.V(5).Infof("ListAndWatch error (termination expected): %v: %v", gkn, err)
727
728
729 case err == io.ErrUnexpectedEOF:
730 klog.V(1).Infof("ListAndWatch error (retry expected): %v: %v", gkn, err)
731
732
733 case errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded):
734 klog.V(5).Infof("ListAndWatch error (termination expected): %v: %v", gkn, err)
735
736
737 case apierrors.IsResourceExpired(err):
738
739 klog.V(5).Infof("ListAndWatch error (retry expected): %v: %v", gkn, err)
740
741
742 case apierrors.IsGone(err):
743 klog.V(5).Infof("ListAndWatch error (retry expected): %v: %v", gkn, err)
744
745
746 case apierrors.IsNotFound(err):
747 klog.V(3).Infof("ListAndWatch error (termination expected): %v: stopping all informers for this GroupKind: %v", gkn, err)
748 w.forEachTargetWithGroupKind(gkn.GroupKind(), func(gkn GroupKindNamespace) {
749 w.stopInformer(gkn)
750 })
751
752
753 case apierrors.IsForbidden(err):
754 klog.V(3).Infof("ListAndWatch error (termination expected): %v: stopping all informers: %v", gkn, err)
755 w.handleFatalError(eventCh, err)
756
757
758 default:
759 klog.Warningf("ListAndWatch error (retry expected): %v: %v", gkn, err)
760 }
761 }
762
763
764 type informerReference struct {
765
766 lock sync.Mutex
767
768 informer cache.SharedIndexInformer
769 context context.Context
770 cancel context.CancelFunc
771 started bool
772 }
773
774
775
776 func (ir *informerReference) Start(ctx context.Context) (context.Context, bool) {
777 ir.lock.Lock()
778 defer ir.lock.Unlock()
779
780 if ir.started {
781 return nil, false
782 }
783
784 ctx, cancel := context.WithCancel(ctx)
785 ir.context = ctx
786 ir.cancel = cancel
787 ir.started = true
788
789 return ctx, true
790 }
791
792 func (ir *informerReference) SetInformer(informer cache.SharedIndexInformer) {
793 ir.lock.Lock()
794 defer ir.lock.Unlock()
795
796 ir.informer = informer
797 }
798
799 func (ir *informerReference) HasSynced() bool {
800 ir.lock.Lock()
801 defer ir.lock.Unlock()
802
803 if !ir.started {
804 return false
805 }
806 if ir.informer == nil {
807 return false
808 }
809 return ir.informer.HasSynced()
810 }
811
812 func (ir *informerReference) HasStarted() bool {
813 ir.lock.Lock()
814 defer ir.lock.Unlock()
815
816 return ir.started
817 }
818
819
820 func (ir *informerReference) Stop() {
821 ir.lock.Lock()
822 defer ir.lock.Unlock()
823
824 if !ir.started {
825 return
826 }
827
828 ir.cancel()
829 ir.started = false
830 ir.context = nil
831 }
832
833 type taskFunc func()
834
835
836
837 type taskManager struct {
838 lock sync.Mutex
839 cancelFuncs map[object.ObjMetadata]context.CancelFunc
840 }
841
842 func (tm *taskManager) Schedule(parentCtx context.Context, id object.ObjMetadata, delay time.Duration, task taskFunc) {
843 tm.lock.Lock()
844 defer tm.lock.Unlock()
845
846 if tm.cancelFuncs == nil {
847 tm.cancelFuncs = make(map[object.ObjMetadata]context.CancelFunc)
848 }
849
850 cancel, found := tm.cancelFuncs[id]
851 if found {
852
853 cancel()
854 }
855
856 taskCtx, cancel := context.WithTimeout(context.Background(), delay)
857 tm.cancelFuncs[id] = cancel
858
859 go func() {
860 klog.V(5).Infof("Task scheduled (%v) for object (%s)", delay, id)
861 select {
862 case <-parentCtx.Done():
863
864 cancel()
865 case <-taskCtx.Done():
866 if taskCtx.Err() == context.DeadlineExceeded {
867 klog.V(5).Infof("Task executing (after %v) for object (%v)", delay, id)
868 task()
869 }
870
871 }
872 }()
873 }
874
875 func (tm *taskManager) Cancel(id object.ObjMetadata) {
876 tm.lock.Lock()
877 defer tm.lock.Unlock()
878
879 cancelFunc, found := tm.cancelFuncs[id]
880 if !found {
881
882 return
883 }
884 delete(tm.cancelFuncs, id)
885 cancelFunc()
886 if len(tm.cancelFuncs) == 0 {
887 tm.cancelFuncs = nil
888 }
889 }
890
View as plain text