...

Source file src/sigs.k8s.io/cli-utils/pkg/apply/task/apply_task.go

Documentation: sigs.k8s.io/cli-utils/pkg/apply/task

     1  // Copyright 2020 The Kubernetes Authors.
     2  // SPDX-License-Identifier: Apache-2.0
     3  
     4  package task
     5  
     6  import (
     7  	"context"
     8  	"errors"
     9  	"fmt"
    10  	"io"
    11  	"strings"
    12  
    13  	"k8s.io/apimachinery/pkg/api/meta"
    14  	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    15  	"k8s.io/apimachinery/pkg/util/sets"
    16  	"k8s.io/cli-runtime/pkg/genericclioptions"
    17  	"k8s.io/cli-runtime/pkg/resource"
    18  	"k8s.io/client-go/discovery"
    19  	"k8s.io/client-go/dynamic"
    20  	"k8s.io/klog/v2"
    21  	"k8s.io/kubectl/pkg/cmd/apply"
    22  	cmddelete "k8s.io/kubectl/pkg/cmd/delete"
    23  	applyerror "sigs.k8s.io/cli-utils/pkg/apply/error"
    24  	"sigs.k8s.io/cli-utils/pkg/apply/event"
    25  	"sigs.k8s.io/cli-utils/pkg/apply/filter"
    26  	"sigs.k8s.io/cli-utils/pkg/apply/info"
    27  	"sigs.k8s.io/cli-utils/pkg/apply/mutator"
    28  	"sigs.k8s.io/cli-utils/pkg/apply/taskrunner"
    29  	"sigs.k8s.io/cli-utils/pkg/common"
    30  	"sigs.k8s.io/cli-utils/pkg/object"
    31  )
    32  
    33  // applyOptions defines the two key functions on the ApplyOptions
    34  // struct that is used by the ApplyTask.
    35  type applyOptions interface {
    36  
    37  	// Run applies the resource set with the SetObjects function
    38  	// to the cluster.
    39  	Run() error
    40  
    41  	// SetObjects sets the slice of resource (in the form form resourceInfo objects)
    42  	// that will be applied upon invoking the Run function.
    43  	SetObjects([]*resource.Info)
    44  }
    45  
    46  // ApplyTask applies the given Objects to the cluster
    47  // by using the ApplyOptions.
    48  type ApplyTask struct {
    49  	TaskName string
    50  
    51  	DynamicClient     dynamic.Interface
    52  	OpenAPIGetter     discovery.OpenAPISchemaInterface
    53  	InfoHelper        info.Helper
    54  	Mapper            meta.RESTMapper
    55  	Objects           object.UnstructuredSet
    56  	Filters           []filter.ValidationFilter
    57  	Mutators          []mutator.Interface
    58  	DryRunStrategy    common.DryRunStrategy
    59  	ServerSideOptions common.ServerSideOptions
    60  }
    61  
    62  // applyOptionsFactoryFunc is a factory function for creating a new
    63  // applyOptions implementation. Used to allow unit testing.
    64  var applyOptionsFactoryFunc = newApplyOptions
    65  
    66  func (a *ApplyTask) Name() string {
    67  	return a.TaskName
    68  }
    69  
    70  func (a *ApplyTask) Action() event.ResourceAction {
    71  	return event.ApplyAction
    72  }
    73  
    74  func (a *ApplyTask) Identifiers() object.ObjMetadataSet {
    75  	return object.UnstructuredSetToObjMetadataSet(a.Objects)
    76  }
    77  
    78  // Start creates a new goroutine that will invoke
    79  // the Run function on the ApplyOptions to update
    80  // the cluster. It will push a TaskResult on the taskChannel
    81  // to signal to the taskrunner that the task has completed (or failed).
    82  // It will also fetch the Generation from each of the applied resources
    83  // after the Run function has completed. This information is then added
    84  // to the taskContext. The generation is increased every time
    85  // the desired state of a resource is changed.
    86  func (a *ApplyTask) Start(taskContext *taskrunner.TaskContext) {
    87  	go func() {
    88  		// TODO: pipe Context through TaskContext
    89  		ctx := context.TODO()
    90  		objects := a.Objects
    91  		klog.V(2).Infof("apply task starting (name: %q, objects: %d)",
    92  			a.Name(), len(objects))
    93  		for _, obj := range objects {
    94  			// Set the client and mapping fields on the provided
    95  			// info so they can be applied to the cluster.
    96  			info, err := a.InfoHelper.BuildInfo(obj)
    97  			// BuildInfo strips path annotations.
    98  			// Use modified object for filters, mutations, and events.
    99  			obj = info.Object.(*unstructured.Unstructured)
   100  			id := object.UnstructuredToObjMetadata(obj)
   101  			if err != nil {
   102  				err = applyerror.NewUnknownTypeError(err)
   103  				if klog.V(4).Enabled() {
   104  					// only log event emitted errors if the verbosity > 4
   105  					klog.Errorf("apply task errored (object: %s): unable to convert obj to info: %v", id, err)
   106  				}
   107  				taskContext.SendEvent(a.createApplyFailedEvent(id, err))
   108  				taskContext.InventoryManager().AddFailedApply(id)
   109  				continue
   110  			}
   111  
   112  			// Check filters to see if we're prevented from applying.
   113  			var filterErr error
   114  			for _, applyFilter := range a.Filters {
   115  				klog.V(6).Infof("apply filter evaluating (filter: %s, object: %s)", applyFilter.Name(), id)
   116  				filterErr = applyFilter.Filter(obj)
   117  				if filterErr != nil {
   118  					var fatalErr *filter.FatalError
   119  					if errors.As(filterErr, &fatalErr) {
   120  						if klog.V(4).Enabled() {
   121  							// only log event emitted errors if the verbosity > 4
   122  							klog.Errorf("apply filter errored (filter: %s, object: %s): %v", applyFilter.Name(), id, fatalErr.Err)
   123  						}
   124  						taskContext.SendEvent(a.createApplyFailedEvent(id, err))
   125  						taskContext.InventoryManager().AddFailedApply(id)
   126  						break
   127  					}
   128  					klog.V(4).Infof("apply filtered (filter: %s, object: %s): %v", applyFilter.Name(), id, filterErr)
   129  					taskContext.SendEvent(a.createApplySkippedEvent(id, obj, filterErr))
   130  					taskContext.InventoryManager().AddSkippedApply(id)
   131  					break
   132  				}
   133  			}
   134  			if filterErr != nil {
   135  				continue
   136  			}
   137  
   138  			// Execute mutators, if any apply
   139  			err = a.mutate(ctx, obj)
   140  			if err != nil {
   141  				if klog.V(4).Enabled() {
   142  					// only log event emitted errors if the verbosity > 4
   143  					klog.Errorf("apply mutation errored (object: %s): %v", id, err)
   144  				}
   145  				taskContext.SendEvent(a.createApplyFailedEvent(id, err))
   146  				taskContext.InventoryManager().AddFailedApply(id)
   147  				continue
   148  			}
   149  
   150  			// Create a new instance of the applyOptions interface and use it
   151  			// to apply the objects.
   152  			ao := applyOptionsFactoryFunc(a.Name(), taskContext.EventChannel(),
   153  				a.ServerSideOptions, a.DryRunStrategy, a.DynamicClient, a.OpenAPIGetter)
   154  			ao.SetObjects([]*resource.Info{info})
   155  			klog.V(5).Infof("applying object: %v", id)
   156  			err = ao.Run()
   157  			if err != nil && a.ServerSideOptions.ServerSideApply && isAPIService(obj) && isStreamError(err) {
   158  				// Server-side Apply doesn't work with APIService before k8s 1.21
   159  				// https://github.com/kubernetes/kubernetes/issues/89264
   160  				// Thus APIService is handled specially using client-side apply.
   161  				err = a.clientSideApply(info, taskContext.EventChannel())
   162  			}
   163  			if err != nil {
   164  				err = applyerror.NewApplyRunError(err)
   165  				if klog.V(4).Enabled() {
   166  					// only log event emitted errors if the verbosity > 4
   167  					klog.Errorf("apply errored (object: %s): %v", id, err)
   168  				}
   169  				taskContext.SendEvent(a.createApplyFailedEvent(id, err))
   170  				taskContext.InventoryManager().AddFailedApply(id)
   171  			} else if info.Object != nil {
   172  				acc, err := meta.Accessor(info.Object)
   173  				if err == nil {
   174  					uid := acc.GetUID()
   175  					gen := acc.GetGeneration()
   176  					taskContext.InventoryManager().AddSuccessfulApply(id, uid, gen)
   177  				}
   178  			}
   179  		}
   180  		a.sendTaskResult(taskContext)
   181  	}()
   182  }
   183  
   184  func newApplyOptions(taskName string, eventChannel chan<- event.Event, serverSideOptions common.ServerSideOptions,
   185  	strategy common.DryRunStrategy, dynamicClient dynamic.Interface,
   186  	openAPIGetter discovery.OpenAPISchemaInterface) applyOptions {
   187  	emptyString := ""
   188  	return &apply.ApplyOptions{
   189  		VisitedNamespaces: sets.NewString(),
   190  		VisitedUids:       sets.NewString(),
   191  		Overwrite:         true, // Normally set in apply.NewApplyOptions
   192  		OpenAPIPatch:      true, // Normally set in apply.NewApplyOptions
   193  		Recorder:          genericclioptions.NoopRecorder{},
   194  		IOStreams: genericclioptions.IOStreams{
   195  			Out:    io.Discard,
   196  			ErrOut: io.Discard, // TODO: Warning for no lastConfigurationAnnotation
   197  			// is printed directly to stderr in ApplyOptions. We
   198  			// should turn that into a warning on the event channel.
   199  		},
   200  		// FilenameOptions are not needed since we don't use the ApplyOptions
   201  		// to read manifests.
   202  		DeleteOptions: &cmddelete.DeleteOptions{},
   203  		PrintFlags: &genericclioptions.PrintFlags{
   204  			OutputFormat: &emptyString,
   205  		},
   206  		// Server-side apply if flag set or server-side dry run.
   207  		ServerSideApply: strategy.ServerDryRun() || serverSideOptions.ServerSideApply,
   208  		ForceConflicts:  serverSideOptions.ForceConflicts,
   209  		FieldManager:    serverSideOptions.FieldManager,
   210  		DryRunStrategy:  strategy.Strategy(),
   211  		ToPrinter: (&KubectlPrinterAdapter{
   212  			ch:        eventChannel,
   213  			groupName: taskName,
   214  		}).toPrinterFunc(),
   215  		DynamicClient:  dynamicClient,
   216  		DryRunVerifier: resource.NewQueryParamVerifier(dynamicClient, openAPIGetter, resource.QueryParamDryRun),
   217  	}
   218  }
   219  
   220  func (a *ApplyTask) sendTaskResult(taskContext *taskrunner.TaskContext) {
   221  	klog.V(2).Infof("apply task completing (name: %q)", a.Name())
   222  	taskContext.TaskChannel() <- taskrunner.TaskResult{}
   223  }
   224  
   225  // Cancel is not supported by the ApplyTask.
   226  func (a *ApplyTask) Cancel(_ *taskrunner.TaskContext) {}
   227  
   228  // StatusUpdate is not supported by the ApplyTask.
   229  func (a *ApplyTask) StatusUpdate(_ *taskrunner.TaskContext, _ object.ObjMetadata) {}
   230  
   231  // mutate loops through the mutator list and executes them on the object.
   232  func (a *ApplyTask) mutate(ctx context.Context, obj *unstructured.Unstructured) error {
   233  	id := object.UnstructuredToObjMetadata(obj)
   234  	for _, mutator := range a.Mutators {
   235  		klog.V(6).Infof("apply mutator %s: %s", mutator.Name(), id)
   236  		mutated, reason, err := mutator.Mutate(ctx, obj)
   237  		if err != nil {
   238  			return fmt.Errorf("failed to mutate %q with %q: %w", id, mutator.Name(), err)
   239  		}
   240  		if mutated {
   241  			klog.V(4).Infof("resource mutated (mutator: %q, resource: %q, reason: %q)", mutator.Name(), id, reason)
   242  		}
   243  	}
   244  	return nil
   245  }
   246  
   247  func (a *ApplyTask) createApplyFailedEvent(id object.ObjMetadata, err error) event.Event {
   248  	return event.Event{
   249  		Type: event.ApplyType,
   250  		ApplyEvent: event.ApplyEvent{
   251  			GroupName:  a.Name(),
   252  			Identifier: id,
   253  			Status:     event.ApplyFailed,
   254  			Error:      err,
   255  		},
   256  	}
   257  }
   258  
   259  func (a *ApplyTask) createApplySkippedEvent(id object.ObjMetadata, resource *unstructured.Unstructured, err error) event.Event {
   260  	return event.Event{
   261  		Type: event.ApplyType,
   262  		ApplyEvent: event.ApplyEvent{
   263  			GroupName:  a.Name(),
   264  			Identifier: id,
   265  			Status:     event.ApplySkipped,
   266  			Resource:   resource,
   267  			Error:      err,
   268  		},
   269  	}
   270  }
   271  
   272  func isAPIService(obj *unstructured.Unstructured) bool {
   273  	gk := obj.GroupVersionKind().GroupKind()
   274  	return gk.Group == "apiregistration.k8s.io" && gk.Kind == "APIService"
   275  }
   276  
   277  // isStreamError checks if the error is a StreamError. Since kubectl wraps the actual StreamError,
   278  // we can't check the error type.
   279  func isStreamError(err error) bool {
   280  	return strings.Contains(err.Error(), "stream error: stream ID ")
   281  }
   282  
   283  func (a *ApplyTask) clientSideApply(info *resource.Info, eventChannel chan<- event.Event) error {
   284  	ao := applyOptionsFactoryFunc(a.Name(), eventChannel, common.ServerSideOptions{ServerSideApply: false}, a.DryRunStrategy, a.DynamicClient, a.OpenAPIGetter)
   285  	ao.SetObjects([]*resource.Info{info})
   286  	return ao.Run()
   287  }
   288  

View as plain text