1
2
3
4 package task
5
6 import (
7 "context"
8 "errors"
9 "fmt"
10 "io"
11 "strings"
12
13 "k8s.io/apimachinery/pkg/api/meta"
14 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
15 "k8s.io/apimachinery/pkg/util/sets"
16 "k8s.io/cli-runtime/pkg/genericclioptions"
17 "k8s.io/cli-runtime/pkg/resource"
18 "k8s.io/client-go/discovery"
19 "k8s.io/client-go/dynamic"
20 "k8s.io/klog/v2"
21 "k8s.io/kubectl/pkg/cmd/apply"
22 cmddelete "k8s.io/kubectl/pkg/cmd/delete"
23 applyerror "sigs.k8s.io/cli-utils/pkg/apply/error"
24 "sigs.k8s.io/cli-utils/pkg/apply/event"
25 "sigs.k8s.io/cli-utils/pkg/apply/filter"
26 "sigs.k8s.io/cli-utils/pkg/apply/info"
27 "sigs.k8s.io/cli-utils/pkg/apply/mutator"
28 "sigs.k8s.io/cli-utils/pkg/apply/taskrunner"
29 "sigs.k8s.io/cli-utils/pkg/common"
30 "sigs.k8s.io/cli-utils/pkg/object"
31 )
32
33
34
35 type applyOptions interface {
36
37
38
39 Run() error
40
41
42
43 SetObjects([]*resource.Info)
44 }
45
46
47
48 type ApplyTask struct {
49 TaskName string
50
51 DynamicClient dynamic.Interface
52 OpenAPIGetter discovery.OpenAPISchemaInterface
53 InfoHelper info.Helper
54 Mapper meta.RESTMapper
55 Objects object.UnstructuredSet
56 Filters []filter.ValidationFilter
57 Mutators []mutator.Interface
58 DryRunStrategy common.DryRunStrategy
59 ServerSideOptions common.ServerSideOptions
60 }
61
62
63
64 var applyOptionsFactoryFunc = newApplyOptions
65
66 func (a *ApplyTask) Name() string {
67 return a.TaskName
68 }
69
70 func (a *ApplyTask) Action() event.ResourceAction {
71 return event.ApplyAction
72 }
73
74 func (a *ApplyTask) Identifiers() object.ObjMetadataSet {
75 return object.UnstructuredSetToObjMetadataSet(a.Objects)
76 }
77
78
79
80
81
82
83
84
85
86 func (a *ApplyTask) Start(taskContext *taskrunner.TaskContext) {
87 go func() {
88
89 ctx := context.TODO()
90 objects := a.Objects
91 klog.V(2).Infof("apply task starting (name: %q, objects: %d)",
92 a.Name(), len(objects))
93 for _, obj := range objects {
94
95
96 info, err := a.InfoHelper.BuildInfo(obj)
97
98
99 obj = info.Object.(*unstructured.Unstructured)
100 id := object.UnstructuredToObjMetadata(obj)
101 if err != nil {
102 err = applyerror.NewUnknownTypeError(err)
103 if klog.V(4).Enabled() {
104
105 klog.Errorf("apply task errored (object: %s): unable to convert obj to info: %v", id, err)
106 }
107 taskContext.SendEvent(a.createApplyFailedEvent(id, err))
108 taskContext.InventoryManager().AddFailedApply(id)
109 continue
110 }
111
112
113 var filterErr error
114 for _, applyFilter := range a.Filters {
115 klog.V(6).Infof("apply filter evaluating (filter: %s, object: %s)", applyFilter.Name(), id)
116 filterErr = applyFilter.Filter(obj)
117 if filterErr != nil {
118 var fatalErr *filter.FatalError
119 if errors.As(filterErr, &fatalErr) {
120 if klog.V(4).Enabled() {
121
122 klog.Errorf("apply filter errored (filter: %s, object: %s): %v", applyFilter.Name(), id, fatalErr.Err)
123 }
124 taskContext.SendEvent(a.createApplyFailedEvent(id, err))
125 taskContext.InventoryManager().AddFailedApply(id)
126 break
127 }
128 klog.V(4).Infof("apply filtered (filter: %s, object: %s): %v", applyFilter.Name(), id, filterErr)
129 taskContext.SendEvent(a.createApplySkippedEvent(id, obj, filterErr))
130 taskContext.InventoryManager().AddSkippedApply(id)
131 break
132 }
133 }
134 if filterErr != nil {
135 continue
136 }
137
138
139 err = a.mutate(ctx, obj)
140 if err != nil {
141 if klog.V(4).Enabled() {
142
143 klog.Errorf("apply mutation errored (object: %s): %v", id, err)
144 }
145 taskContext.SendEvent(a.createApplyFailedEvent(id, err))
146 taskContext.InventoryManager().AddFailedApply(id)
147 continue
148 }
149
150
151
152 ao := applyOptionsFactoryFunc(a.Name(), taskContext.EventChannel(),
153 a.ServerSideOptions, a.DryRunStrategy, a.DynamicClient, a.OpenAPIGetter)
154 ao.SetObjects([]*resource.Info{info})
155 klog.V(5).Infof("applying object: %v", id)
156 err = ao.Run()
157 if err != nil && a.ServerSideOptions.ServerSideApply && isAPIService(obj) && isStreamError(err) {
158
159
160
161 err = a.clientSideApply(info, taskContext.EventChannel())
162 }
163 if err != nil {
164 err = applyerror.NewApplyRunError(err)
165 if klog.V(4).Enabled() {
166
167 klog.Errorf("apply errored (object: %s): %v", id, err)
168 }
169 taskContext.SendEvent(a.createApplyFailedEvent(id, err))
170 taskContext.InventoryManager().AddFailedApply(id)
171 } else if info.Object != nil {
172 acc, err := meta.Accessor(info.Object)
173 if err == nil {
174 uid := acc.GetUID()
175 gen := acc.GetGeneration()
176 taskContext.InventoryManager().AddSuccessfulApply(id, uid, gen)
177 }
178 }
179 }
180 a.sendTaskResult(taskContext)
181 }()
182 }
183
184 func newApplyOptions(taskName string, eventChannel chan<- event.Event, serverSideOptions common.ServerSideOptions,
185 strategy common.DryRunStrategy, dynamicClient dynamic.Interface,
186 openAPIGetter discovery.OpenAPISchemaInterface) applyOptions {
187 emptyString := ""
188 return &apply.ApplyOptions{
189 VisitedNamespaces: sets.NewString(),
190 VisitedUids: sets.NewString(),
191 Overwrite: true,
192 OpenAPIPatch: true,
193 Recorder: genericclioptions.NoopRecorder{},
194 IOStreams: genericclioptions.IOStreams{
195 Out: io.Discard,
196 ErrOut: io.Discard,
197
198
199 },
200
201
202 DeleteOptions: &cmddelete.DeleteOptions{},
203 PrintFlags: &genericclioptions.PrintFlags{
204 OutputFormat: &emptyString,
205 },
206
207 ServerSideApply: strategy.ServerDryRun() || serverSideOptions.ServerSideApply,
208 ForceConflicts: serverSideOptions.ForceConflicts,
209 FieldManager: serverSideOptions.FieldManager,
210 DryRunStrategy: strategy.Strategy(),
211 ToPrinter: (&KubectlPrinterAdapter{
212 ch: eventChannel,
213 groupName: taskName,
214 }).toPrinterFunc(),
215 DynamicClient: dynamicClient,
216 DryRunVerifier: resource.NewQueryParamVerifier(dynamicClient, openAPIGetter, resource.QueryParamDryRun),
217 }
218 }
219
220 func (a *ApplyTask) sendTaskResult(taskContext *taskrunner.TaskContext) {
221 klog.V(2).Infof("apply task completing (name: %q)", a.Name())
222 taskContext.TaskChannel() <- taskrunner.TaskResult{}
223 }
224
225
226 func (a *ApplyTask) Cancel(_ *taskrunner.TaskContext) {}
227
228
229 func (a *ApplyTask) StatusUpdate(_ *taskrunner.TaskContext, _ object.ObjMetadata) {}
230
231
232 func (a *ApplyTask) mutate(ctx context.Context, obj *unstructured.Unstructured) error {
233 id := object.UnstructuredToObjMetadata(obj)
234 for _, mutator := range a.Mutators {
235 klog.V(6).Infof("apply mutator %s: %s", mutator.Name(), id)
236 mutated, reason, err := mutator.Mutate(ctx, obj)
237 if err != nil {
238 return fmt.Errorf("failed to mutate %q with %q: %w", id, mutator.Name(), err)
239 }
240 if mutated {
241 klog.V(4).Infof("resource mutated (mutator: %q, resource: %q, reason: %q)", mutator.Name(), id, reason)
242 }
243 }
244 return nil
245 }
246
247 func (a *ApplyTask) createApplyFailedEvent(id object.ObjMetadata, err error) event.Event {
248 return event.Event{
249 Type: event.ApplyType,
250 ApplyEvent: event.ApplyEvent{
251 GroupName: a.Name(),
252 Identifier: id,
253 Status: event.ApplyFailed,
254 Error: err,
255 },
256 }
257 }
258
259 func (a *ApplyTask) createApplySkippedEvent(id object.ObjMetadata, resource *unstructured.Unstructured, err error) event.Event {
260 return event.Event{
261 Type: event.ApplyType,
262 ApplyEvent: event.ApplyEvent{
263 GroupName: a.Name(),
264 Identifier: id,
265 Status: event.ApplySkipped,
266 Resource: resource,
267 Error: err,
268 },
269 }
270 }
271
272 func isAPIService(obj *unstructured.Unstructured) bool {
273 gk := obj.GroupVersionKind().GroupKind()
274 return gk.Group == "apiregistration.k8s.io" && gk.Kind == "APIService"
275 }
276
277
278
279 func isStreamError(err error) bool {
280 return strings.Contains(err.Error(), "stream error: stream ID ")
281 }
282
283 func (a *ApplyTask) clientSideApply(info *resource.Info, eventChannel chan<- event.Event) error {
284 ao := applyOptionsFactoryFunc(a.Name(), eventChannel, common.ServerSideOptions{ServerSideApply: false}, a.DryRunStrategy, a.DynamicClient, a.OpenAPIGetter)
285 ao.SetObjects([]*resource.Info{info})
286 return ao.Run()
287 }
288
View as plain text