1
2
3
4 package apply
5
6 import (
7 "context"
8 "fmt"
9 "time"
10
11 "k8s.io/apimachinery/pkg/api/meta"
12 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
13 "k8s.io/apimachinery/pkg/util/sets"
14 "k8s.io/client-go/discovery"
15 "k8s.io/client-go/dynamic"
16 "k8s.io/klog/v2"
17 "sigs.k8s.io/cli-utils/pkg/apis/actuation"
18 "sigs.k8s.io/cli-utils/pkg/apply/cache"
19 "sigs.k8s.io/cli-utils/pkg/apply/event"
20 "sigs.k8s.io/cli-utils/pkg/apply/filter"
21 "sigs.k8s.io/cli-utils/pkg/apply/info"
22 "sigs.k8s.io/cli-utils/pkg/apply/mutator"
23 "sigs.k8s.io/cli-utils/pkg/apply/prune"
24 "sigs.k8s.io/cli-utils/pkg/apply/solver"
25 "sigs.k8s.io/cli-utils/pkg/apply/taskrunner"
26 "sigs.k8s.io/cli-utils/pkg/common"
27 "sigs.k8s.io/cli-utils/pkg/inventory"
28 "sigs.k8s.io/cli-utils/pkg/kstatus/watcher"
29 "sigs.k8s.io/cli-utils/pkg/object"
30 "sigs.k8s.io/cli-utils/pkg/object/validation"
31 )
32
33
34
35
36
37
38
39
40
41
42
43 type Applier struct {
44 pruner *prune.Pruner
45 statusWatcher watcher.StatusWatcher
46 invClient inventory.Client
47 client dynamic.Interface
48 openAPIGetter discovery.OpenAPISchemaInterface
49 mapper meta.RESTMapper
50 infoHelper info.Helper
51 }
52
53
54
55 func (a *Applier) prepareObjects(localInv inventory.Info, localObjs object.UnstructuredSet,
56 o ApplierOptions) (object.UnstructuredSet, object.UnstructuredSet, error) {
57 if localInv == nil {
58 return nil, nil, fmt.Errorf("the local inventory can't be nil")
59 }
60 if err := inventory.ValidateNoInventory(localObjs); err != nil {
61 return nil, nil, err
62 }
63
64 for _, localObj := range localObjs {
65 inventory.AddInventoryIDAnnotation(localObj, localInv)
66 }
67
68
69
70
71 if localInv.Strategy() == inventory.NameStrategy && localInv.ID() != "" {
72 prevInvObjs, err := a.invClient.GetClusterInventoryObjs(localInv)
73 if err != nil {
74 return nil, nil, err
75 }
76 if len(prevInvObjs) > 1 {
77 panic(fmt.Errorf("found %d inv objects with Name strategy", len(prevInvObjs)))
78 }
79 if len(prevInvObjs) == 1 {
80 invObj := prevInvObjs[0]
81 val := invObj.GetLabels()[common.InventoryLabel]
82 if val != localInv.ID() {
83 return nil, nil, fmt.Errorf("inventory-id of inventory object in cluster doesn't match provided id %q", localInv.ID())
84 }
85 }
86 }
87 pruneObjs, err := a.pruner.GetPruneObjs(localInv, localObjs, prune.Options{
88 DryRunStrategy: o.DryRunStrategy,
89 })
90 if err != nil {
91 return nil, nil, err
92 }
93 return localObjs, pruneObjs, nil
94 }
95
96
97
98
99
100
101
102
103
104 func (a *Applier) Run(ctx context.Context, invInfo inventory.Info, objects object.UnstructuredSet, options ApplierOptions) <-chan event.Event {
105 klog.V(4).Infof("apply run for %d objects", len(objects))
106 eventChannel := make(chan event.Event)
107 setDefaults(&options)
108 go func() {
109 defer close(eventChannel)
110
111
112 vCollector := &validation.Collector{}
113 validator := &validation.Validator{
114 Collector: vCollector,
115 Mapper: a.mapper,
116 }
117 validator.Validate(objects)
118
119
120 applyObjs, pruneObjs, err := a.prepareObjects(invInfo, objects, options)
121 if err != nil {
122 handleError(eventChannel, err)
123 return
124 }
125 klog.V(4).Infof("calculated %d apply objs; %d prune objs", len(applyObjs), len(pruneObjs))
126
127
128 resourceCache := cache.NewResourceCacheMap()
129 taskContext := taskrunner.NewTaskContext(eventChannel, resourceCache)
130
131
132 klog.V(4).Infoln("applier building task queue...")
133
134 applyFilters := []filter.ValidationFilter{
135 filter.InventoryPolicyApplyFilter{
136 Client: a.client,
137 Mapper: a.mapper,
138 Inv: invInfo,
139 InvPolicy: options.InventoryPolicy,
140 },
141 filter.DependencyFilter{
142 TaskContext: taskContext,
143 ActuationStrategy: actuation.ActuationStrategyApply,
144 DryRunStrategy: options.DryRunStrategy,
145 },
146 }
147
148 pruneFilters := []filter.ValidationFilter{
149 filter.PreventRemoveFilter{},
150 filter.InventoryPolicyPruneFilter{
151 Inv: invInfo,
152 InvPolicy: options.InventoryPolicy,
153 },
154 filter.LocalNamespacesFilter{
155 LocalNamespaces: localNamespaces(invInfo, object.UnstructuredSetToObjMetadataSet(objects)),
156 },
157 filter.DependencyFilter{
158 TaskContext: taskContext,
159 ActuationStrategy: actuation.ActuationStrategyDelete,
160 DryRunStrategy: options.DryRunStrategy,
161 },
162 }
163
164 applyMutators := []mutator.Interface{
165 &mutator.ApplyTimeMutator{
166 Client: a.client,
167 Mapper: a.mapper,
168 ResourceCache: resourceCache,
169 },
170 }
171 taskBuilder := &solver.TaskQueueBuilder{
172 Pruner: a.pruner,
173 DynamicClient: a.client,
174 OpenAPIGetter: a.openAPIGetter,
175 InfoHelper: a.infoHelper,
176 Mapper: a.mapper,
177 InvClient: a.invClient,
178 Collector: vCollector,
179 ApplyFilters: applyFilters,
180 ApplyMutators: applyMutators,
181 PruneFilters: pruneFilters,
182 }
183 opts := solver.Options{
184 ServerSideOptions: options.ServerSideOptions,
185 ReconcileTimeout: options.ReconcileTimeout,
186 Destroy: false,
187 Prune: !options.NoPrune,
188 DryRunStrategy: options.DryRunStrategy,
189 PrunePropagationPolicy: options.PrunePropagationPolicy,
190 PruneTimeout: options.PruneTimeout,
191 InventoryPolicy: options.InventoryPolicy,
192 }
193
194
195 taskQueue := taskBuilder.
196 WithApplyObjects(applyObjs).
197 WithPruneObjects(pruneObjs).
198 WithInventory(invInfo).
199 Build(taskContext, opts)
200
201 klog.V(4).Infof("validation errors: %d", len(vCollector.Errors))
202 klog.V(4).Infof("invalid objects: %d", len(vCollector.InvalidIds))
203
204
205 switch options.ValidationPolicy {
206 case validation.ExitEarly:
207 err = vCollector.ToError()
208 if err != nil {
209 handleError(eventChannel, err)
210 return
211 }
212 case validation.SkipInvalid:
213 for _, err := range vCollector.Errors {
214 handleValidationError(eventChannel, err)
215 }
216 default:
217 handleError(eventChannel, fmt.Errorf("invalid ValidationPolicy: %q", options.ValidationPolicy))
218 return
219 }
220
221
222 for _, id := range vCollector.InvalidIds {
223 taskContext.AddInvalidObject(id)
224 }
225
226
227
228 eventChannel <- event.Event{
229 Type: event.InitType,
230 InitEvent: event.InitEvent{
231 ActionGroups: taskQueue.ToActionGroups(),
232 },
233 }
234
235 klog.V(4).Infoln("applier building TaskStatusRunner...")
236 allIds := object.UnstructuredSetToObjMetadataSet(append(applyObjs, pruneObjs...))
237 statusWatcher := a.statusWatcher
238
239 if opts.DryRunStrategy.ClientOrServerDryRun() {
240 statusWatcher = watcher.BlindStatusWatcher{}
241 }
242 runner := taskrunner.NewTaskStatusRunner(allIds, statusWatcher)
243 klog.V(4).Infoln("applier running TaskStatusRunner...")
244 err = runner.Run(ctx, taskContext, taskQueue.ToChannel(), taskrunner.Options{
245 EmitStatusEvents: options.EmitStatusEvents,
246 })
247 if err != nil {
248 handleError(eventChannel, err)
249 return
250 }
251 }()
252 return eventChannel
253 }
254
255 type ApplierOptions struct {
256
257 ServerSideOptions common.ServerSideOptions
258
259
260
261
262 ReconcileTimeout time.Duration
263
264
265
266 EmitStatusEvents bool
267
268
269
270 NoPrune bool
271
272
273
274 DryRunStrategy common.DryRunStrategy
275
276
277
278
279 PrunePropagationPolicy metav1.DeletionPropagation
280
281
282
283
284 PruneTimeout time.Duration
285
286
287 InventoryPolicy inventory.Policy
288
289
290 ValidationPolicy validation.Policy
291 }
292
293
294
295 func setDefaults(o *ApplierOptions) {
296 if o.PrunePropagationPolicy == "" {
297 o.PrunePropagationPolicy = metav1.DeletePropagationBackground
298 }
299 }
300
301 func handleError(eventChannel chan event.Event, err error) {
302 eventChannel <- event.Event{
303 Type: event.ErrorType,
304 ErrorEvent: event.ErrorEvent{
305 Err: err,
306 },
307 }
308 }
309
310
311
312
313
314 func localNamespaces(localInv inventory.Info, localObjs []object.ObjMetadata) sets.String {
315 namespaces := sets.NewString()
316 for _, obj := range localObjs {
317 if obj.Namespace != "" {
318 namespaces.Insert(obj.Namespace)
319 }
320 }
321 invNamespace := localInv.Namespace()
322 if invNamespace != "" {
323 namespaces.Insert(invNamespace)
324 }
325 return namespaces
326 }
327
328 func handleValidationError(eventChannel chan<- event.Event, err error) {
329 switch tErr := err.(type) {
330 case *validation.Error:
331
332 eventChannel <- event.Event{
333 Type: event.ValidationType,
334 ValidationEvent: event.ValidationEvent{
335 Identifiers: tErr.Identifiers(),
336 Error: tErr,
337 },
338 }
339 default:
340
341 eventChannel <- event.Event{
342 Type: event.ValidationType,
343 ValidationEvent: event.ValidationEvent{
344 Error: tErr,
345 },
346 }
347 }
348 }
349
View as plain text