1
16
17 package controller
18
19 import (
20 "context"
21 "errors"
22 "fmt"
23 "sync"
24 "time"
25
26 "github.com/go-logr/logr"
27 "k8s.io/apimachinery/pkg/types"
28 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
29 "k8s.io/apimachinery/pkg/util/uuid"
30 "k8s.io/client-go/util/workqueue"
31
32 ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics"
33 logf "sigs.k8s.io/controller-runtime/pkg/log"
34 "sigs.k8s.io/controller-runtime/pkg/ratelimiter"
35 "sigs.k8s.io/controller-runtime/pkg/reconcile"
36 "sigs.k8s.io/controller-runtime/pkg/source"
37 )
38
39
40 type Controller struct {
41
42 Name string
43
44
45 MaxConcurrentReconciles int
46
47
48
49
50 Do reconcile.Reconciler
51
52
53 RateLimiter ratelimiter.RateLimiter
54
55
56
57
58 NewQueue func(controllerName string, rateLimiter ratelimiter.RateLimiter) workqueue.RateLimitingInterface
59
60
61
62 Queue workqueue.RateLimitingInterface
63
64
65 mu sync.Mutex
66
67
68 Started bool
69
70
71
72
73
74
75 ctx context.Context
76
77
78
79 CacheSyncTimeout time.Duration
80
81
82 startWatches []source.Source
83
84
85
86
87
88 LogConstructor func(request *reconcile.Request) logr.Logger
89
90
91 RecoverPanic *bool
92
93
94 LeaderElected *bool
95 }
96
97
98 func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (_ reconcile.Result, err error) {
99 defer func() {
100 if r := recover(); r != nil {
101 if c.RecoverPanic != nil && *c.RecoverPanic {
102 for _, fn := range utilruntime.PanicHandlers {
103 fn(r)
104 }
105 err = fmt.Errorf("panic: %v [recovered]", r)
106 return
107 }
108
109 log := logf.FromContext(ctx)
110 log.Info(fmt.Sprintf("Observed a panic in reconciler: %v", r))
111 panic(r)
112 }
113 }()
114 return c.Do.Reconcile(ctx, req)
115 }
116
117
118 func (c *Controller) Watch(src source.Source) error {
119 c.mu.Lock()
120 defer c.mu.Unlock()
121
122
123
124
125 if !c.Started {
126 c.startWatches = append(c.startWatches, src)
127 return nil
128 }
129
130 c.LogConstructor(nil).Info("Starting EventSource", "source", src)
131 return src.Start(c.ctx, c.Queue)
132 }
133
134
135 func (c *Controller) NeedLeaderElection() bool {
136 if c.LeaderElected == nil {
137 return true
138 }
139 return *c.LeaderElected
140 }
141
142
143 func (c *Controller) Start(ctx context.Context) error {
144
145
146 c.mu.Lock()
147 if c.Started {
148 return errors.New("controller was started more than once. This is likely to be caused by being added to a manager multiple times")
149 }
150
151 c.initMetrics()
152
153
154 c.ctx = ctx
155
156 c.Queue = c.NewQueue(c.Name, c.RateLimiter)
157 go func() {
158 <-ctx.Done()
159 c.Queue.ShutDown()
160 }()
161
162 wg := &sync.WaitGroup{}
163 err := func() error {
164 defer c.mu.Unlock()
165
166
167 defer utilruntime.HandleCrash()
168
169
170
171
172 for _, watch := range c.startWatches {
173 c.LogConstructor(nil).Info("Starting EventSource", "source", fmt.Sprintf("%s", watch))
174
175 if err := watch.Start(ctx, c.Queue); err != nil {
176 return err
177 }
178 }
179
180
181 c.LogConstructor(nil).Info("Starting Controller")
182
183 for _, watch := range c.startWatches {
184 syncingSource, ok := watch.(source.SyncingSource)
185 if !ok {
186 continue
187 }
188
189 if err := func() error {
190
191 sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout)
192 defer cancel()
193
194
195
196 if err := syncingSource.WaitForSync(sourceStartCtx); err != nil {
197 err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err)
198 c.LogConstructor(nil).Error(err, "Could not wait for Cache to sync")
199 return err
200 }
201
202 return nil
203 }(); err != nil {
204 return err
205 }
206 }
207
208
209
210
211
212 c.startWatches = nil
213
214
215 c.LogConstructor(nil).Info("Starting workers", "worker count", c.MaxConcurrentReconciles)
216 wg.Add(c.MaxConcurrentReconciles)
217 for i := 0; i < c.MaxConcurrentReconciles; i++ {
218 go func() {
219 defer wg.Done()
220
221
222 for c.processNextWorkItem(ctx) {
223 }
224 }()
225 }
226
227 c.Started = true
228 return nil
229 }()
230 if err != nil {
231 return err
232 }
233
234 <-ctx.Done()
235 c.LogConstructor(nil).Info("Shutdown signal received, waiting for all workers to finish")
236 wg.Wait()
237 c.LogConstructor(nil).Info("All workers finished")
238 return nil
239 }
240
241
242
243 func (c *Controller) processNextWorkItem(ctx context.Context) bool {
244 obj, shutdown := c.Queue.Get()
245 if shutdown {
246
247 return false
248 }
249
250
251
252
253
254
255
256 defer c.Queue.Done(obj)
257
258 ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(1)
259 defer ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(-1)
260
261 c.reconcileHandler(ctx, obj)
262 return true
263 }
264
265 const (
266 labelError = "error"
267 labelRequeueAfter = "requeue_after"
268 labelRequeue = "requeue"
269 labelSuccess = "success"
270 )
271
272 func (c *Controller) initMetrics() {
273 ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Set(0)
274 ctrlmetrics.ReconcileErrors.WithLabelValues(c.Name).Add(0)
275 ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelError).Add(0)
276 ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeueAfter).Add(0)
277 ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeue).Add(0)
278 ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelSuccess).Add(0)
279 ctrlmetrics.WorkerCount.WithLabelValues(c.Name).Set(float64(c.MaxConcurrentReconciles))
280 }
281
282 func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) {
283
284 reconcileStartTS := time.Now()
285 defer func() {
286 c.updateMetrics(time.Since(reconcileStartTS))
287 }()
288
289
290 req, ok := obj.(reconcile.Request)
291 if !ok {
292
293
294
295 c.Queue.Forget(obj)
296 c.LogConstructor(nil).Error(nil, "Queue item was not a Request", "type", fmt.Sprintf("%T", obj), "value", obj)
297
298 return
299 }
300
301 log := c.LogConstructor(&req)
302 reconcileID := uuid.NewUUID()
303
304 log = log.WithValues("reconcileID", reconcileID)
305 ctx = logf.IntoContext(ctx, log)
306 ctx = addReconcileID(ctx, reconcileID)
307
308
309
310 log.V(5).Info("Reconciling")
311 result, err := c.Reconcile(ctx, req)
312 switch {
313 case err != nil:
314 if errors.Is(err, reconcile.TerminalError(nil)) {
315 ctrlmetrics.TerminalReconcileErrors.WithLabelValues(c.Name).Inc()
316 } else {
317 c.Queue.AddRateLimited(req)
318 }
319 ctrlmetrics.ReconcileErrors.WithLabelValues(c.Name).Inc()
320 ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelError).Inc()
321 if !result.IsZero() {
322 log.Info("Warning: Reconciler returned both a non-zero result and a non-nil error. The result will always be ignored if the error is non-nil and the non-nil error causes reqeueuing with exponential backoff. For more details, see: https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/reconcile#Reconciler")
323 }
324 log.Error(err, "Reconciler error")
325 case result.RequeueAfter > 0:
326 log.V(5).Info(fmt.Sprintf("Reconcile done, requeueing after %s", result.RequeueAfter))
327
328
329
330
331 c.Queue.Forget(obj)
332 c.Queue.AddAfter(req, result.RequeueAfter)
333 ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeueAfter).Inc()
334 case result.Requeue:
335 log.V(5).Info("Reconcile done, requeueing")
336 c.Queue.AddRateLimited(req)
337 ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeue).Inc()
338 default:
339 log.V(5).Info("Reconcile successful")
340
341
342 c.Queue.Forget(obj)
343 ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelSuccess).Inc()
344 }
345 }
346
347
348 func (c *Controller) GetLogger() logr.Logger {
349 return c.LogConstructor(nil)
350 }
351
352
353 func (c *Controller) updateMetrics(reconcileTime time.Duration) {
354 ctrlmetrics.ReconcileTime.WithLabelValues(c.Name).Observe(reconcileTime.Seconds())
355 }
356
357
358 func ReconcileIDFromContext(ctx context.Context) types.UID {
359 r, ok := ctx.Value(reconcileIDKey{}).(types.UID)
360 if !ok {
361 return ""
362 }
363
364 return r
365 }
366
367
368
369 type reconcileIDKey struct{}
370
371 func addReconcileID(ctx context.Context, reconcileID types.UID) context.Context {
372 return context.WithValue(ctx, reconcileIDKey{}, reconcileID)
373 }
374
View as plain text