1
2
3
4 package taskrunner
5
6 import (
7 "context"
8 "fmt"
9 "sync"
10 "time"
11
12 "k8s.io/apimachinery/pkg/api/meta"
13 "k8s.io/apimachinery/pkg/runtime/schema"
14 "k8s.io/apimachinery/pkg/types"
15 "k8s.io/klog/v2"
16 "sigs.k8s.io/cli-utils/pkg/apply/event"
17 "sigs.k8s.io/cli-utils/pkg/kstatus/status"
18 "sigs.k8s.io/cli-utils/pkg/object"
19 )
20
21 var (
22 crdGK = schema.GroupKind{Group: "apiextensions.k8s.io", Kind: "CustomResourceDefinition"}
23 )
24
25
26
27 type Task interface {
28 Name() string
29 Action() event.ResourceAction
30 Identifiers() object.ObjMetadataSet
31 Start(*TaskContext)
32 StatusUpdate(*TaskContext, object.ObjMetadata)
33 Cancel(*TaskContext)
34 }
35
36
37
38 func NewWaitTask(name string, ids object.ObjMetadataSet, cond Condition, timeout time.Duration, mapper meta.RESTMapper) *WaitTask {
39 return &WaitTask{
40 TaskName: name,
41 Ids: ids,
42 Condition: cond,
43 Timeout: timeout,
44 Mapper: mapper,
45 }
46 }
47
48
49
50
51
52
53
54
55 type WaitTask struct {
56
57 TaskName string
58
59 Ids object.ObjMetadataSet
60
61 Condition Condition
62
63
64 Timeout time.Duration
65
66 Mapper meta.RESTMapper
67
68
69 cancelFunc context.CancelFunc
70
71 pending object.ObjMetadataSet
72
73
74 failed object.ObjMetadataSet
75
76 mu sync.RWMutex
77 }
78
79 func (w *WaitTask) Name() string {
80 return w.TaskName
81 }
82
83 func (w *WaitTask) Action() event.ResourceAction {
84 return event.WaitAction
85 }
86
87 func (w *WaitTask) Identifiers() object.ObjMetadataSet {
88 return w.Ids
89 }
90
91
92
93 func (w *WaitTask) Start(taskContext *TaskContext) {
94 klog.V(2).Infof("wait task starting (name: %q, objects: %d)",
95 w.Name(), len(w.Ids))
96
97
98 ctx := context.Background()
99
100
101 if w.Timeout > 0 {
102 ctx, w.cancelFunc = context.WithTimeout(ctx, w.Timeout)
103 } else {
104 ctx, w.cancelFunc = context.WithCancel(ctx)
105 }
106
107 w.startInner(taskContext)
108
109
110 go func() {
111
112 <-ctx.Done()
113
114 err := ctx.Err()
115
116 klog.V(2).Infof("wait task completing (name: %q,): %v", w.TaskName, err)
117
118 switch err {
119 case context.Canceled:
120
121 case context.DeadlineExceeded:
122
123 w.sendTimeoutEvents(taskContext)
124 }
125
126
127 w.updateRESTMapper(taskContext)
128
129
130 taskContext.TaskChannel() <- TaskResult{}
131 }()
132 }
133
134 func (w *WaitTask) sendEvent(taskContext *TaskContext, id object.ObjMetadata, status event.WaitEventStatus) {
135 taskContext.SendEvent(event.Event{
136 Type: event.WaitType,
137 WaitEvent: event.WaitEvent{
138 GroupName: w.Name(),
139 Identifier: id,
140 Status: status,
141 },
142 })
143 }
144
145
146
147
148 func (w *WaitTask) startInner(taskContext *TaskContext) {
149 w.mu.Lock()
150 defer w.mu.Unlock()
151
152 klog.V(3).Infof("wait task progress: %d/%d", 0, len(w.Ids))
153
154 pending := object.ObjMetadataSet{}
155 for _, id := range w.Ids {
156 switch {
157 case w.skipped(taskContext, id):
158 err := taskContext.InventoryManager().SetSkippedReconcile(id)
159 if err != nil {
160
161 klog.Errorf("Failed to mark object as skipped reconcile: %v", err)
162 }
163 w.sendEvent(taskContext, id, event.ReconcileSkipped)
164 case w.changedUID(taskContext, id):
165
166 w.handleChangedUID(taskContext, id)
167 case w.reconciledByID(taskContext, id):
168 err := taskContext.InventoryManager().SetSuccessfulReconcile(id)
169 if err != nil {
170
171 klog.Errorf("Failed to mark object as successful reconcile: %v", err)
172 }
173 w.sendEvent(taskContext, id, event.ReconcileSuccessful)
174 default:
175 err := taskContext.InventoryManager().SetPendingReconcile(id)
176 if err != nil {
177
178 klog.Errorf("Failed to mark object as pending reconcile: %v", err)
179 }
180 pending = append(pending, id)
181 w.sendEvent(taskContext, id, event.ReconcilePending)
182 }
183 }
184 w.pending = pending
185
186 klog.V(3).Infof("wait task progress: %d/%d", len(w.Ids)-len(w.pending), len(w.Ids))
187
188 if len(pending) == 0 {
189
190 klog.V(3).Infof("all objects reconciled or skipped (name: %q)", w.TaskName)
191 w.cancelFunc()
192 }
193 }
194
195
196
197 func (w *WaitTask) sendTimeoutEvents(taskContext *TaskContext) {
198 w.mu.RLock()
199 defer w.mu.RUnlock()
200
201 for _, id := range w.pending {
202 err := taskContext.InventoryManager().SetTimeoutReconcile(id)
203 if err != nil {
204
205 klog.Errorf("Failed to mark object as pending reconcile: %v", err)
206 }
207 w.sendEvent(taskContext, id, event.ReconcileTimeout)
208 }
209 }
210
211
212
213 func (w *WaitTask) reconciledByID(taskContext *TaskContext, id object.ObjMetadata) bool {
214 return conditionMet(taskContext, object.ObjMetadataSet{id}, w.Condition)
215 }
216
217
218
219 func (w *WaitTask) skipped(taskContext *TaskContext, id object.ObjMetadata) bool {
220 im := taskContext.InventoryManager()
221 if w.Condition == AllCurrent &&
222 im.IsFailedApply(id) || im.IsSkippedApply(id) {
223 return true
224 }
225 if w.Condition == AllNotFound &&
226 im.IsFailedDelete(id) || im.IsSkippedDelete(id) {
227 return true
228 }
229 return false
230 }
231
232
233 func (w *WaitTask) failedByID(taskContext *TaskContext, id object.ObjMetadata) bool {
234 cached := taskContext.ResourceCache().Get(id)
235 return cached.Status == status.FailedStatus
236 }
237
238
239
240 func (w *WaitTask) changedUID(taskContext *TaskContext, id object.ObjMetadata) bool {
241 var oldUID, newUID types.UID
242
243
244 taskObj, found := taskContext.InventoryManager().ObjectStatus(id)
245 if !found {
246 klog.Errorf("Unknown object UID from InventoryManager: %v", id)
247 return false
248 }
249 oldUID = taskObj.UID
250 if oldUID == "" {
251
252 klog.Errorf("Empty object UID from InventoryManager: %v", id)
253 return false
254 }
255
256
257 pollerObj := taskContext.ResourceCache().Get(id)
258 if pollerObj.Resource == nil {
259 switch pollerObj.Status {
260 case status.UnknownStatus:
261
262 case status.NotFoundStatus:
263
264
265 default:
266
267 klog.Errorf("Unknown object UID from ResourceCache (status: %v): %v", pollerObj.Status, id)
268 }
269 return false
270 }
271 newUID = pollerObj.Resource.GetUID()
272 if newUID == "" {
273
274 klog.Errorf("Empty object UID from ResourceCache (status: %v): %v", pollerObj.Status, id)
275 return false
276 }
277
278 return (oldUID != newUID)
279 }
280
281
282 func (w *WaitTask) handleChangedUID(taskContext *TaskContext, id object.ObjMetadata) {
283 switch w.Condition {
284 case AllNotFound:
285
286
287 klog.Infof("UID change detected: deleted object have been recreated: marking reconcile successful: %v", id)
288 err := taskContext.InventoryManager().SetSuccessfulReconcile(id)
289 if err != nil {
290
291 klog.Errorf("Failed to mark object as successful reconcile: %v", err)
292 }
293 w.sendEvent(taskContext, id, event.ReconcileSuccessful)
294 case AllCurrent:
295
296
297 klog.Infof("UID change detected: applied object has been deleted and recreated: marking reconcile failed: %v", id)
298 err := taskContext.InventoryManager().SetFailedReconcile(id)
299 if err != nil {
300
301 klog.Errorf("Failed to mark object as failed reconcile: %v", err)
302 }
303 w.sendEvent(taskContext, id, event.ReconcileFailed)
304 default:
305 panic(fmt.Sprintf("Invalid wait condition: %v", w.Condition))
306 }
307 }
308
309
310 func (w *WaitTask) Cancel(_ *TaskContext) {
311 w.cancelFunc()
312 }
313
314
315
316
317 func (w *WaitTask) StatusUpdate(taskContext *TaskContext, id object.ObjMetadata) {
318 w.mu.Lock()
319 defer w.mu.Unlock()
320
321 if klog.V(5).Enabled() {
322 status := taskContext.ResourceCache().Get(id).Status
323 klog.Infof("status update (object: %q, status: %q)", id, status)
324 }
325
326 switch {
327 case w.pending.Contains(id):
328 switch {
329 case w.changedUID(taskContext, id):
330
331 w.handleChangedUID(taskContext, id)
332 w.pending = w.pending.Remove(id)
333 case w.reconciledByID(taskContext, id):
334
335 err := taskContext.InventoryManager().SetSuccessfulReconcile(id)
336 if err != nil {
337
338 klog.Errorf("Failed to mark object as successful reconcile: %v", err)
339 }
340 w.pending = w.pending.Remove(id)
341 w.sendEvent(taskContext, id, event.ReconcileSuccessful)
342 case w.failedByID(taskContext, id):
343
344 err := taskContext.InventoryManager().SetFailedReconcile(id)
345 if err != nil {
346
347 klog.Errorf("Failed to mark object as failed reconcile: %v", err)
348 }
349 w.pending = w.pending.Remove(id)
350 w.failed = append(w.failed, id)
351 w.sendEvent(taskContext, id, event.ReconcileFailed)
352
353 }
354 case !w.Ids.Contains(id):
355
356 return
357 case w.skipped(taskContext, id):
358
359 return
360 case w.failed.Contains(id):
361
362
363
364 if w.reconciledByID(taskContext, id) {
365
366 err := taskContext.InventoryManager().SetSuccessfulReconcile(id)
367 if err != nil {
368
369 klog.Errorf("Failed to mark object as successful reconcile: %v", err)
370 }
371 w.failed = w.failed.Remove(id)
372 w.sendEvent(taskContext, id, event.ReconcileSuccessful)
373 } else if !w.failedByID(taskContext, id) {
374
375
376 err := taskContext.InventoryManager().SetPendingReconcile(id)
377 if err != nil {
378
379 klog.Errorf("Failed to mark object as pending reconcile: %v", err)
380 }
381 w.failed = w.failed.Remove(id)
382 w.pending = append(w.pending, id)
383 w.sendEvent(taskContext, id, event.ReconcilePending)
384 }
385
386 default:
387
388 if !w.reconciledByID(taskContext, id) {
389
390 err := taskContext.InventoryManager().SetPendingReconcile(id)
391 if err != nil {
392
393 klog.Errorf("Failed to mark object as pending reconcile: %v", err)
394 }
395 w.pending = append(w.pending, id)
396 w.sendEvent(taskContext, id, event.ReconcilePending)
397 }
398
399 }
400
401 klog.V(3).Infof("wait task progress: %d/%d", len(w.Ids)-len(w.pending), len(w.Ids))
402
403
404
405 if len(w.pending) == 0 {
406
407 klog.V(3).Infof("all objects reconciled or skipped (name: %q)", w.TaskName)
408 w.cancelFunc()
409 }
410 }
411
412
413
414
415
416 func (w *WaitTask) updateRESTMapper(taskContext *TaskContext) {
417 foundCRD := false
418 for _, id := range w.Ids {
419 if id.GroupKind == crdGK && !w.skipped(taskContext, id) {
420 foundCRD = true
421 break
422 }
423 }
424 if !foundCRD {
425
426 return
427 }
428
429 klog.V(3).Infof("Resetting RESTMapper")
430 meta.MaybeResetRESTMapper(w.Mapper)
431 }
432
View as plain text