...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package controllers
16
17 import (
18 "context"
19
20 customizev1alpha1 "github.com/GoogleCloudPlatform/k8s-config-connector/operator/pkg/apis/core/customize/v1alpha1"
21 corekcck8s "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/k8s"
22
23 "github.com/go-logr/logr"
24 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
25 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
26 "k8s.io/apimachinery/pkg/runtime/schema"
27 "k8s.io/apimachinery/pkg/types"
28 "k8s.io/apimachinery/pkg/watch"
29 "k8s.io/client-go/dynamic"
30 "sigs.k8s.io/controller-runtime/pkg/event"
31 )
32
33 var (
34
35 CustomizationCRDsToWatch = []schema.GroupVersionResource{
36 corekcck8s.ToGVR(customizev1alpha1.ControllerResourceGroupVersionKind),
37 }
38 )
39
40
41 type CustomizationWatcher struct {
42
43 triggerGVRs []schema.GroupVersionResource
44 targetNN types.NamespacedName
45
46 watchRegistered map[string]struct{}
47
48
49 events chan event.GenericEvent
50 dynamicClient dynamic.Interface
51 log logr.Logger
52 }
53
54 func NewWithDynamicClient(dc dynamic.Interface, gvrs []schema.GroupVersionResource, logger logr.Logger) *CustomizationWatcher {
55 return &CustomizationWatcher{
56 dynamicClient: dc,
57 triggerGVRs: gvrs,
58 watchRegistered: make(map[string]struct{}),
59 events: make(chan event.GenericEvent),
60 log: logger.WithName("customization-watcher"),
61 }
62 }
63
64
65 func (w *CustomizationWatcher) Events() chan event.GenericEvent {
66 return w.events
67 }
68
69
70 func (w *CustomizationWatcher) EnsureWatchStarted(ctx context.Context, nn types.NamespacedName) error {
71 w.targetNN = nn
72 for _, gvr := range w.triggerGVRs {
73 if _, started := w.watchRegistered[gvr.String()]; started {
74 continue
75 }
76 w.log.Info("starting watch", "trigger GVR", gvr.String(), "target NamespacedName", nn)
77 go w.startWatch(ctx, gvr)
78 }
79 return nil
80 }
81
82
83 func (w *CustomizationWatcher) startWatch(ctx context.Context, triggerGVR schema.GroupVersionResource) {
84 log := w.log.WithValues("trigger GVR", triggerGVR, "target NamespacedName", w.targetNN)
85 opts := metav1.ListOptions{AllowWatchBookmarks: true}
86 triggerEvents, err := w.dynamicClient.Resource(triggerGVR).Namespace(w.targetNN.Namespace).Watch(ctx, opts)
87 if err != nil {
88 log.Error(err, "failed to start watch")
89 return
90 }
91 log.Info("watch started")
92 w.watchRegistered[triggerGVR.String()] = struct{}{}
93 defer func() {
94 triggerEvents.Stop()
95 delete(w.watchRegistered, triggerGVR.String())
96 }()
97 for {
98 select {
99 case <-ctx.Done():
100 log.Info("watch context cancelled")
101 return
102 case triggerEvent, ok := <-triggerEvents.ResultChan():
103 if !ok {
104 log.Info("watch channel closed")
105 return
106 }
107 switch triggerEvent.Type {
108 case watch.Bookmark:
109 continue
110 case watch.Error:
111 log.Error(err, "unexpected error from watch")
112 }
113 genEvent := event.GenericEvent{}
114 genEvent.Object = &unstructured.Unstructured{}
115 genEvent.Object.SetNamespace(w.targetNN.Namespace)
116 genEvent.Object.SetName(w.targetNN.Name)
117 w.events <- genEvent
118 }
119 }
120 }
121
View as plain text