1
16
17 package controller
18
19 import (
20 "context"
21 "errors"
22 "fmt"
23 "sync"
24 "time"
25
26 "github.com/go-logr/logr"
27 . "github.com/onsi/ginkgo/v2"
28 . "github.com/onsi/gomega"
29 "github.com/prometheus/client_golang/prometheus"
30 dto "github.com/prometheus/client_model/go"
31 appsv1 "k8s.io/api/apps/v1"
32 corev1 "k8s.io/api/core/v1"
33 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
34 "k8s.io/apimachinery/pkg/types"
35 "k8s.io/client-go/util/workqueue"
36 "k8s.io/utils/ptr"
37 "sigs.k8s.io/controller-runtime/pkg/cache"
38 "sigs.k8s.io/controller-runtime/pkg/cache/informertest"
39 "sigs.k8s.io/controller-runtime/pkg/client"
40 "sigs.k8s.io/controller-runtime/pkg/controller/controllertest"
41 "sigs.k8s.io/controller-runtime/pkg/event"
42 "sigs.k8s.io/controller-runtime/pkg/handler"
43 ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics"
44 "sigs.k8s.io/controller-runtime/pkg/internal/log"
45 "sigs.k8s.io/controller-runtime/pkg/ratelimiter"
46 "sigs.k8s.io/controller-runtime/pkg/reconcile"
47 "sigs.k8s.io/controller-runtime/pkg/source"
48 )
49
50 var _ = Describe("controller", func() {
51 var fakeReconcile *fakeReconciler
52 var ctrl *Controller
53 var queue *controllertest.Queue
54 var reconciled chan reconcile.Request
55 var request = reconcile.Request{
56 NamespacedName: types.NamespacedName{Namespace: "foo", Name: "bar"},
57 }
58
59 BeforeEach(func() {
60 reconciled = make(chan reconcile.Request)
61 fakeReconcile = &fakeReconciler{
62 Requests: reconciled,
63 results: make(chan fakeReconcileResultPair, 10 ),
64 }
65 queue = &controllertest.Queue{
66 Interface: workqueue.New(),
67 }
68 ctrl = &Controller{
69 MaxConcurrentReconciles: 1,
70 Do: fakeReconcile,
71 NewQueue: func(string, ratelimiter.RateLimiter) workqueue.RateLimitingInterface { return queue },
72 LogConstructor: func(_ *reconcile.Request) logr.Logger {
73 return log.RuntimeLog.WithName("controller").WithName("test")
74 },
75 }
76 })
77
78 Describe("Reconciler", func() {
79 It("should call the Reconciler function", func() {
80 ctx, cancel := context.WithCancel(context.Background())
81 defer cancel()
82
83 ctrl.Do = reconcile.Func(func(context.Context, reconcile.Request) (reconcile.Result, error) {
84 return reconcile.Result{Requeue: true}, nil
85 })
86 result, err := ctrl.Reconcile(ctx,
87 reconcile.Request{NamespacedName: types.NamespacedName{Namespace: "foo", Name: "bar"}})
88 Expect(err).NotTo(HaveOccurred())
89 Expect(result).To(Equal(reconcile.Result{Requeue: true}))
90 })
91
92 It("should not recover panic if RecoverPanic is false by default", func() {
93 ctx, cancel := context.WithCancel(context.Background())
94 defer cancel()
95
96 defer func() {
97 Expect(recover()).ShouldNot(BeNil())
98 }()
99 ctrl.Do = reconcile.Func(func(context.Context, reconcile.Request) (reconcile.Result, error) {
100 var res *reconcile.Result
101 return *res, nil
102 })
103 _, _ = ctrl.Reconcile(ctx,
104 reconcile.Request{NamespacedName: types.NamespacedName{Namespace: "foo", Name: "bar"}})
105 })
106
107 It("should recover panic if RecoverPanic is true", func() {
108 ctx, cancel := context.WithCancel(context.Background())
109 defer cancel()
110
111 defer func() {
112 Expect(recover()).To(BeNil())
113 }()
114 ctrl.RecoverPanic = ptr.To(true)
115 ctrl.Do = reconcile.Func(func(context.Context, reconcile.Request) (reconcile.Result, error) {
116 var res *reconcile.Result
117 return *res, nil
118 })
119 _, err := ctrl.Reconcile(ctx,
120 reconcile.Request{NamespacedName: types.NamespacedName{Namespace: "foo", Name: "bar"}})
121 Expect(err).To(HaveOccurred())
122 Expect(err.Error()).To(ContainSubstring("[recovered]"))
123 })
124 })
125
126 Describe("Start", func() {
127 It("should return an error if there is an error waiting for the informers", func() {
128 f := false
129 ctrl.startWatches = []source.Source{
130 source.Kind(&informertest.FakeInformers{Synced: &f}, &corev1.Pod{}, &handler.TypedEnqueueRequestForObject[*corev1.Pod]{}),
131 }
132 ctrl.Name = "foo"
133 ctx, cancel := context.WithCancel(context.Background())
134 defer cancel()
135 err := ctrl.Start(ctx)
136 Expect(err).To(HaveOccurred())
137 Expect(err.Error()).To(ContainSubstring("failed to wait for foo caches to sync"))
138 })
139
140 It("should error when cache sync timeout occurs", func() {
141 ctrl.CacheSyncTimeout = 10 * time.Nanosecond
142
143 c, err := cache.New(cfg, cache.Options{})
144 Expect(err).NotTo(HaveOccurred())
145 c = &cacheWithIndefinitelyBlockingGetInformer{c}
146
147 ctrl.startWatches = []source.Source{
148 source.Kind(c, &appsv1.Deployment{}, &handler.TypedEnqueueRequestForObject[*appsv1.Deployment]{}),
149 }
150 ctrl.Name = "testcontroller"
151
152 err = ctrl.Start(context.TODO())
153 Expect(err).To(HaveOccurred())
154 Expect(err.Error()).To(ContainSubstring("failed to wait for testcontroller caches to sync: timed out waiting for cache to be synced"))
155 })
156
157 It("should not error when context cancelled", func() {
158 ctrl.CacheSyncTimeout = 1 * time.Second
159
160 sourceSynced := make(chan struct{})
161 c, err := cache.New(cfg, cache.Options{})
162 Expect(err).NotTo(HaveOccurred())
163 c = &cacheWithIndefinitelyBlockingGetInformer{c}
164 ctrl.startWatches = []source.Source{
165 &singnallingSourceWrapper{
166 SyncingSource: source.Kind[client.Object](c, &appsv1.Deployment{}, &handler.EnqueueRequestForObject{}),
167 cacheSyncDone: sourceSynced,
168 },
169 }
170 ctrl.Name = "testcontroller"
171
172 ctx, cancel := context.WithCancel(context.TODO())
173 go func() {
174 defer GinkgoRecover()
175 err = ctrl.Start(ctx)
176 Expect(err).To(Succeed())
177 }()
178
179 cancel()
180 <-sourceSynced
181 })
182
183 It("should not error when cache sync timeout is of sufficiently high", func() {
184 ctrl.CacheSyncTimeout = 1 * time.Second
185
186 ctx, cancel := context.WithCancel(context.Background())
187 defer cancel()
188
189 sourceSynced := make(chan struct{})
190 c, err := cache.New(cfg, cache.Options{})
191 Expect(err).NotTo(HaveOccurred())
192 ctrl.startWatches = []source.Source{
193 &singnallingSourceWrapper{
194 SyncingSource: source.Kind[client.Object](c, &appsv1.Deployment{}, &handler.EnqueueRequestForObject{}),
195 cacheSyncDone: sourceSynced,
196 },
197 }
198
199 go func() {
200 defer GinkgoRecover()
201 Expect(c.Start(ctx)).To(Succeed())
202 }()
203
204 go func() {
205 defer GinkgoRecover()
206 Expect(ctrl.Start(ctx)).To(Succeed())
207 }()
208
209 <-sourceSynced
210 })
211
212 It("should process events from source.Channel", func() {
213
214 processed := make(chan struct{})
215
216 ch := make(chan event.GenericEvent, 1)
217
218 ctx, cancel := context.WithCancel(context.TODO())
219 defer cancel()
220
221
222 p := &corev1.Pod{
223 ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "bar"},
224 }
225 evt := event.GenericEvent{
226 Object: p,
227 }
228
229 ins := source.Channel(
230 ch,
231 handler.Funcs{
232 GenericFunc: func(ctx context.Context, evt event.GenericEvent, q workqueue.RateLimitingInterface) {
233 defer GinkgoRecover()
234 close(processed)
235 },
236 },
237 )
238
239
240 ch <- evt
241
242 ctrl.startWatches = []source.Source{ins}
243
244 go func() {
245 defer GinkgoRecover()
246 Expect(ctrl.Start(ctx)).To(Succeed())
247 }()
248 <-processed
249 })
250
251 It("should error when channel source is not specified", func() {
252 ctx, cancel := context.WithCancel(context.Background())
253 defer cancel()
254
255 ins := source.Channel[string](nil, nil)
256 ctrl.startWatches = []source.Source{ins}
257
258 e := ctrl.Start(ctx)
259 Expect(e).To(HaveOccurred())
260 Expect(e.Error()).To(ContainSubstring("must specify Channel.Source"))
261 })
262
263 It("should call Start on sources with the appropriate EventHandler, Queue, and Predicates", func() {
264 started := false
265 src := source.Func(func(ctx context.Context, q workqueue.RateLimitingInterface) error {
266 defer GinkgoRecover()
267 Expect(q).To(Equal(ctrl.Queue))
268
269 started = true
270 return nil
271 })
272 Expect(ctrl.Watch(src)).NotTo(HaveOccurred())
273
274
275 ctx, cancel := context.WithCancel(context.Background())
276 cancel()
277 Expect(ctrl.Start(ctx)).To(Succeed())
278 Expect(started).To(BeTrue())
279 })
280
281 It("should return an error if there is an error starting sources", func() {
282 err := fmt.Errorf("Expected Error: could not start source")
283 src := source.Func(func(context.Context,
284 workqueue.RateLimitingInterface,
285 ) error {
286 defer GinkgoRecover()
287 return err
288 })
289 Expect(ctrl.Watch(src)).To(Succeed())
290
291 ctx, cancel := context.WithCancel(context.Background())
292 defer cancel()
293 Expect(ctrl.Start(ctx)).To(Equal(err))
294 })
295
296 It("should return an error if it gets started more than once", func() {
297
298 ctx, cancel := context.WithCancel(context.Background())
299 cancel()
300 Expect(ctrl.Start(ctx)).To(Succeed())
301 err := ctrl.Start(ctx)
302 Expect(err).To(HaveOccurred())
303 Expect(err.Error()).To(Equal("controller was started more than once. This is likely to be caused by being added to a manager multiple times"))
304 })
305
306 })
307
308 Describe("Processing queue items from a Controller", func() {
309 It("should call Reconciler if an item is enqueued", func() {
310 ctx, cancel := context.WithCancel(context.Background())
311 defer cancel()
312 go func() {
313 defer GinkgoRecover()
314 Expect(ctrl.Start(ctx)).NotTo(HaveOccurred())
315 }()
316 queue.Add(request)
317
318 By("Invoking Reconciler")
319 fakeReconcile.AddResult(reconcile.Result{}, nil)
320 Expect(<-reconciled).To(Equal(request))
321
322 By("Removing the item from the queue")
323 Eventually(queue.Len).Should(Equal(0))
324 Eventually(func() int { return queue.NumRequeues(request) }).Should(Equal(0))
325 })
326
327 It("should continue to process additional queue items after the first", func() {
328 ctrl.Do = reconcile.Func(func(context.Context, reconcile.Request) (reconcile.Result, error) {
329 defer GinkgoRecover()
330 Fail("Reconciler should not have been called")
331 return reconcile.Result{}, nil
332 })
333 ctx, cancel := context.WithCancel(context.Background())
334 defer cancel()
335 go func() {
336 defer GinkgoRecover()
337 Expect(ctrl.Start(ctx)).NotTo(HaveOccurred())
338 }()
339
340 By("adding two bad items to the queue")
341 queue.Add("foo/bar1")
342 queue.Add("foo/bar2")
343
344 By("expecting both of them to be skipped")
345 Eventually(queue.Len).Should(Equal(0))
346 Eventually(func() int { return queue.NumRequeues(request) }).Should(Equal(0))
347 })
348
349 PIt("should forget an item if it is not a Request and continue processing items", func() {
350
351 })
352
353 It("should requeue a Request if there is an error and continue processing items", func() {
354 ctx, cancel := context.WithCancel(context.Background())
355 defer cancel()
356 go func() {
357 defer GinkgoRecover()
358 Expect(ctrl.Start(ctx)).NotTo(HaveOccurred())
359 }()
360
361 queue.Add(request)
362
363 By("Invoking Reconciler which will give an error")
364 fakeReconcile.AddResult(reconcile.Result{}, fmt.Errorf("expected error: reconcile"))
365 Expect(<-reconciled).To(Equal(request))
366 queue.AddedRateLimitedLock.Lock()
367 Expect(queue.AddedRatelimited).To(Equal([]any{request}))
368 queue.AddedRateLimitedLock.Unlock()
369
370 By("Invoking Reconciler a second time without error")
371 fakeReconcile.AddResult(reconcile.Result{}, nil)
372 Expect(<-reconciled).To(Equal(request))
373
374 By("Removing the item from the queue")
375 Eventually(queue.Len).Should(Equal(0))
376 Eventually(func() int { return queue.NumRequeues(request) }, 1.0).Should(Equal(0))
377 })
378
379 It("should not requeue a Request if there is a terminal error", func() {
380 ctx, cancel := context.WithCancel(context.Background())
381 defer cancel()
382 go func() {
383 defer GinkgoRecover()
384 Expect(ctrl.Start(ctx)).NotTo(HaveOccurred())
385 }()
386
387 queue.Add(request)
388
389 By("Invoking Reconciler which will give an error")
390 fakeReconcile.AddResult(reconcile.Result{}, reconcile.TerminalError(fmt.Errorf("expected error: reconcile")))
391 Expect(<-reconciled).To(Equal(request))
392
393 queue.AddedRateLimitedLock.Lock()
394 Expect(queue.AddedRatelimited).To(BeEmpty())
395 queue.AddedRateLimitedLock.Unlock()
396
397 Expect(queue.Len()).Should(Equal(0))
398 })
399
400
401
402 It("should not reset backoff until there's a non-error result", func() {
403 dq := &DelegatingQueue{RateLimitingInterface: ctrl.NewQueue("controller1", nil)}
404 ctrl.NewQueue = func(string, ratelimiter.RateLimiter) workqueue.RateLimitingInterface { return dq }
405
406 ctx, cancel := context.WithCancel(context.Background())
407 defer cancel()
408 go func() {
409 defer GinkgoRecover()
410 Expect(ctrl.Start(ctx)).NotTo(HaveOccurred())
411 }()
412
413 dq.Add(request)
414 Expect(dq.getCounts()).To(Equal(countInfo{Trying: 1}))
415
416 By("Invoking Reconciler which returns an error")
417 fakeReconcile.AddResult(reconcile.Result{}, fmt.Errorf("something's wrong"))
418 Expect(<-reconciled).To(Equal(request))
419 Eventually(dq.getCounts).Should(Equal(countInfo{Trying: 1, AddRateLimited: 1}))
420
421 By("Invoking Reconciler a second time with an error")
422 fakeReconcile.AddResult(reconcile.Result{}, fmt.Errorf("another thing's wrong"))
423 Expect(<-reconciled).To(Equal(request))
424
425 Eventually(dq.getCounts).Should(Equal(countInfo{Trying: 1, AddRateLimited: 2}))
426
427 By("Invoking Reconciler a third time, where it finally does not return an error")
428 fakeReconcile.AddResult(reconcile.Result{}, nil)
429 Expect(<-reconciled).To(Equal(request))
430
431 Eventually(dq.getCounts).Should(Equal(countInfo{Trying: 0, AddRateLimited: 2}))
432
433 By("Removing the item from the queue")
434 Eventually(dq.Len).Should(Equal(0))
435 Eventually(func() int { return dq.NumRequeues(request) }).Should(Equal(0))
436 })
437
438 It("should requeue a Request with rate limiting if the Result sets Requeue:true and continue processing items", func() {
439 dq := &DelegatingQueue{RateLimitingInterface: ctrl.NewQueue("controller1", nil)}
440 ctrl.NewQueue = func(string, ratelimiter.RateLimiter) workqueue.RateLimitingInterface { return dq }
441
442 ctx, cancel := context.WithCancel(context.Background())
443 defer cancel()
444 go func() {
445 defer GinkgoRecover()
446 Expect(ctrl.Start(ctx)).NotTo(HaveOccurred())
447 }()
448
449 dq.Add(request)
450 Expect(dq.getCounts()).To(Equal(countInfo{Trying: 1}))
451
452 By("Invoking Reconciler which will ask for requeue")
453 fakeReconcile.AddResult(reconcile.Result{Requeue: true}, nil)
454 Expect(<-reconciled).To(Equal(request))
455 Eventually(dq.getCounts).Should(Equal(countInfo{Trying: 1, AddRateLimited: 1}))
456
457 By("Invoking Reconciler a second time without asking for requeue")
458 fakeReconcile.AddResult(reconcile.Result{Requeue: false}, nil)
459 Expect(<-reconciled).To(Equal(request))
460
461 Eventually(dq.getCounts).Should(Equal(countInfo{Trying: 0, AddRateLimited: 1}))
462
463 By("Removing the item from the queue")
464 Eventually(dq.Len).Should(Equal(0))
465 Eventually(func() int { return dq.NumRequeues(request) }).Should(Equal(0))
466 })
467
468 It("should requeue a Request after a duration (but not rate-limitted) if the Result sets RequeueAfter (regardless of Requeue)", func() {
469 dq := &DelegatingQueue{RateLimitingInterface: ctrl.NewQueue("controller1", nil)}
470 ctrl.NewQueue = func(string, ratelimiter.RateLimiter) workqueue.RateLimitingInterface { return dq }
471
472 ctx, cancel := context.WithCancel(context.Background())
473 defer cancel()
474 go func() {
475 defer GinkgoRecover()
476 Expect(ctrl.Start(ctx)).NotTo(HaveOccurred())
477 }()
478
479 dq.Add(request)
480 Expect(dq.getCounts()).To(Equal(countInfo{Trying: 1}))
481
482 By("Invoking Reconciler which will ask for requeue & requeueafter")
483 fakeReconcile.AddResult(reconcile.Result{RequeueAfter: time.Millisecond * 100, Requeue: true}, nil)
484 Expect(<-reconciled).To(Equal(request))
485 Eventually(dq.getCounts).Should(Equal(countInfo{Trying: 0, AddAfter: 1}))
486
487 By("Invoking Reconciler a second time asking for a requeueafter only")
488 fakeReconcile.AddResult(reconcile.Result{RequeueAfter: time.Millisecond * 100}, nil)
489 Expect(<-reconciled).To(Equal(request))
490
491 Eventually(dq.getCounts).Should(Equal(countInfo{Trying: -1 , AddAfter: 2}))
492
493 By("Removing the item from the queue")
494 Eventually(dq.Len).Should(Equal(0))
495 Eventually(func() int { return dq.NumRequeues(request) }).Should(Equal(0))
496 })
497
498 It("should perform error behavior if error is not nil, regardless of RequeueAfter", func() {
499 dq := &DelegatingQueue{RateLimitingInterface: ctrl.NewQueue("controller1", nil)}
500 ctrl.NewQueue = func(string, ratelimiter.RateLimiter) workqueue.RateLimitingInterface { return dq }
501
502 ctx, cancel := context.WithCancel(context.Background())
503 defer cancel()
504 go func() {
505 defer GinkgoRecover()
506 Expect(ctrl.Start(ctx)).NotTo(HaveOccurred())
507 }()
508
509 dq.Add(request)
510 Expect(dq.getCounts()).To(Equal(countInfo{Trying: 1}))
511
512 By("Invoking Reconciler which will ask for requeueafter with an error")
513 fakeReconcile.AddResult(reconcile.Result{RequeueAfter: time.Millisecond * 100}, fmt.Errorf("expected error: reconcile"))
514 Expect(<-reconciled).To(Equal(request))
515 Eventually(dq.getCounts).Should(Equal(countInfo{Trying: 1, AddRateLimited: 1}))
516
517 By("Invoking Reconciler a second time asking for requeueafter without errors")
518 fakeReconcile.AddResult(reconcile.Result{RequeueAfter: time.Millisecond * 100}, nil)
519 Expect(<-reconciled).To(Equal(request))
520 Eventually(dq.getCounts).Should(Equal(countInfo{AddAfter: 1, AddRateLimited: 1}))
521
522 By("Removing the item from the queue")
523 Eventually(dq.Len).Should(Equal(0))
524 Eventually(func() int { return dq.NumRequeues(request) }).Should(Equal(0))
525 })
526
527 PIt("should return if the queue is shutdown", func() {
528
529 })
530
531 PIt("should wait for informers to be synced before processing items", func() {
532
533 })
534
535 PIt("should create a new go routine for MaxConcurrentReconciles", func() {
536
537 })
538
539 Context("prometheus metric reconcile_total", func() {
540 var reconcileTotal dto.Metric
541
542 BeforeEach(func() {
543 ctrlmetrics.ReconcileTotal.Reset()
544 reconcileTotal.Reset()
545 })
546
547 It("should get updated on successful reconciliation", func() {
548 Expect(func() error {
549 Expect(ctrlmetrics.ReconcileTotal.WithLabelValues(ctrl.Name, "success").Write(&reconcileTotal)).To(Succeed())
550 if reconcileTotal.GetCounter().GetValue() != 0.0 {
551 return fmt.Errorf("metric reconcile total not reset")
552 }
553 return nil
554 }()).Should(Succeed())
555
556 ctx, cancel := context.WithCancel(context.Background())
557 defer cancel()
558 go func() {
559 defer GinkgoRecover()
560 Expect(ctrl.Start(ctx)).NotTo(HaveOccurred())
561 }()
562 By("Invoking Reconciler which will succeed")
563 queue.Add(request)
564
565 fakeReconcile.AddResult(reconcile.Result{}, nil)
566 Expect(<-reconciled).To(Equal(request))
567 Eventually(func() error {
568 Expect(ctrlmetrics.ReconcileTotal.WithLabelValues(ctrl.Name, "success").Write(&reconcileTotal)).To(Succeed())
569 if actual := reconcileTotal.GetCounter().GetValue(); actual != 1.0 {
570 return fmt.Errorf("metric reconcile total expected: %v and got: %v", 1.0, actual)
571 }
572 return nil
573 }, 2.0).Should(Succeed())
574 })
575
576 It("should get updated on reconcile errors", func() {
577 Expect(func() error {
578 Expect(ctrlmetrics.ReconcileTotal.WithLabelValues(ctrl.Name, "error").Write(&reconcileTotal)).To(Succeed())
579 if reconcileTotal.GetCounter().GetValue() != 0.0 {
580 return fmt.Errorf("metric reconcile total not reset")
581 }
582 return nil
583 }()).Should(Succeed())
584
585 ctx, cancel := context.WithCancel(context.Background())
586 defer cancel()
587 go func() {
588 defer GinkgoRecover()
589 Expect(ctrl.Start(ctx)).NotTo(HaveOccurred())
590 }()
591 By("Invoking Reconciler which will give an error")
592 queue.Add(request)
593
594 fakeReconcile.AddResult(reconcile.Result{}, fmt.Errorf("expected error: reconcile"))
595 Expect(<-reconciled).To(Equal(request))
596 Eventually(func() error {
597 Expect(ctrlmetrics.ReconcileTotal.WithLabelValues(ctrl.Name, "error").Write(&reconcileTotal)).To(Succeed())
598 if actual := reconcileTotal.GetCounter().GetValue(); actual != 1.0 {
599 return fmt.Errorf("metric reconcile total expected: %v and got: %v", 1.0, actual)
600 }
601 return nil
602 }, 2.0).Should(Succeed())
603 })
604
605 It("should get updated when reconcile returns with retry enabled", func() {
606 Expect(func() error {
607 Expect(ctrlmetrics.ReconcileTotal.WithLabelValues(ctrl.Name, "retry").Write(&reconcileTotal)).To(Succeed())
608 if reconcileTotal.GetCounter().GetValue() != 0.0 {
609 return fmt.Errorf("metric reconcile total not reset")
610 }
611 return nil
612 }()).Should(Succeed())
613
614 ctx, cancel := context.WithCancel(context.Background())
615 defer cancel()
616 go func() {
617 defer GinkgoRecover()
618 Expect(ctrl.Start(ctx)).NotTo(HaveOccurred())
619 }()
620
621 By("Invoking Reconciler which will return result with Requeue enabled")
622 queue.Add(request)
623
624 fakeReconcile.AddResult(reconcile.Result{Requeue: true}, nil)
625 Expect(<-reconciled).To(Equal(request))
626 Eventually(func() error {
627 Expect(ctrlmetrics.ReconcileTotal.WithLabelValues(ctrl.Name, "requeue").Write(&reconcileTotal)).To(Succeed())
628 if actual := reconcileTotal.GetCounter().GetValue(); actual != 1.0 {
629 return fmt.Errorf("metric reconcile total expected: %v and got: %v", 1.0, actual)
630 }
631 return nil
632 }, 2.0).Should(Succeed())
633 })
634
635 It("should get updated when reconcile returns with retryAfter enabled", func() {
636 Expect(func() error {
637 Expect(ctrlmetrics.ReconcileTotal.WithLabelValues(ctrl.Name, "retry_after").Write(&reconcileTotal)).To(Succeed())
638 if reconcileTotal.GetCounter().GetValue() != 0.0 {
639 return fmt.Errorf("metric reconcile total not reset")
640 }
641 return nil
642 }()).Should(Succeed())
643
644 ctx, cancel := context.WithCancel(context.Background())
645 defer cancel()
646 go func() {
647 defer GinkgoRecover()
648 Expect(ctrl.Start(ctx)).NotTo(HaveOccurred())
649 }()
650 By("Invoking Reconciler which will return result with requeueAfter enabled")
651 queue.Add(request)
652
653 fakeReconcile.AddResult(reconcile.Result{RequeueAfter: 5 * time.Hour}, nil)
654 Expect(<-reconciled).To(Equal(request))
655 Eventually(func() error {
656 Expect(ctrlmetrics.ReconcileTotal.WithLabelValues(ctrl.Name, "requeue_after").Write(&reconcileTotal)).To(Succeed())
657 if actual := reconcileTotal.GetCounter().GetValue(); actual != 1.0 {
658 return fmt.Errorf("metric reconcile total expected: %v and got: %v", 1.0, actual)
659 }
660 return nil
661 }, 2.0).Should(Succeed())
662 })
663 })
664
665 Context("should update prometheus metrics", func() {
666 It("should requeue a Request if there is an error and continue processing items", func() {
667 var reconcileErrs dto.Metric
668 ctrlmetrics.ReconcileErrors.Reset()
669 Expect(func() error {
670 Expect(ctrlmetrics.ReconcileErrors.WithLabelValues(ctrl.Name).Write(&reconcileErrs)).To(Succeed())
671 if reconcileErrs.GetCounter().GetValue() != 0.0 {
672 return fmt.Errorf("metric reconcile errors not reset")
673 }
674 return nil
675 }()).Should(Succeed())
676
677 ctx, cancel := context.WithCancel(context.Background())
678 defer cancel()
679 go func() {
680 defer GinkgoRecover()
681 Expect(ctrl.Start(ctx)).NotTo(HaveOccurred())
682 }()
683 queue.Add(request)
684
685 By("Invoking Reconciler which will give an error")
686 fakeReconcile.AddResult(reconcile.Result{}, fmt.Errorf("expected error: reconcile"))
687 Expect(<-reconciled).To(Equal(request))
688 Eventually(func() error {
689 Expect(ctrlmetrics.ReconcileErrors.WithLabelValues(ctrl.Name).Write(&reconcileErrs)).To(Succeed())
690 if reconcileErrs.GetCounter().GetValue() != 1.0 {
691 return fmt.Errorf("metrics not updated")
692 }
693 return nil
694 }, 2.0).Should(Succeed())
695
696 By("Invoking Reconciler a second time without error")
697 fakeReconcile.AddResult(reconcile.Result{}, nil)
698 Expect(<-reconciled).To(Equal(request))
699
700 By("Removing the item from the queue")
701 Eventually(queue.Len).Should(Equal(0))
702 Eventually(func() int { return queue.NumRequeues(request) }).Should(Equal(0))
703 })
704
705 It("should add a reconcile time to the reconcile time histogram", func() {
706 var reconcileTime dto.Metric
707 ctrlmetrics.ReconcileTime.Reset()
708
709 Expect(func() error {
710 histObserver := ctrlmetrics.ReconcileTime.WithLabelValues(ctrl.Name)
711 hist := histObserver.(prometheus.Histogram)
712 Expect(hist.Write(&reconcileTime)).To(Succeed())
713 if reconcileTime.GetHistogram().GetSampleCount() != uint64(0) {
714 return fmt.Errorf("metrics not reset")
715 }
716 return nil
717 }()).Should(Succeed())
718
719 ctx, cancel := context.WithCancel(context.Background())
720 defer cancel()
721 go func() {
722 defer GinkgoRecover()
723 Expect(ctrl.Start(ctx)).NotTo(HaveOccurred())
724 }()
725 queue.Add(request)
726
727 By("Invoking Reconciler")
728 fakeReconcile.AddResult(reconcile.Result{}, nil)
729 Expect(<-reconciled).To(Equal(request))
730
731 By("Removing the item from the queue")
732 Eventually(queue.Len).Should(Equal(0))
733 Eventually(func() int { return queue.NumRequeues(request) }).Should(Equal(0))
734
735 Eventually(func() error {
736 histObserver := ctrlmetrics.ReconcileTime.WithLabelValues(ctrl.Name)
737 hist := histObserver.(prometheus.Histogram)
738 Expect(hist.Write(&reconcileTime)).To(Succeed())
739 if reconcileTime.GetHistogram().GetSampleCount() == uint64(0) {
740 return fmt.Errorf("metrics not updated")
741 }
742 return nil
743 }, 2.0).Should(Succeed())
744 })
745 })
746 })
747 })
748
749 var _ = Describe("ReconcileIDFromContext function", func() {
750 It("should return an empty string if there is nothing in the context", func() {
751 ctx := context.Background()
752 reconcileID := ReconcileIDFromContext(ctx)
753
754 Expect(reconcileID).To(Equal(types.UID("")))
755 })
756
757 It("should return the correct reconcileID from context", func() {
758 const expectedReconcileID = types.UID("uuid")
759 ctx := addReconcileID(context.Background(), expectedReconcileID)
760 reconcileID := ReconcileIDFromContext(ctx)
761
762 Expect(reconcileID).To(Equal(expectedReconcileID))
763 })
764 })
765
766 type DelegatingQueue struct {
767 workqueue.RateLimitingInterface
768 mu sync.Mutex
769
770 countAddRateLimited int
771 countAdd int
772 countAddAfter int
773 }
774
775 func (q *DelegatingQueue) AddRateLimited(item interface{}) {
776 q.mu.Lock()
777 defer q.mu.Unlock()
778
779 q.countAddRateLimited++
780 q.RateLimitingInterface.AddRateLimited(item)
781 }
782
783 func (q *DelegatingQueue) AddAfter(item interface{}, d time.Duration) {
784 q.mu.Lock()
785 defer q.mu.Unlock()
786
787 q.countAddAfter++
788 q.RateLimitingInterface.AddAfter(item, d)
789 }
790
791 func (q *DelegatingQueue) Add(item interface{}) {
792 q.mu.Lock()
793 defer q.mu.Unlock()
794 q.countAdd++
795
796 q.RateLimitingInterface.Add(item)
797 }
798
799 func (q *DelegatingQueue) Forget(item interface{}) {
800 q.mu.Lock()
801 defer q.mu.Unlock()
802 q.countAdd--
803
804 q.RateLimitingInterface.Forget(item)
805 }
806
807 type countInfo struct {
808 Trying, AddAfter, AddRateLimited int
809 }
810
811 func (q *DelegatingQueue) getCounts() countInfo {
812 q.mu.Lock()
813 defer q.mu.Unlock()
814
815 return countInfo{
816 Trying: q.countAdd,
817 AddAfter: q.countAddAfter,
818 AddRateLimited: q.countAddRateLimited,
819 }
820 }
821
822 type fakeReconcileResultPair struct {
823 Result reconcile.Result
824 Err error
825 }
826
827 type fakeReconciler struct {
828 Requests chan reconcile.Request
829 results chan fakeReconcileResultPair
830 }
831
832 func (f *fakeReconciler) AddResult(res reconcile.Result, err error) {
833 f.results <- fakeReconcileResultPair{Result: res, Err: err}
834 }
835
836 func (f *fakeReconciler) Reconcile(_ context.Context, r reconcile.Request) (reconcile.Result, error) {
837 res := <-f.results
838 if f.Requests != nil {
839 f.Requests <- r
840 }
841 return res.Result, res.Err
842 }
843
844 type singnallingSourceWrapper struct {
845 cacheSyncDone chan struct{}
846 source.SyncingSource
847 }
848
849 func (s *singnallingSourceWrapper) WaitForSync(ctx context.Context) error {
850 defer func() {
851 close(s.cacheSyncDone)
852 }()
853 return s.SyncingSource.WaitForSync(ctx)
854 }
855
856 var _ cache.Cache = &cacheWithIndefinitelyBlockingGetInformer{}
857
858
859
860
861
862
863
864 type cacheWithIndefinitelyBlockingGetInformer struct {
865 cache.Cache
866 }
867
868 func (c *cacheWithIndefinitelyBlockingGetInformer) GetInformer(ctx context.Context, obj client.Object, opts ...cache.InformerGetOption) (cache.Informer, error) {
869 <-ctx.Done()
870 return nil, errors.New("GetInformer timed out")
871 }
872
View as plain text