...

Source file src/sigs.k8s.io/controller-runtime/pkg/internal/controller/controller_test.go

Documentation: sigs.k8s.io/controller-runtime/pkg/internal/controller

     1  /*
     2  Copyright 2018 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package controller
    18  
    19  import (
    20  	"context"
    21  	"errors"
    22  	"fmt"
    23  	"sync"
    24  	"time"
    25  
    26  	"github.com/go-logr/logr"
    27  	. "github.com/onsi/ginkgo/v2"
    28  	. "github.com/onsi/gomega"
    29  	"github.com/prometheus/client_golang/prometheus"
    30  	dto "github.com/prometheus/client_model/go"
    31  	appsv1 "k8s.io/api/apps/v1"
    32  	corev1 "k8s.io/api/core/v1"
    33  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    34  	"k8s.io/apimachinery/pkg/types"
    35  	"k8s.io/client-go/util/workqueue"
    36  	"k8s.io/utils/ptr"
    37  	"sigs.k8s.io/controller-runtime/pkg/cache"
    38  	"sigs.k8s.io/controller-runtime/pkg/cache/informertest"
    39  	"sigs.k8s.io/controller-runtime/pkg/client"
    40  	"sigs.k8s.io/controller-runtime/pkg/controller/controllertest"
    41  	"sigs.k8s.io/controller-runtime/pkg/event"
    42  	"sigs.k8s.io/controller-runtime/pkg/handler"
    43  	ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics"
    44  	"sigs.k8s.io/controller-runtime/pkg/internal/log"
    45  	"sigs.k8s.io/controller-runtime/pkg/ratelimiter"
    46  	"sigs.k8s.io/controller-runtime/pkg/reconcile"
    47  	"sigs.k8s.io/controller-runtime/pkg/source"
    48  )
    49  
    50  var _ = Describe("controller", func() {
    51  	var fakeReconcile *fakeReconciler
    52  	var ctrl *Controller
    53  	var queue *controllertest.Queue
    54  	var reconciled chan reconcile.Request
    55  	var request = reconcile.Request{
    56  		NamespacedName: types.NamespacedName{Namespace: "foo", Name: "bar"},
    57  	}
    58  
    59  	BeforeEach(func() {
    60  		reconciled = make(chan reconcile.Request)
    61  		fakeReconcile = &fakeReconciler{
    62  			Requests: reconciled,
    63  			results:  make(chan fakeReconcileResultPair, 10 /* chosen by the completely scientific approach of guessing */),
    64  		}
    65  		queue = &controllertest.Queue{
    66  			Interface: workqueue.New(),
    67  		}
    68  		ctrl = &Controller{
    69  			MaxConcurrentReconciles: 1,
    70  			Do:                      fakeReconcile,
    71  			NewQueue:                func(string, ratelimiter.RateLimiter) workqueue.RateLimitingInterface { return queue },
    72  			LogConstructor: func(_ *reconcile.Request) logr.Logger {
    73  				return log.RuntimeLog.WithName("controller").WithName("test")
    74  			},
    75  		}
    76  	})
    77  
    78  	Describe("Reconciler", func() {
    79  		It("should call the Reconciler function", func() {
    80  			ctx, cancel := context.WithCancel(context.Background())
    81  			defer cancel()
    82  
    83  			ctrl.Do = reconcile.Func(func(context.Context, reconcile.Request) (reconcile.Result, error) {
    84  				return reconcile.Result{Requeue: true}, nil
    85  			})
    86  			result, err := ctrl.Reconcile(ctx,
    87  				reconcile.Request{NamespacedName: types.NamespacedName{Namespace: "foo", Name: "bar"}})
    88  			Expect(err).NotTo(HaveOccurred())
    89  			Expect(result).To(Equal(reconcile.Result{Requeue: true}))
    90  		})
    91  
    92  		It("should not recover panic if RecoverPanic is false by default", func() {
    93  			ctx, cancel := context.WithCancel(context.Background())
    94  			defer cancel()
    95  
    96  			defer func() {
    97  				Expect(recover()).ShouldNot(BeNil())
    98  			}()
    99  			ctrl.Do = reconcile.Func(func(context.Context, reconcile.Request) (reconcile.Result, error) {
   100  				var res *reconcile.Result
   101  				return *res, nil
   102  			})
   103  			_, _ = ctrl.Reconcile(ctx,
   104  				reconcile.Request{NamespacedName: types.NamespacedName{Namespace: "foo", Name: "bar"}})
   105  		})
   106  
   107  		It("should recover panic if RecoverPanic is true", func() {
   108  			ctx, cancel := context.WithCancel(context.Background())
   109  			defer cancel()
   110  
   111  			defer func() {
   112  				Expect(recover()).To(BeNil())
   113  			}()
   114  			ctrl.RecoverPanic = ptr.To(true)
   115  			ctrl.Do = reconcile.Func(func(context.Context, reconcile.Request) (reconcile.Result, error) {
   116  				var res *reconcile.Result
   117  				return *res, nil
   118  			})
   119  			_, err := ctrl.Reconcile(ctx,
   120  				reconcile.Request{NamespacedName: types.NamespacedName{Namespace: "foo", Name: "bar"}})
   121  			Expect(err).To(HaveOccurred())
   122  			Expect(err.Error()).To(ContainSubstring("[recovered]"))
   123  		})
   124  	})
   125  
   126  	Describe("Start", func() {
   127  		It("should return an error if there is an error waiting for the informers", func() {
   128  			f := false
   129  			ctrl.startWatches = []source.Source{
   130  				source.Kind(&informertest.FakeInformers{Synced: &f}, &corev1.Pod{}, &handler.TypedEnqueueRequestForObject[*corev1.Pod]{}),
   131  			}
   132  			ctrl.Name = "foo"
   133  			ctx, cancel := context.WithCancel(context.Background())
   134  			defer cancel()
   135  			err := ctrl.Start(ctx)
   136  			Expect(err).To(HaveOccurred())
   137  			Expect(err.Error()).To(ContainSubstring("failed to wait for foo caches to sync"))
   138  		})
   139  
   140  		It("should error when cache sync timeout occurs", func() {
   141  			ctrl.CacheSyncTimeout = 10 * time.Nanosecond
   142  
   143  			c, err := cache.New(cfg, cache.Options{})
   144  			Expect(err).NotTo(HaveOccurred())
   145  			c = &cacheWithIndefinitelyBlockingGetInformer{c}
   146  
   147  			ctrl.startWatches = []source.Source{
   148  				source.Kind(c, &appsv1.Deployment{}, &handler.TypedEnqueueRequestForObject[*appsv1.Deployment]{}),
   149  			}
   150  			ctrl.Name = "testcontroller"
   151  
   152  			err = ctrl.Start(context.TODO())
   153  			Expect(err).To(HaveOccurred())
   154  			Expect(err.Error()).To(ContainSubstring("failed to wait for testcontroller caches to sync: timed out waiting for cache to be synced"))
   155  		})
   156  
   157  		It("should not error when context cancelled", func() {
   158  			ctrl.CacheSyncTimeout = 1 * time.Second
   159  
   160  			sourceSynced := make(chan struct{})
   161  			c, err := cache.New(cfg, cache.Options{})
   162  			Expect(err).NotTo(HaveOccurred())
   163  			c = &cacheWithIndefinitelyBlockingGetInformer{c}
   164  			ctrl.startWatches = []source.Source{
   165  				&singnallingSourceWrapper{
   166  					SyncingSource: source.Kind[client.Object](c, &appsv1.Deployment{}, &handler.EnqueueRequestForObject{}),
   167  					cacheSyncDone: sourceSynced,
   168  				},
   169  			}
   170  			ctrl.Name = "testcontroller"
   171  
   172  			ctx, cancel := context.WithCancel(context.TODO())
   173  			go func() {
   174  				defer GinkgoRecover()
   175  				err = ctrl.Start(ctx)
   176  				Expect(err).To(Succeed())
   177  			}()
   178  
   179  			cancel()
   180  			<-sourceSynced
   181  		})
   182  
   183  		It("should not error when cache sync timeout is of sufficiently high", func() {
   184  			ctrl.CacheSyncTimeout = 1 * time.Second
   185  
   186  			ctx, cancel := context.WithCancel(context.Background())
   187  			defer cancel()
   188  
   189  			sourceSynced := make(chan struct{})
   190  			c, err := cache.New(cfg, cache.Options{})
   191  			Expect(err).NotTo(HaveOccurred())
   192  			ctrl.startWatches = []source.Source{
   193  				&singnallingSourceWrapper{
   194  					SyncingSource: source.Kind[client.Object](c, &appsv1.Deployment{}, &handler.EnqueueRequestForObject{}),
   195  					cacheSyncDone: sourceSynced,
   196  				},
   197  			}
   198  
   199  			go func() {
   200  				defer GinkgoRecover()
   201  				Expect(c.Start(ctx)).To(Succeed())
   202  			}()
   203  
   204  			go func() {
   205  				defer GinkgoRecover()
   206  				Expect(ctrl.Start(ctx)).To(Succeed())
   207  			}()
   208  
   209  			<-sourceSynced
   210  		})
   211  
   212  		It("should process events from source.Channel", func() {
   213  			// channel to be closed when event is processed
   214  			processed := make(chan struct{})
   215  			// source channel
   216  			ch := make(chan event.GenericEvent, 1)
   217  
   218  			ctx, cancel := context.WithCancel(context.TODO())
   219  			defer cancel()
   220  
   221  			// event to be sent to the channel
   222  			p := &corev1.Pod{
   223  				ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "bar"},
   224  			}
   225  			evt := event.GenericEvent{
   226  				Object: p,
   227  			}
   228  
   229  			ins := source.Channel(
   230  				ch,
   231  				handler.Funcs{
   232  					GenericFunc: func(ctx context.Context, evt event.GenericEvent, q workqueue.RateLimitingInterface) {
   233  						defer GinkgoRecover()
   234  						close(processed)
   235  					},
   236  				},
   237  			)
   238  
   239  			// send the event to the channel
   240  			ch <- evt
   241  
   242  			ctrl.startWatches = []source.Source{ins}
   243  
   244  			go func() {
   245  				defer GinkgoRecover()
   246  				Expect(ctrl.Start(ctx)).To(Succeed())
   247  			}()
   248  			<-processed
   249  		})
   250  
   251  		It("should error when channel source is not specified", func() {
   252  			ctx, cancel := context.WithCancel(context.Background())
   253  			defer cancel()
   254  
   255  			ins := source.Channel[string](nil, nil)
   256  			ctrl.startWatches = []source.Source{ins}
   257  
   258  			e := ctrl.Start(ctx)
   259  			Expect(e).To(HaveOccurred())
   260  			Expect(e.Error()).To(ContainSubstring("must specify Channel.Source"))
   261  		})
   262  
   263  		It("should call Start on sources with the appropriate EventHandler, Queue, and Predicates", func() {
   264  			started := false
   265  			src := source.Func(func(ctx context.Context, q workqueue.RateLimitingInterface) error {
   266  				defer GinkgoRecover()
   267  				Expect(q).To(Equal(ctrl.Queue))
   268  
   269  				started = true
   270  				return nil
   271  			})
   272  			Expect(ctrl.Watch(src)).NotTo(HaveOccurred())
   273  
   274  			// Use a cancelled context so Start doesn't block
   275  			ctx, cancel := context.WithCancel(context.Background())
   276  			cancel()
   277  			Expect(ctrl.Start(ctx)).To(Succeed())
   278  			Expect(started).To(BeTrue())
   279  		})
   280  
   281  		It("should return an error if there is an error starting sources", func() {
   282  			err := fmt.Errorf("Expected Error: could not start source")
   283  			src := source.Func(func(context.Context,
   284  				workqueue.RateLimitingInterface,
   285  			) error {
   286  				defer GinkgoRecover()
   287  				return err
   288  			})
   289  			Expect(ctrl.Watch(src)).To(Succeed())
   290  
   291  			ctx, cancel := context.WithCancel(context.Background())
   292  			defer cancel()
   293  			Expect(ctrl.Start(ctx)).To(Equal(err))
   294  		})
   295  
   296  		It("should return an error if it gets started more than once", func() {
   297  			// Use a cancelled context so Start doesn't block
   298  			ctx, cancel := context.WithCancel(context.Background())
   299  			cancel()
   300  			Expect(ctrl.Start(ctx)).To(Succeed())
   301  			err := ctrl.Start(ctx)
   302  			Expect(err).To(HaveOccurred())
   303  			Expect(err.Error()).To(Equal("controller was started more than once. This is likely to be caused by being added to a manager multiple times"))
   304  		})
   305  
   306  	})
   307  
   308  	Describe("Processing queue items from a Controller", func() {
   309  		It("should call Reconciler if an item is enqueued", func() {
   310  			ctx, cancel := context.WithCancel(context.Background())
   311  			defer cancel()
   312  			go func() {
   313  				defer GinkgoRecover()
   314  				Expect(ctrl.Start(ctx)).NotTo(HaveOccurred())
   315  			}()
   316  			queue.Add(request)
   317  
   318  			By("Invoking Reconciler")
   319  			fakeReconcile.AddResult(reconcile.Result{}, nil)
   320  			Expect(<-reconciled).To(Equal(request))
   321  
   322  			By("Removing the item from the queue")
   323  			Eventually(queue.Len).Should(Equal(0))
   324  			Eventually(func() int { return queue.NumRequeues(request) }).Should(Equal(0))
   325  		})
   326  
   327  		It("should continue to process additional queue items after the first", func() {
   328  			ctrl.Do = reconcile.Func(func(context.Context, reconcile.Request) (reconcile.Result, error) {
   329  				defer GinkgoRecover()
   330  				Fail("Reconciler should not have been called")
   331  				return reconcile.Result{}, nil
   332  			})
   333  			ctx, cancel := context.WithCancel(context.Background())
   334  			defer cancel()
   335  			go func() {
   336  				defer GinkgoRecover()
   337  				Expect(ctrl.Start(ctx)).NotTo(HaveOccurred())
   338  			}()
   339  
   340  			By("adding two bad items to the queue")
   341  			queue.Add("foo/bar1")
   342  			queue.Add("foo/bar2")
   343  
   344  			By("expecting both of them to be skipped")
   345  			Eventually(queue.Len).Should(Equal(0))
   346  			Eventually(func() int { return queue.NumRequeues(request) }).Should(Equal(0))
   347  		})
   348  
   349  		PIt("should forget an item if it is not a Request and continue processing items", func() {
   350  			// TODO(community): write this test
   351  		})
   352  
   353  		It("should requeue a Request if there is an error and continue processing items", func() {
   354  			ctx, cancel := context.WithCancel(context.Background())
   355  			defer cancel()
   356  			go func() {
   357  				defer GinkgoRecover()
   358  				Expect(ctrl.Start(ctx)).NotTo(HaveOccurred())
   359  			}()
   360  
   361  			queue.Add(request)
   362  
   363  			By("Invoking Reconciler which will give an error")
   364  			fakeReconcile.AddResult(reconcile.Result{}, fmt.Errorf("expected error: reconcile"))
   365  			Expect(<-reconciled).To(Equal(request))
   366  			queue.AddedRateLimitedLock.Lock()
   367  			Expect(queue.AddedRatelimited).To(Equal([]any{request}))
   368  			queue.AddedRateLimitedLock.Unlock()
   369  
   370  			By("Invoking Reconciler a second time without error")
   371  			fakeReconcile.AddResult(reconcile.Result{}, nil)
   372  			Expect(<-reconciled).To(Equal(request))
   373  
   374  			By("Removing the item from the queue")
   375  			Eventually(queue.Len).Should(Equal(0))
   376  			Eventually(func() int { return queue.NumRequeues(request) }, 1.0).Should(Equal(0))
   377  		})
   378  
   379  		It("should not requeue a Request if there is a terminal error", func() {
   380  			ctx, cancel := context.WithCancel(context.Background())
   381  			defer cancel()
   382  			go func() {
   383  				defer GinkgoRecover()
   384  				Expect(ctrl.Start(ctx)).NotTo(HaveOccurred())
   385  			}()
   386  
   387  			queue.Add(request)
   388  
   389  			By("Invoking Reconciler which will give an error")
   390  			fakeReconcile.AddResult(reconcile.Result{}, reconcile.TerminalError(fmt.Errorf("expected error: reconcile")))
   391  			Expect(<-reconciled).To(Equal(request))
   392  
   393  			queue.AddedRateLimitedLock.Lock()
   394  			Expect(queue.AddedRatelimited).To(BeEmpty())
   395  			queue.AddedRateLimitedLock.Unlock()
   396  
   397  			Expect(queue.Len()).Should(Equal(0))
   398  		})
   399  
   400  		// TODO(directxman12): we should ensure that backoff occurrs with error requeue
   401  
   402  		It("should not reset backoff until there's a non-error result", func() {
   403  			dq := &DelegatingQueue{RateLimitingInterface: ctrl.NewQueue("controller1", nil)}
   404  			ctrl.NewQueue = func(string, ratelimiter.RateLimiter) workqueue.RateLimitingInterface { return dq }
   405  
   406  			ctx, cancel := context.WithCancel(context.Background())
   407  			defer cancel()
   408  			go func() {
   409  				defer GinkgoRecover()
   410  				Expect(ctrl.Start(ctx)).NotTo(HaveOccurred())
   411  			}()
   412  
   413  			dq.Add(request)
   414  			Expect(dq.getCounts()).To(Equal(countInfo{Trying: 1}))
   415  
   416  			By("Invoking Reconciler which returns an error")
   417  			fakeReconcile.AddResult(reconcile.Result{}, fmt.Errorf("something's wrong"))
   418  			Expect(<-reconciled).To(Equal(request))
   419  			Eventually(dq.getCounts).Should(Equal(countInfo{Trying: 1, AddRateLimited: 1}))
   420  
   421  			By("Invoking Reconciler a second time with an error")
   422  			fakeReconcile.AddResult(reconcile.Result{}, fmt.Errorf("another thing's wrong"))
   423  			Expect(<-reconciled).To(Equal(request))
   424  
   425  			Eventually(dq.getCounts).Should(Equal(countInfo{Trying: 1, AddRateLimited: 2}))
   426  
   427  			By("Invoking Reconciler a third time, where it finally does not return an error")
   428  			fakeReconcile.AddResult(reconcile.Result{}, nil)
   429  			Expect(<-reconciled).To(Equal(request))
   430  
   431  			Eventually(dq.getCounts).Should(Equal(countInfo{Trying: 0, AddRateLimited: 2}))
   432  
   433  			By("Removing the item from the queue")
   434  			Eventually(dq.Len).Should(Equal(0))
   435  			Eventually(func() int { return dq.NumRequeues(request) }).Should(Equal(0))
   436  		})
   437  
   438  		It("should requeue a Request with rate limiting if the Result sets Requeue:true and continue processing items", func() {
   439  			dq := &DelegatingQueue{RateLimitingInterface: ctrl.NewQueue("controller1", nil)}
   440  			ctrl.NewQueue = func(string, ratelimiter.RateLimiter) workqueue.RateLimitingInterface { return dq }
   441  
   442  			ctx, cancel := context.WithCancel(context.Background())
   443  			defer cancel()
   444  			go func() {
   445  				defer GinkgoRecover()
   446  				Expect(ctrl.Start(ctx)).NotTo(HaveOccurred())
   447  			}()
   448  
   449  			dq.Add(request)
   450  			Expect(dq.getCounts()).To(Equal(countInfo{Trying: 1}))
   451  
   452  			By("Invoking Reconciler which will ask for requeue")
   453  			fakeReconcile.AddResult(reconcile.Result{Requeue: true}, nil)
   454  			Expect(<-reconciled).To(Equal(request))
   455  			Eventually(dq.getCounts).Should(Equal(countInfo{Trying: 1, AddRateLimited: 1}))
   456  
   457  			By("Invoking Reconciler a second time without asking for requeue")
   458  			fakeReconcile.AddResult(reconcile.Result{Requeue: false}, nil)
   459  			Expect(<-reconciled).To(Equal(request))
   460  
   461  			Eventually(dq.getCounts).Should(Equal(countInfo{Trying: 0, AddRateLimited: 1}))
   462  
   463  			By("Removing the item from the queue")
   464  			Eventually(dq.Len).Should(Equal(0))
   465  			Eventually(func() int { return dq.NumRequeues(request) }).Should(Equal(0))
   466  		})
   467  
   468  		It("should requeue a Request after a duration (but not rate-limitted) if the Result sets RequeueAfter (regardless of Requeue)", func() {
   469  			dq := &DelegatingQueue{RateLimitingInterface: ctrl.NewQueue("controller1", nil)}
   470  			ctrl.NewQueue = func(string, ratelimiter.RateLimiter) workqueue.RateLimitingInterface { return dq }
   471  
   472  			ctx, cancel := context.WithCancel(context.Background())
   473  			defer cancel()
   474  			go func() {
   475  				defer GinkgoRecover()
   476  				Expect(ctrl.Start(ctx)).NotTo(HaveOccurred())
   477  			}()
   478  
   479  			dq.Add(request)
   480  			Expect(dq.getCounts()).To(Equal(countInfo{Trying: 1}))
   481  
   482  			By("Invoking Reconciler which will ask for requeue & requeueafter")
   483  			fakeReconcile.AddResult(reconcile.Result{RequeueAfter: time.Millisecond * 100, Requeue: true}, nil)
   484  			Expect(<-reconciled).To(Equal(request))
   485  			Eventually(dq.getCounts).Should(Equal(countInfo{Trying: 0, AddAfter: 1}))
   486  
   487  			By("Invoking Reconciler a second time asking for a requeueafter only")
   488  			fakeReconcile.AddResult(reconcile.Result{RequeueAfter: time.Millisecond * 100}, nil)
   489  			Expect(<-reconciled).To(Equal(request))
   490  
   491  			Eventually(dq.getCounts).Should(Equal(countInfo{Trying: -1 /* we don't increment the count in addafter */, AddAfter: 2}))
   492  
   493  			By("Removing the item from the queue")
   494  			Eventually(dq.Len).Should(Equal(0))
   495  			Eventually(func() int { return dq.NumRequeues(request) }).Should(Equal(0))
   496  		})
   497  
   498  		It("should perform error behavior if error is not nil, regardless of RequeueAfter", func() {
   499  			dq := &DelegatingQueue{RateLimitingInterface: ctrl.NewQueue("controller1", nil)}
   500  			ctrl.NewQueue = func(string, ratelimiter.RateLimiter) workqueue.RateLimitingInterface { return dq }
   501  
   502  			ctx, cancel := context.WithCancel(context.Background())
   503  			defer cancel()
   504  			go func() {
   505  				defer GinkgoRecover()
   506  				Expect(ctrl.Start(ctx)).NotTo(HaveOccurred())
   507  			}()
   508  
   509  			dq.Add(request)
   510  			Expect(dq.getCounts()).To(Equal(countInfo{Trying: 1}))
   511  
   512  			By("Invoking Reconciler which will ask for requeueafter with an error")
   513  			fakeReconcile.AddResult(reconcile.Result{RequeueAfter: time.Millisecond * 100}, fmt.Errorf("expected error: reconcile"))
   514  			Expect(<-reconciled).To(Equal(request))
   515  			Eventually(dq.getCounts).Should(Equal(countInfo{Trying: 1, AddRateLimited: 1}))
   516  
   517  			By("Invoking Reconciler a second time asking for requeueafter without errors")
   518  			fakeReconcile.AddResult(reconcile.Result{RequeueAfter: time.Millisecond * 100}, nil)
   519  			Expect(<-reconciled).To(Equal(request))
   520  			Eventually(dq.getCounts).Should(Equal(countInfo{AddAfter: 1, AddRateLimited: 1}))
   521  
   522  			By("Removing the item from the queue")
   523  			Eventually(dq.Len).Should(Equal(0))
   524  			Eventually(func() int { return dq.NumRequeues(request) }).Should(Equal(0))
   525  		})
   526  
   527  		PIt("should return if the queue is shutdown", func() {
   528  			// TODO(community): write this test
   529  		})
   530  
   531  		PIt("should wait for informers to be synced before processing items", func() {
   532  			// TODO(community): write this test
   533  		})
   534  
   535  		PIt("should create a new go routine for MaxConcurrentReconciles", func() {
   536  			// TODO(community): write this test
   537  		})
   538  
   539  		Context("prometheus metric reconcile_total", func() {
   540  			var reconcileTotal dto.Metric
   541  
   542  			BeforeEach(func() {
   543  				ctrlmetrics.ReconcileTotal.Reset()
   544  				reconcileTotal.Reset()
   545  			})
   546  
   547  			It("should get updated on successful reconciliation", func() {
   548  				Expect(func() error {
   549  					Expect(ctrlmetrics.ReconcileTotal.WithLabelValues(ctrl.Name, "success").Write(&reconcileTotal)).To(Succeed())
   550  					if reconcileTotal.GetCounter().GetValue() != 0.0 {
   551  						return fmt.Errorf("metric reconcile total not reset")
   552  					}
   553  					return nil
   554  				}()).Should(Succeed())
   555  
   556  				ctx, cancel := context.WithCancel(context.Background())
   557  				defer cancel()
   558  				go func() {
   559  					defer GinkgoRecover()
   560  					Expect(ctrl.Start(ctx)).NotTo(HaveOccurred())
   561  				}()
   562  				By("Invoking Reconciler which will succeed")
   563  				queue.Add(request)
   564  
   565  				fakeReconcile.AddResult(reconcile.Result{}, nil)
   566  				Expect(<-reconciled).To(Equal(request))
   567  				Eventually(func() error {
   568  					Expect(ctrlmetrics.ReconcileTotal.WithLabelValues(ctrl.Name, "success").Write(&reconcileTotal)).To(Succeed())
   569  					if actual := reconcileTotal.GetCounter().GetValue(); actual != 1.0 {
   570  						return fmt.Errorf("metric reconcile total expected: %v and got: %v", 1.0, actual)
   571  					}
   572  					return nil
   573  				}, 2.0).Should(Succeed())
   574  			})
   575  
   576  			It("should get updated on reconcile errors", func() {
   577  				Expect(func() error {
   578  					Expect(ctrlmetrics.ReconcileTotal.WithLabelValues(ctrl.Name, "error").Write(&reconcileTotal)).To(Succeed())
   579  					if reconcileTotal.GetCounter().GetValue() != 0.0 {
   580  						return fmt.Errorf("metric reconcile total not reset")
   581  					}
   582  					return nil
   583  				}()).Should(Succeed())
   584  
   585  				ctx, cancel := context.WithCancel(context.Background())
   586  				defer cancel()
   587  				go func() {
   588  					defer GinkgoRecover()
   589  					Expect(ctrl.Start(ctx)).NotTo(HaveOccurred())
   590  				}()
   591  				By("Invoking Reconciler which will give an error")
   592  				queue.Add(request)
   593  
   594  				fakeReconcile.AddResult(reconcile.Result{}, fmt.Errorf("expected error: reconcile"))
   595  				Expect(<-reconciled).To(Equal(request))
   596  				Eventually(func() error {
   597  					Expect(ctrlmetrics.ReconcileTotal.WithLabelValues(ctrl.Name, "error").Write(&reconcileTotal)).To(Succeed())
   598  					if actual := reconcileTotal.GetCounter().GetValue(); actual != 1.0 {
   599  						return fmt.Errorf("metric reconcile total expected: %v and got: %v", 1.0, actual)
   600  					}
   601  					return nil
   602  				}, 2.0).Should(Succeed())
   603  			})
   604  
   605  			It("should get updated when reconcile returns with retry enabled", func() {
   606  				Expect(func() error {
   607  					Expect(ctrlmetrics.ReconcileTotal.WithLabelValues(ctrl.Name, "retry").Write(&reconcileTotal)).To(Succeed())
   608  					if reconcileTotal.GetCounter().GetValue() != 0.0 {
   609  						return fmt.Errorf("metric reconcile total not reset")
   610  					}
   611  					return nil
   612  				}()).Should(Succeed())
   613  
   614  				ctx, cancel := context.WithCancel(context.Background())
   615  				defer cancel()
   616  				go func() {
   617  					defer GinkgoRecover()
   618  					Expect(ctrl.Start(ctx)).NotTo(HaveOccurred())
   619  				}()
   620  
   621  				By("Invoking Reconciler which will return result with Requeue enabled")
   622  				queue.Add(request)
   623  
   624  				fakeReconcile.AddResult(reconcile.Result{Requeue: true}, nil)
   625  				Expect(<-reconciled).To(Equal(request))
   626  				Eventually(func() error {
   627  					Expect(ctrlmetrics.ReconcileTotal.WithLabelValues(ctrl.Name, "requeue").Write(&reconcileTotal)).To(Succeed())
   628  					if actual := reconcileTotal.GetCounter().GetValue(); actual != 1.0 {
   629  						return fmt.Errorf("metric reconcile total expected: %v and got: %v", 1.0, actual)
   630  					}
   631  					return nil
   632  				}, 2.0).Should(Succeed())
   633  			})
   634  
   635  			It("should get updated when reconcile returns with retryAfter enabled", func() {
   636  				Expect(func() error {
   637  					Expect(ctrlmetrics.ReconcileTotal.WithLabelValues(ctrl.Name, "retry_after").Write(&reconcileTotal)).To(Succeed())
   638  					if reconcileTotal.GetCounter().GetValue() != 0.0 {
   639  						return fmt.Errorf("metric reconcile total not reset")
   640  					}
   641  					return nil
   642  				}()).Should(Succeed())
   643  
   644  				ctx, cancel := context.WithCancel(context.Background())
   645  				defer cancel()
   646  				go func() {
   647  					defer GinkgoRecover()
   648  					Expect(ctrl.Start(ctx)).NotTo(HaveOccurred())
   649  				}()
   650  				By("Invoking Reconciler which will return result with requeueAfter enabled")
   651  				queue.Add(request)
   652  
   653  				fakeReconcile.AddResult(reconcile.Result{RequeueAfter: 5 * time.Hour}, nil)
   654  				Expect(<-reconciled).To(Equal(request))
   655  				Eventually(func() error {
   656  					Expect(ctrlmetrics.ReconcileTotal.WithLabelValues(ctrl.Name, "requeue_after").Write(&reconcileTotal)).To(Succeed())
   657  					if actual := reconcileTotal.GetCounter().GetValue(); actual != 1.0 {
   658  						return fmt.Errorf("metric reconcile total expected: %v and got: %v", 1.0, actual)
   659  					}
   660  					return nil
   661  				}, 2.0).Should(Succeed())
   662  			})
   663  		})
   664  
   665  		Context("should update prometheus metrics", func() {
   666  			It("should requeue a Request if there is an error and continue processing items", func() {
   667  				var reconcileErrs dto.Metric
   668  				ctrlmetrics.ReconcileErrors.Reset()
   669  				Expect(func() error {
   670  					Expect(ctrlmetrics.ReconcileErrors.WithLabelValues(ctrl.Name).Write(&reconcileErrs)).To(Succeed())
   671  					if reconcileErrs.GetCounter().GetValue() != 0.0 {
   672  						return fmt.Errorf("metric reconcile errors not reset")
   673  					}
   674  					return nil
   675  				}()).Should(Succeed())
   676  
   677  				ctx, cancel := context.WithCancel(context.Background())
   678  				defer cancel()
   679  				go func() {
   680  					defer GinkgoRecover()
   681  					Expect(ctrl.Start(ctx)).NotTo(HaveOccurred())
   682  				}()
   683  				queue.Add(request)
   684  
   685  				By("Invoking Reconciler which will give an error")
   686  				fakeReconcile.AddResult(reconcile.Result{}, fmt.Errorf("expected error: reconcile"))
   687  				Expect(<-reconciled).To(Equal(request))
   688  				Eventually(func() error {
   689  					Expect(ctrlmetrics.ReconcileErrors.WithLabelValues(ctrl.Name).Write(&reconcileErrs)).To(Succeed())
   690  					if reconcileErrs.GetCounter().GetValue() != 1.0 {
   691  						return fmt.Errorf("metrics not updated")
   692  					}
   693  					return nil
   694  				}, 2.0).Should(Succeed())
   695  
   696  				By("Invoking Reconciler a second time without error")
   697  				fakeReconcile.AddResult(reconcile.Result{}, nil)
   698  				Expect(<-reconciled).To(Equal(request))
   699  
   700  				By("Removing the item from the queue")
   701  				Eventually(queue.Len).Should(Equal(0))
   702  				Eventually(func() int { return queue.NumRequeues(request) }).Should(Equal(0))
   703  			})
   704  
   705  			It("should add a reconcile time to the reconcile time histogram", func() {
   706  				var reconcileTime dto.Metric
   707  				ctrlmetrics.ReconcileTime.Reset()
   708  
   709  				Expect(func() error {
   710  					histObserver := ctrlmetrics.ReconcileTime.WithLabelValues(ctrl.Name)
   711  					hist := histObserver.(prometheus.Histogram)
   712  					Expect(hist.Write(&reconcileTime)).To(Succeed())
   713  					if reconcileTime.GetHistogram().GetSampleCount() != uint64(0) {
   714  						return fmt.Errorf("metrics not reset")
   715  					}
   716  					return nil
   717  				}()).Should(Succeed())
   718  
   719  				ctx, cancel := context.WithCancel(context.Background())
   720  				defer cancel()
   721  				go func() {
   722  					defer GinkgoRecover()
   723  					Expect(ctrl.Start(ctx)).NotTo(HaveOccurred())
   724  				}()
   725  				queue.Add(request)
   726  
   727  				By("Invoking Reconciler")
   728  				fakeReconcile.AddResult(reconcile.Result{}, nil)
   729  				Expect(<-reconciled).To(Equal(request))
   730  
   731  				By("Removing the item from the queue")
   732  				Eventually(queue.Len).Should(Equal(0))
   733  				Eventually(func() int { return queue.NumRequeues(request) }).Should(Equal(0))
   734  
   735  				Eventually(func() error {
   736  					histObserver := ctrlmetrics.ReconcileTime.WithLabelValues(ctrl.Name)
   737  					hist := histObserver.(prometheus.Histogram)
   738  					Expect(hist.Write(&reconcileTime)).To(Succeed())
   739  					if reconcileTime.GetHistogram().GetSampleCount() == uint64(0) {
   740  						return fmt.Errorf("metrics not updated")
   741  					}
   742  					return nil
   743  				}, 2.0).Should(Succeed())
   744  			})
   745  		})
   746  	})
   747  })
   748  
   749  var _ = Describe("ReconcileIDFromContext function", func() {
   750  	It("should return an empty string if there is nothing in the context", func() {
   751  		ctx := context.Background()
   752  		reconcileID := ReconcileIDFromContext(ctx)
   753  
   754  		Expect(reconcileID).To(Equal(types.UID("")))
   755  	})
   756  
   757  	It("should return the correct reconcileID from context", func() {
   758  		const expectedReconcileID = types.UID("uuid")
   759  		ctx := addReconcileID(context.Background(), expectedReconcileID)
   760  		reconcileID := ReconcileIDFromContext(ctx)
   761  
   762  		Expect(reconcileID).To(Equal(expectedReconcileID))
   763  	})
   764  })
   765  
   766  type DelegatingQueue struct {
   767  	workqueue.RateLimitingInterface
   768  	mu sync.Mutex
   769  
   770  	countAddRateLimited int
   771  	countAdd            int
   772  	countAddAfter       int
   773  }
   774  
   775  func (q *DelegatingQueue) AddRateLimited(item interface{}) {
   776  	q.mu.Lock()
   777  	defer q.mu.Unlock()
   778  
   779  	q.countAddRateLimited++
   780  	q.RateLimitingInterface.AddRateLimited(item)
   781  }
   782  
   783  func (q *DelegatingQueue) AddAfter(item interface{}, d time.Duration) {
   784  	q.mu.Lock()
   785  	defer q.mu.Unlock()
   786  
   787  	q.countAddAfter++
   788  	q.RateLimitingInterface.AddAfter(item, d)
   789  }
   790  
   791  func (q *DelegatingQueue) Add(item interface{}) {
   792  	q.mu.Lock()
   793  	defer q.mu.Unlock()
   794  	q.countAdd++
   795  
   796  	q.RateLimitingInterface.Add(item)
   797  }
   798  
   799  func (q *DelegatingQueue) Forget(item interface{}) {
   800  	q.mu.Lock()
   801  	defer q.mu.Unlock()
   802  	q.countAdd--
   803  
   804  	q.RateLimitingInterface.Forget(item)
   805  }
   806  
   807  type countInfo struct {
   808  	Trying, AddAfter, AddRateLimited int
   809  }
   810  
   811  func (q *DelegatingQueue) getCounts() countInfo {
   812  	q.mu.Lock()
   813  	defer q.mu.Unlock()
   814  
   815  	return countInfo{
   816  		Trying:         q.countAdd,
   817  		AddAfter:       q.countAddAfter,
   818  		AddRateLimited: q.countAddRateLimited,
   819  	}
   820  }
   821  
   822  type fakeReconcileResultPair struct {
   823  	Result reconcile.Result
   824  	Err    error
   825  }
   826  
   827  type fakeReconciler struct {
   828  	Requests chan reconcile.Request
   829  	results  chan fakeReconcileResultPair
   830  }
   831  
   832  func (f *fakeReconciler) AddResult(res reconcile.Result, err error) {
   833  	f.results <- fakeReconcileResultPair{Result: res, Err: err}
   834  }
   835  
   836  func (f *fakeReconciler) Reconcile(_ context.Context, r reconcile.Request) (reconcile.Result, error) {
   837  	res := <-f.results
   838  	if f.Requests != nil {
   839  		f.Requests <- r
   840  	}
   841  	return res.Result, res.Err
   842  }
   843  
   844  type singnallingSourceWrapper struct {
   845  	cacheSyncDone chan struct{}
   846  	source.SyncingSource
   847  }
   848  
   849  func (s *singnallingSourceWrapper) WaitForSync(ctx context.Context) error {
   850  	defer func() {
   851  		close(s.cacheSyncDone)
   852  	}()
   853  	return s.SyncingSource.WaitForSync(ctx)
   854  }
   855  
   856  var _ cache.Cache = &cacheWithIndefinitelyBlockingGetInformer{}
   857  
   858  // cacheWithIndefinitelyBlockingGetInformer has a GetInformer implementation that blocks indefinitely or until its
   859  // context is cancelled.
   860  // We need it as a workaround for testenvs lack of support for a secure apiserver, because the insecure port always
   861  // implies the allow all authorizer, so we can not simulate rbac issues with it. They are the usual cause of the real
   862  // caches GetInformer blocking showing this behavior.
   863  // TODO: Remove this once envtest supports a secure apiserver.
   864  type cacheWithIndefinitelyBlockingGetInformer struct {
   865  	cache.Cache
   866  }
   867  
   868  func (c *cacheWithIndefinitelyBlockingGetInformer) GetInformer(ctx context.Context, obj client.Object, opts ...cache.InformerGetOption) (cache.Informer, error) {
   869  	<-ctx.Done()
   870  	return nil, errors.New("GetInformer timed out")
   871  }
   872  

View as plain text