1
2
3
4 package taskrunner
5
6 import (
7 "context"
8 "fmt"
9 "sync"
10 "testing"
11 "time"
12
13 "github.com/stretchr/testify/assert"
14 "k8s.io/apimachinery/pkg/runtime/schema"
15 "sigs.k8s.io/cli-utils/pkg/apply/cache"
16 "sigs.k8s.io/cli-utils/pkg/apply/event"
17 pollevent "sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
18 "sigs.k8s.io/cli-utils/pkg/kstatus/status"
19 "sigs.k8s.io/cli-utils/pkg/kstatus/watcher"
20 "sigs.k8s.io/cli-utils/pkg/object"
21 "sigs.k8s.io/cli-utils/pkg/testutil"
22 )
23
24 var (
25 depID = object.ObjMetadata{
26 GroupKind: schema.GroupKind{
27 Group: "apps",
28 Kind: "Deployment",
29 },
30 Namespace: "default",
31 Name: "dep",
32 }
33 cmID = object.ObjMetadata{
34 GroupKind: schema.GroupKind{
35 Group: "",
36 Kind: "ConfigMap",
37 },
38 Namespace: "default",
39 Name: "cm",
40 }
41 )
42
43 func TestBaseRunner(t *testing.T) {
44 testCases := map[string]struct {
45 tasks []Task
46 statusEventsDelay time.Duration
47 statusEvents []pollevent.Event
48 expectedEventTypes []event.Type
49 expectedWaitEvents []event.WaitEvent
50 }{
51 "wait task runs until condition is met": {
52 tasks: []Task{
53 &fakeApplyTask{
54 resultEvent: event.Event{
55 Type: event.ApplyType,
56 },
57 duration: 3 * time.Second,
58 },
59 NewWaitTask("wait", object.ObjMetadataSet{depID, cmID}, AllCurrent,
60 1*time.Minute, testutil.NewFakeRESTMapper()),
61 &fakeApplyTask{
62 resultEvent: event.Event{
63 Type: event.PruneType,
64 },
65 duration: 2 * time.Second,
66 },
67 },
68 statusEventsDelay: 5 * time.Second,
69 statusEvents: []pollevent.Event{
70 {
71 Type: pollevent.ResourceUpdateEvent,
72 Resource: &pollevent.ResourceStatus{
73 Identifier: cmID,
74 Status: status.CurrentStatus,
75 },
76 },
77 {
78 Type: pollevent.ResourceUpdateEvent,
79 Resource: &pollevent.ResourceStatus{
80 Identifier: depID,
81 Status: status.CurrentStatus,
82 },
83 },
84 },
85 expectedEventTypes: []event.Type{
86 event.ActionGroupType,
87 event.ApplyType,
88 event.ActionGroupType,
89 event.ActionGroupType,
90 event.WaitType,
91 event.WaitType,
92 event.StatusType,
93 event.WaitType,
94 event.StatusType,
95 event.WaitType,
96 event.ActionGroupType,
97 event.ActionGroupType,
98 event.PruneType,
99 event.ActionGroupType,
100 },
101 expectedWaitEvents: []event.WaitEvent{
102 {
103 GroupName: "wait",
104 Identifier: depID,
105 Status: event.ReconcilePending,
106 },
107 {
108 GroupName: "wait",
109 Identifier: cmID,
110 Status: event.ReconcilePending,
111 },
112 {
113 GroupName: "wait",
114 Identifier: cmID,
115 Status: event.ReconcileSuccessful,
116 },
117 {
118 GroupName: "wait",
119 Identifier: depID,
120 Status: event.ReconcileSuccessful,
121 },
122 },
123 },
124 "wait task times out eventually (Unknown)": {
125 tasks: []Task{
126 NewWaitTask("wait", object.ObjMetadataSet{depID, cmID}, AllCurrent,
127 2*time.Second, testutil.NewFakeRESTMapper()),
128 },
129 statusEventsDelay: time.Second,
130 statusEvents: []pollevent.Event{
131 {
132 Type: pollevent.ResourceUpdateEvent,
133 Resource: &pollevent.ResourceStatus{
134 Identifier: cmID,
135 Status: status.CurrentStatus,
136 },
137 },
138 },
139 expectedEventTypes: []event.Type{
140 event.ActionGroupType,
141 event.WaitType,
142 event.WaitType,
143 event.StatusType,
144 event.WaitType,
145 event.WaitType,
146 event.ActionGroupType,
147 },
148 expectedWaitEvents: []event.WaitEvent{
149 {
150 GroupName: "wait",
151 Identifier: depID,
152 Status: event.ReconcilePending,
153 },
154 {
155 GroupName: "wait",
156 Identifier: cmID,
157 Status: event.ReconcilePending,
158 },
159 {
160 GroupName: "wait",
161 Identifier: cmID,
162 Status: event.ReconcileSuccessful,
163 },
164 {
165 GroupName: "wait",
166 Identifier: depID,
167 Status: event.ReconcileTimeout,
168 },
169 },
170 },
171 "wait task times out eventually (InProgress)": {
172 tasks: []Task{
173 NewWaitTask("wait", object.ObjMetadataSet{depID, cmID}, AllCurrent,
174 2*time.Second, testutil.NewFakeRESTMapper()),
175 },
176 statusEventsDelay: time.Second,
177 statusEvents: []pollevent.Event{
178 {
179 Type: pollevent.ResourceUpdateEvent,
180 Resource: &pollevent.ResourceStatus{
181 Identifier: cmID,
182 Status: status.CurrentStatus,
183 },
184 },
185 {
186 Type: pollevent.ResourceUpdateEvent,
187 Resource: &pollevent.ResourceStatus{
188 Identifier: depID,
189 Status: status.InProgressStatus,
190 },
191 },
192 },
193 expectedEventTypes: []event.Type{
194 event.ActionGroupType,
195 event.WaitType,
196 event.WaitType,
197 event.StatusType,
198 event.WaitType,
199 event.StatusType,
200 event.WaitType,
201 event.ActionGroupType,
202 },
203 expectedWaitEvents: []event.WaitEvent{
204 {
205 GroupName: "wait",
206 Identifier: depID,
207 Status: event.ReconcilePending,
208 },
209 {
210 GroupName: "wait",
211 Identifier: cmID,
212 Status: event.ReconcilePending,
213 },
214 {
215 GroupName: "wait",
216 Identifier: cmID,
217 Status: event.ReconcileSuccessful,
218 },
219 {
220 GroupName: "wait",
221 Identifier: depID,
222 Status: event.ReconcileTimeout,
223 },
224 },
225 },
226 "tasks run in order": {
227 tasks: []Task{
228 &fakeApplyTask{
229 resultEvent: event.Event{
230 Type: event.ApplyType,
231 },
232 duration: 1 * time.Second,
233 },
234 &fakeApplyTask{
235 resultEvent: event.Event{
236 Type: event.PruneType,
237 },
238 duration: 1 * time.Second,
239 },
240 &fakeApplyTask{
241 resultEvent: event.Event{
242 Type: event.ApplyType,
243 },
244 duration: 1 * time.Second,
245 },
246 &fakeApplyTask{
247 resultEvent: event.Event{
248 Type: event.PruneType,
249 },
250 duration: 1 * time.Second,
251 },
252 },
253 statusEventsDelay: 1 * time.Second,
254 statusEvents: []pollevent.Event{},
255 expectedEventTypes: []event.Type{
256 event.ActionGroupType,
257 event.ApplyType,
258 event.ActionGroupType,
259 event.ActionGroupType,
260 event.PruneType,
261 event.ActionGroupType,
262 event.ActionGroupType,
263 event.ApplyType,
264 event.ActionGroupType,
265 event.ActionGroupType,
266 event.PruneType,
267 event.ActionGroupType,
268 },
269 },
270 }
271
272 for tn, tc := range testCases {
273 t.Run(tn, func(t *testing.T) {
274 taskQueue := make(chan Task, len(tc.tasks))
275 for _, tsk := range tc.tasks {
276 taskQueue <- tsk
277 }
278
279 ids := object.ObjMetadataSet{}
280 statusWatcher := newFakeWatcher(tc.statusEvents)
281 eventChannel := make(chan event.Event)
282 resourceCache := cache.NewResourceCacheMap()
283 taskContext := NewTaskContext(eventChannel, resourceCache)
284 runner := NewTaskStatusRunner(ids, statusWatcher)
285
286
287
288 var wg sync.WaitGroup
289
290 statusChannel := make(chan pollevent.Event)
291 wg.Add(1)
292 go func() {
293 defer wg.Done()
294
295 time.Sleep(tc.statusEventsDelay)
296 statusWatcher.Start()
297 }()
298
299 var events []event.Event
300 wg.Add(1)
301 go func() {
302 defer wg.Done()
303
304 for msg := range eventChannel {
305 events = append(events, msg)
306 }
307 }()
308
309 opts := Options{EmitStatusEvents: true}
310 ctx := context.Background()
311 err := runner.Run(ctx, taskContext, taskQueue, opts)
312 close(statusChannel)
313 close(eventChannel)
314 wg.Wait()
315
316 assert.NoError(t, err)
317
318 if want, got := len(tc.expectedEventTypes), len(events); want != got {
319 t.Errorf("expected %d events, but got %d", want, got)
320 }
321 var waitEvents []event.WaitEvent
322 for i, e := range events {
323 expectedEventType := tc.expectedEventTypes[i]
324 if want, got := expectedEventType, e.Type; want != got {
325 t.Errorf("expected event type %s, but got %s",
326 want, got)
327 }
328 if e.Type == event.WaitType {
329 waitEvents = append(waitEvents, e.WaitEvent)
330 }
331 }
332 assert.Equal(t, tc.expectedWaitEvents, waitEvents)
333 })
334 }
335 }
336
337 func TestBaseRunnerCancellation(t *testing.T) {
338 testError := fmt.Errorf("this is a test error")
339
340 testCases := map[string]struct {
341 tasks []Task
342 statusEventsDelay time.Duration
343 statusEvents []pollevent.Event
344 contextTimeout time.Duration
345 expectedError error
346 expectedEventTypes []event.Type
347 }{
348 "cancellation while custom task is running": {
349 tasks: []Task{
350 &fakeApplyTask{
351 resultEvent: event.Event{
352 Type: event.ApplyType,
353 },
354 duration: 4 * time.Second,
355 },
356 &fakeApplyTask{
357 resultEvent: event.Event{
358 Type: event.PruneType,
359 },
360 duration: 2 * time.Second,
361 },
362 },
363 contextTimeout: 2 * time.Second,
364 expectedError: context.DeadlineExceeded,
365 expectedEventTypes: []event.Type{
366 event.ActionGroupType,
367 event.ApplyType,
368 event.ActionGroupType,
369 },
370 },
371 "cancellation while wait task is running": {
372 tasks: []Task{
373 NewWaitTask("wait", object.ObjMetadataSet{depID}, AllCurrent,
374 20*time.Second, testutil.NewFakeRESTMapper()),
375 &fakeApplyTask{
376 resultEvent: event.Event{
377 Type: event.PruneType,
378 },
379 duration: 2 * time.Second,
380 },
381 },
382 contextTimeout: 2 * time.Second,
383 expectedError: context.DeadlineExceeded,
384 expectedEventTypes: []event.Type{
385 event.ActionGroupType,
386 event.WaitType,
387 event.ActionGroupType,
388 },
389 },
390 "error while custom task is running": {
391 tasks: []Task{
392 &fakeApplyTask{
393 name: "apply-0",
394 resultEvent: event.Event{
395 Type: event.ApplyType,
396 },
397 duration: 2 * time.Second,
398 err: testError,
399 },
400 &fakeApplyTask{
401 name: "prune-0",
402 resultEvent: event.Event{
403 Type: event.PruneType,
404 },
405 duration: 2 * time.Second,
406 },
407 },
408 contextTimeout: 30 * time.Second,
409 expectedError: fmt.Errorf(`task failed (action: "Apply", name: "apply-0"): %w`, testError),
410 expectedEventTypes: []event.Type{
411 event.ActionGroupType,
412 event.ApplyType,
413 event.ActionGroupType,
414 },
415 },
416 "error from status watcher while wait task is running": {
417 tasks: []Task{
418 NewWaitTask("wait", object.ObjMetadataSet{depID}, AllCurrent,
419 20*time.Second, testutil.NewFakeRESTMapper()),
420 &fakeApplyTask{
421 resultEvent: event.Event{
422 Type: event.PruneType,
423 },
424 duration: 2 * time.Second,
425 },
426 },
427 statusEventsDelay: 2 * time.Second,
428 statusEvents: []pollevent.Event{
429 {
430 Type: pollevent.ErrorEvent,
431 Error: testError,
432 },
433 },
434 contextTimeout: 30 * time.Second,
435 expectedError: fmt.Errorf("polling for status failed: %w", testError),
436 expectedEventTypes: []event.Type{
437 event.ActionGroupType,
438 event.WaitType,
439 event.ActionGroupType,
440 },
441 },
442 }
443
444 for tn, tc := range testCases {
445 t.Run(tn, func(t *testing.T) {
446 taskQueue := make(chan Task, len(tc.tasks))
447 for _, tsk := range tc.tasks {
448 taskQueue <- tsk
449 }
450
451 ids := object.ObjMetadataSet{}
452 statusWatcher := newFakeWatcher(tc.statusEvents)
453 eventChannel := make(chan event.Event)
454 resourceCache := cache.NewResourceCacheMap()
455 taskContext := NewTaskContext(eventChannel, resourceCache)
456 runner := NewTaskStatusRunner(ids, statusWatcher)
457
458
459
460 var wg sync.WaitGroup
461
462 statusChannel := make(chan pollevent.Event)
463 wg.Add(1)
464 go func() {
465 defer wg.Done()
466
467 time.Sleep(tc.statusEventsDelay)
468 statusWatcher.Start()
469 }()
470
471 var events []event.Event
472 wg.Add(1)
473 go func() {
474 defer wg.Done()
475
476 for msg := range eventChannel {
477 events = append(events, msg)
478 }
479 }()
480
481 ctx, cancel := context.WithTimeout(context.Background(), tc.contextTimeout)
482 defer cancel()
483
484 opts := Options{EmitStatusEvents: true}
485 err := runner.Run(ctx, taskContext, taskQueue, opts)
486 close(statusChannel)
487 close(eventChannel)
488 wg.Wait()
489
490 if tc.expectedError != nil {
491 assert.EqualError(t, err, tc.expectedError.Error())
492 } else {
493 assert.NoError(t, err)
494 }
495
496 if want, got := len(tc.expectedEventTypes), len(events); want != got {
497 t.Errorf("expected %d events, but got %d", want, got)
498 }
499 for i, e := range events {
500 expectedEventType := tc.expectedEventTypes[i]
501 if want, got := expectedEventType, e.Type; want != got {
502 t.Errorf("expected event type %s, but got %s",
503 want, got)
504 }
505 }
506 })
507 }
508 }
509
510 type fakeApplyTask struct {
511 name string
512 resultEvent event.Event
513 duration time.Duration
514 err error
515 }
516
517 func (f *fakeApplyTask) Name() string {
518 return f.name
519 }
520
521 func (f *fakeApplyTask) Action() event.ResourceAction {
522 return event.ApplyAction
523 }
524
525 func (f *fakeApplyTask) Identifiers() object.ObjMetadataSet {
526 return object.ObjMetadataSet{}
527 }
528
529 func (f *fakeApplyTask) Start(taskContext *TaskContext) {
530 go func() {
531 <-time.NewTimer(f.duration).C
532 taskContext.SendEvent(f.resultEvent)
533 taskContext.TaskChannel() <- TaskResult{
534 Err: f.err,
535 }
536 }()
537 }
538
539 func (f *fakeApplyTask) Cancel(_ *TaskContext) {}
540
541 func (f *fakeApplyTask) StatusUpdate(_ *TaskContext, _ object.ObjMetadata) {}
542
543 type fakeWatcher struct {
544 start chan struct{}
545 events []pollevent.Event
546 }
547
548 func newFakeWatcher(statusEvents []pollevent.Event) *fakeWatcher {
549 return &fakeWatcher{
550 events: statusEvents,
551 start: make(chan struct{}),
552 }
553 }
554
555
556 func (f *fakeWatcher) Start() {
557 close(f.start)
558 }
559
560 func (f *fakeWatcher) Watch(ctx context.Context, _ object.ObjMetadataSet, _ watcher.Options) <-chan pollevent.Event {
561 eventChannel := make(chan pollevent.Event)
562 go func() {
563 defer close(eventChannel)
564
565 eventChannel <- pollevent.Event{Type: pollevent.SyncEvent}
566
567 <-f.start
568 for _, f := range f.events {
569 eventChannel <- f
570 }
571
572 <-ctx.Done()
573 }()
574 return eventChannel
575 }
576
View as plain text