...

Source file src/sigs.k8s.io/cli-utils/pkg/apply/applier.go

Documentation: sigs.k8s.io/cli-utils/pkg/apply

     1  // Copyright 2019 The Kubernetes Authors.
     2  // SPDX-License-Identifier: Apache-2.0
     3  
     4  package apply
     5  
     6  import (
     7  	"context"
     8  	"fmt"
     9  	"time"
    10  
    11  	"k8s.io/apimachinery/pkg/api/meta"
    12  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    13  	"k8s.io/apimachinery/pkg/util/sets"
    14  	"k8s.io/client-go/discovery"
    15  	"k8s.io/client-go/dynamic"
    16  	"k8s.io/klog/v2"
    17  	"sigs.k8s.io/cli-utils/pkg/apis/actuation"
    18  	"sigs.k8s.io/cli-utils/pkg/apply/cache"
    19  	"sigs.k8s.io/cli-utils/pkg/apply/event"
    20  	"sigs.k8s.io/cli-utils/pkg/apply/filter"
    21  	"sigs.k8s.io/cli-utils/pkg/apply/info"
    22  	"sigs.k8s.io/cli-utils/pkg/apply/mutator"
    23  	"sigs.k8s.io/cli-utils/pkg/apply/prune"
    24  	"sigs.k8s.io/cli-utils/pkg/apply/solver"
    25  	"sigs.k8s.io/cli-utils/pkg/apply/taskrunner"
    26  	"sigs.k8s.io/cli-utils/pkg/common"
    27  	"sigs.k8s.io/cli-utils/pkg/inventory"
    28  	"sigs.k8s.io/cli-utils/pkg/kstatus/watcher"
    29  	"sigs.k8s.io/cli-utils/pkg/object"
    30  	"sigs.k8s.io/cli-utils/pkg/object/validation"
    31  )
    32  
    33  // Applier performs the step of applying a set of resources into a cluster,
    34  // conditionally waits for all of them to be fully reconciled and finally
    35  // performs prune to clean up any resources that has been deleted.
    36  // The applier performs its function by executing a list queue of tasks,
    37  // each of which is one of the steps in the process of applying a set
    38  // of resources to the cluster. The actual execution of these tasks are
    39  // handled by a StatusRunner. So the taskqueue is effectively a
    40  // specification that is executed by the StatusRunner. Based on input
    41  // parameters and/or the set of resources that needs to be applied to the
    42  // cluster, different sets of tasks might be needed.
    43  type Applier struct {
    44  	pruner        *prune.Pruner
    45  	statusWatcher watcher.StatusWatcher
    46  	invClient     inventory.Client
    47  	client        dynamic.Interface
    48  	openAPIGetter discovery.OpenAPISchemaInterface
    49  	mapper        meta.RESTMapper
    50  	infoHelper    info.Helper
    51  }
    52  
    53  // prepareObjects returns the set of objects to apply and to prune or
    54  // an error if one occurred.
    55  func (a *Applier) prepareObjects(localInv inventory.Info, localObjs object.UnstructuredSet,
    56  	o ApplierOptions) (object.UnstructuredSet, object.UnstructuredSet, error) {
    57  	if localInv == nil {
    58  		return nil, nil, fmt.Errorf("the local inventory can't be nil")
    59  	}
    60  	if err := inventory.ValidateNoInventory(localObjs); err != nil {
    61  		return nil, nil, err
    62  	}
    63  	// Add the inventory annotation to the resources being applied.
    64  	for _, localObj := range localObjs {
    65  		inventory.AddInventoryIDAnnotation(localObj, localInv)
    66  	}
    67  	// If the inventory uses the Name strategy and an inventory ID is provided,
    68  	// verify that the existing inventory object (if there is one) has an ID
    69  	// label that matches.
    70  	// TODO(seans): This inventory id validation should happen in destroy and status.
    71  	if localInv.Strategy() == inventory.NameStrategy && localInv.ID() != "" {
    72  		prevInvObjs, err := a.invClient.GetClusterInventoryObjs(localInv)
    73  		if err != nil {
    74  			return nil, nil, err
    75  		}
    76  		if len(prevInvObjs) > 1 {
    77  			panic(fmt.Errorf("found %d inv objects with Name strategy", len(prevInvObjs)))
    78  		}
    79  		if len(prevInvObjs) == 1 {
    80  			invObj := prevInvObjs[0]
    81  			val := invObj.GetLabels()[common.InventoryLabel]
    82  			if val != localInv.ID() {
    83  				return nil, nil, fmt.Errorf("inventory-id of inventory object in cluster doesn't match provided id %q", localInv.ID())
    84  			}
    85  		}
    86  	}
    87  	pruneObjs, err := a.pruner.GetPruneObjs(localInv, localObjs, prune.Options{
    88  		DryRunStrategy: o.DryRunStrategy,
    89  	})
    90  	if err != nil {
    91  		return nil, nil, err
    92  	}
    93  	return localObjs, pruneObjs, nil
    94  }
    95  
    96  // Run performs the Apply step. This happens asynchronously with updates
    97  // on progress and any errors reported back on the event channel.
    98  // Cancelling the operation or setting timeout on how long to Wait
    99  // for it complete can be done with the passed in context.
   100  // Note: There isn't currently any way to interrupt the operation
   101  // before all the given resources have been applied to the cluster. Any
   102  // cancellation or timeout will only affect how long we Wait for the
   103  // resources to become current.
   104  func (a *Applier) Run(ctx context.Context, invInfo inventory.Info, objects object.UnstructuredSet, options ApplierOptions) <-chan event.Event {
   105  	klog.V(4).Infof("apply run for %d objects", len(objects))
   106  	eventChannel := make(chan event.Event)
   107  	setDefaults(&options)
   108  	go func() {
   109  		defer close(eventChannel)
   110  		// Validate the resources to make sure we catch those problems early
   111  		// before anything has been updated in the cluster.
   112  		vCollector := &validation.Collector{}
   113  		validator := &validation.Validator{
   114  			Collector: vCollector,
   115  			Mapper:    a.mapper,
   116  		}
   117  		validator.Validate(objects)
   118  
   119  		// Decide which objects to apply and which to prune
   120  		applyObjs, pruneObjs, err := a.prepareObjects(invInfo, objects, options)
   121  		if err != nil {
   122  			handleError(eventChannel, err)
   123  			return
   124  		}
   125  		klog.V(4).Infof("calculated %d apply objs; %d prune objs", len(applyObjs), len(pruneObjs))
   126  
   127  		// Build a TaskContext for passing info between tasks
   128  		resourceCache := cache.NewResourceCacheMap()
   129  		taskContext := taskrunner.NewTaskContext(eventChannel, resourceCache)
   130  
   131  		// Fetch the queue (channel) of tasks that should be executed.
   132  		klog.V(4).Infoln("applier building task queue...")
   133  		// Build list of apply validation filters.
   134  		applyFilters := []filter.ValidationFilter{
   135  			filter.InventoryPolicyApplyFilter{
   136  				Client:    a.client,
   137  				Mapper:    a.mapper,
   138  				Inv:       invInfo,
   139  				InvPolicy: options.InventoryPolicy,
   140  			},
   141  			filter.DependencyFilter{
   142  				TaskContext:       taskContext,
   143  				ActuationStrategy: actuation.ActuationStrategyApply,
   144  				DryRunStrategy:    options.DryRunStrategy,
   145  			},
   146  		}
   147  		// Build list of prune validation filters.
   148  		pruneFilters := []filter.ValidationFilter{
   149  			filter.PreventRemoveFilter{},
   150  			filter.InventoryPolicyPruneFilter{
   151  				Inv:       invInfo,
   152  				InvPolicy: options.InventoryPolicy,
   153  			},
   154  			filter.LocalNamespacesFilter{
   155  				LocalNamespaces: localNamespaces(invInfo, object.UnstructuredSetToObjMetadataSet(objects)),
   156  			},
   157  			filter.DependencyFilter{
   158  				TaskContext:       taskContext,
   159  				ActuationStrategy: actuation.ActuationStrategyDelete,
   160  				DryRunStrategy:    options.DryRunStrategy,
   161  			},
   162  		}
   163  		// Build list of apply mutators.
   164  		applyMutators := []mutator.Interface{
   165  			&mutator.ApplyTimeMutator{
   166  				Client:        a.client,
   167  				Mapper:        a.mapper,
   168  				ResourceCache: resourceCache,
   169  			},
   170  		}
   171  		taskBuilder := &solver.TaskQueueBuilder{
   172  			Pruner:        a.pruner,
   173  			DynamicClient: a.client,
   174  			OpenAPIGetter: a.openAPIGetter,
   175  			InfoHelper:    a.infoHelper,
   176  			Mapper:        a.mapper,
   177  			InvClient:     a.invClient,
   178  			Collector:     vCollector,
   179  			ApplyFilters:  applyFilters,
   180  			ApplyMutators: applyMutators,
   181  			PruneFilters:  pruneFilters,
   182  		}
   183  		opts := solver.Options{
   184  			ServerSideOptions:      options.ServerSideOptions,
   185  			ReconcileTimeout:       options.ReconcileTimeout,
   186  			Destroy:                false,
   187  			Prune:                  !options.NoPrune,
   188  			DryRunStrategy:         options.DryRunStrategy,
   189  			PrunePropagationPolicy: options.PrunePropagationPolicy,
   190  			PruneTimeout:           options.PruneTimeout,
   191  			InventoryPolicy:        options.InventoryPolicy,
   192  		}
   193  
   194  		// Build the ordered set of tasks to execute.
   195  		taskQueue := taskBuilder.
   196  			WithApplyObjects(applyObjs).
   197  			WithPruneObjects(pruneObjs).
   198  			WithInventory(invInfo).
   199  			Build(taskContext, opts)
   200  
   201  		klog.V(4).Infof("validation errors: %d", len(vCollector.Errors))
   202  		klog.V(4).Infof("invalid objects: %d", len(vCollector.InvalidIds))
   203  
   204  		// Handle validation errors
   205  		switch options.ValidationPolicy {
   206  		case validation.ExitEarly:
   207  			err = vCollector.ToError()
   208  			if err != nil {
   209  				handleError(eventChannel, err)
   210  				return
   211  			}
   212  		case validation.SkipInvalid:
   213  			for _, err := range vCollector.Errors {
   214  				handleValidationError(eventChannel, err)
   215  			}
   216  		default:
   217  			handleError(eventChannel, fmt.Errorf("invalid ValidationPolicy: %q", options.ValidationPolicy))
   218  			return
   219  		}
   220  
   221  		// Register invalid objects to be retained in the inventory, if present.
   222  		for _, id := range vCollector.InvalidIds {
   223  			taskContext.AddInvalidObject(id)
   224  		}
   225  
   226  		// Send event to inform the caller about the resources that
   227  		// will be applied/pruned.
   228  		eventChannel <- event.Event{
   229  			Type: event.InitType,
   230  			InitEvent: event.InitEvent{
   231  				ActionGroups: taskQueue.ToActionGroups(),
   232  			},
   233  		}
   234  		// Create a new TaskStatusRunner to execute the taskQueue.
   235  		klog.V(4).Infoln("applier building TaskStatusRunner...")
   236  		allIds := object.UnstructuredSetToObjMetadataSet(append(applyObjs, pruneObjs...))
   237  		statusWatcher := a.statusWatcher
   238  		// Disable watcher for dry runs
   239  		if opts.DryRunStrategy.ClientOrServerDryRun() {
   240  			statusWatcher = watcher.BlindStatusWatcher{}
   241  		}
   242  		runner := taskrunner.NewTaskStatusRunner(allIds, statusWatcher)
   243  		klog.V(4).Infoln("applier running TaskStatusRunner...")
   244  		err = runner.Run(ctx, taskContext, taskQueue.ToChannel(), taskrunner.Options{
   245  			EmitStatusEvents: options.EmitStatusEvents,
   246  		})
   247  		if err != nil {
   248  			handleError(eventChannel, err)
   249  			return
   250  		}
   251  	}()
   252  	return eventChannel
   253  }
   254  
   255  type ApplierOptions struct {
   256  	// Encapsulates the fields for server-side apply.
   257  	ServerSideOptions common.ServerSideOptions
   258  
   259  	// ReconcileTimeout defines whether the applier should wait
   260  	// until all applied resources have been reconciled, and if so,
   261  	// how long to wait.
   262  	ReconcileTimeout time.Duration
   263  
   264  	// EmitStatusEvents defines whether status events should be
   265  	// emitted on the eventChannel to the caller.
   266  	EmitStatusEvents bool
   267  
   268  	// NoPrune defines whether pruning of previously applied
   269  	// objects should happen after apply.
   270  	NoPrune bool
   271  
   272  	// DryRunStrategy defines whether changes should actually be performed,
   273  	// or if it is just talk and no action.
   274  	DryRunStrategy common.DryRunStrategy
   275  
   276  	// PrunePropagationPolicy defines the deletion propagation policy
   277  	// that should be used for pruning. If this is not provided, the
   278  	// default is to use the Background policy.
   279  	PrunePropagationPolicy metav1.DeletionPropagation
   280  
   281  	// PruneTimeout defines whether we should wait for all resources
   282  	// to be fully deleted after pruning, and if so, how long we should
   283  	// wait.
   284  	PruneTimeout time.Duration
   285  
   286  	// InventoryPolicy defines the inventory policy of apply.
   287  	InventoryPolicy inventory.Policy
   288  
   289  	// ValidationPolicy defines how to handle invalid objects.
   290  	ValidationPolicy validation.Policy
   291  }
   292  
   293  // setDefaults set the options to the default values if they
   294  // have not been provided.
   295  func setDefaults(o *ApplierOptions) {
   296  	if o.PrunePropagationPolicy == "" {
   297  		o.PrunePropagationPolicy = metav1.DeletePropagationBackground
   298  	}
   299  }
   300  
   301  func handleError(eventChannel chan event.Event, err error) {
   302  	eventChannel <- event.Event{
   303  		Type: event.ErrorType,
   304  		ErrorEvent: event.ErrorEvent{
   305  			Err: err,
   306  		},
   307  	}
   308  }
   309  
   310  // localNamespaces stores a set of strings of all the namespaces
   311  // for the passed non cluster-scoped localObjs, plus the namespace
   312  // of the passed inventory object. This is used to skip deleting
   313  // namespaces which have currently applied objects in them.
   314  func localNamespaces(localInv inventory.Info, localObjs []object.ObjMetadata) sets.String {
   315  	namespaces := sets.NewString()
   316  	for _, obj := range localObjs {
   317  		if obj.Namespace != "" {
   318  			namespaces.Insert(obj.Namespace)
   319  		}
   320  	}
   321  	invNamespace := localInv.Namespace()
   322  	if invNamespace != "" {
   323  		namespaces.Insert(invNamespace)
   324  	}
   325  	return namespaces
   326  }
   327  
   328  func handleValidationError(eventChannel chan<- event.Event, err error) {
   329  	switch tErr := err.(type) {
   330  	case *validation.Error:
   331  		// handle validation error about one or more specific objects
   332  		eventChannel <- event.Event{
   333  			Type: event.ValidationType,
   334  			ValidationEvent: event.ValidationEvent{
   335  				Identifiers: tErr.Identifiers(),
   336  				Error:       tErr,
   337  			},
   338  		}
   339  	default:
   340  		// handle general validation error (no specific object)
   341  		eventChannel <- event.Event{
   342  			Type: event.ValidationType,
   343  			ValidationEvent: event.ValidationEvent{
   344  				Error: tErr,
   345  			},
   346  		}
   347  	}
   348  }
   349  

View as plain text