1
2
3
4 package engine
5
6 import (
7 "context"
8 "errors"
9 "fmt"
10 "time"
11
12 "k8s.io/apimachinery/pkg/api/meta"
13 "k8s.io/apimachinery/pkg/runtime/schema"
14 "sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
15 "sigs.k8s.io/cli-utils/pkg/object"
16 "sigs.k8s.io/controller-runtime/pkg/client"
17 )
18
19
20
21 type ClusterReaderFactory interface {
22 New(reader client.Reader, mapper meta.RESTMapper, identifiers object.ObjMetadataSet) (ClusterReader, error)
23 }
24
25 type ClusterReaderFactoryFunc func(client.Reader, meta.RESTMapper, object.ObjMetadataSet) (ClusterReader, error)
26
27 func (c ClusterReaderFactoryFunc) New(r client.Reader, m meta.RESTMapper, ids object.ObjMetadataSet) (ClusterReader, error) {
28 return c(r, m, ids)
29 }
30
31
32 type PollerEngine struct {
33 Reader client.Reader
34 Mapper meta.RESTMapper
35 StatusReaders []StatusReader
36 DefaultStatusReader StatusReader
37 ClusterReaderFactory ClusterReaderFactory
38 }
39
40
41
42
43
44
45 func (s *PollerEngine) Poll(ctx context.Context, identifiers object.ObjMetadataSet, options Options) <-chan event.Event {
46 eventChannel := make(chan event.Event)
47
48 go func() {
49 defer close(eventChannel)
50
51 err := s.validateIdentifiers(identifiers)
52 if err != nil {
53 handleError(eventChannel, err)
54 return
55 }
56
57 clusterReader, err := s.ClusterReaderFactory.New(s.Reader, s.Mapper, identifiers)
58 if err != nil {
59 handleError(eventChannel, fmt.Errorf("error creating new ClusterReader: %w", err))
60 return
61 }
62
63 runner := &statusPollerRunner{
64 clusterReader: clusterReader,
65 statusReaders: s.StatusReaders,
66 defaultStatusReader: s.DefaultStatusReader,
67 identifiers: identifiers,
68 previousResourceStatuses: make(map[object.ObjMetadata]*event.ResourceStatus),
69 eventChannel: eventChannel,
70 pollingInterval: options.PollInterval,
71 }
72 runner.Run(ctx)
73 }()
74
75 return eventChannel
76 }
77
78 func handleError(eventChannel chan event.Event, err error) {
79 eventChannel <- event.Event{
80 Type: event.ErrorEvent,
81 Error: err,
82 }
83 }
84
85
86
87 func (s *PollerEngine) validateIdentifiers(identifiers object.ObjMetadataSet) error {
88 for _, id := range identifiers {
89 mapping, err := s.Mapper.RESTMapping(id.GroupKind)
90 if err != nil {
91
92
93 if meta.IsNoMatchError(err) {
94 continue
95 }
96 return err
97 }
98 if mapping.Scope.Name() == meta.RESTScopeNameNamespace && id.Namespace == "" {
99 return fmt.Errorf("resource %s %s is namespace scoped, but namespace is not set",
100 id.GroupKind.String(), id.Name)
101 }
102 }
103 return nil
104 }
105
106
107
108
109
110 type Options struct {
111
112
113
114 PollInterval time.Duration
115 }
116
117
118
119
120
121
122
123
124 type statusPollerRunner struct {
125
126
127 clusterReader ClusterReader
128
129
130
131
132 statusReaders []StatusReader
133
134
135
136 defaultStatusReader StatusReader
137
138
139
140 identifiers object.ObjMetadataSet
141
142
143
144
145 previousResourceStatuses map[object.ObjMetadata]*event.ResourceStatus
146
147
148
149 eventChannel chan event.Event
150
151
152
153 pollingInterval time.Duration
154 }
155
156
157 func (r *statusPollerRunner) Run(ctx context.Context) {
158
159 ticker := time.NewTicker(r.pollingInterval)
160 defer func() {
161 ticker.Stop()
162 }()
163
164 err := r.syncAndPoll(ctx)
165 if err != nil {
166 r.handleSyncAndPollErr(err)
167 return
168 }
169
170 for {
171 select {
172 case <-ctx.Done():
173 return
174 case <-ticker.C:
175
176 err := r.syncAndPoll(ctx)
177 if err != nil {
178 r.handleSyncAndPollErr(err)
179 return
180 }
181 }
182 }
183 }
184
185
186
187
188
189
190 func (r *statusPollerRunner) handleSyncAndPollErr(err error) {
191 if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
192 return
193 }
194 r.eventChannel <- event.Event{
195 Type: event.ErrorEvent,
196 Error: err,
197 }
198 }
199
200 func (r *statusPollerRunner) syncAndPoll(ctx context.Context) error {
201
202
203
204
205 err := r.clusterReader.Sync(ctx)
206 if err != nil {
207 return err
208 }
209
210
211
212 return r.pollStatusForAllResources(ctx)
213 }
214
215
216
217 func (r *statusPollerRunner) pollStatusForAllResources(ctx context.Context) error {
218 for _, id := range r.identifiers {
219
220 select {
221 case <-ctx.Done():
222 return ctx.Err()
223 default:
224 }
225 gk := id.GroupKind
226 statusReader := r.statusReaderForGroupKind(gk)
227 resourceStatus, err := statusReader.ReadStatus(ctx, r.clusterReader, id)
228 if err != nil {
229 return err
230 }
231 if r.isUpdatedResourceStatus(resourceStatus) {
232 r.previousResourceStatuses[id] = resourceStatus
233 r.eventChannel <- event.Event{
234 Type: event.ResourceUpdateEvent,
235 Resource: resourceStatus,
236 }
237 }
238 }
239 return nil
240 }
241
242 func (r *statusPollerRunner) statusReaderForGroupKind(gk schema.GroupKind) StatusReader {
243 for _, sr := range r.statusReaders {
244 if sr.Supports(gk) {
245 return sr
246 }
247 }
248 return r.defaultStatusReader
249 }
250
251 func (r *statusPollerRunner) isUpdatedResourceStatus(resourceStatus *event.ResourceStatus) bool {
252 oldResourceStatus, found := r.previousResourceStatuses[resourceStatus.Identifier]
253 if !found {
254 return true
255 }
256 return !event.ResourceStatusEqual(resourceStatus, oldResourceStatus)
257 }
258
View as plain text