...
1
2
3
4 package collector
5
6 import (
7 "sort"
8 "sync"
9
10 "sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
11 "sigs.k8s.io/cli-utils/pkg/kstatus/status"
12 "sigs.k8s.io/cli-utils/pkg/object"
13 )
14
15 func NewResourceStatusCollector(identifiers object.ObjMetadataSet) *ResourceStatusCollector {
16 resourceStatuses := make(map[object.ObjMetadata]*event.ResourceStatus)
17 for _, id := range identifiers {
18 resourceStatuses[id] = &event.ResourceStatus{
19 Identifier: id,
20 Status: status.UnknownStatus,
21 }
22 }
23 return &ResourceStatusCollector{
24 ResourceStatuses: resourceStatuses,
25 }
26 }
27
28
29
30
31
32
33
34 type Observer interface {
35 Notify(*ResourceStatusCollector, event.Event)
36 }
37
38
39
40 type ObserverFunc func(*ResourceStatusCollector, event.Event)
41
42 func (o ObserverFunc) Notify(rsc *ResourceStatusCollector, e event.Event) {
43 o(rsc, e)
44 }
45
46
47
48
49
50
51 type ResourceStatusCollector struct {
52 mux sync.RWMutex
53
54 LastEventType event.Type
55
56 ResourceStatuses map[object.ObjMetadata]*event.ResourceStatus
57
58 Error error
59 }
60
61
62
63 type ListenerResult struct {
64 Err error
65 }
66
67
68
69
70 func (o *ResourceStatusCollector) Listen(eventChannel <-chan event.Event) <-chan ListenerResult {
71 return o.ListenWithObserver(eventChannel, nil)
72 }
73
74
75
76
77
78
79 func (o *ResourceStatusCollector) ListenWithObserver(eventChannel <-chan event.Event,
80 observer Observer) <-chan ListenerResult {
81 completed := make(chan ListenerResult)
82 go func() {
83 defer close(completed)
84 for e := range eventChannel {
85 err := o.processEvent(e)
86 if err != nil {
87 completed <- ListenerResult{
88 Err: err,
89 }
90 }
91 if observer != nil {
92 observer.Notify(o, e)
93 }
94 }
95 }()
96 return completed
97 }
98
99 func (o *ResourceStatusCollector) processEvent(e event.Event) error {
100 o.mux.Lock()
101 defer o.mux.Unlock()
102 o.LastEventType = e.Type
103 if e.Type == event.ErrorEvent {
104 o.Error = e.Error
105 return e.Error
106 }
107 if e.Type == event.ResourceUpdateEvent {
108 resourceStatus := e.Resource
109 o.ResourceStatuses[resourceStatus.Identifier] = resourceStatus
110 }
111 return nil
112 }
113
114
115
116 type Observation struct {
117 LastEventType event.Type
118
119 ResourceStatuses []*event.ResourceStatus
120
121 Error error
122 }
123
124
125
126 func (o *ResourceStatusCollector) LatestObservation() *Observation {
127 o.mux.RLock()
128 defer o.mux.RUnlock()
129
130 var resourceStatuses event.ResourceStatuses
131 for _, resourceStatus := range o.ResourceStatuses {
132 resourceStatuses = append(resourceStatuses, resourceStatus)
133 }
134 sort.Sort(resourceStatuses)
135
136 return &Observation{
137 LastEventType: o.LastEventType,
138 ResourceStatuses: resourceStatuses,
139 Error: o.Error,
140 }
141 }
142
View as plain text