...
1
2
3
4 package watcher
5
6 import (
7 "context"
8 "fmt"
9 "time"
10
11 "k8s.io/apimachinery/pkg/api/meta"
12 "k8s.io/apimachinery/pkg/runtime/schema"
13 "k8s.io/client-go/dynamic"
14 "k8s.io/klog/v2"
15 "sigs.k8s.io/cli-utils/pkg/kstatus/polling/clusterreader"
16 "sigs.k8s.io/cli-utils/pkg/kstatus/polling/engine"
17 "sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
18 "sigs.k8s.io/cli-utils/pkg/kstatus/polling/statusreaders"
19 "sigs.k8s.io/cli-utils/pkg/object"
20 )
21
22
23
24
25 type DefaultStatusWatcher struct {
26
27 DynamicClient dynamic.Interface
28
29
30 Mapper meta.RESTMapper
31
32
33
34 ResyncPeriod time.Duration
35
36
37
38
39 StatusReader engine.StatusReader
40
41
42
43
44
45 ClusterReader engine.ClusterReader
46 }
47
48 var _ StatusWatcher = &DefaultStatusWatcher{}
49
50
51
52
53 func NewDefaultStatusWatcher(dynamicClient dynamic.Interface, mapper meta.RESTMapper) *DefaultStatusWatcher {
54 return &DefaultStatusWatcher{
55 DynamicClient: dynamicClient,
56 Mapper: mapper,
57 ResyncPeriod: 1 * time.Hour,
58 StatusReader: statusreaders.NewDefaultStatusReader(mapper),
59 ClusterReader: &clusterreader.DynamicClusterReader{
60 DynamicClient: dynamicClient,
61 Mapper: mapper,
62 },
63 }
64 }
65
66
67
68
69 func (w *DefaultStatusWatcher) Watch(ctx context.Context, ids object.ObjMetadataSet, opts Options) <-chan event.Event {
70 strategy := opts.RESTScopeStrategy
71 if strategy == RESTScopeAutomatic {
72 strategy = autoSelectRESTScopeStrategy(ids)
73 }
74
75 var scope meta.RESTScope
76 var targets []GroupKindNamespace
77 switch strategy {
78 case RESTScopeRoot:
79 scope = meta.RESTScopeRoot
80 targets = rootScopeGKNs(ids)
81 klog.V(3).Infof("DynamicStatusWatcher starting in root-scoped mode (targets: %d)", len(targets))
82 case RESTScopeNamespace:
83 scope = meta.RESTScopeNamespace
84 targets = namespaceScopeGKNs(ids)
85 klog.V(3).Infof("DynamicStatusWatcher starting in namespace-scoped mode (targets: %d)", len(targets))
86 default:
87 return handleFatalError(fmt.Errorf("invalid RESTScopeStrategy: %v", strategy))
88 }
89
90 informer := &ObjectStatusReporter{
91 InformerFactory: NewDynamicInformerFactory(w.DynamicClient, w.ResyncPeriod),
92 Mapper: w.Mapper,
93 StatusReader: w.StatusReader,
94 ClusterReader: w.ClusterReader,
95 Targets: targets,
96 ObjectFilter: &AllowListObjectFilter{AllowList: ids},
97 RESTScope: scope,
98 }
99 return informer.Start(ctx)
100 }
101
102 func handleFatalError(err error) <-chan event.Event {
103 eventCh := make(chan event.Event)
104 go func() {
105 defer close(eventCh)
106 eventCh <- event.Event{
107 Type: event.ErrorEvent,
108 Error: err,
109 }
110 }()
111 return eventCh
112 }
113
114 func autoSelectRESTScopeStrategy(ids object.ObjMetadataSet) RESTScopeStrategy {
115 if len(uniqueNamespaces(ids)) > 1 {
116 return RESTScopeRoot
117 }
118 return RESTScopeNamespace
119 }
120
121 func rootScopeGKNs(ids object.ObjMetadataSet) []GroupKindNamespace {
122 gks := uniqueGKs(ids)
123 targets := make([]GroupKindNamespace, len(gks))
124 for i, gk := range gks {
125 targets[i] = GroupKindNamespace{
126 Group: gk.Group,
127 Kind: gk.Kind,
128 Namespace: "",
129 }
130 }
131 return targets
132 }
133
134 func namespaceScopeGKNs(ids object.ObjMetadataSet) []GroupKindNamespace {
135 return uniqueGKNs(ids)
136 }
137
138
139 func uniqueGKNs(ids object.ObjMetadataSet) []GroupKindNamespace {
140 gknMap := make(map[GroupKindNamespace]struct{})
141 for _, id := range ids {
142 gkn := GroupKindNamespace{Group: id.GroupKind.Group, Kind: id.GroupKind.Kind, Namespace: id.Namespace}
143 gknMap[gkn] = struct{}{}
144 }
145 gknList := make([]GroupKindNamespace, 0, len(gknMap))
146 for gk := range gknMap {
147 gknList = append(gknList, gk)
148 }
149 return gknList
150 }
151
152
153 func uniqueGKs(ids object.ObjMetadataSet) []schema.GroupKind {
154 gkMap := make(map[schema.GroupKind]struct{})
155 for _, id := range ids {
156 gkn := schema.GroupKind{Group: id.GroupKind.Group, Kind: id.GroupKind.Kind}
157 gkMap[gkn] = struct{}{}
158 }
159 gkList := make([]schema.GroupKind, 0, len(gkMap))
160 for gk := range gkMap {
161 gkList = append(gkList, gk)
162 }
163 return gkList
164 }
165
166 func uniqueNamespaces(ids object.ObjMetadataSet) []string {
167 nsMap := make(map[string]struct{})
168 for _, id := range ids {
169 nsMap[id.Namespace] = struct{}{}
170 }
171 nsList := make([]string, 0, len(nsMap))
172 for ns := range nsMap {
173 nsList = append(nsList, ns)
174 }
175 return nsList
176 }
177
View as plain text