1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package solver
16
17 import (
18 "fmt"
19 "time"
20
21 "k8s.io/apimachinery/pkg/api/meta"
22 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
23 "k8s.io/client-go/discovery"
24 "k8s.io/client-go/dynamic"
25 "k8s.io/klog/v2"
26 "sigs.k8s.io/cli-utils/pkg/apply/event"
27 "sigs.k8s.io/cli-utils/pkg/apply/filter"
28 "sigs.k8s.io/cli-utils/pkg/apply/info"
29 "sigs.k8s.io/cli-utils/pkg/apply/mutator"
30 "sigs.k8s.io/cli-utils/pkg/apply/prune"
31 "sigs.k8s.io/cli-utils/pkg/apply/task"
32 "sigs.k8s.io/cli-utils/pkg/apply/taskrunner"
33 "sigs.k8s.io/cli-utils/pkg/common"
34 "sigs.k8s.io/cli-utils/pkg/inventory"
35 "sigs.k8s.io/cli-utils/pkg/object"
36 "sigs.k8s.io/cli-utils/pkg/object/graph"
37 "sigs.k8s.io/cli-utils/pkg/object/validation"
38 )
39
40 type TaskQueueBuilder struct {
41 Pruner *prune.Pruner
42 DynamicClient dynamic.Interface
43 OpenAPIGetter discovery.OpenAPISchemaInterface
44 InfoHelper info.Helper
45 Mapper meta.RESTMapper
46 InvClient inventory.Client
47
48
49 Collector *validation.Collector
50 ApplyFilters []filter.ValidationFilter
51 ApplyMutators []mutator.Interface
52 PruneFilters []filter.ValidationFilter
53
54
55 applyCounter int
56 pruneCounter int
57 waitCounter int
58
59 invInfo inventory.Info
60 applyObjs object.UnstructuredSet
61 pruneObjs object.UnstructuredSet
62 }
63
64 type TaskQueue struct {
65 tasks []taskrunner.Task
66 }
67
68 func (tq *TaskQueue) ToChannel() chan taskrunner.Task {
69 taskQueue := make(chan taskrunner.Task, len(tq.tasks))
70 for _, t := range tq.tasks {
71 taskQueue <- t
72 }
73 return taskQueue
74 }
75
76 func (tq *TaskQueue) ToActionGroups() []event.ActionGroup {
77 var ags []event.ActionGroup
78
79 for _, t := range tq.tasks {
80 ags = append(ags, event.ActionGroup{
81 Name: t.Name(),
82 Action: t.Action(),
83 Identifiers: t.Identifiers(),
84 })
85 }
86 return ags
87 }
88
89 type Options struct {
90 ServerSideOptions common.ServerSideOptions
91 ReconcileTimeout time.Duration
92
93
94 Destroy bool
95
96 Prune bool
97 DryRunStrategy common.DryRunStrategy
98 PrunePropagationPolicy metav1.DeletionPropagation
99 PruneTimeout time.Duration
100 InventoryPolicy inventory.Policy
101 }
102
103
104 func (t *TaskQueueBuilder) WithInventory(inv inventory.Info) *TaskQueueBuilder {
105 t.invInfo = inv
106 return t
107 }
108
109
110 func (t *TaskQueueBuilder) WithApplyObjects(applyObjs object.UnstructuredSet) *TaskQueueBuilder {
111 t.applyObjs = applyObjs
112 return t
113 }
114
115
116 func (t *TaskQueueBuilder) WithPruneObjects(pruneObjs object.UnstructuredSet) *TaskQueueBuilder {
117 t.pruneObjs = pruneObjs
118 return t
119 }
120
121
122 func (t *TaskQueueBuilder) Build(taskContext *taskrunner.TaskContext, o Options) *TaskQueue {
123 var tasks []taskrunner.Task
124
125
126 t.applyCounter = 0
127 t.pruneCounter = 0
128 t.waitCounter = 0
129
130
131 applyObjs := t.Collector.FilterInvalidObjects(t.applyObjs)
132 pruneObjs := t.Collector.FilterInvalidObjects(t.pruneObjs)
133
134
135
136
137 allObjs := make(object.UnstructuredSet, 0, len(applyObjs)+len(pruneObjs))
138 allObjs = append(allObjs, applyObjs...)
139 allObjs = append(allObjs, pruneObjs...)
140 g, err := graph.DependencyGraph(allObjs)
141 if err != nil {
142 t.Collector.Collect(err)
143 }
144
145 taskContext.SetGraph(g)
146
147
148 idSetList, err := g.Sort()
149 if err != nil {
150 t.Collector.Collect(err)
151 }
152
153
154 applyObjs = t.Collector.FilterInvalidObjects(applyObjs)
155 pruneObjs = t.Collector.FilterInvalidObjects(pruneObjs)
156
157 if !o.Destroy {
158
159 klog.V(2).Infof("adding inventory add task (%d objects)", len(applyObjs))
160 tasks = append(tasks, &task.InvAddTask{
161 TaskName: "inventory-add-0",
162 InvClient: t.InvClient,
163 InvInfo: t.invInfo,
164 Objects: applyObjs,
165 DryRun: o.DryRunStrategy,
166 })
167 }
168
169 if len(applyObjs) > 0 {
170
171 for _, id := range object.UnstructuredSetToObjMetadataSet(applyObjs) {
172 taskContext.InventoryManager().AddPendingApply(id)
173 }
174
175
176 applySets := graph.HydrateSetList(idSetList, applyObjs)
177
178 for _, applySet := range applySets {
179 tasks = append(tasks,
180 t.newApplyTask(applySet, t.ApplyFilters, t.ApplyMutators, o))
181
182 if !o.DryRunStrategy.ClientOrServerDryRun() {
183 applyIds := object.UnstructuredSetToObjMetadataSet(applySet)
184 tasks = append(tasks,
185 t.newWaitTask(applyIds, taskrunner.AllCurrent, o.ReconcileTimeout))
186 }
187 }
188 }
189
190 if o.Prune && len(pruneObjs) > 0 {
191
192 for _, id := range object.UnstructuredSetToObjMetadataSet(pruneObjs) {
193 taskContext.InventoryManager().AddPendingDelete(id)
194 }
195
196
197 pruneSets := graph.HydrateSetList(idSetList, pruneObjs)
198
199
200 graph.ReverseSetList(pruneSets)
201
202 for _, pruneSet := range pruneSets {
203 tasks = append(tasks,
204 t.newPruneTask(pruneSet, t.PruneFilters, o))
205
206 if !o.DryRunStrategy.ClientOrServerDryRun() {
207 pruneIds := object.UnstructuredSetToObjMetadataSet(pruneSet)
208 tasks = append(tasks,
209 t.newWaitTask(pruneIds, taskrunner.AllNotFound, o.PruneTimeout))
210 }
211 }
212 }
213
214
215 if !o.Destroy {
216 klog.V(2).Infoln("adding inventory set task")
217 prevInvIds, _ := t.InvClient.GetClusterObjs(t.invInfo)
218 tasks = append(tasks, &task.InvSetTask{
219 TaskName: "inventory-set-0",
220 InvClient: t.InvClient,
221 InvInfo: t.invInfo,
222 PrevInventory: prevInvIds,
223 DryRun: o.DryRunStrategy,
224 })
225 } else {
226 klog.V(2).Infoln("adding delete inventory task")
227 tasks = append(tasks, &task.DeleteInvTask{
228 TaskName: "delete-inventory-0",
229 InvClient: t.InvClient,
230 InvInfo: t.invInfo,
231 DryRun: o.DryRunStrategy,
232 })
233 }
234
235 return &TaskQueue{tasks: tasks}
236 }
237
238
239
240 func (t *TaskQueueBuilder) newApplyTask(applyObjs object.UnstructuredSet,
241 applyFilters []filter.ValidationFilter, applyMutators []mutator.Interface, o Options) taskrunner.Task {
242 applyObjs = t.Collector.FilterInvalidObjects(applyObjs)
243 klog.V(2).Infof("adding apply task (%d objects)", len(applyObjs))
244 task := &task.ApplyTask{
245 TaskName: fmt.Sprintf("apply-%d", t.applyCounter),
246 Objects: applyObjs,
247 Filters: applyFilters,
248 Mutators: applyMutators,
249 ServerSideOptions: o.ServerSideOptions,
250 DryRunStrategy: o.DryRunStrategy,
251 DynamicClient: t.DynamicClient,
252 OpenAPIGetter: t.OpenAPIGetter,
253 InfoHelper: t.InfoHelper,
254 Mapper: t.Mapper,
255 }
256 t.applyCounter++
257 return task
258 }
259
260
261
262 func (t *TaskQueueBuilder) newWaitTask(waitIds object.ObjMetadataSet, condition taskrunner.Condition,
263 waitTimeout time.Duration) taskrunner.Task {
264 waitIds = t.Collector.FilterInvalidIds(waitIds)
265 klog.V(2).Infoln("adding wait task")
266 task := taskrunner.NewWaitTask(
267 fmt.Sprintf("wait-%d", t.waitCounter),
268 waitIds,
269 condition,
270 waitTimeout,
271 t.Mapper,
272 )
273 t.waitCounter++
274 return task
275 }
276
277
278
279 func (t *TaskQueueBuilder) newPruneTask(pruneObjs object.UnstructuredSet,
280 pruneFilters []filter.ValidationFilter, o Options) taskrunner.Task {
281 pruneObjs = t.Collector.FilterInvalidObjects(pruneObjs)
282 klog.V(2).Infof("adding prune task (%d objects)", len(pruneObjs))
283 task := &task.PruneTask{
284 TaskName: fmt.Sprintf("prune-%d", t.pruneCounter),
285 Objects: pruneObjs,
286 Filters: pruneFilters,
287 Pruner: t.Pruner,
288 PropagationPolicy: o.PrunePropagationPolicy,
289 DryRunStrategy: o.DryRunStrategy,
290 Destroy: o.Destroy,
291 }
292 t.pruneCounter++
293 return task
294 }
295
View as plain text