...

Source file src/sigs.k8s.io/cli-utils/pkg/apply/taskrunner/task.go

Documentation: sigs.k8s.io/cli-utils/pkg/apply/taskrunner

     1  // Copyright 2020 The Kubernetes Authors.
     2  // SPDX-License-Identifier: Apache-2.0
     3  
     4  package taskrunner
     5  
     6  import (
     7  	"context"
     8  	"fmt"
     9  	"sync"
    10  	"time"
    11  
    12  	"k8s.io/apimachinery/pkg/api/meta"
    13  	"k8s.io/apimachinery/pkg/runtime/schema"
    14  	"k8s.io/apimachinery/pkg/types"
    15  	"k8s.io/klog/v2"
    16  	"sigs.k8s.io/cli-utils/pkg/apply/event"
    17  	"sigs.k8s.io/cli-utils/pkg/kstatus/status"
    18  	"sigs.k8s.io/cli-utils/pkg/object"
    19  )
    20  
    21  var (
    22  	crdGK = schema.GroupKind{Group: "apiextensions.k8s.io", Kind: "CustomResourceDefinition"}
    23  )
    24  
    25  // Task is the interface that must be implemented by
    26  // all tasks that will be executed by the taskrunner.
    27  type Task interface {
    28  	Name() string
    29  	Action() event.ResourceAction
    30  	Identifiers() object.ObjMetadataSet
    31  	Start(*TaskContext)
    32  	StatusUpdate(*TaskContext, object.ObjMetadata)
    33  	Cancel(*TaskContext)
    34  }
    35  
    36  // NewWaitTask creates a new wait task where we will wait until
    37  // the resources specifies by ids all meet the specified condition.
    38  func NewWaitTask(name string, ids object.ObjMetadataSet, cond Condition, timeout time.Duration, mapper meta.RESTMapper) *WaitTask {
    39  	return &WaitTask{
    40  		TaskName:  name,
    41  		Ids:       ids,
    42  		Condition: cond,
    43  		Timeout:   timeout,
    44  		Mapper:    mapper,
    45  	}
    46  }
    47  
    48  // WaitTask is an implementation of the Task interface that is used
    49  // to wait for a set of resources (identified by a slice of ObjMetadata)
    50  // will all meet the condition specified. It also specifies a timeout
    51  // for how long we are willing to wait for this to happen.
    52  // Unlike other implementations of the Task interface, the wait task
    53  // is handled in a special way to the taskrunner and is a part of the core
    54  // package.
    55  type WaitTask struct {
    56  	// TaskName allows providing a name for the task.
    57  	TaskName string
    58  	// Ids is the full list of resources that we are waiting for.
    59  	Ids object.ObjMetadataSet
    60  	// Condition defines the status we want all resources to reach
    61  	Condition Condition
    62  	// Timeout defines how long we are willing to wait for the condition
    63  	// to be met.
    64  	Timeout time.Duration
    65  	// Mapper is the RESTMapper to update after CRDs have been reconciled
    66  	Mapper meta.RESTMapper
    67  	// cancelFunc is a function that will cancel the timeout timer
    68  	// on the task.
    69  	cancelFunc context.CancelFunc
    70  	// pending is the set of resources that we are still waiting for.
    71  	pending object.ObjMetadataSet
    72  	// failed is the set of resources that we are waiting for, but is considered
    73  	// failed, i.e. unlikely to successfully reconcile.
    74  	failed object.ObjMetadataSet
    75  	// mu protects the pending ObjMetadataSet
    76  	mu sync.RWMutex
    77  }
    78  
    79  func (w *WaitTask) Name() string {
    80  	return w.TaskName
    81  }
    82  
    83  func (w *WaitTask) Action() event.ResourceAction {
    84  	return event.WaitAction
    85  }
    86  
    87  func (w *WaitTask) Identifiers() object.ObjMetadataSet {
    88  	return w.Ids
    89  }
    90  
    91  // Start kicks off the task. For the wait task, this just means
    92  // setting up the timeout timer.
    93  func (w *WaitTask) Start(taskContext *TaskContext) {
    94  	klog.V(2).Infof("wait task starting (name: %q, objects: %d)",
    95  		w.Name(), len(w.Ids))
    96  
    97  	// TODO: inherit context from task runner, passed through the TaskContext
    98  	ctx := context.Background()
    99  
   100  	// use a context wrapper to handle complete/cancel/timeout
   101  	if w.Timeout > 0 {
   102  		ctx, w.cancelFunc = context.WithTimeout(ctx, w.Timeout)
   103  	} else {
   104  		ctx, w.cancelFunc = context.WithCancel(ctx)
   105  	}
   106  
   107  	w.startInner(taskContext)
   108  
   109  	// A goroutine to handle ending the WaitTask.
   110  	go func() {
   111  		// Block until complete/cancel/timeout
   112  		<-ctx.Done()
   113  		// Err is always non-nil when Done channel is closed.
   114  		err := ctx.Err()
   115  
   116  		klog.V(2).Infof("wait task completing (name: %q,): %v", w.TaskName, err)
   117  
   118  		switch err {
   119  		case context.Canceled:
   120  			// happy path - cancelled or completed (not considered an error)
   121  		case context.DeadlineExceeded:
   122  			// timed out
   123  			w.sendTimeoutEvents(taskContext)
   124  		}
   125  
   126  		// Update RESTMapper to pick up new custom resource types
   127  		w.updateRESTMapper(taskContext)
   128  
   129  		// Done here. signal completion to the task runner
   130  		taskContext.TaskChannel() <- TaskResult{}
   131  	}()
   132  }
   133  
   134  func (w *WaitTask) sendEvent(taskContext *TaskContext, id object.ObjMetadata, status event.WaitEventStatus) {
   135  	taskContext.SendEvent(event.Event{
   136  		Type: event.WaitType,
   137  		WaitEvent: event.WaitEvent{
   138  			GroupName:  w.Name(),
   139  			Identifier: id,
   140  			Status:     status,
   141  		},
   142  	})
   143  }
   144  
   145  // startInner sends initial pending, skipped, an reconciled events.
   146  // If all objects are reconciled or skipped, cancelFunc is called.
   147  // The pending set is write locked during execution of startInner.
   148  func (w *WaitTask) startInner(taskContext *TaskContext) {
   149  	w.mu.Lock()
   150  	defer w.mu.Unlock()
   151  
   152  	klog.V(3).Infof("wait task progress: %d/%d", 0, len(w.Ids))
   153  
   154  	pending := object.ObjMetadataSet{}
   155  	for _, id := range w.Ids {
   156  		switch {
   157  		case w.skipped(taskContext, id):
   158  			err := taskContext.InventoryManager().SetSkippedReconcile(id)
   159  			if err != nil {
   160  				// Object never applied or deleted!
   161  				klog.Errorf("Failed to mark object as skipped reconcile: %v", err)
   162  			}
   163  			w.sendEvent(taskContext, id, event.ReconcileSkipped)
   164  		case w.changedUID(taskContext, id):
   165  			// replaced
   166  			w.handleChangedUID(taskContext, id)
   167  		case w.reconciledByID(taskContext, id):
   168  			err := taskContext.InventoryManager().SetSuccessfulReconcile(id)
   169  			if err != nil {
   170  				// Object never applied or deleted!
   171  				klog.Errorf("Failed to mark object as successful reconcile: %v", err)
   172  			}
   173  			w.sendEvent(taskContext, id, event.ReconcileSuccessful)
   174  		default:
   175  			err := taskContext.InventoryManager().SetPendingReconcile(id)
   176  			if err != nil {
   177  				// Object never applied or deleted!
   178  				klog.Errorf("Failed to mark object as pending reconcile: %v", err)
   179  			}
   180  			pending = append(pending, id)
   181  			w.sendEvent(taskContext, id, event.ReconcilePending)
   182  		}
   183  	}
   184  	w.pending = pending
   185  
   186  	klog.V(3).Infof("wait task progress: %d/%d", len(w.Ids)-len(w.pending), len(w.Ids))
   187  
   188  	if len(pending) == 0 {
   189  		// all reconciled - clear pending and exit
   190  		klog.V(3).Infof("all objects reconciled or skipped (name: %q)", w.TaskName)
   191  		w.cancelFunc()
   192  	}
   193  }
   194  
   195  // sendTimeoutEvents sends a timeout event for every remaining pending object
   196  // The pending set is read locked during execution of sendTimeoutEvents.
   197  func (w *WaitTask) sendTimeoutEvents(taskContext *TaskContext) {
   198  	w.mu.RLock()
   199  	defer w.mu.RUnlock()
   200  
   201  	for _, id := range w.pending {
   202  		err := taskContext.InventoryManager().SetTimeoutReconcile(id)
   203  		if err != nil {
   204  			// Object never applied or deleted!
   205  			klog.Errorf("Failed to mark object as pending reconcile: %v", err)
   206  		}
   207  		w.sendEvent(taskContext, id, event.ReconcileTimeout)
   208  	}
   209  }
   210  
   211  // reconciledByID checks whether the condition set in the task is currently met
   212  // for the specified object given the status of resource in the cache.
   213  func (w *WaitTask) reconciledByID(taskContext *TaskContext, id object.ObjMetadata) bool {
   214  	return conditionMet(taskContext, object.ObjMetadataSet{id}, w.Condition)
   215  }
   216  
   217  // skipped returns true if the object failed or was skipped by a preceding
   218  // apply/delete/prune task.
   219  func (w *WaitTask) skipped(taskContext *TaskContext, id object.ObjMetadata) bool {
   220  	im := taskContext.InventoryManager()
   221  	if w.Condition == AllCurrent &&
   222  		im.IsFailedApply(id) || im.IsSkippedApply(id) {
   223  		return true
   224  	}
   225  	if w.Condition == AllNotFound &&
   226  		im.IsFailedDelete(id) || im.IsSkippedDelete(id) {
   227  		return true
   228  	}
   229  	return false
   230  }
   231  
   232  // failedByID returns true if the resource is failed.
   233  func (w *WaitTask) failedByID(taskContext *TaskContext, id object.ObjMetadata) bool {
   234  	cached := taskContext.ResourceCache().Get(id)
   235  	return cached.Status == status.FailedStatus
   236  }
   237  
   238  // changedUID returns true if the UID of the object has changed since it was
   239  // applied or deleted. This indicates that the object was deleted and recreated.
   240  func (w *WaitTask) changedUID(taskContext *TaskContext, id object.ObjMetadata) bool {
   241  	var oldUID, newUID types.UID
   242  
   243  	// Get the uid from the ApplyTask/PruneTask
   244  	taskObj, found := taskContext.InventoryManager().ObjectStatus(id)
   245  	if !found {
   246  		klog.Errorf("Unknown object UID from InventoryManager: %v", id)
   247  		return false
   248  	}
   249  	oldUID = taskObj.UID
   250  	if oldUID == "" {
   251  		// All objects should have been given a UID by the apiserver
   252  		klog.Errorf("Empty object UID from InventoryManager: %v", id)
   253  		return false
   254  	}
   255  
   256  	// Get the uid from the StatusPoller
   257  	pollerObj := taskContext.ResourceCache().Get(id)
   258  	if pollerObj.Resource == nil {
   259  		switch pollerObj.Status {
   260  		case status.UnknownStatus:
   261  			// Resource is expected to be nil when Unknown.
   262  		case status.NotFoundStatus:
   263  			// Resource is expected to be nil when NotFound.
   264  			// K8s DELETE API doesn't always return an object.
   265  		default:
   266  			// For all other statuses, nil Resource is probably a bug.
   267  			klog.Errorf("Unknown object UID from ResourceCache (status: %v): %v", pollerObj.Status, id)
   268  		}
   269  		return false
   270  	}
   271  	newUID = pollerObj.Resource.GetUID()
   272  	if newUID == "" {
   273  		// All objects should have been given a UID by the apiserver
   274  		klog.Errorf("Empty object UID from ResourceCache (status: %v): %v", pollerObj.Status, id)
   275  		return false
   276  	}
   277  
   278  	return (oldUID != newUID)
   279  }
   280  
   281  // handleChangedUID updates the object status and sends an event
   282  func (w *WaitTask) handleChangedUID(taskContext *TaskContext, id object.ObjMetadata) {
   283  	switch w.Condition {
   284  	case AllNotFound:
   285  		// Object recreated by another actor after deletion.
   286  		// Treat as success.
   287  		klog.Infof("UID change detected: deleted object have been recreated: marking reconcile successful: %v", id)
   288  		err := taskContext.InventoryManager().SetSuccessfulReconcile(id)
   289  		if err != nil {
   290  			// Object never applied or deleted!
   291  			klog.Errorf("Failed to mark object as successful reconcile: %v", err)
   292  		}
   293  		w.sendEvent(taskContext, id, event.ReconcileSuccessful)
   294  	case AllCurrent:
   295  		// Object deleted and recreated by another actor after apply.
   296  		// Treat as failure (unverifiable).
   297  		klog.Infof("UID change detected: applied object has been deleted and recreated: marking reconcile failed: %v", id)
   298  		err := taskContext.InventoryManager().SetFailedReconcile(id)
   299  		if err != nil {
   300  			// Object never applied or deleted!
   301  			klog.Errorf("Failed to mark object as failed reconcile: %v", err)
   302  		}
   303  		w.sendEvent(taskContext, id, event.ReconcileFailed)
   304  	default:
   305  		panic(fmt.Sprintf("Invalid wait condition: %v", w.Condition))
   306  	}
   307  }
   308  
   309  // Cancel exits early with a timeout error
   310  func (w *WaitTask) Cancel(_ *TaskContext) {
   311  	w.cancelFunc()
   312  }
   313  
   314  // StatusUpdate records objects status updates and sends WaitEvents.
   315  // If all objects are reconciled or skipped, cancelFunc is called.
   316  // The pending set is write locked during execution of StatusUpdate.
   317  func (w *WaitTask) StatusUpdate(taskContext *TaskContext, id object.ObjMetadata) {
   318  	w.mu.Lock()
   319  	defer w.mu.Unlock()
   320  
   321  	if klog.V(5).Enabled() {
   322  		status := taskContext.ResourceCache().Get(id).Status
   323  		klog.Infof("status update (object: %q, status: %q)", id, status)
   324  	}
   325  
   326  	switch {
   327  	case w.pending.Contains(id):
   328  		switch {
   329  		case w.changedUID(taskContext, id):
   330  			// replaced
   331  			w.handleChangedUID(taskContext, id)
   332  			w.pending = w.pending.Remove(id)
   333  		case w.reconciledByID(taskContext, id):
   334  			// reconciled - remove from pending & send event
   335  			err := taskContext.InventoryManager().SetSuccessfulReconcile(id)
   336  			if err != nil {
   337  				// Object never applied or deleted!
   338  				klog.Errorf("Failed to mark object as successful reconcile: %v", err)
   339  			}
   340  			w.pending = w.pending.Remove(id)
   341  			w.sendEvent(taskContext, id, event.ReconcileSuccessful)
   342  		case w.failedByID(taskContext, id):
   343  			// failed - remove from pending & send event
   344  			err := taskContext.InventoryManager().SetFailedReconcile(id)
   345  			if err != nil {
   346  				// Object never applied or deleted!
   347  				klog.Errorf("Failed to mark object as failed reconcile: %v", err)
   348  			}
   349  			w.pending = w.pending.Remove(id)
   350  			w.failed = append(w.failed, id)
   351  			w.sendEvent(taskContext, id, event.ReconcileFailed)
   352  			// default - still pending
   353  		}
   354  	case !w.Ids.Contains(id):
   355  		// not in wait group - ignore
   356  		return
   357  	case w.skipped(taskContext, id):
   358  		// skipped - ignore
   359  		return
   360  	case w.failed.Contains(id):
   361  		// If a failed resource becomes current before other
   362  		// resources have completed/timed out, we consider it
   363  		// current.
   364  		if w.reconciledByID(taskContext, id) {
   365  			// reconciled - remove from pending & send event
   366  			err := taskContext.InventoryManager().SetSuccessfulReconcile(id)
   367  			if err != nil {
   368  				// Object never applied or deleted!
   369  				klog.Errorf("Failed to mark object as successful reconcile: %v", err)
   370  			}
   371  			w.failed = w.failed.Remove(id)
   372  			w.sendEvent(taskContext, id, event.ReconcileSuccessful)
   373  		} else if !w.failedByID(taskContext, id) {
   374  			// If a resource is no longer reported as Failed and is not Reconciled,
   375  			// they should just go back to InProgress.
   376  			err := taskContext.InventoryManager().SetPendingReconcile(id)
   377  			if err != nil {
   378  				// Object never applied or deleted!
   379  				klog.Errorf("Failed to mark object as pending reconcile: %v", err)
   380  			}
   381  			w.failed = w.failed.Remove(id)
   382  			w.pending = append(w.pending, id)
   383  			w.sendEvent(taskContext, id, event.ReconcilePending)
   384  		}
   385  		// else - still failed
   386  	default:
   387  		// reconciled - check if unreconciled
   388  		if !w.reconciledByID(taskContext, id) {
   389  			// unreconciled - add to pending & send event
   390  			err := taskContext.InventoryManager().SetPendingReconcile(id)
   391  			if err != nil {
   392  				// Object never applied or deleted!
   393  				klog.Errorf("Failed to mark object as pending reconcile: %v", err)
   394  			}
   395  			w.pending = append(w.pending, id)
   396  			w.sendEvent(taskContext, id, event.ReconcilePending)
   397  		}
   398  		// else - still reconciled
   399  	}
   400  
   401  	klog.V(3).Infof("wait task progress: %d/%d", len(w.Ids)-len(w.pending), len(w.Ids))
   402  
   403  	// If we no longer have any pending resources, the WaitTask
   404  	// can be completed.
   405  	if len(w.pending) == 0 {
   406  		// all reconciled, so exit
   407  		klog.V(3).Infof("all objects reconciled or skipped (name: %q)", w.TaskName)
   408  		w.cancelFunc()
   409  	}
   410  }
   411  
   412  // updateRESTMapper resets the RESTMapper if CRDs were applied, so that new
   413  // resource types can be applied by subsequent tasks.
   414  // TODO: find a way to add/remove mappers without resetting the entire mapper
   415  // Resetting the mapper requires all CRDs to be queried again.
   416  func (w *WaitTask) updateRESTMapper(taskContext *TaskContext) {
   417  	foundCRD := false
   418  	for _, id := range w.Ids {
   419  		if id.GroupKind == crdGK && !w.skipped(taskContext, id) {
   420  			foundCRD = true
   421  			break
   422  		}
   423  	}
   424  	if !foundCRD {
   425  		// no update required
   426  		return
   427  	}
   428  
   429  	klog.V(3).Infof("Resetting RESTMapper")
   430  	meta.MaybeResetRESTMapper(w.Mapper)
   431  }
   432  

View as plain text