...

Source file src/sigs.k8s.io/cli-utils/pkg/apply/taskrunner/runner_test.go

Documentation: sigs.k8s.io/cli-utils/pkg/apply/taskrunner

     1  // Copyright 2020 The Kubernetes Authors.
     2  // SPDX-License-Identifier: Apache-2.0
     3  
     4  package taskrunner
     5  
     6  import (
     7  	"context"
     8  	"fmt"
     9  	"sync"
    10  	"testing"
    11  	"time"
    12  
    13  	"github.com/stretchr/testify/assert"
    14  	"k8s.io/apimachinery/pkg/runtime/schema"
    15  	"sigs.k8s.io/cli-utils/pkg/apply/cache"
    16  	"sigs.k8s.io/cli-utils/pkg/apply/event"
    17  	pollevent "sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
    18  	"sigs.k8s.io/cli-utils/pkg/kstatus/status"
    19  	"sigs.k8s.io/cli-utils/pkg/kstatus/watcher"
    20  	"sigs.k8s.io/cli-utils/pkg/object"
    21  	"sigs.k8s.io/cli-utils/pkg/testutil"
    22  )
    23  
    24  var (
    25  	depID = object.ObjMetadata{
    26  		GroupKind: schema.GroupKind{
    27  			Group: "apps",
    28  			Kind:  "Deployment",
    29  		},
    30  		Namespace: "default",
    31  		Name:      "dep",
    32  	}
    33  	cmID = object.ObjMetadata{
    34  		GroupKind: schema.GroupKind{
    35  			Group: "",
    36  			Kind:  "ConfigMap",
    37  		},
    38  		Namespace: "default",
    39  		Name:      "cm",
    40  	}
    41  )
    42  
    43  func TestBaseRunner(t *testing.T) {
    44  	testCases := map[string]struct {
    45  		tasks              []Task
    46  		statusEventsDelay  time.Duration
    47  		statusEvents       []pollevent.Event
    48  		expectedEventTypes []event.Type
    49  		expectedWaitEvents []event.WaitEvent
    50  	}{
    51  		"wait task runs until condition is met": {
    52  			tasks: []Task{
    53  				&fakeApplyTask{
    54  					resultEvent: event.Event{
    55  						Type: event.ApplyType,
    56  					},
    57  					duration: 3 * time.Second,
    58  				},
    59  				NewWaitTask("wait", object.ObjMetadataSet{depID, cmID}, AllCurrent,
    60  					1*time.Minute, testutil.NewFakeRESTMapper()),
    61  				&fakeApplyTask{
    62  					resultEvent: event.Event{
    63  						Type: event.PruneType,
    64  					},
    65  					duration: 2 * time.Second,
    66  				},
    67  			},
    68  			statusEventsDelay: 5 * time.Second,
    69  			statusEvents: []pollevent.Event{
    70  				{
    71  					Type: pollevent.ResourceUpdateEvent,
    72  					Resource: &pollevent.ResourceStatus{
    73  						Identifier: cmID,
    74  						Status:     status.CurrentStatus,
    75  					},
    76  				},
    77  				{
    78  					Type: pollevent.ResourceUpdateEvent,
    79  					Resource: &pollevent.ResourceStatus{
    80  						Identifier: depID,
    81  						Status:     status.CurrentStatus,
    82  					},
    83  				},
    84  			},
    85  			expectedEventTypes: []event.Type{
    86  				event.ActionGroupType,
    87  				event.ApplyType,
    88  				event.ActionGroupType,
    89  				event.ActionGroupType,
    90  				event.WaitType,   // deployment pending
    91  				event.WaitType,   // configmap pending
    92  				event.StatusType, // configmap current
    93  				event.WaitType,   // configmap reconciled
    94  				event.StatusType, // deployment current
    95  				event.WaitType,   // deployment reconciled
    96  				event.ActionGroupType,
    97  				event.ActionGroupType,
    98  				event.PruneType,
    99  				event.ActionGroupType,
   100  			},
   101  			expectedWaitEvents: []event.WaitEvent{
   102  				{
   103  					GroupName:  "wait",
   104  					Identifier: depID,
   105  					Status:     event.ReconcilePending,
   106  				},
   107  				{
   108  					GroupName:  "wait",
   109  					Identifier: cmID,
   110  					Status:     event.ReconcilePending,
   111  				},
   112  				{
   113  					GroupName:  "wait",
   114  					Identifier: cmID,
   115  					Status:     event.ReconcileSuccessful,
   116  				},
   117  				{
   118  					GroupName:  "wait",
   119  					Identifier: depID,
   120  					Status:     event.ReconcileSuccessful,
   121  				},
   122  			},
   123  		},
   124  		"wait task times out eventually (Unknown)": {
   125  			tasks: []Task{
   126  				NewWaitTask("wait", object.ObjMetadataSet{depID, cmID}, AllCurrent,
   127  					2*time.Second, testutil.NewFakeRESTMapper()),
   128  			},
   129  			statusEventsDelay: time.Second,
   130  			statusEvents: []pollevent.Event{
   131  				{
   132  					Type: pollevent.ResourceUpdateEvent,
   133  					Resource: &pollevent.ResourceStatus{
   134  						Identifier: cmID,
   135  						Status:     status.CurrentStatus,
   136  					},
   137  				},
   138  			},
   139  			expectedEventTypes: []event.Type{
   140  				event.ActionGroupType,
   141  				event.WaitType,   // configmap pending
   142  				event.WaitType,   // deployment pending
   143  				event.StatusType, // configmap current
   144  				event.WaitType,   // configmap reconciled
   145  				event.WaitType,   // deployment timeout error
   146  				event.ActionGroupType,
   147  			},
   148  			expectedWaitEvents: []event.WaitEvent{
   149  				{
   150  					GroupName:  "wait",
   151  					Identifier: depID,
   152  					Status:     event.ReconcilePending,
   153  				},
   154  				{
   155  					GroupName:  "wait",
   156  					Identifier: cmID,
   157  					Status:     event.ReconcilePending,
   158  				},
   159  				{
   160  					GroupName:  "wait",
   161  					Identifier: cmID,
   162  					Status:     event.ReconcileSuccessful,
   163  				},
   164  				{
   165  					GroupName:  "wait",
   166  					Identifier: depID,
   167  					Status:     event.ReconcileTimeout,
   168  				},
   169  			},
   170  		},
   171  		"wait task times out eventually (InProgress)": {
   172  			tasks: []Task{
   173  				NewWaitTask("wait", object.ObjMetadataSet{depID, cmID}, AllCurrent,
   174  					2*time.Second, testutil.NewFakeRESTMapper()),
   175  			},
   176  			statusEventsDelay: time.Second,
   177  			statusEvents: []pollevent.Event{
   178  				{
   179  					Type: pollevent.ResourceUpdateEvent,
   180  					Resource: &pollevent.ResourceStatus{
   181  						Identifier: cmID,
   182  						Status:     status.CurrentStatus,
   183  					},
   184  				},
   185  				{
   186  					Type: pollevent.ResourceUpdateEvent,
   187  					Resource: &pollevent.ResourceStatus{
   188  						Identifier: depID,
   189  						Status:     status.InProgressStatus,
   190  					},
   191  				},
   192  			},
   193  			expectedEventTypes: []event.Type{
   194  				event.ActionGroupType,
   195  				event.WaitType,   // configmap pending
   196  				event.WaitType,   // deployment pending
   197  				event.StatusType, // configmap current
   198  				event.WaitType,   // configmap reconciled
   199  				event.StatusType, // deployment inprogress
   200  				event.WaitType,   // deployment timeout error
   201  				event.ActionGroupType,
   202  			},
   203  			expectedWaitEvents: []event.WaitEvent{
   204  				{
   205  					GroupName:  "wait",
   206  					Identifier: depID,
   207  					Status:     event.ReconcilePending,
   208  				},
   209  				{
   210  					GroupName:  "wait",
   211  					Identifier: cmID,
   212  					Status:     event.ReconcilePending,
   213  				},
   214  				{
   215  					GroupName:  "wait",
   216  					Identifier: cmID,
   217  					Status:     event.ReconcileSuccessful,
   218  				},
   219  				{
   220  					GroupName:  "wait",
   221  					Identifier: depID,
   222  					Status:     event.ReconcileTimeout,
   223  				},
   224  			},
   225  		},
   226  		"tasks run in order": {
   227  			tasks: []Task{
   228  				&fakeApplyTask{
   229  					resultEvent: event.Event{
   230  						Type: event.ApplyType,
   231  					},
   232  					duration: 1 * time.Second,
   233  				},
   234  				&fakeApplyTask{
   235  					resultEvent: event.Event{
   236  						Type: event.PruneType,
   237  					},
   238  					duration: 1 * time.Second,
   239  				},
   240  				&fakeApplyTask{
   241  					resultEvent: event.Event{
   242  						Type: event.ApplyType,
   243  					},
   244  					duration: 1 * time.Second,
   245  				},
   246  				&fakeApplyTask{
   247  					resultEvent: event.Event{
   248  						Type: event.PruneType,
   249  					},
   250  					duration: 1 * time.Second,
   251  				},
   252  			},
   253  			statusEventsDelay: 1 * time.Second,
   254  			statusEvents:      []pollevent.Event{},
   255  			expectedEventTypes: []event.Type{
   256  				event.ActionGroupType,
   257  				event.ApplyType,
   258  				event.ActionGroupType,
   259  				event.ActionGroupType,
   260  				event.PruneType,
   261  				event.ActionGroupType,
   262  				event.ActionGroupType,
   263  				event.ApplyType,
   264  				event.ActionGroupType,
   265  				event.ActionGroupType,
   266  				event.PruneType,
   267  				event.ActionGroupType,
   268  			},
   269  		},
   270  	}
   271  
   272  	for tn, tc := range testCases {
   273  		t.Run(tn, func(t *testing.T) {
   274  			taskQueue := make(chan Task, len(tc.tasks))
   275  			for _, tsk := range tc.tasks {
   276  				taskQueue <- tsk
   277  			}
   278  
   279  			ids := object.ObjMetadataSet{} // unused by fake statusWatcher
   280  			statusWatcher := newFakeWatcher(tc.statusEvents)
   281  			eventChannel := make(chan event.Event)
   282  			resourceCache := cache.NewResourceCacheMap()
   283  			taskContext := NewTaskContext(eventChannel, resourceCache)
   284  			runner := NewTaskStatusRunner(ids, statusWatcher)
   285  
   286  			// Use a WaitGroup to make sure changes in the goroutines
   287  			// are visible to the main goroutine.
   288  			var wg sync.WaitGroup
   289  
   290  			statusChannel := make(chan pollevent.Event)
   291  			wg.Add(1)
   292  			go func() {
   293  				defer wg.Done()
   294  
   295  				time.Sleep(tc.statusEventsDelay)
   296  				statusWatcher.Start()
   297  			}()
   298  
   299  			var events []event.Event
   300  			wg.Add(1)
   301  			go func() {
   302  				defer wg.Done()
   303  
   304  				for msg := range eventChannel {
   305  					events = append(events, msg)
   306  				}
   307  			}()
   308  
   309  			opts := Options{EmitStatusEvents: true}
   310  			ctx := context.Background()
   311  			err := runner.Run(ctx, taskContext, taskQueue, opts)
   312  			close(statusChannel)
   313  			close(eventChannel)
   314  			wg.Wait()
   315  
   316  			assert.NoError(t, err)
   317  
   318  			if want, got := len(tc.expectedEventTypes), len(events); want != got {
   319  				t.Errorf("expected %d events, but got %d", want, got)
   320  			}
   321  			var waitEvents []event.WaitEvent
   322  			for i, e := range events {
   323  				expectedEventType := tc.expectedEventTypes[i]
   324  				if want, got := expectedEventType, e.Type; want != got {
   325  					t.Errorf("expected event type %s, but got %s",
   326  						want, got)
   327  				}
   328  				if e.Type == event.WaitType {
   329  					waitEvents = append(waitEvents, e.WaitEvent)
   330  				}
   331  			}
   332  			assert.Equal(t, tc.expectedWaitEvents, waitEvents)
   333  		})
   334  	}
   335  }
   336  
   337  func TestBaseRunnerCancellation(t *testing.T) {
   338  	testError := fmt.Errorf("this is a test error")
   339  
   340  	testCases := map[string]struct {
   341  		tasks              []Task
   342  		statusEventsDelay  time.Duration
   343  		statusEvents       []pollevent.Event
   344  		contextTimeout     time.Duration
   345  		expectedError      error
   346  		expectedEventTypes []event.Type
   347  	}{
   348  		"cancellation while custom task is running": {
   349  			tasks: []Task{
   350  				&fakeApplyTask{
   351  					resultEvent: event.Event{
   352  						Type: event.ApplyType,
   353  					},
   354  					duration: 4 * time.Second,
   355  				},
   356  				&fakeApplyTask{
   357  					resultEvent: event.Event{
   358  						Type: event.PruneType,
   359  					},
   360  					duration: 2 * time.Second,
   361  				},
   362  			},
   363  			contextTimeout: 2 * time.Second,
   364  			expectedError:  context.DeadlineExceeded,
   365  			expectedEventTypes: []event.Type{
   366  				event.ActionGroupType,
   367  				event.ApplyType,
   368  				event.ActionGroupType,
   369  			},
   370  		},
   371  		"cancellation while wait task is running": {
   372  			tasks: []Task{
   373  				NewWaitTask("wait", object.ObjMetadataSet{depID}, AllCurrent,
   374  					20*time.Second, testutil.NewFakeRESTMapper()),
   375  				&fakeApplyTask{
   376  					resultEvent: event.Event{
   377  						Type: event.PruneType,
   378  					},
   379  					duration: 2 * time.Second,
   380  				},
   381  			},
   382  			contextTimeout: 2 * time.Second,
   383  			expectedError:  context.DeadlineExceeded,
   384  			expectedEventTypes: []event.Type{
   385  				event.ActionGroupType,
   386  				event.WaitType, // pending
   387  				event.ActionGroupType,
   388  			},
   389  		},
   390  		"error while custom task is running": {
   391  			tasks: []Task{
   392  				&fakeApplyTask{
   393  					name: "apply-0",
   394  					resultEvent: event.Event{
   395  						Type: event.ApplyType,
   396  					},
   397  					duration: 2 * time.Second,
   398  					err:      testError,
   399  				},
   400  				&fakeApplyTask{
   401  					name: "prune-0",
   402  					resultEvent: event.Event{
   403  						Type: event.PruneType,
   404  					},
   405  					duration: 2 * time.Second,
   406  				},
   407  			},
   408  			contextTimeout: 30 * time.Second,
   409  			expectedError:  fmt.Errorf(`task failed (action: "Apply", name: "apply-0"): %w`, testError),
   410  			expectedEventTypes: []event.Type{
   411  				event.ActionGroupType,
   412  				event.ApplyType,
   413  				event.ActionGroupType,
   414  			},
   415  		},
   416  		"error from status watcher while wait task is running": {
   417  			tasks: []Task{
   418  				NewWaitTask("wait", object.ObjMetadataSet{depID}, AllCurrent,
   419  					20*time.Second, testutil.NewFakeRESTMapper()),
   420  				&fakeApplyTask{
   421  					resultEvent: event.Event{
   422  						Type: event.PruneType,
   423  					},
   424  					duration: 2 * time.Second,
   425  				},
   426  			},
   427  			statusEventsDelay: 2 * time.Second,
   428  			statusEvents: []pollevent.Event{
   429  				{
   430  					Type:  pollevent.ErrorEvent,
   431  					Error: testError,
   432  				},
   433  			},
   434  			contextTimeout: 30 * time.Second,
   435  			expectedError:  fmt.Errorf("polling for status failed: %w", testError),
   436  			expectedEventTypes: []event.Type{
   437  				event.ActionGroupType,
   438  				event.WaitType, // pending
   439  				event.ActionGroupType,
   440  			},
   441  		},
   442  	}
   443  
   444  	for tn, tc := range testCases {
   445  		t.Run(tn, func(t *testing.T) {
   446  			taskQueue := make(chan Task, len(tc.tasks))
   447  			for _, tsk := range tc.tasks {
   448  				taskQueue <- tsk
   449  			}
   450  
   451  			ids := object.ObjMetadataSet{} // unused by fake statusWatcher
   452  			statusWatcher := newFakeWatcher(tc.statusEvents)
   453  			eventChannel := make(chan event.Event)
   454  			resourceCache := cache.NewResourceCacheMap()
   455  			taskContext := NewTaskContext(eventChannel, resourceCache)
   456  			runner := NewTaskStatusRunner(ids, statusWatcher)
   457  
   458  			// Use a WaitGroup to make sure changes in the goroutines
   459  			// are visible to the main goroutine.
   460  			var wg sync.WaitGroup
   461  
   462  			statusChannel := make(chan pollevent.Event)
   463  			wg.Add(1)
   464  			go func() {
   465  				defer wg.Done()
   466  
   467  				time.Sleep(tc.statusEventsDelay)
   468  				statusWatcher.Start()
   469  			}()
   470  
   471  			var events []event.Event
   472  			wg.Add(1)
   473  			go func() {
   474  				defer wg.Done()
   475  
   476  				for msg := range eventChannel {
   477  					events = append(events, msg)
   478  				}
   479  			}()
   480  
   481  			ctx, cancel := context.WithTimeout(context.Background(), tc.contextTimeout)
   482  			defer cancel()
   483  
   484  			opts := Options{EmitStatusEvents: true}
   485  			err := runner.Run(ctx, taskContext, taskQueue, opts)
   486  			close(statusChannel)
   487  			close(eventChannel)
   488  			wg.Wait()
   489  
   490  			if tc.expectedError != nil {
   491  				assert.EqualError(t, err, tc.expectedError.Error())
   492  			} else {
   493  				assert.NoError(t, err)
   494  			}
   495  
   496  			if want, got := len(tc.expectedEventTypes), len(events); want != got {
   497  				t.Errorf("expected %d events, but got %d", want, got)
   498  			}
   499  			for i, e := range events {
   500  				expectedEventType := tc.expectedEventTypes[i]
   501  				if want, got := expectedEventType, e.Type; want != got {
   502  					t.Errorf("expected event type %s, but got %s",
   503  						want, got)
   504  				}
   505  			}
   506  		})
   507  	}
   508  }
   509  
   510  type fakeApplyTask struct {
   511  	name        string
   512  	resultEvent event.Event
   513  	duration    time.Duration
   514  	err         error
   515  }
   516  
   517  func (f *fakeApplyTask) Name() string {
   518  	return f.name
   519  }
   520  
   521  func (f *fakeApplyTask) Action() event.ResourceAction {
   522  	return event.ApplyAction
   523  }
   524  
   525  func (f *fakeApplyTask) Identifiers() object.ObjMetadataSet {
   526  	return object.ObjMetadataSet{}
   527  }
   528  
   529  func (f *fakeApplyTask) Start(taskContext *TaskContext) {
   530  	go func() {
   531  		<-time.NewTimer(f.duration).C
   532  		taskContext.SendEvent(f.resultEvent)
   533  		taskContext.TaskChannel() <- TaskResult{
   534  			Err: f.err,
   535  		}
   536  	}()
   537  }
   538  
   539  func (f *fakeApplyTask) Cancel(_ *TaskContext) {}
   540  
   541  func (f *fakeApplyTask) StatusUpdate(_ *TaskContext, _ object.ObjMetadata) {}
   542  
   543  type fakeWatcher struct {
   544  	start  chan struct{}
   545  	events []pollevent.Event
   546  }
   547  
   548  func newFakeWatcher(statusEvents []pollevent.Event) *fakeWatcher {
   549  	return &fakeWatcher{
   550  		events: statusEvents,
   551  		start:  make(chan struct{}),
   552  	}
   553  }
   554  
   555  // Start events being sent on the status channel
   556  func (f *fakeWatcher) Start() {
   557  	close(f.start)
   558  }
   559  
   560  func (f *fakeWatcher) Watch(ctx context.Context, _ object.ObjMetadataSet, _ watcher.Options) <-chan pollevent.Event {
   561  	eventChannel := make(chan pollevent.Event)
   562  	go func() {
   563  		defer close(eventChannel)
   564  		// send sync event immediately
   565  		eventChannel <- pollevent.Event{Type: pollevent.SyncEvent}
   566  		// wait until started to send the events
   567  		<-f.start
   568  		for _, f := range f.events {
   569  			eventChannel <- f
   570  		}
   571  		// wait until cancelled to close the event channel and exit
   572  		<-ctx.Done()
   573  	}()
   574  	return eventChannel
   575  }
   576  

View as plain text