...

Source file src/sigs.k8s.io/cli-utils/pkg/apply/taskrunner/runner.go

Documentation: sigs.k8s.io/cli-utils/pkg/apply/taskrunner

     1  // Copyright 2020 The Kubernetes Authors.
     2  // SPDX-License-Identifier: Apache-2.0
     3  
     4  package taskrunner
     5  
     6  import (
     7  	"context"
     8  	"fmt"
     9  
    10  	"k8s.io/klog/v2"
    11  	"sigs.k8s.io/cli-utils/pkg/apply/cache"
    12  	"sigs.k8s.io/cli-utils/pkg/apply/event"
    13  	pollevent "sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
    14  	"sigs.k8s.io/cli-utils/pkg/kstatus/watcher"
    15  	"sigs.k8s.io/cli-utils/pkg/object"
    16  )
    17  
    18  // NewTaskStatusRunner returns a new TaskStatusRunner.
    19  func NewTaskStatusRunner(identifiers object.ObjMetadataSet, statusWatcher watcher.StatusWatcher) *TaskStatusRunner {
    20  	return &TaskStatusRunner{
    21  		Identifiers:   identifiers,
    22  		StatusWatcher: statusWatcher,
    23  	}
    24  }
    25  
    26  // TaskStatusRunner is a taskRunner that executes a set of
    27  // tasks while at the same time uses the statusPoller to
    28  // keep track of the status of the resources.
    29  type TaskStatusRunner struct {
    30  	Identifiers   object.ObjMetadataSet
    31  	StatusWatcher watcher.StatusWatcher
    32  }
    33  
    34  // Options defines properties that is passed along to
    35  // the statusPoller.
    36  type Options struct {
    37  	EmitStatusEvents bool
    38  }
    39  
    40  // Run executes the tasks in the taskqueue, with the statusPoller running in the
    41  // background.
    42  //
    43  // The tasks run in a loop where a single goroutine will process events from
    44  // three different channels.
    45  //   - taskQueue is read to allow updating the task queue at runtime.
    46  //   - statusChannel is read to allow updates to the resource cache and triggering
    47  //     validation of wait conditions.
    48  //   - eventChannel is written to with events based on status updates, if
    49  //     emitStatusEvents is true.
    50  func (tsr *TaskStatusRunner) Run(
    51  	ctx context.Context,
    52  	taskContext *TaskContext,
    53  	taskQueue chan Task,
    54  	opts Options,
    55  ) error {
    56  	// Give the poller its own context and run it in the background.
    57  	// If taskStatusRunner.Run is cancelled, baseRunner.run will exit early,
    58  	// causing the poller to be cancelled.
    59  	statusCtx, cancelFunc := context.WithCancel(context.Background())
    60  	statusChannel := tsr.StatusWatcher.Watch(statusCtx, tsr.Identifiers, watcher.Options{})
    61  
    62  	// complete stops the statusPoller, drains the statusChannel, and returns
    63  	// the provided error.
    64  	// Run this before returning!
    65  	// Avoid using defer, otherwise the statusPoller will hang. It needs to be
    66  	// drained synchronously before return, instead of asynchronously after.
    67  	complete := func(err error) error {
    68  		klog.V(7).Info("Runner cancelled status watcher")
    69  		cancelFunc()
    70  		for statusEvent := range statusChannel {
    71  			klog.V(7).Infof("Runner ignored status event: %v", statusEvent)
    72  		}
    73  		return err
    74  	}
    75  
    76  	// Wait until the StatusWatcher is sychronized to start the first task.
    77  	var currentTask Task
    78  	done := false
    79  
    80  	// abort is used to signal that something has failed, and
    81  	// the task processing should end as soon as is possible. Only
    82  	// wait tasks can be interrupted, so for all other tasks we need
    83  	// to wait for the currently running one to finish before we can
    84  	// exit.
    85  	abort := false
    86  	var abortReason error
    87  
    88  	// We do this so we can set the doneCh to a nil channel after
    89  	// it has been closed. This is needed to avoid a busy loop.
    90  	doneCh := ctx.Done()
    91  
    92  	for {
    93  		select {
    94  		// This processes status events from a channel, most likely
    95  		// driven by the StatusPoller. All normal resource status update
    96  		// events are passed through to the eventChannel. This means
    97  		// that listeners of the eventChannel will get updates on status
    98  		// even while other tasks (like apply tasks) are running.
    99  		case statusEvent, ok := <-statusChannel:
   100  			// If the statusChannel has closed or we are preparing
   101  			// to abort the task processing, we just ignore all
   102  			// statusEvents.
   103  			// TODO(mortent): Check if a closed statusChannel might
   104  			// create a busy loop here.
   105  			if !ok {
   106  				continue
   107  			}
   108  
   109  			if abort {
   110  				klog.V(7).Infof("Runner ignored status event: %v", statusEvent)
   111  				continue
   112  			}
   113  			klog.V(7).Infof("Runner received status event: %v", statusEvent)
   114  
   115  			// An error event on the statusChannel means the StatusWatcher
   116  			// has encountered a problem so it can't continue. This means
   117  			// the statusChannel will be closed soon.
   118  			if statusEvent.Type == pollevent.ErrorEvent {
   119  				abort = true
   120  				abortReason = fmt.Errorf("polling for status failed: %v",
   121  					statusEvent.Error)
   122  				if currentTask != nil {
   123  					currentTask.Cancel(taskContext)
   124  				} else {
   125  					// tasks not started yet - abort now
   126  					return complete(abortReason)
   127  				}
   128  				continue
   129  			}
   130  
   131  			// The StatusWatcher is synchronized.
   132  			// Tasks may commence!
   133  			if statusEvent.Type == pollevent.SyncEvent {
   134  				// Find and start the first task in the queue.
   135  				currentTask, done = nextTask(taskQueue, taskContext)
   136  				if done {
   137  					return complete(nil)
   138  				}
   139  				continue
   140  			}
   141  
   142  			if opts.EmitStatusEvents {
   143  				// Forward all normal events to the eventChannel
   144  				taskContext.SendEvent(event.Event{
   145  					Type: event.StatusType,
   146  					StatusEvent: event.StatusEvent{
   147  						Identifier:       statusEvent.Resource.Identifier,
   148  						PollResourceInfo: statusEvent.Resource,
   149  						Resource:         statusEvent.Resource.Resource,
   150  						Error:            statusEvent.Error,
   151  					},
   152  				})
   153  			}
   154  
   155  			id := statusEvent.Resource.Identifier
   156  
   157  			// Update the cache to track the latest resource spec & status.
   158  			// Status is computed from the resource on-demand.
   159  			// Warning: Resource may be nil!
   160  			taskContext.ResourceCache().Put(id, cache.ResourceStatus{
   161  				Resource:      statusEvent.Resource.Resource,
   162  				Status:        statusEvent.Resource.Status,
   163  				StatusMessage: statusEvent.Resource.Message,
   164  			})
   165  
   166  			// send a status update to the running task, but only if the status
   167  			// has changed and the task is tracking the object.
   168  			if currentTask != nil {
   169  				if currentTask.Identifiers().Contains(id) {
   170  					currentTask.StatusUpdate(taskContext, id)
   171  				}
   172  			}
   173  		// A message on the taskChannel means that the current task
   174  		// has either completed or failed.
   175  		// If it has failed, we return the error.
   176  		// If the abort flag is true, which means something
   177  		// else has gone wrong and we are waiting for the current task to
   178  		// finish, we exit.
   179  		// If everything is ok, we fetch and start the next task.
   180  		case msg := <-taskContext.TaskChannel():
   181  			taskContext.SendEvent(event.Event{
   182  				Type: event.ActionGroupType,
   183  				ActionGroupEvent: event.ActionGroupEvent{
   184  					GroupName: currentTask.Name(),
   185  					Action:    currentTask.Action(),
   186  					Status:    event.Finished,
   187  				},
   188  			})
   189  			if msg.Err != nil {
   190  				return complete(
   191  					fmt.Errorf("task failed (action: %q, name: %q): %w",
   192  						currentTask.Action(), currentTask.Name(), msg.Err))
   193  			}
   194  			if abort {
   195  				return complete(abortReason)
   196  			}
   197  			currentTask, done = nextTask(taskQueue, taskContext)
   198  			// If there are no more tasks, we are done. So just
   199  			// return.
   200  			if done {
   201  				return complete(nil)
   202  			}
   203  		// The doneCh will be closed if the passed in context is cancelled.
   204  		// If so, we just set the abort flag and wait for the currently running
   205  		// task to complete before we exit.
   206  		case <-doneCh:
   207  			doneCh = nil // Set doneCh to nil so we don't enter a busy loop.
   208  			abort = true
   209  			abortReason = ctx.Err() // always non-nil when doneCh is closed
   210  			klog.V(7).Infof("Runner aborting: %v", abortReason)
   211  			if currentTask != nil {
   212  				currentTask.Cancel(taskContext)
   213  			} else {
   214  				// tasks not started yet - abort now
   215  				return complete(abortReason)
   216  			}
   217  		}
   218  	}
   219  }
   220  
   221  // nextTask fetches the latest task from the taskQueue and
   222  // starts it. If the taskQueue is empty, it the second
   223  // return value will be true.
   224  func nextTask(taskQueue chan Task, taskContext *TaskContext) (Task, bool) {
   225  	var tsk Task
   226  	select {
   227  	// If there is any tasks left in the queue, this
   228  	// case statement will be executed.
   229  	case t := <-taskQueue:
   230  		tsk = t
   231  	default:
   232  		// Only happens when the channel is empty.
   233  		return nil, true
   234  	}
   235  
   236  	taskContext.SendEvent(event.Event{
   237  		Type: event.ActionGroupType,
   238  		ActionGroupEvent: event.ActionGroupEvent{
   239  			GroupName: tsk.Name(),
   240  			Action:    tsk.Action(),
   241  			Status:    event.Started,
   242  		},
   243  	})
   244  
   245  	tsk.Start(taskContext)
   246  
   247  	return tsk, false
   248  }
   249  
   250  // TaskResult is the type returned from tasks once they have completed
   251  // or failed. If it has failed or timed out, the Err property will be
   252  // set.
   253  type TaskResult struct {
   254  	Err error
   255  }
   256  

View as plain text