...

Source file src/sigs.k8s.io/cli-utils/pkg/apply/solver/solver.go

Documentation: sigs.k8s.io/cli-utils/pkg/apply/solver

     1  // Copyright 2020 The Kubernetes Authors.
     2  // SPDX-License-Identifier: Apache-2.0
     3  
     4  // The solver package is responsible for constructing a
     5  // taskqueue based on the set of resources that should be
     6  // applied.
     7  // This involves setting up the appropriate sequence of
     8  // apply, wait and prune tasks so any dependencies between
     9  // resources doesn't cause a later apply operation to
    10  // fail.
    11  // Currently this package assumes that the resources have
    12  // already been sorted in the appropriate order. We might
    13  // want to consider moving the sorting functionality into
    14  // this package.
    15  package solver
    16  
    17  import (
    18  	"fmt"
    19  	"time"
    20  
    21  	"k8s.io/apimachinery/pkg/api/meta"
    22  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    23  	"k8s.io/client-go/discovery"
    24  	"k8s.io/client-go/dynamic"
    25  	"k8s.io/klog/v2"
    26  	"sigs.k8s.io/cli-utils/pkg/apply/event"
    27  	"sigs.k8s.io/cli-utils/pkg/apply/filter"
    28  	"sigs.k8s.io/cli-utils/pkg/apply/info"
    29  	"sigs.k8s.io/cli-utils/pkg/apply/mutator"
    30  	"sigs.k8s.io/cli-utils/pkg/apply/prune"
    31  	"sigs.k8s.io/cli-utils/pkg/apply/task"
    32  	"sigs.k8s.io/cli-utils/pkg/apply/taskrunner"
    33  	"sigs.k8s.io/cli-utils/pkg/common"
    34  	"sigs.k8s.io/cli-utils/pkg/inventory"
    35  	"sigs.k8s.io/cli-utils/pkg/object"
    36  	"sigs.k8s.io/cli-utils/pkg/object/graph"
    37  	"sigs.k8s.io/cli-utils/pkg/object/validation"
    38  )
    39  
    40  type TaskQueueBuilder struct {
    41  	Pruner        *prune.Pruner
    42  	DynamicClient dynamic.Interface
    43  	OpenAPIGetter discovery.OpenAPISchemaInterface
    44  	InfoHelper    info.Helper
    45  	Mapper        meta.RESTMapper
    46  	InvClient     inventory.Client
    47  	// Collector is used to collect validation errors and invalid objects.
    48  	// Invalid objects will be filtered and not be injected into tasks.
    49  	Collector     *validation.Collector
    50  	ApplyFilters  []filter.ValidationFilter
    51  	ApplyMutators []mutator.Interface
    52  	PruneFilters  []filter.ValidationFilter
    53  
    54  	// The accumulated tasks and counter variables to name tasks.
    55  	applyCounter int
    56  	pruneCounter int
    57  	waitCounter  int
    58  
    59  	invInfo   inventory.Info
    60  	applyObjs object.UnstructuredSet
    61  	pruneObjs object.UnstructuredSet
    62  }
    63  
    64  type TaskQueue struct {
    65  	tasks []taskrunner.Task
    66  }
    67  
    68  func (tq *TaskQueue) ToChannel() chan taskrunner.Task {
    69  	taskQueue := make(chan taskrunner.Task, len(tq.tasks))
    70  	for _, t := range tq.tasks {
    71  		taskQueue <- t
    72  	}
    73  	return taskQueue
    74  }
    75  
    76  func (tq *TaskQueue) ToActionGroups() []event.ActionGroup {
    77  	var ags []event.ActionGroup
    78  
    79  	for _, t := range tq.tasks {
    80  		ags = append(ags, event.ActionGroup{
    81  			Name:        t.Name(),
    82  			Action:      t.Action(),
    83  			Identifiers: t.Identifiers(),
    84  		})
    85  	}
    86  	return ags
    87  }
    88  
    89  type Options struct {
    90  	ServerSideOptions common.ServerSideOptions
    91  	ReconcileTimeout  time.Duration
    92  	// True if we are destroying, which deletes the inventory object
    93  	// as well (possibly) the inventory namespace.
    94  	Destroy bool
    95  	// True if we're deleting prune objects
    96  	Prune                  bool
    97  	DryRunStrategy         common.DryRunStrategy
    98  	PrunePropagationPolicy metav1.DeletionPropagation
    99  	PruneTimeout           time.Duration
   100  	InventoryPolicy        inventory.Policy
   101  }
   102  
   103  // WithInventory sets the inventory info and returns the builder for chaining.
   104  func (t *TaskQueueBuilder) WithInventory(inv inventory.Info) *TaskQueueBuilder {
   105  	t.invInfo = inv
   106  	return t
   107  }
   108  
   109  // WithApplyObjects sets the apply objects and returns the builder for chaining.
   110  func (t *TaskQueueBuilder) WithApplyObjects(applyObjs object.UnstructuredSet) *TaskQueueBuilder {
   111  	t.applyObjs = applyObjs
   112  	return t
   113  }
   114  
   115  // WithPruneObjects sets the prune objects and returns the builder for chaining.
   116  func (t *TaskQueueBuilder) WithPruneObjects(pruneObjs object.UnstructuredSet) *TaskQueueBuilder {
   117  	t.pruneObjs = pruneObjs
   118  	return t
   119  }
   120  
   121  // Build returns the queue of tasks that have been created
   122  func (t *TaskQueueBuilder) Build(taskContext *taskrunner.TaskContext, o Options) *TaskQueue {
   123  	var tasks []taskrunner.Task
   124  
   125  	// reset counters
   126  	t.applyCounter = 0
   127  	t.pruneCounter = 0
   128  	t.waitCounter = 0
   129  
   130  	// Filter objects that failed earlier validation
   131  	applyObjs := t.Collector.FilterInvalidObjects(t.applyObjs)
   132  	pruneObjs := t.Collector.FilterInvalidObjects(t.pruneObjs)
   133  
   134  	// Merge applyObjs & pruneObjs and graph them together.
   135  	// This detects implicit and explicit dependencies.
   136  	// Invalid dependency annotations will be treated as validation errors.
   137  	allObjs := make(object.UnstructuredSet, 0, len(applyObjs)+len(pruneObjs))
   138  	allObjs = append(allObjs, applyObjs...)
   139  	allObjs = append(allObjs, pruneObjs...)
   140  	g, err := graph.DependencyGraph(allObjs)
   141  	if err != nil {
   142  		t.Collector.Collect(err)
   143  	}
   144  	// Store graph for use by DependencyFilter
   145  	taskContext.SetGraph(g)
   146  	// Sort objects into phases (apply order).
   147  	// Cycles will be treated as validation errors.
   148  	idSetList, err := g.Sort()
   149  	if err != nil {
   150  		t.Collector.Collect(err)
   151  	}
   152  
   153  	// Filter objects with cycles or invalid dependency annotations
   154  	applyObjs = t.Collector.FilterInvalidObjects(applyObjs)
   155  	pruneObjs = t.Collector.FilterInvalidObjects(pruneObjs)
   156  
   157  	if !o.Destroy {
   158  		// InvAddTask creates the inventory and adds any objects being applied
   159  		klog.V(2).Infof("adding inventory add task (%d objects)", len(applyObjs))
   160  		tasks = append(tasks, &task.InvAddTask{
   161  			TaskName:  "inventory-add-0",
   162  			InvClient: t.InvClient,
   163  			InvInfo:   t.invInfo,
   164  			Objects:   applyObjs,
   165  			DryRun:    o.DryRunStrategy,
   166  		})
   167  	}
   168  
   169  	if len(applyObjs) > 0 {
   170  		// Register actuation plan in the inventory
   171  		for _, id := range object.UnstructuredSetToObjMetadataSet(applyObjs) {
   172  			taskContext.InventoryManager().AddPendingApply(id)
   173  		}
   174  
   175  		// Filter idSetList down to just apply objects
   176  		applySets := graph.HydrateSetList(idSetList, applyObjs)
   177  
   178  		for _, applySet := range applySets {
   179  			tasks = append(tasks,
   180  				t.newApplyTask(applySet, t.ApplyFilters, t.ApplyMutators, o))
   181  			// dry-run skips wait tasks
   182  			if !o.DryRunStrategy.ClientOrServerDryRun() {
   183  				applyIds := object.UnstructuredSetToObjMetadataSet(applySet)
   184  				tasks = append(tasks,
   185  					t.newWaitTask(applyIds, taskrunner.AllCurrent, o.ReconcileTimeout))
   186  			}
   187  		}
   188  	}
   189  
   190  	if o.Prune && len(pruneObjs) > 0 {
   191  		// Register actuation plan in the inventory
   192  		for _, id := range object.UnstructuredSetToObjMetadataSet(pruneObjs) {
   193  			taskContext.InventoryManager().AddPendingDelete(id)
   194  		}
   195  
   196  		// Filter idSetList down to just prune objects
   197  		pruneSets := graph.HydrateSetList(idSetList, pruneObjs)
   198  
   199  		// Reverse apply order to get prune order
   200  		graph.ReverseSetList(pruneSets)
   201  
   202  		for _, pruneSet := range pruneSets {
   203  			tasks = append(tasks,
   204  				t.newPruneTask(pruneSet, t.PruneFilters, o))
   205  			// dry-run skips wait tasks
   206  			if !o.DryRunStrategy.ClientOrServerDryRun() {
   207  				pruneIds := object.UnstructuredSetToObjMetadataSet(pruneSet)
   208  				tasks = append(tasks,
   209  					t.newWaitTask(pruneIds, taskrunner.AllNotFound, o.PruneTimeout))
   210  			}
   211  		}
   212  	}
   213  
   214  	// TODO: add InvSetTask when Destroy=true to retain undeleted objects
   215  	if !o.Destroy {
   216  		klog.V(2).Infoln("adding inventory set task")
   217  		prevInvIds, _ := t.InvClient.GetClusterObjs(t.invInfo)
   218  		tasks = append(tasks, &task.InvSetTask{
   219  			TaskName:      "inventory-set-0",
   220  			InvClient:     t.InvClient,
   221  			InvInfo:       t.invInfo,
   222  			PrevInventory: prevInvIds,
   223  			DryRun:        o.DryRunStrategy,
   224  		})
   225  	} else {
   226  		klog.V(2).Infoln("adding delete inventory task")
   227  		tasks = append(tasks, &task.DeleteInvTask{
   228  			TaskName:  "delete-inventory-0",
   229  			InvClient: t.InvClient,
   230  			InvInfo:   t.invInfo,
   231  			DryRun:    o.DryRunStrategy,
   232  		})
   233  	}
   234  
   235  	return &TaskQueue{tasks: tasks}
   236  }
   237  
   238  // AppendApplyTask appends a task to the task queue to apply the passed objects
   239  // to the cluster. Returns a pointer to the Builder to chain function calls.
   240  func (t *TaskQueueBuilder) newApplyTask(applyObjs object.UnstructuredSet,
   241  	applyFilters []filter.ValidationFilter, applyMutators []mutator.Interface, o Options) taskrunner.Task {
   242  	applyObjs = t.Collector.FilterInvalidObjects(applyObjs)
   243  	klog.V(2).Infof("adding apply task (%d objects)", len(applyObjs))
   244  	task := &task.ApplyTask{
   245  		TaskName:          fmt.Sprintf("apply-%d", t.applyCounter),
   246  		Objects:           applyObjs,
   247  		Filters:           applyFilters,
   248  		Mutators:          applyMutators,
   249  		ServerSideOptions: o.ServerSideOptions,
   250  		DryRunStrategy:    o.DryRunStrategy,
   251  		DynamicClient:     t.DynamicClient,
   252  		OpenAPIGetter:     t.OpenAPIGetter,
   253  		InfoHelper:        t.InfoHelper,
   254  		Mapper:            t.Mapper,
   255  	}
   256  	t.applyCounter++
   257  	return task
   258  }
   259  
   260  // AppendWaitTask appends a task to wait on the passed objects to the task queue.
   261  // Returns a pointer to the Builder to chain function calls.
   262  func (t *TaskQueueBuilder) newWaitTask(waitIds object.ObjMetadataSet, condition taskrunner.Condition,
   263  	waitTimeout time.Duration) taskrunner.Task {
   264  	waitIds = t.Collector.FilterInvalidIds(waitIds)
   265  	klog.V(2).Infoln("adding wait task")
   266  	task := taskrunner.NewWaitTask(
   267  		fmt.Sprintf("wait-%d", t.waitCounter),
   268  		waitIds,
   269  		condition,
   270  		waitTimeout,
   271  		t.Mapper,
   272  	)
   273  	t.waitCounter++
   274  	return task
   275  }
   276  
   277  // AppendPruneTask appends a task to delete objects from the cluster to the task queue.
   278  // Returns a pointer to the Builder to chain function calls.
   279  func (t *TaskQueueBuilder) newPruneTask(pruneObjs object.UnstructuredSet,
   280  	pruneFilters []filter.ValidationFilter, o Options) taskrunner.Task {
   281  	pruneObjs = t.Collector.FilterInvalidObjects(pruneObjs)
   282  	klog.V(2).Infof("adding prune task (%d objects)", len(pruneObjs))
   283  	task := &task.PruneTask{
   284  		TaskName:          fmt.Sprintf("prune-%d", t.pruneCounter),
   285  		Objects:           pruneObjs,
   286  		Filters:           pruneFilters,
   287  		Pruner:            t.Pruner,
   288  		PropagationPolicy: o.PrunePropagationPolicy,
   289  		DryRunStrategy:    o.DryRunStrategy,
   290  		Destroy:           o.Destroy,
   291  	}
   292  	t.pruneCounter++
   293  	return task
   294  }
   295  

View as plain text