1
2
3
4 package taskrunner
5
6 import (
7 "testing"
8 "time"
9
10 "github.com/stretchr/testify/assert"
11 "sigs.k8s.io/cli-utils/pkg/apis/actuation"
12 "sigs.k8s.io/cli-utils/pkg/apply/cache"
13 "sigs.k8s.io/cli-utils/pkg/apply/event"
14 "sigs.k8s.io/cli-utils/pkg/inventory"
15 "sigs.k8s.io/cli-utils/pkg/kstatus/status"
16 "sigs.k8s.io/cli-utils/pkg/object"
17 "sigs.k8s.io/cli-utils/pkg/testutil"
18 )
19
20 var testDeployment1YAML = `
21 apiVersion: apps/v1
22 kind: Deployment
23 metadata:
24 name: a
25 namespace: default
26 uid: dep-uid-a
27 generation: 1
28 spec:
29 replicas: 1
30 `
31
32 var testDeployment2YAML = `
33 apiVersion: v1
34 kind: Deployment
35 metadata:
36 name: b
37 namespace: default
38 uid: dep-uid-b
39 generation: 1
40 spec:
41 replicas: 2
42 `
43
44 var testDeployment3YAML = `
45 apiVersion: v1
46 kind: Deployment
47 metadata:
48 name: c
49 namespace: default
50 uid: dep-uid-c
51 generation: 1
52 spec:
53 replicas: 3
54 `
55
56 var testDeployment4YAML = `
57 apiVersion: v1
58 kind: Deployment
59 metadata:
60 name: d
61 namespace: default
62 uid: dep-uid-d
63 generation: 1
64 spec:
65 replicas: 4
66 `
67
68 func TestWaitTask_CompleteEventually(t *testing.T) {
69 testDeployment1ID := testutil.ToIdentifier(t, testDeployment1YAML)
70 testDeployment1 := testutil.Unstructured(t, testDeployment1YAML)
71 testDeployment2ID := testutil.ToIdentifier(t, testDeployment2YAML)
72 testDeployment2 := testutil.Unstructured(t, testDeployment2YAML)
73 testDeployment3ID := testutil.ToIdentifier(t, testDeployment3YAML)
74 testDeployment4ID := testutil.ToIdentifier(t, testDeployment4YAML)
75 ids := object.ObjMetadataSet{
76 testDeployment1ID,
77 testDeployment2ID,
78 testDeployment3ID,
79 testDeployment4ID,
80 }
81 waitTimeout := 2 * time.Second
82 taskName := "wait-1"
83 task := NewWaitTask(taskName, ids, AllCurrent,
84 waitTimeout, testutil.NewFakeRESTMapper())
85
86 eventChannel := make(chan event.Event)
87 resourceCache := cache.NewResourceCacheMap()
88 taskContext := NewTaskContext(eventChannel, resourceCache)
89 defer close(eventChannel)
90
91
92 testDeployment1.SetUID("a")
93 testDeployment1.SetGeneration(1)
94 testDeployment2.SetUID("b")
95 testDeployment2.SetGeneration(1)
96
97
98 taskContext.InventoryManager().AddSuccessfulApply(testDeployment1ID,
99 testDeployment1.GetUID(), testDeployment1.GetGeneration())
100 taskContext.InventoryManager().AddSuccessfulApply(testDeployment2ID,
101 testDeployment2.GetUID(), testDeployment2.GetGeneration())
102
103
104 taskContext.InventoryManager().AddFailedApply(testDeployment3ID)
105
106
107 taskContext.InventoryManager().AddSkippedApply(testDeployment4ID)
108
109
110 go func() {
111
112 task.Start(taskContext)
113
114
115 resourceCache.Put(testDeployment1ID, cache.ResourceStatus{
116 Resource: testDeployment1,
117 Status: status.CurrentStatus,
118 })
119
120 task.StatusUpdate(taskContext, testDeployment1ID)
121
122
123 resourceCache.Put(testDeployment2ID, cache.ResourceStatus{
124 Resource: testDeployment2,
125 Status: status.InProgressStatus,
126 })
127
128 task.StatusUpdate(taskContext, testDeployment2ID)
129
130
131 resourceCache.Put(testDeployment2ID, cache.ResourceStatus{
132 Resource: testDeployment2,
133 Status: status.CurrentStatus,
134 })
135
136 task.StatusUpdate(taskContext, testDeployment2ID)
137 }()
138
139
140 timer := time.NewTimer(5 * time.Second)
141 receivedEvents := []event.Event{}
142 loop:
143 for {
144 select {
145 case e := <-taskContext.EventChannel():
146 receivedEvents = append(receivedEvents, e)
147 case res := <-taskContext.TaskChannel():
148 timer.Stop()
149 assert.NoError(t, res.Err)
150 break loop
151 case <-timer.C:
152 t.Fatalf("timed out waiting for TaskResult")
153 }
154 }
155
156
157 expectedEvents := []event.Event{
158
159
160 {
161 Type: event.WaitType,
162 WaitEvent: event.WaitEvent{
163 GroupName: taskName,
164 Identifier: testDeployment1ID,
165 Status: event.ReconcilePending,
166 },
167 },
168
169 {
170 Type: event.WaitType,
171 WaitEvent: event.WaitEvent{
172 GroupName: taskName,
173 Identifier: testDeployment2ID,
174 Status: event.ReconcilePending,
175 },
176 },
177
178 {
179 Type: event.WaitType,
180 WaitEvent: event.WaitEvent{
181 GroupName: taskName,
182 Identifier: testDeployment3ID,
183 Status: event.ReconcileSkipped,
184 },
185 },
186
187 {
188 Type: event.WaitType,
189 WaitEvent: event.WaitEvent{
190 GroupName: taskName,
191 Identifier: testDeployment4ID,
192 Status: event.ReconcileSkipped,
193 },
194 },
195
196
197 {
198 Type: event.WaitType,
199 WaitEvent: event.WaitEvent{
200 GroupName: taskName,
201 Identifier: testDeployment1ID,
202 Status: event.ReconcileSuccessful,
203 },
204 },
205
206 {
207 Type: event.WaitType,
208 WaitEvent: event.WaitEvent{
209 GroupName: taskName,
210 Identifier: testDeployment2ID,
211 Status: event.ReconcileSuccessful,
212 },
213 },
214 }
215 testutil.AssertEqual(t, expectedEvents, receivedEvents,
216 "Actual events (%d) do not match expected events (%d)",
217 len(receivedEvents), len(expectedEvents))
218
219 expectedInventory := actuation.Inventory{
220 Status: actuation.InventoryStatus{
221 Objects: []actuation.ObjectStatus{
222 {
223 ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeployment1ID),
224 Strategy: actuation.ActuationStrategyApply,
225 Actuation: actuation.ActuationSucceeded,
226 Reconcile: actuation.ReconcileSucceeded,
227 UID: testDeployment1.GetUID(),
228 Generation: testDeployment1.GetGeneration(),
229 },
230 {
231 ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeployment2ID),
232 Strategy: actuation.ActuationStrategyApply,
233 Actuation: actuation.ActuationSucceeded,
234 Reconcile: actuation.ReconcileSucceeded,
235 UID: testDeployment2.GetUID(),
236 Generation: testDeployment2.GetGeneration(),
237 },
238 {
239 ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeployment3ID),
240 Strategy: actuation.ActuationStrategyApply,
241 Actuation: actuation.ActuationFailed,
242 Reconcile: actuation.ReconcileSkipped,
243 },
244 {
245 ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeployment4ID),
246 Strategy: actuation.ActuationStrategyApply,
247 Actuation: actuation.ActuationSkipped,
248 Reconcile: actuation.ReconcileSkipped,
249 },
250 },
251 },
252 }
253 testutil.AssertEqual(t, &expectedInventory, taskContext.InventoryManager().Inventory())
254 }
255
256 func TestWaitTask_Timeout(t *testing.T) {
257 testDeployment1ID := testutil.ToIdentifier(t, testDeployment1YAML)
258 testDeployment1 := testutil.Unstructured(t, testDeployment1YAML)
259 testDeployment2ID := testutil.ToIdentifier(t, testDeployment2YAML)
260 testDeployment2 := testutil.Unstructured(t, testDeployment2YAML)
261 testDeployment3ID := testutil.ToIdentifier(t, testDeployment3YAML)
262 testDeployment4ID := testutil.ToIdentifier(t, testDeployment4YAML)
263 ids := object.ObjMetadataSet{
264 testDeployment1ID,
265 testDeployment2ID,
266 testDeployment3ID,
267 testDeployment4ID,
268 }
269 waitTimeout := 2 * time.Second
270 taskName := "wait-2"
271 task := NewWaitTask(taskName, ids, AllCurrent,
272 waitTimeout, testutil.NewFakeRESTMapper())
273
274 eventChannel := make(chan event.Event)
275 resourceCache := cache.NewResourceCacheMap()
276 taskContext := NewTaskContext(eventChannel, resourceCache)
277 defer close(eventChannel)
278
279
280 testDeployment1.SetUID("a")
281 testDeployment1.SetGeneration(1)
282 testDeployment2.SetUID("b")
283 testDeployment2.SetGeneration(1)
284
285
286 taskContext.InventoryManager().AddSuccessfulApply(testDeployment1ID,
287 testDeployment1.GetUID(), testDeployment1.GetGeneration())
288 taskContext.InventoryManager().AddSuccessfulApply(testDeployment2ID,
289 testDeployment2.GetUID(), testDeployment2.GetGeneration())
290
291
292 taskContext.InventoryManager().AddFailedApply(testDeployment3ID)
293
294
295 taskContext.InventoryManager().AddSkippedApply(testDeployment4ID)
296
297
298 go func() {
299
300 task.Start(taskContext)
301
302 resourceCache.Put(testDeployment1ID, cache.ResourceStatus{
303 Resource: testDeployment1,
304 Status: status.CurrentStatus,
305 })
306
307 task.StatusUpdate(taskContext, testDeployment1ID)
308 }()
309
310
311 timer := time.NewTimer(5 * time.Second)
312 receivedEvents := []event.Event{}
313 loop:
314 for {
315 select {
316 case e := <-taskContext.EventChannel():
317 receivedEvents = append(receivedEvents, e)
318 case res := <-taskContext.TaskChannel():
319 timer.Stop()
320 assert.NoError(t, res.Err)
321 break loop
322 case <-timer.C:
323 t.Fatalf("timed out waiting for TaskResult")
324 }
325 }
326
327
328 expectedEvents := []event.Event{
329
330
331 {
332 Type: event.WaitType,
333 WaitEvent: event.WaitEvent{
334 GroupName: taskName,
335 Identifier: testDeployment1ID,
336 Status: event.ReconcilePending,
337 },
338 },
339
340 {
341 Type: event.WaitType,
342 WaitEvent: event.WaitEvent{
343 GroupName: taskName,
344 Identifier: testDeployment2ID,
345 Status: event.ReconcilePending,
346 },
347 },
348
349 {
350 Type: event.WaitType,
351 WaitEvent: event.WaitEvent{
352 GroupName: taskName,
353 Identifier: testDeployment3ID,
354 Status: event.ReconcileSkipped,
355 },
356 },
357
358 {
359 Type: event.WaitType,
360 WaitEvent: event.WaitEvent{
361 GroupName: taskName,
362 Identifier: testDeployment4ID,
363 Status: event.ReconcileSkipped,
364 },
365 },
366
367
368 {
369 Type: event.WaitType,
370 WaitEvent: event.WaitEvent{
371 GroupName: taskName,
372 Identifier: testDeployment1ID,
373 Status: event.ReconcileSuccessful,
374 },
375 },
376
377
378 {
379 Type: event.WaitType,
380 WaitEvent: event.WaitEvent{
381 GroupName: taskName,
382 Identifier: testDeployment2ID,
383 Status: event.ReconcileTimeout,
384 },
385 },
386 }
387 testutil.AssertEqual(t, expectedEvents, receivedEvents,
388 "Actual events (%d) do not match expected events (%d)",
389 len(receivedEvents), len(expectedEvents))
390
391 expectedInventory := actuation.Inventory{
392 Status: actuation.InventoryStatus{
393 Objects: []actuation.ObjectStatus{
394 {
395 ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeployment1ID),
396 Strategy: actuation.ActuationStrategyApply,
397 Actuation: actuation.ActuationSucceeded,
398 Reconcile: actuation.ReconcileSucceeded,
399 UID: testDeployment1.GetUID(),
400 Generation: testDeployment1.GetGeneration(),
401 },
402 {
403 ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeployment2ID),
404 Strategy: actuation.ActuationStrategyApply,
405 Actuation: actuation.ActuationSucceeded,
406 Reconcile: actuation.ReconcileTimeout,
407 UID: testDeployment2.GetUID(),
408 Generation: testDeployment2.GetGeneration(),
409 },
410 {
411 ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeployment3ID),
412 Strategy: actuation.ActuationStrategyApply,
413 Actuation: actuation.ActuationFailed,
414 Reconcile: actuation.ReconcileSkipped,
415 },
416 {
417 ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeployment4ID),
418 Strategy: actuation.ActuationStrategyApply,
419 Actuation: actuation.ActuationSkipped,
420 Reconcile: actuation.ReconcileSkipped,
421 },
422 },
423 },
424 }
425 testutil.AssertEqual(t, &expectedInventory, taskContext.InventoryManager().Inventory())
426 }
427
428 func TestWaitTask_StartAndComplete(t *testing.T) {
429 testDeploymentID := testutil.ToIdentifier(t, testDeployment1YAML)
430 testDeployment := testutil.Unstructured(t, testDeployment1YAML)
431 ids := object.ObjMetadataSet{
432 testDeploymentID,
433 }
434 waitTimeout := 2 * time.Second
435 taskName := "wait-3"
436 task := NewWaitTask(taskName, ids, AllCurrent,
437 waitTimeout, testutil.NewFakeRESTMapper())
438
439 eventChannel := make(chan event.Event)
440 resourceCache := cache.NewResourceCacheMap()
441 taskContext := NewTaskContext(eventChannel, resourceCache)
442 defer close(eventChannel)
443
444
445 testDeployment.SetUID("a")
446 testDeployment.SetGeneration(1)
447
448
449 taskContext.InventoryManager().AddSuccessfulApply(testDeploymentID,
450 testDeployment.GetUID(), testDeployment.GetGeneration())
451
452
453 resourceCache.Put(testDeploymentID, cache.ResourceStatus{
454 Resource: testDeployment,
455 Status: status.CurrentStatus,
456 })
457
458
459 go func() {
460
461 task.Start(taskContext)
462 }()
463
464
465 timer := time.NewTimer(5 * time.Second)
466 receivedEvents := []event.Event{}
467 loop:
468 for {
469 select {
470 case e := <-taskContext.EventChannel():
471 receivedEvents = append(receivedEvents, e)
472 case res := <-taskContext.TaskChannel():
473 timer.Stop()
474 assert.NoError(t, res.Err)
475 break loop
476 case <-timer.C:
477 t.Fatalf("timed out waiting for TaskResult")
478 }
479 }
480
481 expectedEvents := []event.Event{
482
483 {
484 Type: event.WaitType,
485 WaitEvent: event.WaitEvent{
486 GroupName: taskName,
487 Identifier: testDeploymentID,
488 Status: event.ReconcileSuccessful,
489 },
490 },
491 }
492 testutil.AssertEqual(t, expectedEvents, receivedEvents,
493 "Actual events (%d) do not match expected events (%d)",
494 len(receivedEvents), len(expectedEvents))
495
496 expectedInventory := actuation.Inventory{
497 Status: actuation.InventoryStatus{
498 Objects: []actuation.ObjectStatus{
499 {
500 ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeploymentID),
501 Strategy: actuation.ActuationStrategyApply,
502 Actuation: actuation.ActuationSucceeded,
503 Reconcile: actuation.ReconcileSucceeded,
504 UID: testDeployment.GetUID(),
505 Generation: testDeployment.GetGeneration(),
506 },
507 },
508 },
509 }
510 testutil.AssertEqual(t, &expectedInventory, taskContext.InventoryManager().Inventory())
511 }
512
513 func TestWaitTask_Cancel(t *testing.T) {
514 testDeploymentID := testutil.ToIdentifier(t, testDeployment1YAML)
515 testDeployment := testutil.Unstructured(t, testDeployment1YAML)
516 ids := object.ObjMetadataSet{
517 testDeploymentID,
518 }
519 waitTimeout := 5 * time.Second
520 taskName := "wait-4"
521 task := NewWaitTask(taskName, ids, AllCurrent,
522 waitTimeout, testutil.NewFakeRESTMapper())
523
524 eventChannel := make(chan event.Event)
525 resourceCache := cache.NewResourceCacheMap()
526 taskContext := NewTaskContext(eventChannel, resourceCache)
527 defer close(eventChannel)
528
529
530 testDeployment.SetUID("a")
531 testDeployment.SetGeneration(1)
532
533
534 taskContext.InventoryManager().AddSuccessfulApply(testDeploymentID,
535 testDeployment.GetUID(), testDeployment.GetGeneration())
536
537
538 go func() {
539
540 task.Start(taskContext)
541
542
543 time.Sleep(1 * time.Second)
544
545
546 task.Cancel(taskContext)
547 }()
548
549
550 timer := time.NewTimer(10 * time.Second)
551 receivedEvents := []event.Event{}
552 loop:
553 for {
554 select {
555 case e := <-taskContext.EventChannel():
556 receivedEvents = append(receivedEvents, e)
557 case res := <-taskContext.TaskChannel():
558 timer.Stop()
559 assert.NoError(t, res.Err)
560 break loop
561 case <-timer.C:
562 t.Fatalf("timed out waiting for TaskResult")
563 }
564 }
565
566
567 expectedEvents := []event.Event{
568
569
570 {
571 Type: event.WaitType,
572 WaitEvent: event.WaitEvent{
573 GroupName: taskName,
574 Identifier: testDeploymentID,
575 Status: event.ReconcilePending,
576 },
577 },
578 }
579 testutil.AssertEqual(t, expectedEvents, receivedEvents,
580 "Actual events (%d) do not match expected events (%d)",
581 len(receivedEvents), len(expectedEvents))
582
583 expectedInventory := actuation.Inventory{
584 Status: actuation.InventoryStatus{
585 Objects: []actuation.ObjectStatus{
586 {
587 ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeploymentID),
588 Strategy: actuation.ActuationStrategyApply,
589 Actuation: actuation.ActuationSucceeded,
590 Reconcile: actuation.ReconcilePending,
591 UID: testDeployment.GetUID(),
592 Generation: testDeployment.GetGeneration(),
593 },
594 },
595 },
596 }
597 testutil.AssertEqual(t, &expectedInventory, taskContext.InventoryManager().Inventory())
598 }
599
600 func TestWaitTask_SingleTaskResult(t *testing.T) {
601 testDeploymentID := testutil.ToIdentifier(t, testDeployment1YAML)
602 testDeployment := testutil.Unstructured(t, testDeployment1YAML)
603 ids := object.ObjMetadataSet{
604 testDeploymentID,
605 }
606 waitTimeout := 3 * time.Second
607 taskName := "wait-5"
608 task := NewWaitTask(taskName, ids, AllCurrent,
609 waitTimeout, testutil.NewFakeRESTMapper())
610
611
612 eventChannel := make(chan event.Event, 10)
613 resourceCache := cache.NewResourceCacheMap()
614 taskContext := NewTaskContext(eventChannel, resourceCache)
615 defer close(eventChannel)
616
617
618 testDeployment.SetUID("a")
619 testDeployment.SetGeneration(1)
620
621
622 taskContext.InventoryManager().AddSuccessfulApply(testDeploymentID,
623 testDeployment.GetUID(), testDeployment.GetGeneration())
624
625
626 go func() {
627
628 task.Start(taskContext)
629
630
631 time.Sleep(1 * time.Second)
632
633
634 resourceCache.Put(testDeploymentID, cache.ResourceStatus{
635 Resource: withGeneration(testDeployment, 1),
636 Status: status.CurrentStatus,
637 })
638
639
640 for i := 0; i < 10; i++ {
641 task.StatusUpdate(taskContext, testDeploymentID)
642 }
643 }()
644
645
646 timer := time.NewTimer(5 * time.Second)
647 receivedEvents := []event.Event{}
648 receivedResults := []TaskResult{}
649 loop:
650 for {
651 select {
652 case e := <-taskContext.EventChannel():
653 receivedEvents = append(receivedEvents, e)
654 case res := <-taskContext.TaskChannel():
655 receivedResults = append(receivedResults, res)
656 case <-timer.C:
657 break loop
658 }
659 }
660
661 expectedEvents := []event.Event{
662
663
664 {
665 Type: event.WaitType,
666 WaitEvent: event.WaitEvent{
667 GroupName: taskName,
668 Identifier: testDeploymentID,
669 Status: event.ReconcilePending,
670 },
671 },
672
673 {
674 Type: event.WaitType,
675 WaitEvent: event.WaitEvent{
676 GroupName: taskName,
677 Identifier: testDeploymentID,
678 Status: event.ReconcileSuccessful,
679 },
680 },
681 }
682 testutil.AssertEqual(t, expectedEvents, receivedEvents,
683 "Actual events (%d) do not match expected events (%d)",
684 len(receivedEvents), len(expectedEvents))
685
686 expectedResults := []TaskResult{
687 {},
688 }
689 assert.Equal(t, expectedResults, receivedResults)
690
691 expectedInventory := actuation.Inventory{
692 Status: actuation.InventoryStatus{
693 Objects: []actuation.ObjectStatus{
694 {
695 ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeploymentID),
696 Strategy: actuation.ActuationStrategyApply,
697 Actuation: actuation.ActuationSucceeded,
698 Reconcile: actuation.ReconcileSucceeded,
699 UID: testDeployment.GetUID(),
700 Generation: testDeployment.GetGeneration(),
701 },
702 },
703 },
704 }
705 testutil.AssertEqual(t, &expectedInventory, taskContext.InventoryManager().Inventory())
706 }
707
708 func TestWaitTask_Failed(t *testing.T) {
709 taskName := "wait-6"
710 testDeployment1ID := testutil.ToIdentifier(t, testDeployment1YAML)
711 testDeployment1 := testutil.Unstructured(t, testDeployment1YAML)
712 testDeployment2ID := testutil.ToIdentifier(t, testDeployment2YAML)
713 testDeployment2 := testutil.Unstructured(t, testDeployment2YAML)
714
715
716 testDeployment1.SetUID("a")
717 testDeployment1.SetGeneration(1)
718 testDeployment2.SetUID("b")
719 testDeployment2.SetGeneration(1)
720
721 testCases := map[string]struct {
722 configureTaskContextFunc func(taskContext *TaskContext)
723 eventsFunc func(*cache.ResourceCacheMap, *WaitTask, *TaskContext)
724 waitTimeout time.Duration
725 expectedEvents []event.Event
726 expectedInventory *actuation.Inventory
727 }{
728 "continue on failed if others InProgress": {
729 configureTaskContextFunc: func(taskContext *TaskContext) {
730
731 taskContext.InventoryManager().AddSuccessfulApply(testDeployment1ID,
732 testDeployment1.GetUID(), testDeployment1.GetGeneration())
733 taskContext.InventoryManager().AddSuccessfulApply(testDeployment2ID,
734 testDeployment2.GetUID(), testDeployment2.GetGeneration())
735 },
736 eventsFunc: func(resourceCache *cache.ResourceCacheMap, task *WaitTask, taskContext *TaskContext) {
737 resourceCache.Put(testDeployment1ID, cache.ResourceStatus{
738 Resource: testDeployment1,
739 Status: status.FailedStatus,
740 })
741 task.StatusUpdate(taskContext, testDeployment1ID)
742
743 resourceCache.Put(testDeployment2ID, cache.ResourceStatus{
744 Resource: testDeployment2,
745 Status: status.InProgressStatus,
746 })
747 task.StatusUpdate(taskContext, testDeployment2ID)
748
749 resourceCache.Put(testDeployment2ID, cache.ResourceStatus{
750 Resource: testDeployment2,
751 Status: status.CurrentStatus,
752 })
753 task.StatusUpdate(taskContext, testDeployment2ID)
754 },
755 waitTimeout: 2 * time.Second,
756 expectedEvents: []event.Event{
757
758 {
759 Type: event.WaitType,
760 WaitEvent: event.WaitEvent{
761 GroupName: taskName,
762 Identifier: testDeployment1ID,
763 Status: event.ReconcilePending,
764 },
765 },
766
767 {
768 Type: event.WaitType,
769 WaitEvent: event.WaitEvent{
770 GroupName: taskName,
771 Identifier: testDeployment2ID,
772 Status: event.ReconcilePending,
773 },
774 },
775
776 {
777 Type: event.WaitType,
778 WaitEvent: event.WaitEvent{
779 GroupName: taskName,
780 Identifier: testDeployment1ID,
781 Status: event.ReconcileFailed,
782 },
783 },
784
785 {
786 Type: event.WaitType,
787 WaitEvent: event.WaitEvent{
788 GroupName: taskName,
789 Identifier: testDeployment2ID,
790 Status: event.ReconcileSuccessful,
791 },
792 },
793 },
794 expectedInventory: &actuation.Inventory{
795 Status: actuation.InventoryStatus{
796 Objects: []actuation.ObjectStatus{
797 {
798 ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeployment1ID),
799 Strategy: actuation.ActuationStrategyApply,
800 Actuation: actuation.ActuationSucceeded,
801 Reconcile: actuation.ReconcileFailed,
802 UID: testDeployment1.GetUID(),
803 Generation: testDeployment1.GetGeneration(),
804 },
805 {
806 ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeployment2ID),
807 Strategy: actuation.ActuationStrategyApply,
808 Actuation: actuation.ActuationSucceeded,
809 Reconcile: actuation.ReconcileSucceeded,
810 UID: testDeployment2.GetUID(),
811 Generation: testDeployment2.GetGeneration(),
812 },
813 },
814 },
815 },
816 },
817 "complete wait task is last resource becomes failed": {
818 configureTaskContextFunc: func(taskContext *TaskContext) {
819
820 taskContext.InventoryManager().AddSuccessfulApply(testDeployment1ID,
821 testDeployment1.GetUID(), testDeployment1.GetGeneration())
822 taskContext.InventoryManager().AddSuccessfulApply(testDeployment2ID,
823 testDeployment2.GetUID(), testDeployment2.GetGeneration())
824 },
825 eventsFunc: func(resourceCache *cache.ResourceCacheMap, task *WaitTask, taskContext *TaskContext) {
826 resourceCache.Put(testDeployment2ID, cache.ResourceStatus{
827 Resource: testDeployment2,
828 Status: status.CurrentStatus,
829 })
830 task.StatusUpdate(taskContext, testDeployment2ID)
831
832 resourceCache.Put(testDeployment1ID, cache.ResourceStatus{
833 Resource: testDeployment1,
834 Status: status.FailedStatus,
835 })
836 task.StatusUpdate(taskContext, testDeployment1ID)
837 },
838 waitTimeout: 2 * time.Second,
839 expectedEvents: []event.Event{
840
841 {
842 Type: event.WaitType,
843 WaitEvent: event.WaitEvent{
844 GroupName: taskName,
845 Identifier: testDeployment1ID,
846 Status: event.ReconcilePending,
847 },
848 },
849
850 {
851 Type: event.WaitType,
852 WaitEvent: event.WaitEvent{
853 GroupName: taskName,
854 Identifier: testDeployment2ID,
855 Status: event.ReconcilePending,
856 },
857 },
858
859 {
860 Type: event.WaitType,
861 WaitEvent: event.WaitEvent{
862 GroupName: taskName,
863 Identifier: testDeployment2ID,
864 Status: event.ReconcileSuccessful,
865 },
866 },
867
868 {
869 Type: event.WaitType,
870 WaitEvent: event.WaitEvent{
871 GroupName: taskName,
872 Identifier: testDeployment1ID,
873 Status: event.ReconcileFailed,
874 },
875 },
876 },
877 expectedInventory: &actuation.Inventory{
878 Status: actuation.InventoryStatus{
879 Objects: []actuation.ObjectStatus{
880 {
881 ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeployment1ID),
882 Strategy: actuation.ActuationStrategyApply,
883 Actuation: actuation.ActuationSucceeded,
884 Reconcile: actuation.ReconcileFailed,
885 UID: testDeployment1.GetUID(),
886 Generation: testDeployment1.GetGeneration(),
887 },
888 {
889 ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeployment2ID),
890 Strategy: actuation.ActuationStrategyApply,
891 Actuation: actuation.ActuationSucceeded,
892 Reconcile: actuation.ReconcileSucceeded,
893 UID: testDeployment2.GetUID(),
894 Generation: testDeployment2.GetGeneration(),
895 },
896 },
897 },
898 },
899 },
900 "failed resource can become current": {
901 configureTaskContextFunc: func(taskContext *TaskContext) {
902
903 taskContext.InventoryManager().AddSuccessfulApply(testDeployment1ID,
904 testDeployment1.GetUID(), testDeployment1.GetGeneration())
905 taskContext.InventoryManager().AddSuccessfulApply(testDeployment2ID,
906 testDeployment2.GetUID(), testDeployment2.GetGeneration())
907 },
908 eventsFunc: func(resourceCache *cache.ResourceCacheMap, task *WaitTask, taskContext *TaskContext) {
909 resourceCache.Put(testDeployment1ID, cache.ResourceStatus{
910 Resource: testDeployment1,
911 Status: status.FailedStatus,
912 })
913 task.StatusUpdate(taskContext, testDeployment1ID)
914
915 resourceCache.Put(testDeployment1ID, cache.ResourceStatus{
916 Resource: testDeployment1,
917 Status: status.CurrentStatus,
918 })
919 task.StatusUpdate(taskContext, testDeployment1ID)
920
921 resourceCache.Put(testDeployment2ID, cache.ResourceStatus{
922 Resource: testDeployment2,
923 Status: status.CurrentStatus,
924 })
925 task.StatusUpdate(taskContext, testDeployment2ID)
926 },
927 waitTimeout: 2 * time.Second,
928 expectedEvents: []event.Event{
929
930 {
931 Type: event.WaitType,
932 WaitEvent: event.WaitEvent{
933 GroupName: taskName,
934 Identifier: testDeployment1ID,
935 Status: event.ReconcilePending,
936 },
937 },
938
939 {
940 Type: event.WaitType,
941 WaitEvent: event.WaitEvent{
942 GroupName: taskName,
943 Identifier: testDeployment2ID,
944 Status: event.ReconcilePending,
945 },
946 },
947
948 {
949 Type: event.WaitType,
950 WaitEvent: event.WaitEvent{
951 GroupName: taskName,
952 Identifier: testDeployment1ID,
953 Status: event.ReconcileFailed,
954 },
955 },
956
957 {
958 Type: event.WaitType,
959 WaitEvent: event.WaitEvent{
960 GroupName: taskName,
961 Identifier: testDeployment1ID,
962 Status: event.ReconcileSuccessful,
963 },
964 },
965
966 {
967 Type: event.WaitType,
968 WaitEvent: event.WaitEvent{
969 GroupName: taskName,
970 Identifier: testDeployment2ID,
971 Status: event.ReconcileSuccessful,
972 },
973 },
974 },
975 expectedInventory: &actuation.Inventory{
976 Status: actuation.InventoryStatus{
977 Objects: []actuation.ObjectStatus{
978 {
979 ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeployment1ID),
980 Strategy: actuation.ActuationStrategyApply,
981 Actuation: actuation.ActuationSucceeded,
982 Reconcile: actuation.ReconcileSucceeded,
983 UID: testDeployment1.GetUID(),
984 Generation: testDeployment1.GetGeneration(),
985 },
986 {
987 ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeployment2ID),
988 Strategy: actuation.ActuationStrategyApply,
989 Actuation: actuation.ActuationSucceeded,
990 Reconcile: actuation.ReconcileSucceeded,
991 UID: testDeployment2.GetUID(),
992 Generation: testDeployment2.GetGeneration(),
993 },
994 },
995 },
996 },
997 },
998 "failed resource can become InProgress": {
999 configureTaskContextFunc: func(taskContext *TaskContext) {
1000
1001 taskContext.InventoryManager().AddSuccessfulApply(testDeployment1ID,
1002 testDeployment1.GetUID(), testDeployment1.GetGeneration())
1003 taskContext.InventoryManager().AddSuccessfulApply(testDeployment2ID,
1004 testDeployment2.GetUID(), testDeployment2.GetGeneration())
1005 },
1006 eventsFunc: func(resourceCache *cache.ResourceCacheMap, task *WaitTask, taskContext *TaskContext) {
1007 resourceCache.Put(testDeployment1ID, cache.ResourceStatus{
1008 Resource: testDeployment1,
1009 Status: status.FailedStatus,
1010 })
1011 task.StatusUpdate(taskContext, testDeployment1ID)
1012
1013 resourceCache.Put(testDeployment1ID, cache.ResourceStatus{
1014 Resource: testDeployment1,
1015 Status: status.InProgressStatus,
1016 })
1017 task.StatusUpdate(taskContext, testDeployment1ID)
1018
1019 resourceCache.Put(testDeployment2ID, cache.ResourceStatus{
1020 Resource: testDeployment2,
1021 Status: status.CurrentStatus,
1022 })
1023 task.StatusUpdate(taskContext, testDeployment2ID)
1024 },
1025 waitTimeout: 2 * time.Second,
1026 expectedEvents: []event.Event{
1027
1028 {
1029 Type: event.WaitType,
1030 WaitEvent: event.WaitEvent{
1031 GroupName: taskName,
1032 Identifier: testDeployment1ID,
1033 Status: event.ReconcilePending,
1034 },
1035 },
1036
1037 {
1038 Type: event.WaitType,
1039 WaitEvent: event.WaitEvent{
1040 GroupName: taskName,
1041 Identifier: testDeployment2ID,
1042 Status: event.ReconcilePending,
1043 },
1044 },
1045
1046 {
1047 Type: event.WaitType,
1048 WaitEvent: event.WaitEvent{
1049 GroupName: taskName,
1050 Identifier: testDeployment1ID,
1051 Status: event.ReconcileFailed,
1052 },
1053 },
1054
1055 {
1056 Type: event.WaitType,
1057 WaitEvent: event.WaitEvent{
1058 GroupName: taskName,
1059 Identifier: testDeployment1ID,
1060 Status: event.ReconcilePending,
1061 },
1062 },
1063
1064 {
1065 Type: event.WaitType,
1066 WaitEvent: event.WaitEvent{
1067 GroupName: taskName,
1068 Identifier: testDeployment2ID,
1069 Status: event.ReconcileSuccessful,
1070 },
1071 },
1072
1073 {
1074 Type: event.WaitType,
1075 WaitEvent: event.WaitEvent{
1076 GroupName: taskName,
1077 Identifier: testDeployment1ID,
1078 Status: event.ReconcileTimeout,
1079 },
1080 },
1081 },
1082 expectedInventory: &actuation.Inventory{
1083 Status: actuation.InventoryStatus{
1084 Objects: []actuation.ObjectStatus{
1085 {
1086 ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeployment1ID),
1087 Strategy: actuation.ActuationStrategyApply,
1088 Actuation: actuation.ActuationSucceeded,
1089 Reconcile: actuation.ReconcileTimeout,
1090 UID: testDeployment1.GetUID(),
1091 Generation: testDeployment1.GetGeneration(),
1092 },
1093 {
1094 ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeployment2ID),
1095 Strategy: actuation.ActuationStrategyApply,
1096 Actuation: actuation.ActuationSucceeded,
1097 Reconcile: actuation.ReconcileSucceeded,
1098 UID: testDeployment2.GetUID(),
1099 Generation: testDeployment2.GetGeneration(),
1100 },
1101 },
1102 },
1103 },
1104 },
1105 }
1106
1107 for tn, tc := range testCases {
1108 t.Run(tn, func(t *testing.T) {
1109 ids := object.ObjMetadataSet{
1110 testDeployment1ID,
1111 testDeployment2ID,
1112 }
1113 task := NewWaitTask(taskName, ids, AllCurrent,
1114 tc.waitTimeout, testutil.NewFakeRESTMapper())
1115
1116 eventChannel := make(chan event.Event)
1117 resourceCache := cache.NewResourceCacheMap()
1118 taskContext := NewTaskContext(eventChannel, resourceCache)
1119 defer close(eventChannel)
1120
1121 tc.configureTaskContextFunc(taskContext)
1122
1123
1124 go func() {
1125
1126 task.Start(taskContext)
1127
1128 tc.eventsFunc(resourceCache, task, taskContext)
1129 }()
1130
1131
1132 timer := time.NewTimer(5 * time.Second)
1133 receivedEvents := []event.Event{}
1134 loop:
1135 for {
1136 select {
1137 case e := <-taskContext.EventChannel():
1138 receivedEvents = append(receivedEvents, e)
1139 case res := <-taskContext.TaskChannel():
1140 timer.Stop()
1141 assert.NoError(t, res.Err)
1142 break loop
1143 case <-timer.C:
1144 t.Fatalf("timed out waiting for TaskResult")
1145 }
1146 }
1147
1148 testutil.AssertEqual(t, tc.expectedEvents, receivedEvents,
1149 "Actual events (%d) do not match expected events (%d)",
1150 len(receivedEvents), len(tc.expectedEvents))
1151
1152 testutil.AssertEqual(t, tc.expectedInventory, taskContext.InventoryManager().Inventory())
1153 })
1154 }
1155 }
1156
1157 func TestWaitTask_UIDChanged(t *testing.T) {
1158 taskName := "wait-7"
1159 testDeployment1ID := testutil.ToIdentifier(t, testDeployment1YAML)
1160 testDeployment1 := testutil.Unstructured(t, testDeployment1YAML)
1161 testDeployment2ID := testutil.ToIdentifier(t, testDeployment2YAML)
1162 testDeployment2 := testutil.Unstructured(t, testDeployment2YAML)
1163
1164
1165 testDeployment1.SetUID("a")
1166 testDeployment1.SetGeneration(1)
1167 testDeployment2.SetUID("b")
1168 testDeployment2.SetGeneration(1)
1169
1170 replacedDeployment1 := testDeployment1.DeepCopy()
1171 replacedDeployment1.SetUID("replaced")
1172
1173 testCases := map[string]struct {
1174 condition Condition
1175 configureTaskContextFunc func(taskContext *TaskContext)
1176 eventsFunc func(*cache.ResourceCacheMap, *WaitTask, *TaskContext)
1177 waitTimeout time.Duration
1178 expectedEvents []event.Event
1179 expectedInventory *actuation.Inventory
1180 }{
1181 "UID changed after apply means reconcile failure": {
1182 condition: AllCurrent,
1183 configureTaskContextFunc: func(taskContext *TaskContext) {
1184
1185 taskContext.InventoryManager().AddSuccessfulApply(testDeployment1ID,
1186 testDeployment1.GetUID(), testDeployment1.GetGeneration())
1187 taskContext.InventoryManager().AddSuccessfulApply(testDeployment2ID,
1188 testDeployment2.GetUID(), testDeployment2.GetGeneration())
1189 },
1190 eventsFunc: func(resourceCache *cache.ResourceCacheMap, task *WaitTask, taskContext *TaskContext) {
1191
1192 resourceCache.Put(testDeployment1ID, cache.ResourceStatus{
1193 Resource: replacedDeployment1,
1194 Status: status.CurrentStatus,
1195 })
1196 task.StatusUpdate(taskContext, testDeployment1ID)
1197
1198 resourceCache.Put(testDeployment2ID, cache.ResourceStatus{
1199 Resource: testDeployment2,
1200 Status: status.CurrentStatus,
1201 })
1202 task.StatusUpdate(taskContext, testDeployment2ID)
1203 },
1204 waitTimeout: 2 * time.Second,
1205 expectedEvents: []event.Event{
1206
1207 {
1208 Type: event.WaitType,
1209 WaitEvent: event.WaitEvent{
1210 GroupName: taskName,
1211 Identifier: testDeployment1ID,
1212 Status: event.ReconcilePending,
1213 },
1214 },
1215
1216 {
1217 Type: event.WaitType,
1218 WaitEvent: event.WaitEvent{
1219 GroupName: taskName,
1220 Identifier: testDeployment2ID,
1221 Status: event.ReconcilePending,
1222 },
1223 },
1224
1225 {
1226 Type: event.WaitType,
1227 WaitEvent: event.WaitEvent{
1228 GroupName: taskName,
1229 Identifier: testDeployment1ID,
1230 Status: event.ReconcileFailed,
1231 },
1232 },
1233
1234 {
1235 Type: event.WaitType,
1236 WaitEvent: event.WaitEvent{
1237 GroupName: taskName,
1238 Identifier: testDeployment2ID,
1239 Status: event.ReconcileSuccessful,
1240 },
1241 },
1242 },
1243 expectedInventory: &actuation.Inventory{
1244 Status: actuation.InventoryStatus{
1245 Objects: []actuation.ObjectStatus{
1246 {
1247 ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeployment1ID),
1248 Strategy: actuation.ActuationStrategyApply,
1249 Actuation: actuation.ActuationSucceeded,
1250
1251 Reconcile: actuation.ReconcileFailed,
1252
1253 UID: testDeployment1.GetUID(),
1254 Generation: testDeployment1.GetGeneration(),
1255 },
1256 {
1257 ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeployment2ID),
1258 Strategy: actuation.ActuationStrategyApply,
1259 Actuation: actuation.ActuationSucceeded,
1260 Reconcile: actuation.ReconcileSucceeded,
1261 UID: testDeployment2.GetUID(),
1262 Generation: testDeployment2.GetGeneration(),
1263 },
1264 },
1265 },
1266 },
1267 },
1268 "UID changed after delete means reconcile success": {
1269 condition: AllNotFound,
1270 configureTaskContextFunc: func(taskContext *TaskContext) {
1271
1272 taskContext.InventoryManager().AddSuccessfulDelete(testDeployment1ID,
1273 testDeployment1.GetUID())
1274 taskContext.InventoryManager().AddSuccessfulDelete(testDeployment2ID,
1275 testDeployment2.GetUID())
1276 },
1277 eventsFunc: func(resourceCache *cache.ResourceCacheMap, task *WaitTask, taskContext *TaskContext) {
1278
1279 resourceCache.Put(testDeployment1ID, cache.ResourceStatus{
1280 Resource: replacedDeployment1,
1281 Status: status.InProgressStatus,
1282 })
1283 task.StatusUpdate(taskContext, testDeployment1ID)
1284
1285 resourceCache.Put(testDeployment2ID, cache.ResourceStatus{
1286 Resource: testDeployment2,
1287 Status: status.NotFoundStatus,
1288 })
1289 task.StatusUpdate(taskContext, testDeployment2ID)
1290 },
1291 waitTimeout: 2 * time.Second,
1292 expectedEvents: []event.Event{
1293
1294 {
1295 Type: event.WaitType,
1296 WaitEvent: event.WaitEvent{
1297 GroupName: taskName,
1298 Identifier: testDeployment1ID,
1299 Status: event.ReconcilePending,
1300 },
1301 },
1302
1303 {
1304 Type: event.WaitType,
1305 WaitEvent: event.WaitEvent{
1306 GroupName: taskName,
1307 Identifier: testDeployment2ID,
1308 Status: event.ReconcilePending,
1309 },
1310 },
1311
1312 {
1313 Type: event.WaitType,
1314 WaitEvent: event.WaitEvent{
1315 GroupName: taskName,
1316 Identifier: testDeployment1ID,
1317 Status: event.ReconcileSuccessful,
1318 },
1319 },
1320
1321 {
1322 Type: event.WaitType,
1323 WaitEvent: event.WaitEvent{
1324 GroupName: taskName,
1325 Identifier: testDeployment2ID,
1326 Status: event.ReconcileSuccessful,
1327 },
1328 },
1329 },
1330 expectedInventory: &actuation.Inventory{
1331 Status: actuation.InventoryStatus{
1332 Objects: []actuation.ObjectStatus{
1333 {
1334 ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeployment1ID),
1335 Strategy: actuation.ActuationStrategyDelete,
1336 Actuation: actuation.ActuationSucceeded,
1337
1338 Reconcile: actuation.ReconcileSucceeded,
1339
1340 UID: testDeployment1.GetUID(),
1341
1342 },
1343 {
1344 ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeployment2ID),
1345 Strategy: actuation.ActuationStrategyDelete,
1346 Actuation: actuation.ActuationSucceeded,
1347 Reconcile: actuation.ReconcileSucceeded,
1348 UID: testDeployment2.GetUID(),
1349
1350 },
1351 },
1352 },
1353 },
1354 },
1355 }
1356
1357 for tn, tc := range testCases {
1358 t.Run(tn, func(t *testing.T) {
1359 ids := object.ObjMetadataSet{
1360 testDeployment1ID,
1361 testDeployment2ID,
1362 }
1363 task := NewWaitTask(taskName, ids, tc.condition,
1364 tc.waitTimeout, testutil.NewFakeRESTMapper())
1365
1366 eventChannel := make(chan event.Event)
1367 resourceCache := cache.NewResourceCacheMap()
1368 taskContext := NewTaskContext(eventChannel, resourceCache)
1369 defer close(eventChannel)
1370
1371 tc.configureTaskContextFunc(taskContext)
1372
1373
1374 go func() {
1375
1376 task.Start(taskContext)
1377
1378 tc.eventsFunc(resourceCache, task, taskContext)
1379 }()
1380
1381
1382 timer := time.NewTimer(5 * time.Second)
1383 receivedEvents := []event.Event{}
1384 loop:
1385 for {
1386 select {
1387 case e := <-taskContext.EventChannel():
1388 receivedEvents = append(receivedEvents, e)
1389 case res := <-taskContext.TaskChannel():
1390 timer.Stop()
1391 assert.NoError(t, res.Err)
1392 break loop
1393 case <-timer.C:
1394 t.Fatalf("timed out waiting for TaskResult")
1395 }
1396 }
1397
1398 testutil.AssertEqual(t, tc.expectedEvents, receivedEvents,
1399 "Actual events (%d) do not match expected events (%d)",
1400 len(receivedEvents), len(tc.expectedEvents))
1401
1402 testutil.AssertEqual(t, tc.expectedInventory, taskContext.InventoryManager().Inventory())
1403 })
1404 }
1405 }
1406
View as plain text