...

Source file src/sigs.k8s.io/cli-utils/pkg/apply/taskrunner/task_test.go

Documentation: sigs.k8s.io/cli-utils/pkg/apply/taskrunner

     1  // Copyright 2020 The Kubernetes Authors.
     2  // SPDX-License-Identifier: Apache-2.0
     3  
     4  package taskrunner
     5  
     6  import (
     7  	"testing"
     8  	"time"
     9  
    10  	"github.com/stretchr/testify/assert"
    11  	"sigs.k8s.io/cli-utils/pkg/apis/actuation"
    12  	"sigs.k8s.io/cli-utils/pkg/apply/cache"
    13  	"sigs.k8s.io/cli-utils/pkg/apply/event"
    14  	"sigs.k8s.io/cli-utils/pkg/inventory"
    15  	"sigs.k8s.io/cli-utils/pkg/kstatus/status"
    16  	"sigs.k8s.io/cli-utils/pkg/object"
    17  	"sigs.k8s.io/cli-utils/pkg/testutil"
    18  )
    19  
    20  var testDeployment1YAML = `
    21  apiVersion: apps/v1
    22  kind: Deployment
    23  metadata:
    24    name: a
    25    namespace: default
    26    uid: dep-uid-a
    27    generation: 1
    28  spec:
    29    replicas: 1
    30  `
    31  
    32  var testDeployment2YAML = `
    33  apiVersion: v1
    34  kind: Deployment
    35  metadata:
    36    name: b
    37    namespace: default
    38    uid: dep-uid-b
    39    generation: 1
    40  spec:
    41    replicas: 2
    42  `
    43  
    44  var testDeployment3YAML = `
    45  apiVersion: v1
    46  kind: Deployment
    47  metadata:
    48    name: c
    49    namespace: default
    50    uid: dep-uid-c
    51    generation: 1
    52  spec:
    53    replicas: 3
    54  `
    55  
    56  var testDeployment4YAML = `
    57  apiVersion: v1
    58  kind: Deployment
    59  metadata:
    60    name: d
    61    namespace: default
    62    uid: dep-uid-d
    63    generation: 1
    64  spec:
    65    replicas: 4
    66  `
    67  
    68  func TestWaitTask_CompleteEventually(t *testing.T) {
    69  	testDeployment1ID := testutil.ToIdentifier(t, testDeployment1YAML)
    70  	testDeployment1 := testutil.Unstructured(t, testDeployment1YAML)
    71  	testDeployment2ID := testutil.ToIdentifier(t, testDeployment2YAML)
    72  	testDeployment2 := testutil.Unstructured(t, testDeployment2YAML)
    73  	testDeployment3ID := testutil.ToIdentifier(t, testDeployment3YAML)
    74  	testDeployment4ID := testutil.ToIdentifier(t, testDeployment4YAML)
    75  	ids := object.ObjMetadataSet{
    76  		testDeployment1ID,
    77  		testDeployment2ID,
    78  		testDeployment3ID,
    79  		testDeployment4ID,
    80  	}
    81  	waitTimeout := 2 * time.Second
    82  	taskName := "wait-1"
    83  	task := NewWaitTask(taskName, ids, AllCurrent,
    84  		waitTimeout, testutil.NewFakeRESTMapper())
    85  
    86  	eventChannel := make(chan event.Event)
    87  	resourceCache := cache.NewResourceCacheMap()
    88  	taskContext := NewTaskContext(eventChannel, resourceCache)
    89  	defer close(eventChannel)
    90  
    91  	// Update metadata on successfully applied objects
    92  	testDeployment1.SetUID("a")
    93  	testDeployment1.SetGeneration(1)
    94  	testDeployment2.SetUID("b")
    95  	testDeployment2.SetGeneration(1)
    96  
    97  	// mark deployment 1 & 2 as apply succeeded
    98  	taskContext.InventoryManager().AddSuccessfulApply(testDeployment1ID,
    99  		testDeployment1.GetUID(), testDeployment1.GetGeneration())
   100  	taskContext.InventoryManager().AddSuccessfulApply(testDeployment2ID,
   101  		testDeployment2.GetUID(), testDeployment2.GetGeneration())
   102  
   103  	// mark deployment 3 as apply failed
   104  	taskContext.InventoryManager().AddFailedApply(testDeployment3ID)
   105  
   106  	// mark deployment 4 as apply skipped
   107  	taskContext.InventoryManager().AddSkippedApply(testDeployment4ID)
   108  
   109  	// run task async, to let the test collect events
   110  	go func() {
   111  		// start the task
   112  		task.Start(taskContext)
   113  
   114  		// mark deployment1 as Current
   115  		resourceCache.Put(testDeployment1ID, cache.ResourceStatus{
   116  			Resource: testDeployment1,
   117  			Status:   status.CurrentStatus,
   118  		})
   119  		// tell the WaitTask deployment1 has new status
   120  		task.StatusUpdate(taskContext, testDeployment1ID)
   121  
   122  		// mark deployment2 as InProgress
   123  		resourceCache.Put(testDeployment2ID, cache.ResourceStatus{
   124  			Resource: testDeployment2,
   125  			Status:   status.InProgressStatus,
   126  		})
   127  		// tell the WaitTask deployment2 has new status
   128  		task.StatusUpdate(taskContext, testDeployment2ID)
   129  
   130  		// mark deployment2 as Current
   131  		resourceCache.Put(testDeployment2ID, cache.ResourceStatus{
   132  			Resource: testDeployment2,
   133  			Status:   status.CurrentStatus,
   134  		})
   135  		// tell the WaitTask deployment2 has new status
   136  		task.StatusUpdate(taskContext, testDeployment2ID)
   137  	}()
   138  
   139  	// wait for task result
   140  	timer := time.NewTimer(5 * time.Second)
   141  	receivedEvents := []event.Event{}
   142  loop:
   143  	for {
   144  		select {
   145  		case e := <-taskContext.EventChannel():
   146  			receivedEvents = append(receivedEvents, e)
   147  		case res := <-taskContext.TaskChannel():
   148  			timer.Stop()
   149  			assert.NoError(t, res.Err)
   150  			break loop
   151  		case <-timer.C:
   152  			t.Fatalf("timed out waiting for TaskResult")
   153  		}
   154  	}
   155  
   156  	// Expect an event for every object (sorted).
   157  	expectedEvents := []event.Event{
   158  		// skipped/reconciled/pending events first, in the order provided to the WaitTask
   159  		// deployment1 pending
   160  		{
   161  			Type: event.WaitType,
   162  			WaitEvent: event.WaitEvent{
   163  				GroupName:  taskName,
   164  				Identifier: testDeployment1ID,
   165  				Status:     event.ReconcilePending,
   166  			},
   167  		},
   168  		// deployment2 pending
   169  		{
   170  			Type: event.WaitType,
   171  			WaitEvent: event.WaitEvent{
   172  				GroupName:  taskName,
   173  				Identifier: testDeployment2ID,
   174  				Status:     event.ReconcilePending,
   175  			},
   176  		},
   177  		// deployment3 skipped
   178  		{
   179  			Type: event.WaitType,
   180  			WaitEvent: event.WaitEvent{
   181  				GroupName:  taskName,
   182  				Identifier: testDeployment3ID,
   183  				Status:     event.ReconcileSkipped,
   184  			},
   185  		},
   186  		// deployment4 skipped
   187  		{
   188  			Type: event.WaitType,
   189  			WaitEvent: event.WaitEvent{
   190  				GroupName:  taskName,
   191  				Identifier: testDeployment4ID,
   192  				Status:     event.ReconcileSkipped,
   193  			},
   194  		},
   195  		// current events next, in the order of status updates
   196  		// deployment1 current
   197  		{
   198  			Type: event.WaitType,
   199  			WaitEvent: event.WaitEvent{
   200  				GroupName:  taskName,
   201  				Identifier: testDeployment1ID,
   202  				Status:     event.ReconcileSuccessful,
   203  			},
   204  		},
   205  		// deployment2 current
   206  		{
   207  			Type: event.WaitType,
   208  			WaitEvent: event.WaitEvent{
   209  				GroupName:  taskName,
   210  				Identifier: testDeployment2ID,
   211  				Status:     event.ReconcileSuccessful,
   212  			},
   213  		},
   214  	}
   215  	testutil.AssertEqual(t, expectedEvents, receivedEvents,
   216  		"Actual events (%d) do not match expected events (%d)",
   217  		len(receivedEvents), len(expectedEvents))
   218  
   219  	expectedInventory := actuation.Inventory{
   220  		Status: actuation.InventoryStatus{
   221  			Objects: []actuation.ObjectStatus{
   222  				{
   223  					ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeployment1ID),
   224  					Strategy:        actuation.ActuationStrategyApply,
   225  					Actuation:       actuation.ActuationSucceeded,
   226  					Reconcile:       actuation.ReconcileSucceeded,
   227  					UID:             testDeployment1.GetUID(),
   228  					Generation:      testDeployment1.GetGeneration(),
   229  				},
   230  				{
   231  					ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeployment2ID),
   232  					Strategy:        actuation.ActuationStrategyApply,
   233  					Actuation:       actuation.ActuationSucceeded,
   234  					Reconcile:       actuation.ReconcileSucceeded,
   235  					UID:             testDeployment2.GetUID(),
   236  					Generation:      testDeployment2.GetGeneration(),
   237  				},
   238  				{
   239  					ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeployment3ID),
   240  					Strategy:        actuation.ActuationStrategyApply,
   241  					Actuation:       actuation.ActuationFailed,
   242  					Reconcile:       actuation.ReconcileSkipped,
   243  				},
   244  				{
   245  					ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeployment4ID),
   246  					Strategy:        actuation.ActuationStrategyApply,
   247  					Actuation:       actuation.ActuationSkipped,
   248  					Reconcile:       actuation.ReconcileSkipped,
   249  				},
   250  			},
   251  		},
   252  	}
   253  	testutil.AssertEqual(t, &expectedInventory, taskContext.InventoryManager().Inventory())
   254  }
   255  
   256  func TestWaitTask_Timeout(t *testing.T) {
   257  	testDeployment1ID := testutil.ToIdentifier(t, testDeployment1YAML)
   258  	testDeployment1 := testutil.Unstructured(t, testDeployment1YAML)
   259  	testDeployment2ID := testutil.ToIdentifier(t, testDeployment2YAML)
   260  	testDeployment2 := testutil.Unstructured(t, testDeployment2YAML)
   261  	testDeployment3ID := testutil.ToIdentifier(t, testDeployment3YAML)
   262  	testDeployment4ID := testutil.ToIdentifier(t, testDeployment4YAML)
   263  	ids := object.ObjMetadataSet{
   264  		testDeployment1ID,
   265  		testDeployment2ID,
   266  		testDeployment3ID,
   267  		testDeployment4ID,
   268  	}
   269  	waitTimeout := 2 * time.Second
   270  	taskName := "wait-2"
   271  	task := NewWaitTask(taskName, ids, AllCurrent,
   272  		waitTimeout, testutil.NewFakeRESTMapper())
   273  
   274  	eventChannel := make(chan event.Event)
   275  	resourceCache := cache.NewResourceCacheMap()
   276  	taskContext := NewTaskContext(eventChannel, resourceCache)
   277  	defer close(eventChannel)
   278  
   279  	// Update metadata on successfully applied objects
   280  	testDeployment1.SetUID("a")
   281  	testDeployment1.SetGeneration(1)
   282  	testDeployment2.SetUID("b")
   283  	testDeployment2.SetGeneration(1)
   284  
   285  	// mark deployment 1 & 2 as apply succeeded
   286  	taskContext.InventoryManager().AddSuccessfulApply(testDeployment1ID,
   287  		testDeployment1.GetUID(), testDeployment1.GetGeneration())
   288  	taskContext.InventoryManager().AddSuccessfulApply(testDeployment2ID,
   289  		testDeployment2.GetUID(), testDeployment2.GetGeneration())
   290  
   291  	// mark deployment 3 as apply failed
   292  	taskContext.InventoryManager().AddFailedApply(testDeployment3ID)
   293  
   294  	// mark deployment 4 as apply skipped
   295  	taskContext.InventoryManager().AddSkippedApply(testDeployment4ID)
   296  
   297  	// run task async, to let the test collect events
   298  	go func() {
   299  		// start the task
   300  		task.Start(taskContext)
   301  		// mark deployment1 as Current
   302  		resourceCache.Put(testDeployment1ID, cache.ResourceStatus{
   303  			Resource: testDeployment1,
   304  			Status:   status.CurrentStatus,
   305  		})
   306  		// tell the WaitTask deployment1 has new status
   307  		task.StatusUpdate(taskContext, testDeployment1ID)
   308  	}()
   309  
   310  	// wait for task result
   311  	timer := time.NewTimer(5 * time.Second)
   312  	receivedEvents := []event.Event{}
   313  loop:
   314  	for {
   315  		select {
   316  		case e := <-taskContext.EventChannel():
   317  			receivedEvents = append(receivedEvents, e)
   318  		case res := <-taskContext.TaskChannel():
   319  			timer.Stop()
   320  			assert.NoError(t, res.Err)
   321  			break loop
   322  		case <-timer.C:
   323  			t.Fatalf("timed out waiting for TaskResult")
   324  		}
   325  	}
   326  
   327  	// Expect an event for every object (sorted).
   328  	expectedEvents := []event.Event{
   329  		// skipped/reconciled/pending events first, in the order provided to the WaitTask
   330  		// deployment1 pending
   331  		{
   332  			Type: event.WaitType,
   333  			WaitEvent: event.WaitEvent{
   334  				GroupName:  taskName,
   335  				Identifier: testDeployment1ID,
   336  				Status:     event.ReconcilePending,
   337  			},
   338  		},
   339  		// deployment2 pending
   340  		{
   341  			Type: event.WaitType,
   342  			WaitEvent: event.WaitEvent{
   343  				GroupName:  taskName,
   344  				Identifier: testDeployment2ID,
   345  				Status:     event.ReconcilePending,
   346  			},
   347  		},
   348  		// deployment3 skipped
   349  		{
   350  			Type: event.WaitType,
   351  			WaitEvent: event.WaitEvent{
   352  				GroupName:  taskName,
   353  				Identifier: testDeployment3ID,
   354  				Status:     event.ReconcileSkipped,
   355  			},
   356  		},
   357  		// deployment4 skipped
   358  		{
   359  			Type: event.WaitType,
   360  			WaitEvent: event.WaitEvent{
   361  				GroupName:  taskName,
   362  				Identifier: testDeployment4ID,
   363  				Status:     event.ReconcileSkipped,
   364  			},
   365  		},
   366  		// current events next, in the order of status updates
   367  		// deployment1 current
   368  		{
   369  			Type: event.WaitType,
   370  			WaitEvent: event.WaitEvent{
   371  				GroupName:  taskName,
   372  				Identifier: testDeployment1ID,
   373  				Status:     event.ReconcileSuccessful,
   374  			},
   375  		},
   376  		// timeout events last, in the order provided to the WaitTask
   377  		// deployment2 timeout
   378  		{
   379  			Type: event.WaitType,
   380  			WaitEvent: event.WaitEvent{
   381  				GroupName:  taskName,
   382  				Identifier: testDeployment2ID,
   383  				Status:     event.ReconcileTimeout,
   384  			},
   385  		},
   386  	}
   387  	testutil.AssertEqual(t, expectedEvents, receivedEvents,
   388  		"Actual events (%d) do not match expected events (%d)",
   389  		len(receivedEvents), len(expectedEvents))
   390  
   391  	expectedInventory := actuation.Inventory{
   392  		Status: actuation.InventoryStatus{
   393  			Objects: []actuation.ObjectStatus{
   394  				{
   395  					ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeployment1ID),
   396  					Strategy:        actuation.ActuationStrategyApply,
   397  					Actuation:       actuation.ActuationSucceeded,
   398  					Reconcile:       actuation.ReconcileSucceeded,
   399  					UID:             testDeployment1.GetUID(),
   400  					Generation:      testDeployment1.GetGeneration(),
   401  				},
   402  				{
   403  					ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeployment2ID),
   404  					Strategy:        actuation.ActuationStrategyApply,
   405  					Actuation:       actuation.ActuationSucceeded,
   406  					Reconcile:       actuation.ReconcileTimeout,
   407  					UID:             testDeployment2.GetUID(),
   408  					Generation:      testDeployment2.GetGeneration(),
   409  				},
   410  				{
   411  					ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeployment3ID),
   412  					Strategy:        actuation.ActuationStrategyApply,
   413  					Actuation:       actuation.ActuationFailed,
   414  					Reconcile:       actuation.ReconcileSkipped,
   415  				},
   416  				{
   417  					ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeployment4ID),
   418  					Strategy:        actuation.ActuationStrategyApply,
   419  					Actuation:       actuation.ActuationSkipped,
   420  					Reconcile:       actuation.ReconcileSkipped,
   421  				},
   422  			},
   423  		},
   424  	}
   425  	testutil.AssertEqual(t, &expectedInventory, taskContext.InventoryManager().Inventory())
   426  }
   427  
   428  func TestWaitTask_StartAndComplete(t *testing.T) {
   429  	testDeploymentID := testutil.ToIdentifier(t, testDeployment1YAML)
   430  	testDeployment := testutil.Unstructured(t, testDeployment1YAML)
   431  	ids := object.ObjMetadataSet{
   432  		testDeploymentID,
   433  	}
   434  	waitTimeout := 2 * time.Second
   435  	taskName := "wait-3"
   436  	task := NewWaitTask(taskName, ids, AllCurrent,
   437  		waitTimeout, testutil.NewFakeRESTMapper())
   438  
   439  	eventChannel := make(chan event.Event)
   440  	resourceCache := cache.NewResourceCacheMap()
   441  	taskContext := NewTaskContext(eventChannel, resourceCache)
   442  	defer close(eventChannel)
   443  
   444  	// Update metadata on successfully applied objects
   445  	testDeployment.SetUID("a")
   446  	testDeployment.SetGeneration(1)
   447  
   448  	// mark deployment as apply succeeded
   449  	taskContext.InventoryManager().AddSuccessfulApply(testDeploymentID,
   450  		testDeployment.GetUID(), testDeployment.GetGeneration())
   451  
   452  	// mark the deployment as Current before starting
   453  	resourceCache.Put(testDeploymentID, cache.ResourceStatus{
   454  		Resource: testDeployment,
   455  		Status:   status.CurrentStatus,
   456  	})
   457  
   458  	// run task async, to let the test collect events
   459  	go func() {
   460  		// start the task
   461  		task.Start(taskContext)
   462  	}()
   463  
   464  	// wait for first task result
   465  	timer := time.NewTimer(5 * time.Second)
   466  	receivedEvents := []event.Event{}
   467  loop:
   468  	for {
   469  		select {
   470  		case e := <-taskContext.EventChannel():
   471  			receivedEvents = append(receivedEvents, e)
   472  		case res := <-taskContext.TaskChannel():
   473  			timer.Stop()
   474  			assert.NoError(t, res.Err)
   475  			break loop
   476  		case <-timer.C:
   477  			t.Fatalf("timed out waiting for TaskResult")
   478  		}
   479  	}
   480  
   481  	expectedEvents := []event.Event{
   482  		// deployment1 current (no pending event when Current before start)
   483  		{
   484  			Type: event.WaitType,
   485  			WaitEvent: event.WaitEvent{
   486  				GroupName:  taskName,
   487  				Identifier: testDeploymentID,
   488  				Status:     event.ReconcileSuccessful,
   489  			},
   490  		},
   491  	}
   492  	testutil.AssertEqual(t, expectedEvents, receivedEvents,
   493  		"Actual events (%d) do not match expected events (%d)",
   494  		len(receivedEvents), len(expectedEvents))
   495  
   496  	expectedInventory := actuation.Inventory{
   497  		Status: actuation.InventoryStatus{
   498  			Objects: []actuation.ObjectStatus{
   499  				{
   500  					ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeploymentID),
   501  					Strategy:        actuation.ActuationStrategyApply,
   502  					Actuation:       actuation.ActuationSucceeded,
   503  					Reconcile:       actuation.ReconcileSucceeded,
   504  					UID:             testDeployment.GetUID(),
   505  					Generation:      testDeployment.GetGeneration(),
   506  				},
   507  			},
   508  		},
   509  	}
   510  	testutil.AssertEqual(t, &expectedInventory, taskContext.InventoryManager().Inventory())
   511  }
   512  
   513  func TestWaitTask_Cancel(t *testing.T) {
   514  	testDeploymentID := testutil.ToIdentifier(t, testDeployment1YAML)
   515  	testDeployment := testutil.Unstructured(t, testDeployment1YAML)
   516  	ids := object.ObjMetadataSet{
   517  		testDeploymentID,
   518  	}
   519  	waitTimeout := 5 * time.Second
   520  	taskName := "wait-4"
   521  	task := NewWaitTask(taskName, ids, AllCurrent,
   522  		waitTimeout, testutil.NewFakeRESTMapper())
   523  
   524  	eventChannel := make(chan event.Event)
   525  	resourceCache := cache.NewResourceCacheMap()
   526  	taskContext := NewTaskContext(eventChannel, resourceCache)
   527  	defer close(eventChannel)
   528  
   529  	// Update metadata on successfully applied objects
   530  	testDeployment.SetUID("a")
   531  	testDeployment.SetGeneration(1)
   532  
   533  	// mark deployment as apply succeeded
   534  	taskContext.InventoryManager().AddSuccessfulApply(testDeploymentID,
   535  		testDeployment.GetUID(), testDeployment.GetGeneration())
   536  
   537  	// run task async, to let the test collect events
   538  	go func() {
   539  		// start the task
   540  		task.Start(taskContext)
   541  
   542  		// wait a bit
   543  		time.Sleep(1 * time.Second)
   544  
   545  		// cancel immediately (simulate context cancel from baseRunner)
   546  		task.Cancel(taskContext)
   547  	}()
   548  
   549  	// wait for first task result
   550  	timer := time.NewTimer(10 * time.Second)
   551  	receivedEvents := []event.Event{}
   552  loop:
   553  	for {
   554  		select {
   555  		case e := <-taskContext.EventChannel():
   556  			receivedEvents = append(receivedEvents, e)
   557  		case res := <-taskContext.TaskChannel():
   558  			timer.Stop()
   559  			assert.NoError(t, res.Err)
   560  			break loop
   561  		case <-timer.C:
   562  			t.Fatalf("timed out waiting for TaskResult")
   563  		}
   564  	}
   565  
   566  	// no timeout events sent on cancel
   567  	expectedEvents := []event.Event{
   568  		// skipped/reconciled/pending events first, in the order provided to the WaitTask
   569  		// deployment1 pending
   570  		{
   571  			Type: event.WaitType,
   572  			WaitEvent: event.WaitEvent{
   573  				GroupName:  taskName,
   574  				Identifier: testDeploymentID,
   575  				Status:     event.ReconcilePending,
   576  			},
   577  		},
   578  	}
   579  	testutil.AssertEqual(t, expectedEvents, receivedEvents,
   580  		"Actual events (%d) do not match expected events (%d)",
   581  		len(receivedEvents), len(expectedEvents))
   582  
   583  	expectedInventory := actuation.Inventory{
   584  		Status: actuation.InventoryStatus{
   585  			Objects: []actuation.ObjectStatus{
   586  				{
   587  					ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeploymentID),
   588  					Strategy:        actuation.ActuationStrategyApply,
   589  					Actuation:       actuation.ActuationSucceeded,
   590  					Reconcile:       actuation.ReconcilePending,
   591  					UID:             testDeployment.GetUID(),
   592  					Generation:      testDeployment.GetGeneration(),
   593  				},
   594  			},
   595  		},
   596  	}
   597  	testutil.AssertEqual(t, &expectedInventory, taskContext.InventoryManager().Inventory())
   598  }
   599  
   600  func TestWaitTask_SingleTaskResult(t *testing.T) {
   601  	testDeploymentID := testutil.ToIdentifier(t, testDeployment1YAML)
   602  	testDeployment := testutil.Unstructured(t, testDeployment1YAML)
   603  	ids := object.ObjMetadataSet{
   604  		testDeploymentID,
   605  	}
   606  	waitTimeout := 3 * time.Second
   607  	taskName := "wait-5"
   608  	task := NewWaitTask(taskName, ids, AllCurrent,
   609  		waitTimeout, testutil.NewFakeRESTMapper())
   610  
   611  	// buffer events, because they're sent by StatusUpdate
   612  	eventChannel := make(chan event.Event, 10)
   613  	resourceCache := cache.NewResourceCacheMap()
   614  	taskContext := NewTaskContext(eventChannel, resourceCache)
   615  	defer close(eventChannel)
   616  
   617  	// Update metadata on successfully applied objects
   618  	testDeployment.SetUID("a")
   619  	testDeployment.SetGeneration(1)
   620  
   621  	// mark deployment as apply succeeded
   622  	taskContext.InventoryManager().AddSuccessfulApply(testDeploymentID,
   623  		testDeployment.GetUID(), testDeployment.GetGeneration())
   624  
   625  	// run task async, to let the test collect events
   626  	go func() {
   627  		// start the task
   628  		task.Start(taskContext)
   629  
   630  		// wait a bit
   631  		time.Sleep(1 * time.Second)
   632  
   633  		// mark the deployment as Current
   634  		resourceCache.Put(testDeploymentID, cache.ResourceStatus{
   635  			Resource: withGeneration(testDeployment, 1),
   636  			Status:   status.CurrentStatus,
   637  		})
   638  
   639  		// send multiple status updates
   640  		for i := 0; i < 10; i++ {
   641  			task.StatusUpdate(taskContext, testDeploymentID)
   642  		}
   643  	}()
   644  
   645  	// wait for timeout
   646  	timer := time.NewTimer(5 * time.Second)
   647  	receivedEvents := []event.Event{}
   648  	receivedResults := []TaskResult{}
   649  loop:
   650  	for {
   651  		select {
   652  		case e := <-taskContext.EventChannel():
   653  			receivedEvents = append(receivedEvents, e)
   654  		case res := <-taskContext.TaskChannel():
   655  			receivedResults = append(receivedResults, res)
   656  		case <-timer.C:
   657  			break loop
   658  		}
   659  	}
   660  
   661  	expectedEvents := []event.Event{
   662  		// skipped/reconciled/pending events first, in the order provided to the WaitTask
   663  		// deployment1 pending
   664  		{
   665  			Type: event.WaitType,
   666  			WaitEvent: event.WaitEvent{
   667  				GroupName:  taskName,
   668  				Identifier: testDeploymentID,
   669  				Status:     event.ReconcilePending,
   670  			},
   671  		},
   672  		// deployment1 reconciled
   673  		{
   674  			Type: event.WaitType,
   675  			WaitEvent: event.WaitEvent{
   676  				GroupName:  taskName,
   677  				Identifier: testDeploymentID,
   678  				Status:     event.ReconcileSuccessful,
   679  			},
   680  		},
   681  	}
   682  	testutil.AssertEqual(t, expectedEvents, receivedEvents,
   683  		"Actual events (%d) do not match expected events (%d)",
   684  		len(receivedEvents), len(expectedEvents))
   685  
   686  	expectedResults := []TaskResult{
   687  		{}, // Empty result means success
   688  	}
   689  	assert.Equal(t, expectedResults, receivedResults)
   690  
   691  	expectedInventory := actuation.Inventory{
   692  		Status: actuation.InventoryStatus{
   693  			Objects: []actuation.ObjectStatus{
   694  				{
   695  					ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeploymentID),
   696  					Strategy:        actuation.ActuationStrategyApply,
   697  					Actuation:       actuation.ActuationSucceeded,
   698  					Reconcile:       actuation.ReconcileSucceeded,
   699  					UID:             testDeployment.GetUID(),
   700  					Generation:      testDeployment.GetGeneration(),
   701  				},
   702  			},
   703  		},
   704  	}
   705  	testutil.AssertEqual(t, &expectedInventory, taskContext.InventoryManager().Inventory())
   706  }
   707  
   708  func TestWaitTask_Failed(t *testing.T) {
   709  	taskName := "wait-6"
   710  	testDeployment1ID := testutil.ToIdentifier(t, testDeployment1YAML)
   711  	testDeployment1 := testutil.Unstructured(t, testDeployment1YAML)
   712  	testDeployment2ID := testutil.ToIdentifier(t, testDeployment2YAML)
   713  	testDeployment2 := testutil.Unstructured(t, testDeployment2YAML)
   714  
   715  	// Update metadata on successfully applied objects
   716  	testDeployment1.SetUID("a")
   717  	testDeployment1.SetGeneration(1)
   718  	testDeployment2.SetUID("b")
   719  	testDeployment2.SetGeneration(1)
   720  
   721  	testCases := map[string]struct {
   722  		configureTaskContextFunc func(taskContext *TaskContext)
   723  		eventsFunc               func(*cache.ResourceCacheMap, *WaitTask, *TaskContext)
   724  		waitTimeout              time.Duration
   725  		expectedEvents           []event.Event
   726  		expectedInventory        *actuation.Inventory
   727  	}{
   728  		"continue on failed if others InProgress": {
   729  			configureTaskContextFunc: func(taskContext *TaskContext) {
   730  				// mark deployment as apply succeeded
   731  				taskContext.InventoryManager().AddSuccessfulApply(testDeployment1ID,
   732  					testDeployment1.GetUID(), testDeployment1.GetGeneration())
   733  				taskContext.InventoryManager().AddSuccessfulApply(testDeployment2ID,
   734  					testDeployment2.GetUID(), testDeployment2.GetGeneration())
   735  			},
   736  			eventsFunc: func(resourceCache *cache.ResourceCacheMap, task *WaitTask, taskContext *TaskContext) {
   737  				resourceCache.Put(testDeployment1ID, cache.ResourceStatus{
   738  					Resource: testDeployment1,
   739  					Status:   status.FailedStatus,
   740  				})
   741  				task.StatusUpdate(taskContext, testDeployment1ID)
   742  
   743  				resourceCache.Put(testDeployment2ID, cache.ResourceStatus{
   744  					Resource: testDeployment2,
   745  					Status:   status.InProgressStatus,
   746  				})
   747  				task.StatusUpdate(taskContext, testDeployment2ID)
   748  
   749  				resourceCache.Put(testDeployment2ID, cache.ResourceStatus{
   750  					Resource: testDeployment2,
   751  					Status:   status.CurrentStatus,
   752  				})
   753  				task.StatusUpdate(taskContext, testDeployment2ID)
   754  			},
   755  			waitTimeout: 2 * time.Second,
   756  			expectedEvents: []event.Event{
   757  				// deployment1 pending
   758  				{
   759  					Type: event.WaitType,
   760  					WaitEvent: event.WaitEvent{
   761  						GroupName:  taskName,
   762  						Identifier: testDeployment1ID,
   763  						Status:     event.ReconcilePending,
   764  					},
   765  				},
   766  				// deployment2 pending
   767  				{
   768  					Type: event.WaitType,
   769  					WaitEvent: event.WaitEvent{
   770  						GroupName:  taskName,
   771  						Identifier: testDeployment2ID,
   772  						Status:     event.ReconcilePending,
   773  					},
   774  				},
   775  				// deployment1 is failed
   776  				{
   777  					Type: event.WaitType,
   778  					WaitEvent: event.WaitEvent{
   779  						GroupName:  taskName,
   780  						Identifier: testDeployment1ID,
   781  						Status:     event.ReconcileFailed,
   782  					},
   783  				},
   784  				// deployment2 current
   785  				{
   786  					Type: event.WaitType,
   787  					WaitEvent: event.WaitEvent{
   788  						GroupName:  taskName,
   789  						Identifier: testDeployment2ID,
   790  						Status:     event.ReconcileSuccessful,
   791  					},
   792  				},
   793  			},
   794  			expectedInventory: &actuation.Inventory{
   795  				Status: actuation.InventoryStatus{
   796  					Objects: []actuation.ObjectStatus{
   797  						{
   798  							ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeployment1ID),
   799  							Strategy:        actuation.ActuationStrategyApply,
   800  							Actuation:       actuation.ActuationSucceeded,
   801  							Reconcile:       actuation.ReconcileFailed,
   802  							UID:             testDeployment1.GetUID(),
   803  							Generation:      testDeployment1.GetGeneration(),
   804  						},
   805  						{
   806  							ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeployment2ID),
   807  							Strategy:        actuation.ActuationStrategyApply,
   808  							Actuation:       actuation.ActuationSucceeded,
   809  							Reconcile:       actuation.ReconcileSucceeded,
   810  							UID:             testDeployment2.GetUID(),
   811  							Generation:      testDeployment2.GetGeneration(),
   812  						},
   813  					},
   814  				},
   815  			},
   816  		},
   817  		"complete wait task is last resource becomes failed": {
   818  			configureTaskContextFunc: func(taskContext *TaskContext) {
   819  				// mark deployment as apply succeeded
   820  				taskContext.InventoryManager().AddSuccessfulApply(testDeployment1ID,
   821  					testDeployment1.GetUID(), testDeployment1.GetGeneration())
   822  				taskContext.InventoryManager().AddSuccessfulApply(testDeployment2ID,
   823  					testDeployment2.GetUID(), testDeployment2.GetGeneration())
   824  			},
   825  			eventsFunc: func(resourceCache *cache.ResourceCacheMap, task *WaitTask, taskContext *TaskContext) {
   826  				resourceCache.Put(testDeployment2ID, cache.ResourceStatus{
   827  					Resource: testDeployment2,
   828  					Status:   status.CurrentStatus,
   829  				})
   830  				task.StatusUpdate(taskContext, testDeployment2ID)
   831  
   832  				resourceCache.Put(testDeployment1ID, cache.ResourceStatus{
   833  					Resource: testDeployment1,
   834  					Status:   status.FailedStatus,
   835  				})
   836  				task.StatusUpdate(taskContext, testDeployment1ID)
   837  			},
   838  			waitTimeout: 2 * time.Second,
   839  			expectedEvents: []event.Event{
   840  				// deployment1 pending
   841  				{
   842  					Type: event.WaitType,
   843  					WaitEvent: event.WaitEvent{
   844  						GroupName:  taskName,
   845  						Identifier: testDeployment1ID,
   846  						Status:     event.ReconcilePending,
   847  					},
   848  				},
   849  				// deployment2 pending
   850  				{
   851  					Type: event.WaitType,
   852  					WaitEvent: event.WaitEvent{
   853  						GroupName:  taskName,
   854  						Identifier: testDeployment2ID,
   855  						Status:     event.ReconcilePending,
   856  					},
   857  				},
   858  				// deployment2 current
   859  				{
   860  					Type: event.WaitType,
   861  					WaitEvent: event.WaitEvent{
   862  						GroupName:  taskName,
   863  						Identifier: testDeployment2ID,
   864  						Status:     event.ReconcileSuccessful,
   865  					},
   866  				},
   867  				// deployment1 is failed
   868  				{
   869  					Type: event.WaitType,
   870  					WaitEvent: event.WaitEvent{
   871  						GroupName:  taskName,
   872  						Identifier: testDeployment1ID,
   873  						Status:     event.ReconcileFailed,
   874  					},
   875  				},
   876  			},
   877  			expectedInventory: &actuation.Inventory{
   878  				Status: actuation.InventoryStatus{
   879  					Objects: []actuation.ObjectStatus{
   880  						{
   881  							ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeployment1ID),
   882  							Strategy:        actuation.ActuationStrategyApply,
   883  							Actuation:       actuation.ActuationSucceeded,
   884  							Reconcile:       actuation.ReconcileFailed,
   885  							UID:             testDeployment1.GetUID(),
   886  							Generation:      testDeployment1.GetGeneration(),
   887  						},
   888  						{
   889  							ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeployment2ID),
   890  							Strategy:        actuation.ActuationStrategyApply,
   891  							Actuation:       actuation.ActuationSucceeded,
   892  							Reconcile:       actuation.ReconcileSucceeded,
   893  							UID:             testDeployment2.GetUID(),
   894  							Generation:      testDeployment2.GetGeneration(),
   895  						},
   896  					},
   897  				},
   898  			},
   899  		},
   900  		"failed resource can become current": {
   901  			configureTaskContextFunc: func(taskContext *TaskContext) {
   902  				// mark deployment as apply succeeded
   903  				taskContext.InventoryManager().AddSuccessfulApply(testDeployment1ID,
   904  					testDeployment1.GetUID(), testDeployment1.GetGeneration())
   905  				taskContext.InventoryManager().AddSuccessfulApply(testDeployment2ID,
   906  					testDeployment2.GetUID(), testDeployment2.GetGeneration())
   907  			},
   908  			eventsFunc: func(resourceCache *cache.ResourceCacheMap, task *WaitTask, taskContext *TaskContext) {
   909  				resourceCache.Put(testDeployment1ID, cache.ResourceStatus{
   910  					Resource: testDeployment1,
   911  					Status:   status.FailedStatus,
   912  				})
   913  				task.StatusUpdate(taskContext, testDeployment1ID)
   914  
   915  				resourceCache.Put(testDeployment1ID, cache.ResourceStatus{
   916  					Resource: testDeployment1,
   917  					Status:   status.CurrentStatus,
   918  				})
   919  				task.StatusUpdate(taskContext, testDeployment1ID)
   920  
   921  				resourceCache.Put(testDeployment2ID, cache.ResourceStatus{
   922  					Resource: testDeployment2,
   923  					Status:   status.CurrentStatus,
   924  				})
   925  				task.StatusUpdate(taskContext, testDeployment2ID)
   926  			},
   927  			waitTimeout: 2 * time.Second,
   928  			expectedEvents: []event.Event{
   929  				// deployment1 pending
   930  				{
   931  					Type: event.WaitType,
   932  					WaitEvent: event.WaitEvent{
   933  						GroupName:  taskName,
   934  						Identifier: testDeployment1ID,
   935  						Status:     event.ReconcilePending,
   936  					},
   937  				},
   938  				// deployment2 pending
   939  				{
   940  					Type: event.WaitType,
   941  					WaitEvent: event.WaitEvent{
   942  						GroupName:  taskName,
   943  						Identifier: testDeployment2ID,
   944  						Status:     event.ReconcilePending,
   945  					},
   946  				},
   947  				// deployment1 is failed
   948  				{
   949  					Type: event.WaitType,
   950  					WaitEvent: event.WaitEvent{
   951  						GroupName:  taskName,
   952  						Identifier: testDeployment1ID,
   953  						Status:     event.ReconcileFailed,
   954  					},
   955  				},
   956  				// deployment1 is current
   957  				{
   958  					Type: event.WaitType,
   959  					WaitEvent: event.WaitEvent{
   960  						GroupName:  taskName,
   961  						Identifier: testDeployment1ID,
   962  						Status:     event.ReconcileSuccessful,
   963  					},
   964  				},
   965  				// deployment2 current
   966  				{
   967  					Type: event.WaitType,
   968  					WaitEvent: event.WaitEvent{
   969  						GroupName:  taskName,
   970  						Identifier: testDeployment2ID,
   971  						Status:     event.ReconcileSuccessful,
   972  					},
   973  				},
   974  			},
   975  			expectedInventory: &actuation.Inventory{
   976  				Status: actuation.InventoryStatus{
   977  					Objects: []actuation.ObjectStatus{
   978  						{
   979  							ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeployment1ID),
   980  							Strategy:        actuation.ActuationStrategyApply,
   981  							Actuation:       actuation.ActuationSucceeded,
   982  							Reconcile:       actuation.ReconcileSucceeded,
   983  							UID:             testDeployment1.GetUID(),
   984  							Generation:      testDeployment1.GetGeneration(),
   985  						},
   986  						{
   987  							ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeployment2ID),
   988  							Strategy:        actuation.ActuationStrategyApply,
   989  							Actuation:       actuation.ActuationSucceeded,
   990  							Reconcile:       actuation.ReconcileSucceeded,
   991  							UID:             testDeployment2.GetUID(),
   992  							Generation:      testDeployment2.GetGeneration(),
   993  						},
   994  					},
   995  				},
   996  			},
   997  		},
   998  		"failed resource can become InProgress": {
   999  			configureTaskContextFunc: func(taskContext *TaskContext) {
  1000  				// mark deployment as apply succeeded
  1001  				taskContext.InventoryManager().AddSuccessfulApply(testDeployment1ID,
  1002  					testDeployment1.GetUID(), testDeployment1.GetGeneration())
  1003  				taskContext.InventoryManager().AddSuccessfulApply(testDeployment2ID,
  1004  					testDeployment2.GetUID(), testDeployment2.GetGeneration())
  1005  			},
  1006  			eventsFunc: func(resourceCache *cache.ResourceCacheMap, task *WaitTask, taskContext *TaskContext) {
  1007  				resourceCache.Put(testDeployment1ID, cache.ResourceStatus{
  1008  					Resource: testDeployment1,
  1009  					Status:   status.FailedStatus,
  1010  				})
  1011  				task.StatusUpdate(taskContext, testDeployment1ID)
  1012  
  1013  				resourceCache.Put(testDeployment1ID, cache.ResourceStatus{
  1014  					Resource: testDeployment1,
  1015  					Status:   status.InProgressStatus,
  1016  				})
  1017  				task.StatusUpdate(taskContext, testDeployment1ID)
  1018  
  1019  				resourceCache.Put(testDeployment2ID, cache.ResourceStatus{
  1020  					Resource: testDeployment2,
  1021  					Status:   status.CurrentStatus,
  1022  				})
  1023  				task.StatusUpdate(taskContext, testDeployment2ID)
  1024  			},
  1025  			waitTimeout: 2 * time.Second,
  1026  			expectedEvents: []event.Event{
  1027  				// deployment1 pending
  1028  				{
  1029  					Type: event.WaitType,
  1030  					WaitEvent: event.WaitEvent{
  1031  						GroupName:  taskName,
  1032  						Identifier: testDeployment1ID,
  1033  						Status:     event.ReconcilePending,
  1034  					},
  1035  				},
  1036  				// deployment2 pending
  1037  				{
  1038  					Type: event.WaitType,
  1039  					WaitEvent: event.WaitEvent{
  1040  						GroupName:  taskName,
  1041  						Identifier: testDeployment2ID,
  1042  						Status:     event.ReconcilePending,
  1043  					},
  1044  				},
  1045  				// deployment1 is failed
  1046  				{
  1047  					Type: event.WaitType,
  1048  					WaitEvent: event.WaitEvent{
  1049  						GroupName:  taskName,
  1050  						Identifier: testDeployment1ID,
  1051  						Status:     event.ReconcileFailed,
  1052  					},
  1053  				},
  1054  				// deployment1 pending
  1055  				{
  1056  					Type: event.WaitType,
  1057  					WaitEvent: event.WaitEvent{
  1058  						GroupName:  taskName,
  1059  						Identifier: testDeployment1ID,
  1060  						Status:     event.ReconcilePending,
  1061  					},
  1062  				},
  1063  				// deployment2 current
  1064  				{
  1065  					Type: event.WaitType,
  1066  					WaitEvent: event.WaitEvent{
  1067  						GroupName:  taskName,
  1068  						Identifier: testDeployment2ID,
  1069  						Status:     event.ReconcileSuccessful,
  1070  					},
  1071  				},
  1072  				// deployment1 timed out
  1073  				{
  1074  					Type: event.WaitType,
  1075  					WaitEvent: event.WaitEvent{
  1076  						GroupName:  taskName,
  1077  						Identifier: testDeployment1ID,
  1078  						Status:     event.ReconcileTimeout,
  1079  					},
  1080  				},
  1081  			},
  1082  			expectedInventory: &actuation.Inventory{
  1083  				Status: actuation.InventoryStatus{
  1084  					Objects: []actuation.ObjectStatus{
  1085  						{
  1086  							ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeployment1ID),
  1087  							Strategy:        actuation.ActuationStrategyApply,
  1088  							Actuation:       actuation.ActuationSucceeded,
  1089  							Reconcile:       actuation.ReconcileTimeout,
  1090  							UID:             testDeployment1.GetUID(),
  1091  							Generation:      testDeployment1.GetGeneration(),
  1092  						},
  1093  						{
  1094  							ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeployment2ID),
  1095  							Strategy:        actuation.ActuationStrategyApply,
  1096  							Actuation:       actuation.ActuationSucceeded,
  1097  							Reconcile:       actuation.ReconcileSucceeded,
  1098  							UID:             testDeployment2.GetUID(),
  1099  							Generation:      testDeployment2.GetGeneration(),
  1100  						},
  1101  					},
  1102  				},
  1103  			},
  1104  		},
  1105  	}
  1106  
  1107  	for tn, tc := range testCases {
  1108  		t.Run(tn, func(t *testing.T) {
  1109  			ids := object.ObjMetadataSet{
  1110  				testDeployment1ID,
  1111  				testDeployment2ID,
  1112  			}
  1113  			task := NewWaitTask(taskName, ids, AllCurrent,
  1114  				tc.waitTimeout, testutil.NewFakeRESTMapper())
  1115  
  1116  			eventChannel := make(chan event.Event)
  1117  			resourceCache := cache.NewResourceCacheMap()
  1118  			taskContext := NewTaskContext(eventChannel, resourceCache)
  1119  			defer close(eventChannel)
  1120  
  1121  			tc.configureTaskContextFunc(taskContext)
  1122  
  1123  			// run task async, to let the test collect events
  1124  			go func() {
  1125  				// start the task
  1126  				task.Start(taskContext)
  1127  
  1128  				tc.eventsFunc(resourceCache, task, taskContext)
  1129  			}()
  1130  
  1131  			// wait for task result
  1132  			timer := time.NewTimer(5 * time.Second)
  1133  			receivedEvents := []event.Event{}
  1134  		loop:
  1135  			for {
  1136  				select {
  1137  				case e := <-taskContext.EventChannel():
  1138  					receivedEvents = append(receivedEvents, e)
  1139  				case res := <-taskContext.TaskChannel():
  1140  					timer.Stop()
  1141  					assert.NoError(t, res.Err)
  1142  					break loop
  1143  				case <-timer.C:
  1144  					t.Fatalf("timed out waiting for TaskResult")
  1145  				}
  1146  			}
  1147  
  1148  			testutil.AssertEqual(t, tc.expectedEvents, receivedEvents,
  1149  				"Actual events (%d) do not match expected events (%d)",
  1150  				len(receivedEvents), len(tc.expectedEvents))
  1151  
  1152  			testutil.AssertEqual(t, tc.expectedInventory, taskContext.InventoryManager().Inventory())
  1153  		})
  1154  	}
  1155  }
  1156  
  1157  func TestWaitTask_UIDChanged(t *testing.T) {
  1158  	taskName := "wait-7"
  1159  	testDeployment1ID := testutil.ToIdentifier(t, testDeployment1YAML)
  1160  	testDeployment1 := testutil.Unstructured(t, testDeployment1YAML)
  1161  	testDeployment2ID := testutil.ToIdentifier(t, testDeployment2YAML)
  1162  	testDeployment2 := testutil.Unstructured(t, testDeployment2YAML)
  1163  
  1164  	// Update metadata on successfully applied objects
  1165  	testDeployment1.SetUID("a")
  1166  	testDeployment1.SetGeneration(1)
  1167  	testDeployment2.SetUID("b")
  1168  	testDeployment2.SetGeneration(1)
  1169  
  1170  	replacedDeployment1 := testDeployment1.DeepCopy()
  1171  	replacedDeployment1.SetUID("replaced")
  1172  
  1173  	testCases := map[string]struct {
  1174  		condition                Condition
  1175  		configureTaskContextFunc func(taskContext *TaskContext)
  1176  		eventsFunc               func(*cache.ResourceCacheMap, *WaitTask, *TaskContext)
  1177  		waitTimeout              time.Duration
  1178  		expectedEvents           []event.Event
  1179  		expectedInventory        *actuation.Inventory
  1180  	}{
  1181  		"UID changed after apply means reconcile failure": {
  1182  			condition: AllCurrent,
  1183  			configureTaskContextFunc: func(taskContext *TaskContext) {
  1184  				// mark deployment as apply succeeded
  1185  				taskContext.InventoryManager().AddSuccessfulApply(testDeployment1ID,
  1186  					testDeployment1.GetUID(), testDeployment1.GetGeneration())
  1187  				taskContext.InventoryManager().AddSuccessfulApply(testDeployment2ID,
  1188  					testDeployment2.GetUID(), testDeployment2.GetGeneration())
  1189  			},
  1190  			eventsFunc: func(resourceCache *cache.ResourceCacheMap, task *WaitTask, taskContext *TaskContext) {
  1191  				// any status update after apply success should trigger failure if the UID changed
  1192  				resourceCache.Put(testDeployment1ID, cache.ResourceStatus{
  1193  					Resource: replacedDeployment1,
  1194  					Status:   status.CurrentStatus,
  1195  				})
  1196  				task.StatusUpdate(taskContext, testDeployment1ID)
  1197  
  1198  				resourceCache.Put(testDeployment2ID, cache.ResourceStatus{
  1199  					Resource: testDeployment2,
  1200  					Status:   status.CurrentStatus,
  1201  				})
  1202  				task.StatusUpdate(taskContext, testDeployment2ID)
  1203  			},
  1204  			waitTimeout: 2 * time.Second,
  1205  			expectedEvents: []event.Event{
  1206  				// deployment1 pending
  1207  				{
  1208  					Type: event.WaitType,
  1209  					WaitEvent: event.WaitEvent{
  1210  						GroupName:  taskName,
  1211  						Identifier: testDeployment1ID,
  1212  						Status:     event.ReconcilePending,
  1213  					},
  1214  				},
  1215  				// deployment2 pending
  1216  				{
  1217  					Type: event.WaitType,
  1218  					WaitEvent: event.WaitEvent{
  1219  						GroupName:  taskName,
  1220  						Identifier: testDeployment2ID,
  1221  						Status:     event.ReconcilePending,
  1222  					},
  1223  				},
  1224  				// deployment1 is failed
  1225  				{
  1226  					Type: event.WaitType,
  1227  					WaitEvent: event.WaitEvent{
  1228  						GroupName:  taskName,
  1229  						Identifier: testDeployment1ID,
  1230  						Status:     event.ReconcileFailed,
  1231  					},
  1232  				},
  1233  				// deployment2 current
  1234  				{
  1235  					Type: event.WaitType,
  1236  					WaitEvent: event.WaitEvent{
  1237  						GroupName:  taskName,
  1238  						Identifier: testDeployment2ID,
  1239  						Status:     event.ReconcileSuccessful,
  1240  					},
  1241  				},
  1242  			},
  1243  			expectedInventory: &actuation.Inventory{
  1244  				Status: actuation.InventoryStatus{
  1245  					Objects: []actuation.ObjectStatus{
  1246  						{
  1247  							ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeployment1ID),
  1248  							Strategy:        actuation.ActuationStrategyApply,
  1249  							Actuation:       actuation.ActuationSucceeded,
  1250  							// UID change causes failure after apply
  1251  							Reconcile: actuation.ReconcileFailed,
  1252  							// Recorded UID should be from the applied object, not the new replacement
  1253  							UID:        testDeployment1.GetUID(),
  1254  							Generation: testDeployment1.GetGeneration(),
  1255  						},
  1256  						{
  1257  							ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeployment2ID),
  1258  							Strategy:        actuation.ActuationStrategyApply,
  1259  							Actuation:       actuation.ActuationSucceeded,
  1260  							Reconcile:       actuation.ReconcileSucceeded,
  1261  							UID:             testDeployment2.GetUID(),
  1262  							Generation:      testDeployment2.GetGeneration(),
  1263  						},
  1264  					},
  1265  				},
  1266  			},
  1267  		},
  1268  		"UID changed after delete means reconcile success": {
  1269  			condition: AllNotFound,
  1270  			configureTaskContextFunc: func(taskContext *TaskContext) {
  1271  				// mark deployment as apply succeeded
  1272  				taskContext.InventoryManager().AddSuccessfulDelete(testDeployment1ID,
  1273  					testDeployment1.GetUID())
  1274  				taskContext.InventoryManager().AddSuccessfulDelete(testDeployment2ID,
  1275  					testDeployment2.GetUID())
  1276  			},
  1277  			eventsFunc: func(resourceCache *cache.ResourceCacheMap, task *WaitTask, taskContext *TaskContext) {
  1278  				// any status update after delete should trigger success if the UID changed
  1279  				resourceCache.Put(testDeployment1ID, cache.ResourceStatus{
  1280  					Resource: replacedDeployment1,
  1281  					Status:   status.InProgressStatus,
  1282  				})
  1283  				task.StatusUpdate(taskContext, testDeployment1ID)
  1284  
  1285  				resourceCache.Put(testDeployment2ID, cache.ResourceStatus{
  1286  					Resource: testDeployment2,
  1287  					Status:   status.NotFoundStatus,
  1288  				})
  1289  				task.StatusUpdate(taskContext, testDeployment2ID)
  1290  			},
  1291  			waitTimeout: 2 * time.Second,
  1292  			expectedEvents: []event.Event{
  1293  				// deployment1 pending
  1294  				{
  1295  					Type: event.WaitType,
  1296  					WaitEvent: event.WaitEvent{
  1297  						GroupName:  taskName,
  1298  						Identifier: testDeployment1ID,
  1299  						Status:     event.ReconcilePending,
  1300  					},
  1301  				},
  1302  				// deployment2 pending
  1303  				{
  1304  					Type: event.WaitType,
  1305  					WaitEvent: event.WaitEvent{
  1306  						GroupName:  taskName,
  1307  						Identifier: testDeployment2ID,
  1308  						Status:     event.ReconcilePending,
  1309  					},
  1310  				},
  1311  				// deployment1 is replaced
  1312  				{
  1313  					Type: event.WaitType,
  1314  					WaitEvent: event.WaitEvent{
  1315  						GroupName:  taskName,
  1316  						Identifier: testDeployment1ID,
  1317  						Status:     event.ReconcileSuccessful,
  1318  					},
  1319  				},
  1320  				// deployment2 not found
  1321  				{
  1322  					Type: event.WaitType,
  1323  					WaitEvent: event.WaitEvent{
  1324  						GroupName:  taskName,
  1325  						Identifier: testDeployment2ID,
  1326  						Status:     event.ReconcileSuccessful,
  1327  					},
  1328  				},
  1329  			},
  1330  			expectedInventory: &actuation.Inventory{
  1331  				Status: actuation.InventoryStatus{
  1332  					Objects: []actuation.ObjectStatus{
  1333  						{
  1334  							ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeployment1ID),
  1335  							Strategy:        actuation.ActuationStrategyDelete,
  1336  							Actuation:       actuation.ActuationSucceeded,
  1337  							// UID change causes success after delete
  1338  							Reconcile: actuation.ReconcileSucceeded,
  1339  							// Recorded UID should be from the deleted object, not the new replacement
  1340  							UID: testDeployment1.GetUID(),
  1341  							// Deleted generation is unknown
  1342  						},
  1343  						{
  1344  							ObjectReference: inventory.ObjectReferenceFromObjMetadata(testDeployment2ID),
  1345  							Strategy:        actuation.ActuationStrategyDelete,
  1346  							Actuation:       actuation.ActuationSucceeded,
  1347  							Reconcile:       actuation.ReconcileSucceeded,
  1348  							UID:             testDeployment2.GetUID(),
  1349  							// Deleted generation is unknown
  1350  						},
  1351  					},
  1352  				},
  1353  			},
  1354  		},
  1355  	}
  1356  
  1357  	for tn, tc := range testCases {
  1358  		t.Run(tn, func(t *testing.T) {
  1359  			ids := object.ObjMetadataSet{
  1360  				testDeployment1ID,
  1361  				testDeployment2ID,
  1362  			}
  1363  			task := NewWaitTask(taskName, ids, tc.condition,
  1364  				tc.waitTimeout, testutil.NewFakeRESTMapper())
  1365  
  1366  			eventChannel := make(chan event.Event)
  1367  			resourceCache := cache.NewResourceCacheMap()
  1368  			taskContext := NewTaskContext(eventChannel, resourceCache)
  1369  			defer close(eventChannel)
  1370  
  1371  			tc.configureTaskContextFunc(taskContext)
  1372  
  1373  			// run task async, to let the test collect events
  1374  			go func() {
  1375  				// start the task
  1376  				task.Start(taskContext)
  1377  
  1378  				tc.eventsFunc(resourceCache, task, taskContext)
  1379  			}()
  1380  
  1381  			// wait for task result
  1382  			timer := time.NewTimer(5 * time.Second)
  1383  			receivedEvents := []event.Event{}
  1384  		loop:
  1385  			for {
  1386  				select {
  1387  				case e := <-taskContext.EventChannel():
  1388  					receivedEvents = append(receivedEvents, e)
  1389  				case res := <-taskContext.TaskChannel():
  1390  					timer.Stop()
  1391  					assert.NoError(t, res.Err)
  1392  					break loop
  1393  				case <-timer.C:
  1394  					t.Fatalf("timed out waiting for TaskResult")
  1395  				}
  1396  			}
  1397  
  1398  			testutil.AssertEqual(t, tc.expectedEvents, receivedEvents,
  1399  				"Actual events (%d) do not match expected events (%d)",
  1400  				len(receivedEvents), len(tc.expectedEvents))
  1401  
  1402  			testutil.AssertEqual(t, tc.expectedInventory, taskContext.InventoryManager().Inventory())
  1403  		})
  1404  	}
  1405  }
  1406  

View as plain text