1
16
17 package builder
18
19 import (
20 "errors"
21 "fmt"
22 "strings"
23
24 "github.com/go-logr/logr"
25 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26 "k8s.io/apimachinery/pkg/runtime/schema"
27 "k8s.io/klog/v2"
28
29 "sigs.k8s.io/controller-runtime/pkg/client"
30 "sigs.k8s.io/controller-runtime/pkg/client/apiutil"
31 "sigs.k8s.io/controller-runtime/pkg/controller"
32 "sigs.k8s.io/controller-runtime/pkg/handler"
33 "sigs.k8s.io/controller-runtime/pkg/manager"
34 "sigs.k8s.io/controller-runtime/pkg/predicate"
35 "sigs.k8s.io/controller-runtime/pkg/reconcile"
36 "sigs.k8s.io/controller-runtime/pkg/source"
37 )
38
39
40 var newController = controller.New
41 var getGvk = apiutil.GVKForObject
42
43
44
45 type objectProjection int
46
47 const (
48
49 projectAsNormal objectProjection = iota
50
51 projectAsMetadata
52 )
53
54
55 type Builder struct {
56 forInput ForInput
57 ownsInput []OwnsInput
58 rawSources []source.Source
59 watchesInput []WatchesInput
60 mgr manager.Manager
61 globalPredicates []predicate.Predicate
62 ctrl controller.Controller
63 ctrlOptions controller.Options
64 name string
65 }
66
67
68 func ControllerManagedBy(m manager.Manager) *Builder {
69 return &Builder{mgr: m}
70 }
71
72
73 type ForInput struct {
74 object client.Object
75 predicates []predicate.Predicate
76 objectProjection objectProjection
77 err error
78 }
79
80
81
82
83
84 func (blder *Builder) For(object client.Object, opts ...ForOption) *Builder {
85 if blder.forInput.object != nil {
86 blder.forInput.err = fmt.Errorf("For(...) should only be called once, could not assign multiple objects for reconciliation")
87 return blder
88 }
89 input := ForInput{object: object}
90 for _, opt := range opts {
91 opt.ApplyToFor(&input)
92 }
93
94 blder.forInput = input
95 return blder
96 }
97
98
99 type OwnsInput struct {
100 matchEveryOwner bool
101 object client.Object
102 predicates []predicate.Predicate
103 objectProjection objectProjection
104 }
105
106
107
108
109
110
111
112
113
114 func (blder *Builder) Owns(object client.Object, opts ...OwnsOption) *Builder {
115 input := OwnsInput{object: object}
116 for _, opt := range opts {
117 opt.ApplyToOwns(&input)
118 }
119
120 blder.ownsInput = append(blder.ownsInput, input)
121 return blder
122 }
123
124
125 type WatchesInput struct {
126 obj client.Object
127 handler handler.EventHandler
128 predicates []predicate.Predicate
129 objectProjection objectProjection
130 }
131
132
133
134
135
136
137 func (blder *Builder) Watches(object client.Object, eventHandler handler.EventHandler, opts ...WatchesOption) *Builder {
138 input := WatchesInput{
139 obj: object,
140 handler: eventHandler,
141 }
142 for _, opt := range opts {
143 opt.ApplyToWatches(&input)
144 }
145
146 blder.watchesInput = append(blder.watchesInput, input)
147
148 return blder
149 }
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178 func (blder *Builder) WatchesMetadata(object client.Object, eventHandler handler.EventHandler, opts ...WatchesOption) *Builder {
179 opts = append(opts, OnlyMetadata)
180 return blder.Watches(object, eventHandler, opts...)
181 }
182
183
184
185
186
187
188
189
190 func (blder *Builder) WatchesRawSource(src source.Source) *Builder {
191 blder.rawSources = append(blder.rawSources, src)
192
193 return blder
194 }
195
196
197
198
199
200 func (blder *Builder) WithEventFilter(p predicate.Predicate) *Builder {
201 blder.globalPredicates = append(blder.globalPredicates, p)
202 return blder
203 }
204
205
206 func (blder *Builder) WithOptions(options controller.Options) *Builder {
207 blder.ctrlOptions = options
208 return blder
209 }
210
211
212 func (blder *Builder) WithLogConstructor(logConstructor func(*reconcile.Request) logr.Logger) *Builder {
213 blder.ctrlOptions.LogConstructor = logConstructor
214 return blder
215 }
216
217
218
219
220
221
222 func (blder *Builder) Named(name string) *Builder {
223 blder.name = name
224 return blder
225 }
226
227
228 func (blder *Builder) Complete(r reconcile.Reconciler) error {
229 _, err := blder.Build(r)
230 return err
231 }
232
233
234 func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, error) {
235 if r == nil {
236 return nil, fmt.Errorf("must provide a non-nil Reconciler")
237 }
238 if blder.mgr == nil {
239 return nil, fmt.Errorf("must provide a non-nil Manager")
240 }
241 if blder.forInput.err != nil {
242 return nil, blder.forInput.err
243 }
244
245
246 if err := blder.doController(r); err != nil {
247 return nil, err
248 }
249
250
251 if err := blder.doWatch(); err != nil {
252 return nil, err
253 }
254
255 return blder.ctrl, nil
256 }
257
258 func (blder *Builder) project(obj client.Object, proj objectProjection) (client.Object, error) {
259 switch proj {
260 case projectAsNormal:
261 return obj, nil
262 case projectAsMetadata:
263 metaObj := &metav1.PartialObjectMetadata{}
264 gvk, err := getGvk(obj, blder.mgr.GetScheme())
265 if err != nil {
266 return nil, fmt.Errorf("unable to determine GVK of %T for a metadata-only watch: %w", obj, err)
267 }
268 metaObj.SetGroupVersionKind(gvk)
269 return metaObj, nil
270 default:
271 panic(fmt.Sprintf("unexpected projection type %v on type %T, should not be possible since this is an internal field", proj, obj))
272 }
273 }
274
275 func (blder *Builder) doWatch() error {
276
277 if blder.forInput.object != nil {
278 obj, err := blder.project(blder.forInput.object, blder.forInput.objectProjection)
279 if err != nil {
280 return err
281 }
282 hdler := &handler.EnqueueRequestForObject{}
283 allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
284 allPredicates = append(allPredicates, blder.forInput.predicates...)
285 src := source.Kind(blder.mgr.GetCache(), obj, hdler, allPredicates...)
286 if err := blder.ctrl.Watch(src); err != nil {
287 return err
288 }
289 }
290
291
292 if len(blder.ownsInput) > 0 && blder.forInput.object == nil {
293 return errors.New("Owns() can only be used together with For()")
294 }
295 for _, own := range blder.ownsInput {
296 obj, err := blder.project(own.object, own.objectProjection)
297 if err != nil {
298 return err
299 }
300 opts := []handler.OwnerOption{}
301 if !own.matchEveryOwner {
302 opts = append(opts, handler.OnlyControllerOwner())
303 }
304 hdler := handler.EnqueueRequestForOwner(
305 blder.mgr.GetScheme(), blder.mgr.GetRESTMapper(),
306 blder.forInput.object,
307 opts...,
308 )
309 allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
310 allPredicates = append(allPredicates, own.predicates...)
311 src := source.Kind(blder.mgr.GetCache(), obj, hdler, allPredicates...)
312 if err := blder.ctrl.Watch(src); err != nil {
313 return err
314 }
315 }
316
317
318 if len(blder.watchesInput) == 0 && blder.forInput.object == nil && len(blder.rawSources) == 0 {
319 return errors.New("there are no watches configured, controller will never get triggered. Use For(), Owns(), Watches() or WatchesRawSource() to set them up")
320 }
321 for _, w := range blder.watchesInput {
322 projected, err := blder.project(w.obj, w.objectProjection)
323 if err != nil {
324 return fmt.Errorf("failed to project for %T: %w", w.obj, err)
325 }
326 allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
327 allPredicates = append(allPredicates, w.predicates...)
328 if err := blder.ctrl.Watch(source.Kind(blder.mgr.GetCache(), projected, w.handler, allPredicates...)); err != nil {
329 return err
330 }
331 }
332 for _, src := range blder.rawSources {
333 if err := blder.ctrl.Watch(src); err != nil {
334 return err
335 }
336 }
337 return nil
338 }
339
340 func (blder *Builder) getControllerName(gvk schema.GroupVersionKind, hasGVK bool) (string, error) {
341 if blder.name != "" {
342 return blder.name, nil
343 }
344 if !hasGVK {
345 return "", errors.New("one of For() or Named() must be called")
346 }
347 return strings.ToLower(gvk.Kind), nil
348 }
349
350 func (blder *Builder) doController(r reconcile.Reconciler) error {
351 globalOpts := blder.mgr.GetControllerOptions()
352
353 ctrlOptions := blder.ctrlOptions
354 if ctrlOptions.Reconciler != nil && r != nil {
355 return errors.New("reconciler was set via WithOptions() and via Build() or Complete()")
356 }
357 if ctrlOptions.Reconciler == nil {
358 ctrlOptions.Reconciler = r
359 }
360
361
362
363 var gvk schema.GroupVersionKind
364 hasGVK := blder.forInput.object != nil
365 if hasGVK {
366 var err error
367 gvk, err = getGvk(blder.forInput.object, blder.mgr.GetScheme())
368 if err != nil {
369 return err
370 }
371 }
372
373
374 if ctrlOptions.MaxConcurrentReconciles == 0 && hasGVK {
375 groupKind := gvk.GroupKind().String()
376
377 if concurrency, ok := globalOpts.GroupKindConcurrency[groupKind]; ok && concurrency > 0 {
378 ctrlOptions.MaxConcurrentReconciles = concurrency
379 }
380 }
381
382
383 if ctrlOptions.CacheSyncTimeout == 0 && globalOpts.CacheSyncTimeout > 0 {
384 ctrlOptions.CacheSyncTimeout = globalOpts.CacheSyncTimeout
385 }
386
387 controllerName, err := blder.getControllerName(gvk, hasGVK)
388 if err != nil {
389 return err
390 }
391
392
393 if ctrlOptions.LogConstructor == nil {
394 log := blder.mgr.GetLogger().WithValues(
395 "controller", controllerName,
396 )
397 if hasGVK {
398 log = log.WithValues(
399 "controllerGroup", gvk.Group,
400 "controllerKind", gvk.Kind,
401 )
402 }
403
404 ctrlOptions.LogConstructor = func(req *reconcile.Request) logr.Logger {
405 log := log
406 if req != nil {
407 if hasGVK {
408 log = log.WithValues(gvk.Kind, klog.KRef(req.Namespace, req.Name))
409 }
410 log = log.WithValues(
411 "namespace", req.Namespace, "name", req.Name,
412 )
413 }
414 return log
415 }
416 }
417
418
419 blder.ctrl, err = newController(controllerName, blder.mgr, ctrlOptions)
420 return err
421 }
422
View as plain text