1
2
3
4 package table
5
6 import (
7 "fmt"
8 "sort"
9 "sync"
10
11 "k8s.io/klog/v2"
12 "sigs.k8s.io/cli-utils/pkg/apply/event"
13 pe "sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
14 "sigs.k8s.io/cli-utils/pkg/kstatus/status"
15 "sigs.k8s.io/cli-utils/pkg/object"
16 "sigs.k8s.io/cli-utils/pkg/object/validation"
17 "sigs.k8s.io/cli-utils/pkg/print/stats"
18 "sigs.k8s.io/cli-utils/pkg/print/table"
19 )
20
21 const InvalidStatus status.Status = "Invalid"
22
23 func newResourceStateCollector(resourceGroups []event.ActionGroup) *resourceStateCollector {
24 resourceInfos := make(map[object.ObjMetadata]*resourceInfo)
25 for _, group := range resourceGroups {
26 action := group.Action
27
28
29 if action == event.WaitAction {
30 continue
31 }
32 for _, identifier := range group.Identifiers {
33 resourceInfos[identifier] = &resourceInfo{
34 identifier: identifier,
35 resourceStatus: &pe.ResourceStatus{
36 Identifier: identifier,
37 Status: status.UnknownStatus,
38 },
39 ResourceAction: action,
40 }
41 }
42 }
43 return &resourceStateCollector{
44 resourceInfos: resourceInfos,
45 }
46 }
47
48
49
50
51
52
53 type resourceStateCollector struct {
54 mux sync.RWMutex
55
56
57
58
59 resourceInfos map[object.ObjMetadata]*resourceInfo
60
61
62 stats stats.Stats
63
64 err error
65 }
66
67
68
69
70 type resourceInfo struct {
71
72
73 identifier object.ObjMetadata
74
75
76
77 resourceStatus *pe.ResourceStatus
78
79
80
81
82 ResourceAction event.ResourceAction
83
84
85
86 Error error
87
88
89
90 ApplyStatus event.ApplyEventStatus
91
92
93
94 PruneStatus event.PruneEventStatus
95
96
97
98 DeleteStatus event.DeleteEventStatus
99
100
101
102 WaitStatus event.WaitEventStatus
103 }
104
105
106 func (r *resourceInfo) Identifier() object.ObjMetadata {
107 return r.identifier
108 }
109
110
111
112 func (r *resourceInfo) ResourceStatus() *pe.ResourceStatus {
113 return r.resourceStatus
114 }
115
116
117
118 func (r *resourceInfo) SubResources() []table.Resource {
119 var resources []table.Resource
120 for _, res := range r.resourceStatus.GeneratedResources {
121 resources = append(resources, &subResourceInfo{
122 resourceStatus: res,
123 })
124 }
125 return resources
126 }
127
128
129
130
131
132 type subResourceInfo struct {
133
134
135 resourceStatus *pe.ResourceStatus
136 }
137
138
139 func (r *subResourceInfo) Identifier() object.ObjMetadata {
140 return r.resourceStatus.Identifier
141 }
142
143
144
145 func (r *subResourceInfo) ResourceStatus() *pe.ResourceStatus {
146 return r.resourceStatus
147 }
148
149
150
151 func (r *subResourceInfo) SubResources() []table.Resource {
152 var resources []table.Resource
153 for _, res := range r.resourceStatus.GeneratedResources {
154 resources = append(resources, &subResourceInfo{
155 resourceStatus: res,
156 })
157 }
158 return resources
159 }
160
161
162
163
164
165
166
167
168 func (r *resourceStateCollector) Listen(eventChannel <-chan event.Event) <-chan listenerResult {
169 completed := make(chan listenerResult)
170 go func() {
171 defer close(completed)
172 for ev := range eventChannel {
173 if err := r.processEvent(ev); err != nil {
174 completed <- listenerResult{err: err}
175 return
176 }
177 }
178 }()
179 return completed
180 }
181
182 type listenerResult struct {
183 err error
184 }
185
186
187 func (r *resourceStateCollector) processEvent(ev event.Event) error {
188 r.mux.Lock()
189 defer r.mux.Unlock()
190 switch ev.Type {
191 case event.ValidationType:
192 return r.processValidationEvent(ev.ValidationEvent)
193 case event.StatusType:
194 r.processStatusEvent(ev.StatusEvent)
195 case event.ApplyType:
196 r.processApplyEvent(ev.ApplyEvent)
197 case event.PruneType:
198 r.processPruneEvent(ev.PruneEvent)
199 case event.DeleteType:
200 r.processDeleteEvent(ev.DeleteEvent)
201 case event.WaitType:
202 r.processWaitEvent(ev.WaitEvent)
203 case event.ErrorType:
204 return ev.ErrorEvent.Err
205 }
206 return nil
207 }
208
209
210
211 func (r *resourceStateCollector) processValidationEvent(e event.ValidationEvent) error {
212 klog.V(7).Infoln("processing validation event")
213
214 err := e.Error
215 if vErr, ok := err.(*validation.Error); ok {
216 err = vErr.Unwrap()
217 }
218 if len(e.Identifiers) == 0 {
219
220 return fmt.Errorf("invalid validation event: no identifiers: %w", err)
221 }
222 for _, id := range e.Identifiers {
223 previous, found := r.resourceInfos[id]
224 if !found {
225 klog.V(4).Infof("%s status event not found in ResourceInfos; no processing", id)
226 continue
227 }
228 previous.resourceStatus = &pe.ResourceStatus{
229 Identifier: id,
230 Status: InvalidStatus,
231 Message: e.Error.Error(),
232 }
233 }
234 return nil
235 }
236
237
238
239 func (r *resourceStateCollector) processStatusEvent(e event.StatusEvent) {
240 klog.V(7).Infoln("processing status event")
241 previous, found := r.resourceInfos[e.Identifier]
242 if !found {
243 klog.V(4).Infof("%s status event not found in ResourceInfos; no processing", e.Identifier)
244 return
245 }
246 previous.resourceStatus = e.PollResourceInfo
247 }
248
249
250 func (r *resourceStateCollector) processApplyEvent(e event.ApplyEvent) {
251 identifier := e.Identifier
252 klog.V(7).Infof("processing apply event for %s", identifier)
253 previous, found := r.resourceInfos[identifier]
254 if !found {
255 klog.V(4).Infof("%s apply event not found in ResourceInfos; no processing", identifier)
256 return
257 }
258 if e.Error != nil {
259 previous.Error = e.Error
260 }
261 previous.ApplyStatus = e.Status
262 r.stats.ApplyStats.Inc(e.Status)
263 }
264
265
266 func (r *resourceStateCollector) processPruneEvent(e event.PruneEvent) {
267 identifier := e.Identifier
268 klog.V(7).Infof("processing prune event for %s", identifier)
269 previous, found := r.resourceInfos[identifier]
270 if !found {
271 klog.V(4).Infof("%s prune event not found in ResourceInfos; no processing", identifier)
272 return
273 }
274 if e.Error != nil {
275 previous.Error = e.Error
276 }
277 previous.PruneStatus = e.Status
278 r.stats.PruneStats.Inc(e.Status)
279 }
280
281
282 func (r *resourceStateCollector) processDeleteEvent(e event.DeleteEvent) {
283 identifier := e.Identifier
284 klog.V(7).Infof("processing delete event for %s", identifier)
285 previous, found := r.resourceInfos[identifier]
286 if !found {
287 klog.V(4).Infof("%s delete event not found in ResourceInfos; no processing", identifier)
288 return
289 }
290 if e.Error != nil {
291 previous.Error = e.Error
292 }
293 previous.DeleteStatus = e.Status
294 r.stats.DeleteStats.Inc(e.Status)
295 }
296
297
298 func (r *resourceStateCollector) processWaitEvent(e event.WaitEvent) {
299 identifier := e.Identifier
300 klog.V(7).Infof("processing wait event for %s", identifier)
301 previous, found := r.resourceInfos[identifier]
302 if !found {
303 klog.V(4).Infof("%s wait event not found in ResourceInfos; no processing", identifier)
304 return
305 }
306 previous.WaitStatus = e.Status
307 r.stats.WaitStats.Inc(e.Status)
308 }
309
310
311 type ResourceState struct {
312 resourceInfos ResourceInfos
313
314 err error
315 }
316
317
318
319 func (r *ResourceState) Resources() []table.Resource {
320 var resources []table.Resource
321 for _, res := range r.resourceInfos {
322 resources = append(resources, res)
323 }
324 return resources
325 }
326
327 func (r *ResourceState) Error() error {
328 return r.err
329 }
330
331
332
333 func (r *resourceStateCollector) LatestState() *ResourceState {
334 r.mux.RLock()
335 defer r.mux.RUnlock()
336
337 var resourceInfos ResourceInfos
338 for _, ri := range r.resourceInfos {
339 resourceInfos = append(resourceInfos, &resourceInfo{
340 identifier: ri.identifier,
341 resourceStatus: ri.resourceStatus,
342 ResourceAction: ri.ResourceAction,
343 ApplyStatus: ri.ApplyStatus,
344 PruneStatus: ri.PruneStatus,
345 DeleteStatus: ri.DeleteStatus,
346 WaitStatus: ri.WaitStatus,
347 })
348 }
349 sort.Sort(resourceInfos)
350
351 return &ResourceState{
352 resourceInfos: resourceInfos,
353 err: r.err,
354 }
355 }
356
357 type ResourceInfos []*resourceInfo
358
359 func (g ResourceInfos) Len() int {
360 return len(g)
361 }
362
363 func (g ResourceInfos) Less(i, j int) bool {
364 idI := g[i].identifier
365 idJ := g[j].identifier
366
367 if idI.Namespace != idJ.Namespace {
368 return idI.Namespace < idJ.Namespace
369 }
370 if idI.GroupKind.Group != idJ.GroupKind.Group {
371 return idI.GroupKind.Group < idJ.GroupKind.Group
372 }
373 if idI.GroupKind.Kind != idJ.GroupKind.Kind {
374 return idI.GroupKind.Kind < idJ.GroupKind.Kind
375 }
376 return idI.Name < idJ.Name
377 }
378
379 func (g ResourceInfos) Swap(i, j int) {
380 g[i], g[j] = g[j], g[i]
381 }
382
View as plain text