...

Source file src/sigs.k8s.io/controller-runtime/pkg/manager/manager_test.go

Documentation: sigs.k8s.io/controller-runtime/pkg/manager

     1  /*
     2  Copyright 2018 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package manager
    18  
    19  import (
    20  	"context"
    21  	"errors"
    22  	"fmt"
    23  	"io"
    24  	"net"
    25  	"net/http"
    26  	"path"
    27  	"sync"
    28  	"sync/atomic"
    29  	"time"
    30  
    31  	"github.com/go-logr/logr"
    32  	. "github.com/onsi/ginkgo/v2"
    33  	. "github.com/onsi/gomega"
    34  	"github.com/prometheus/client_golang/prometheus"
    35  	"go.uber.org/goleak"
    36  	coordinationv1 "k8s.io/api/coordination/v1"
    37  	corev1 "k8s.io/api/core/v1"
    38  	"k8s.io/apimachinery/pkg/api/meta"
    39  	"k8s.io/apimachinery/pkg/runtime"
    40  	"k8s.io/apimachinery/pkg/runtime/schema"
    41  	"k8s.io/client-go/rest"
    42  	"k8s.io/client-go/tools/leaderelection/resourcelock"
    43  	"sigs.k8s.io/controller-runtime/pkg/cache"
    44  	"sigs.k8s.io/controller-runtime/pkg/cache/informertest"
    45  	"sigs.k8s.io/controller-runtime/pkg/client"
    46  	"sigs.k8s.io/controller-runtime/pkg/cluster"
    47  	intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder"
    48  	"sigs.k8s.io/controller-runtime/pkg/leaderelection"
    49  	fakeleaderelection "sigs.k8s.io/controller-runtime/pkg/leaderelection/fake"
    50  	"sigs.k8s.io/controller-runtime/pkg/metrics"
    51  	metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
    52  	"sigs.k8s.io/controller-runtime/pkg/recorder"
    53  	"sigs.k8s.io/controller-runtime/pkg/webhook"
    54  )
    55  
    56  var _ = Describe("manger.Manager", func() {
    57  	Describe("New", func() {
    58  		It("should return an error if there is no Config", func() {
    59  			m, err := New(nil, Options{})
    60  			Expect(m).To(BeNil())
    61  			Expect(err.Error()).To(ContainSubstring("must specify Config"))
    62  
    63  		})
    64  
    65  		It("should return an error if it can't create a RestMapper", func() {
    66  			expected := fmt.Errorf("expected error: RestMapper")
    67  			m, err := New(cfg, Options{
    68  				MapperProvider: func(c *rest.Config, httpClient *http.Client) (meta.RESTMapper, error) { return nil, expected },
    69  			})
    70  			Expect(m).To(BeNil())
    71  			Expect(err).To(Equal(expected))
    72  
    73  		})
    74  
    75  		It("should return an error it can't create a client.Client", func() {
    76  			m, err := New(cfg, Options{
    77  				NewClient: func(config *rest.Config, options client.Options) (client.Client, error) {
    78  					return nil, errors.New("expected error")
    79  				},
    80  			})
    81  			Expect(m).To(BeNil())
    82  			Expect(err).To(HaveOccurred())
    83  			Expect(err.Error()).To(ContainSubstring("expected error"))
    84  		})
    85  
    86  		It("should return an error it can't create a cache.Cache", func() {
    87  			m, err := New(cfg, Options{
    88  				NewCache: func(config *rest.Config, opts cache.Options) (cache.Cache, error) {
    89  					return nil, fmt.Errorf("expected error")
    90  				},
    91  			})
    92  			Expect(m).To(BeNil())
    93  			Expect(err).To(HaveOccurred())
    94  			Expect(err.Error()).To(ContainSubstring("expected error"))
    95  		})
    96  
    97  		It("should create a client defined in by the new client function", func() {
    98  			m, err := New(cfg, Options{
    99  				NewClient: func(config *rest.Config, options client.Options) (client.Client, error) {
   100  					return nil, nil
   101  				},
   102  			})
   103  			Expect(m).ToNot(BeNil())
   104  			Expect(err).ToNot(HaveOccurred())
   105  			Expect(m.GetClient()).To(BeNil())
   106  		})
   107  
   108  		It("should return an error it can't create a recorder.Provider", func() {
   109  			m, err := New(cfg, Options{
   110  				newRecorderProvider: func(_ *rest.Config, _ *http.Client, _ *runtime.Scheme, _ logr.Logger, _ intrec.EventBroadcasterProducer) (*intrec.Provider, error) {
   111  					return nil, fmt.Errorf("expected error")
   112  				},
   113  			})
   114  			Expect(m).To(BeNil())
   115  			Expect(err).To(HaveOccurred())
   116  			Expect(err.Error()).To(ContainSubstring("expected error"))
   117  		})
   118  
   119  		It("should lazily initialize a webhook server if needed", func() {
   120  			By("creating a manager with options")
   121  			m, err := New(cfg, Options{WebhookServer: webhook.NewServer(webhook.Options{Port: 9440, Host: "foo.com"})})
   122  			Expect(err).NotTo(HaveOccurred())
   123  			Expect(m).NotTo(BeNil())
   124  
   125  			By("checking options are passed to the webhook server")
   126  			svr := m.GetWebhookServer()
   127  			Expect(svr).NotTo(BeNil())
   128  			Expect(svr.(*webhook.DefaultServer).Options.Port).To(Equal(9440))
   129  			Expect(svr.(*webhook.DefaultServer).Options.Host).To(Equal("foo.com"))
   130  		})
   131  
   132  		It("should not initialize a webhook server if Options.WebhookServer is set", func() {
   133  			By("creating a manager with options")
   134  			srv := webhook.NewServer(webhook.Options{Port: 9440})
   135  			m, err := New(cfg, Options{WebhookServer: srv})
   136  			Expect(err).NotTo(HaveOccurred())
   137  			Expect(m).NotTo(BeNil())
   138  
   139  			By("checking the server contains the Port set on the webhook server and not passed to Options")
   140  			svr := m.GetWebhookServer()
   141  			Expect(svr).NotTo(BeNil())
   142  			Expect(svr).To(Equal(srv))
   143  			Expect(svr.(*webhook.DefaultServer).Options.Port).To(Equal(9440))
   144  		})
   145  
   146  		It("should allow passing a custom webhook.Server implementation", func() {
   147  			type customWebhook struct {
   148  				webhook.Server
   149  			}
   150  			m, err := New(cfg, Options{WebhookServer: customWebhook{}})
   151  			Expect(err).NotTo(HaveOccurred())
   152  			Expect(m).NotTo(BeNil())
   153  
   154  			svr := m.GetWebhookServer()
   155  			Expect(svr).NotTo(BeNil())
   156  
   157  			_, isCustomWebhook := svr.(customWebhook)
   158  			Expect(isCustomWebhook).To(BeTrue())
   159  		})
   160  
   161  		Context("with leader election enabled", func() {
   162  			It("should only cancel the leader election after all runnables are done", func() {
   163  				m, err := New(cfg, Options{
   164  					LeaderElection:          true,
   165  					LeaderElectionNamespace: "default",
   166  					LeaderElectionID:        "test-leader-election-id-2",
   167  					HealthProbeBindAddress:  "0",
   168  					Metrics:                 metricsserver.Options{BindAddress: "0"},
   169  					PprofBindAddress:        "0",
   170  				})
   171  				Expect(err).ToNot(HaveOccurred())
   172  				gvkcorev1 := schema.GroupVersionKind{Group: corev1.SchemeGroupVersion.Group, Version: corev1.SchemeGroupVersion.Version, Kind: "ConfigMap"}
   173  				gvkcoordinationv1 := schema.GroupVersionKind{Group: coordinationv1.SchemeGroupVersion.Group, Version: coordinationv1.SchemeGroupVersion.Version, Kind: "Lease"}
   174  				Expect(m.GetScheme().Recognizes(gvkcorev1)).To(BeTrue())
   175  				Expect(m.GetScheme().Recognizes(gvkcoordinationv1)).To(BeTrue())
   176  				runnableDone := make(chan struct{})
   177  				slowRunnable := RunnableFunc(func(ctx context.Context) error {
   178  					<-ctx.Done()
   179  					time.Sleep(100 * time.Millisecond)
   180  					close(runnableDone)
   181  					return nil
   182  				})
   183  				Expect(m.Add(slowRunnable)).To(Succeed())
   184  
   185  				cm := m.(*controllerManager)
   186  				cm.gracefulShutdownTimeout = time.Second
   187  				leaderElectionDone := make(chan struct{})
   188  				cm.onStoppedLeading = func() {
   189  					close(leaderElectionDone)
   190  				}
   191  
   192  				ctx, cancel := context.WithCancel(context.Background())
   193  				mgrDone := make(chan struct{})
   194  				go func() {
   195  					defer GinkgoRecover()
   196  					Expect(m.Start(ctx)).To(Succeed())
   197  					close(mgrDone)
   198  				}()
   199  				<-cm.Elected()
   200  				cancel()
   201  				select {
   202  				case <-leaderElectionDone:
   203  					Expect(errors.New("leader election was cancelled before runnables were done")).ToNot(HaveOccurred())
   204  				case <-runnableDone:
   205  					// Success
   206  				}
   207  				// Don't leak routines
   208  				<-mgrDone
   209  
   210  			})
   211  			It("should disable gracefulShutdown when stopping to lead", func() {
   212  				m, err := New(cfg, Options{
   213  					LeaderElection:          true,
   214  					LeaderElectionNamespace: "default",
   215  					LeaderElectionID:        "test-leader-election-id-3",
   216  					HealthProbeBindAddress:  "0",
   217  					Metrics:                 metricsserver.Options{BindAddress: "0"},
   218  					PprofBindAddress:        "0",
   219  				})
   220  				Expect(err).ToNot(HaveOccurred())
   221  
   222  				ctx, cancel := context.WithCancel(context.Background())
   223  				defer cancel()
   224  				mgrDone := make(chan struct{})
   225  				go func() {
   226  					defer GinkgoRecover()
   227  					err := m.Start(ctx)
   228  					Expect(err).To(HaveOccurred())
   229  					Expect(err.Error()).To(ContainSubstring("leader election lost"))
   230  					close(mgrDone)
   231  				}()
   232  				cm := m.(*controllerManager)
   233  				<-cm.elected
   234  
   235  				cm.leaderElectionCancel()
   236  				<-mgrDone
   237  
   238  				Expect(cm.gracefulShutdownTimeout.Nanoseconds()).To(Equal(int64(0)))
   239  			})
   240  
   241  			It("should prevent leader election when shutting down a non-elected manager", func() {
   242  				var rl resourcelock.Interface
   243  				m1, err := New(cfg, Options{
   244  					LeaderElection:          true,
   245  					LeaderElectionNamespace: "default",
   246  					LeaderElectionID:        "test-leader-election-id",
   247  					newResourceLock: func(config *rest.Config, recorderProvider recorder.Provider, options leaderelection.Options) (resourcelock.Interface, error) {
   248  						var err error
   249  						rl, err = leaderelection.NewResourceLock(config, recorderProvider, options)
   250  						return rl, err
   251  					},
   252  					HealthProbeBindAddress: "0",
   253  					Metrics:                metricsserver.Options{BindAddress: "0"},
   254  					PprofBindAddress:       "0",
   255  				})
   256  				Expect(err).ToNot(HaveOccurred())
   257  				Expect(m1).ToNot(BeNil())
   258  				Expect(rl.Describe()).To(Equal("default/test-leader-election-id"))
   259  
   260  				m1cm, ok := m1.(*controllerManager)
   261  				Expect(ok).To(BeTrue())
   262  				m1cm.onStoppedLeading = func() {}
   263  
   264  				m2, err := New(cfg, Options{
   265  					LeaderElection:          true,
   266  					LeaderElectionNamespace: "default",
   267  					LeaderElectionID:        "test-leader-election-id",
   268  					newResourceLock: func(config *rest.Config, recorderProvider recorder.Provider, options leaderelection.Options) (resourcelock.Interface, error) {
   269  						var err error
   270  						rl, err = leaderelection.NewResourceLock(config, recorderProvider, options)
   271  						return rl, err
   272  					},
   273  					HealthProbeBindAddress: "0",
   274  					Metrics:                metricsserver.Options{BindAddress: "0"},
   275  					PprofBindAddress:       "0",
   276  				})
   277  				Expect(err).ToNot(HaveOccurred())
   278  				Expect(m2).ToNot(BeNil())
   279  				Expect(rl.Describe()).To(Equal("default/test-leader-election-id"))
   280  
   281  				m1done := make(chan struct{})
   282  				Expect(m1.Add(RunnableFunc(func(ctx context.Context) error {
   283  					defer GinkgoRecover()
   284  					close(m1done)
   285  					return nil
   286  				}))).To(Succeed())
   287  
   288  				ctx1, cancel1 := context.WithCancel(context.Background())
   289  				defer cancel1()
   290  				go func() {
   291  					defer GinkgoRecover()
   292  					Expect(m1.Elected()).ShouldNot(BeClosed())
   293  					Expect(m1.Start(ctx1)).NotTo(HaveOccurred())
   294  				}()
   295  				<-m1.Elected()
   296  				<-m1done
   297  
   298  				electionRunnable := &needElection{make(chan struct{})}
   299  
   300  				Expect(m2.Add(electionRunnable)).To(Succeed())
   301  
   302  				ctx2, cancel2 := context.WithCancel(context.Background())
   303  				m2done := make(chan struct{})
   304  				go func() {
   305  					defer GinkgoRecover()
   306  					Expect(m2.Start(ctx2)).NotTo(HaveOccurred())
   307  					close(m2done)
   308  				}()
   309  				Consistently(m2.Elected()).ShouldNot(Receive())
   310  
   311  				go func() {
   312  					defer GinkgoRecover()
   313  					Consistently(electionRunnable.ch).ShouldNot(Receive())
   314  				}()
   315  				cancel2()
   316  				<-m2done
   317  			})
   318  
   319  			It("should default ID to controller-runtime if ID is not set", func() {
   320  				var rl resourcelock.Interface
   321  				m1, err := New(cfg, Options{
   322  					LeaderElection:          true,
   323  					LeaderElectionNamespace: "default",
   324  					LeaderElectionID:        "test-leader-election-id",
   325  					newResourceLock: func(config *rest.Config, recorderProvider recorder.Provider, options leaderelection.Options) (resourcelock.Interface, error) {
   326  						var err error
   327  						rl, err = leaderelection.NewResourceLock(config, recorderProvider, options)
   328  						return rl, err
   329  					},
   330  					HealthProbeBindAddress: "0",
   331  					Metrics:                metricsserver.Options{BindAddress: "0"},
   332  					PprofBindAddress:       "0",
   333  				})
   334  				Expect(err).ToNot(HaveOccurred())
   335  				Expect(m1).ToNot(BeNil())
   336  				Expect(rl.Describe()).To(Equal("default/test-leader-election-id"))
   337  
   338  				m1cm, ok := m1.(*controllerManager)
   339  				Expect(ok).To(BeTrue())
   340  				m1cm.onStoppedLeading = func() {}
   341  
   342  				m2, err := New(cfg, Options{
   343  					LeaderElection:          true,
   344  					LeaderElectionNamespace: "default",
   345  					LeaderElectionID:        "test-leader-election-id",
   346  					newResourceLock: func(config *rest.Config, recorderProvider recorder.Provider, options leaderelection.Options) (resourcelock.Interface, error) {
   347  						var err error
   348  						rl, err = leaderelection.NewResourceLock(config, recorderProvider, options)
   349  						return rl, err
   350  					},
   351  					HealthProbeBindAddress: "0",
   352  					Metrics:                metricsserver.Options{BindAddress: "0"},
   353  					PprofBindAddress:       "0",
   354  				})
   355  				Expect(err).ToNot(HaveOccurred())
   356  				Expect(m2).ToNot(BeNil())
   357  				Expect(rl.Describe()).To(Equal("default/test-leader-election-id"))
   358  
   359  				m2cm, ok := m2.(*controllerManager)
   360  				Expect(ok).To(BeTrue())
   361  				m2cm.onStoppedLeading = func() {}
   362  
   363  				c1 := make(chan struct{})
   364  				Expect(m1.Add(RunnableFunc(func(ctx context.Context) error {
   365  					defer GinkgoRecover()
   366  					close(c1)
   367  					return nil
   368  				}))).To(Succeed())
   369  
   370  				ctx1, cancel1 := context.WithCancel(context.Background())
   371  				defer cancel1()
   372  				go func() {
   373  					defer GinkgoRecover()
   374  					Expect(m1.Elected()).ShouldNot(BeClosed())
   375  					Expect(m1.Start(ctx1)).NotTo(HaveOccurred())
   376  				}()
   377  				<-m1.Elected()
   378  				<-c1
   379  
   380  				c2 := make(chan struct{})
   381  				Expect(m2.Add(RunnableFunc(func(context.Context) error {
   382  					defer GinkgoRecover()
   383  					close(c2)
   384  					return nil
   385  				}))).To(Succeed())
   386  
   387  				ctx2, cancel := context.WithCancel(context.Background())
   388  				m2done := make(chan struct{})
   389  				go func() {
   390  					defer GinkgoRecover()
   391  					Expect(m2.Start(ctx2)).NotTo(HaveOccurred())
   392  					close(m2done)
   393  				}()
   394  				Consistently(m2.Elected()).ShouldNot(Receive())
   395  
   396  				Consistently(c2).ShouldNot(Receive())
   397  				cancel()
   398  				<-m2done
   399  			})
   400  
   401  			It("should return an error if it can't create a ResourceLock", func() {
   402  				m, err := New(cfg, Options{
   403  					newResourceLock: func(_ *rest.Config, _ recorder.Provider, _ leaderelection.Options) (resourcelock.Interface, error) {
   404  						return nil, fmt.Errorf("expected error")
   405  					},
   406  				})
   407  				Expect(m).To(BeNil())
   408  				Expect(err).To(MatchError(ContainSubstring("expected error")))
   409  			})
   410  
   411  			It("should return an error if namespace not set and not running in cluster", func() {
   412  				m, err := New(cfg, Options{LeaderElection: true, LeaderElectionID: "controller-runtime"})
   413  				Expect(m).To(BeNil())
   414  				Expect(err).To(HaveOccurred())
   415  				Expect(err.Error()).To(ContainSubstring("unable to find leader election namespace: not running in-cluster, please specify LeaderElectionNamespace"))
   416  			})
   417  
   418  			// We must keep this default until we are sure all controller-runtime users have upgraded from the original default
   419  			// ConfigMap lock to a controller-runtime version that has this new default. Many users of controller-runtime skip
   420  			// versions, so we should be extremely conservative here.
   421  			It("should default to LeasesResourceLock", func() {
   422  				m, err := New(cfg, Options{LeaderElection: true, LeaderElectionID: "controller-runtime", LeaderElectionNamespace: "my-ns"})
   423  				Expect(m).ToNot(BeNil())
   424  				Expect(err).ToNot(HaveOccurred())
   425  				cm, ok := m.(*controllerManager)
   426  				Expect(ok).To(BeTrue())
   427  				_, isLeaseLock := cm.resourceLock.(*resourcelock.LeaseLock)
   428  				Expect(isLeaseLock).To(BeTrue())
   429  
   430  			})
   431  			It("should use the specified ResourceLock", func() {
   432  				m, err := New(cfg, Options{
   433  					LeaderElection:             true,
   434  					LeaderElectionResourceLock: resourcelock.LeasesResourceLock,
   435  					LeaderElectionID:           "controller-runtime",
   436  					LeaderElectionNamespace:    "my-ns",
   437  				})
   438  				Expect(m).ToNot(BeNil())
   439  				Expect(err).ToNot(HaveOccurred())
   440  				cm, ok := m.(*controllerManager)
   441  				Expect(ok).To(BeTrue())
   442  				_, isLeaseLock := cm.resourceLock.(*resourcelock.LeaseLock)
   443  				Expect(isLeaseLock).To(BeTrue())
   444  			})
   445  			It("should release lease if ElectionReleaseOnCancel is true", func() {
   446  				var rl resourcelock.Interface
   447  				m, err := New(cfg, Options{
   448  					LeaderElection:                true,
   449  					LeaderElectionResourceLock:    resourcelock.LeasesResourceLock,
   450  					LeaderElectionID:              "controller-runtime",
   451  					LeaderElectionNamespace:       "my-ns",
   452  					LeaderElectionReleaseOnCancel: true,
   453  					newResourceLock: func(config *rest.Config, recorderProvider recorder.Provider, options leaderelection.Options) (resourcelock.Interface, error) {
   454  						var err error
   455  						rl, err = fakeleaderelection.NewResourceLock(config, recorderProvider, options)
   456  						return rl, err
   457  					},
   458  				})
   459  				Expect(err).ToNot(HaveOccurred())
   460  
   461  				ctx, cancel := context.WithCancel(context.Background())
   462  				doneCh := make(chan struct{})
   463  				go func() {
   464  					defer GinkgoRecover()
   465  					defer close(doneCh)
   466  					Expect(m.Start(ctx)).NotTo(HaveOccurred())
   467  				}()
   468  				<-m.(*controllerManager).elected
   469  				cancel()
   470  				<-doneCh
   471  
   472  				ctx, cancel = context.WithCancel(context.Background())
   473  				defer cancel()
   474  				record, _, err := rl.Get(ctx)
   475  				Expect(err).ToNot(HaveOccurred())
   476  				Expect(record.HolderIdentity).To(BeEmpty())
   477  			})
   478  			When("using a custom LeaderElectionResourceLockInterface", func() {
   479  				It("should use the custom LeaderElectionResourceLockInterface", func() {
   480  					rl, err := fakeleaderelection.NewResourceLock(nil, nil, leaderelection.Options{})
   481  					Expect(err).NotTo(HaveOccurred())
   482  
   483  					m, err := New(cfg, Options{
   484  						LeaderElection:                      true,
   485  						LeaderElectionResourceLockInterface: rl,
   486  						newResourceLock: func(config *rest.Config, recorderProvider recorder.Provider, options leaderelection.Options) (resourcelock.Interface, error) {
   487  							return nil, fmt.Errorf("this should not be called")
   488  						},
   489  					})
   490  					Expect(m).ToNot(BeNil())
   491  					Expect(err).ToNot(HaveOccurred())
   492  					cm, ok := m.(*controllerManager)
   493  					Expect(ok).To(BeTrue())
   494  					Expect(cm.resourceLock).To(Equal(rl))
   495  				})
   496  			})
   497  		})
   498  
   499  		It("should create a metrics server if a valid address is provided", func() {
   500  			var srv metricsserver.Server
   501  			m, err := New(cfg, Options{
   502  				Metrics: metricsserver.Options{BindAddress: ":0"},
   503  				newMetricsServer: func(options metricsserver.Options, config *rest.Config, httpClient *http.Client) (metricsserver.Server, error) {
   504  					var err error
   505  					srv, err = metricsserver.NewServer(options, config, httpClient)
   506  					return srv, err
   507  				},
   508  			})
   509  			Expect(m).ToNot(BeNil())
   510  			Expect(err).ToNot(HaveOccurred())
   511  			Expect(srv).ToNot(BeNil())
   512  
   513  			// Triggering the metric server start here manually to test if it works.
   514  			// Usually this happens later during manager.Start().
   515  			ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
   516  			Expect(srv.Start(ctx)).To(Succeed())
   517  			cancel()
   518  		})
   519  
   520  		It("should create a metrics server if a valid address is provided and secure serving is enabled", func() {
   521  			var srv metricsserver.Server
   522  			m, err := New(cfg, Options{
   523  				Metrics: metricsserver.Options{BindAddress: ":0", SecureServing: true},
   524  				newMetricsServer: func(options metricsserver.Options, config *rest.Config, httpClient *http.Client) (metricsserver.Server, error) {
   525  					var err error
   526  					srv, err = metricsserver.NewServer(options, config, httpClient)
   527  					return srv, err
   528  				},
   529  			})
   530  			Expect(m).ToNot(BeNil())
   531  			Expect(err).ToNot(HaveOccurred())
   532  			Expect(srv).ToNot(BeNil())
   533  
   534  			// Triggering the metric server start here manually to test if it works.
   535  			// Usually this happens later during manager.Start().
   536  			ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
   537  			Expect(srv.Start(ctx)).To(Succeed())
   538  			cancel()
   539  		})
   540  
   541  		It("should be able to create a manager with a cache that fails on missing informer", func() {
   542  			m, err := New(cfg, Options{
   543  				Cache: cache.Options{
   544  					ReaderFailOnMissingInformer: true,
   545  				},
   546  			})
   547  			Expect(m).ToNot(BeNil())
   548  			Expect(err).ToNot(HaveOccurred())
   549  		})
   550  
   551  		It("should return an error if the metrics bind address is already in use", func() {
   552  			ln, err := net.Listen("tcp", ":0") //nolint:gosec
   553  			Expect(err).ShouldNot(HaveOccurred())
   554  
   555  			var srv metricsserver.Server
   556  			m, err := New(cfg, Options{
   557  				Metrics: metricsserver.Options{
   558  					BindAddress: ln.Addr().String(),
   559  				},
   560  				newMetricsServer: func(options metricsserver.Options, config *rest.Config, httpClient *http.Client) (metricsserver.Server, error) {
   561  					var err error
   562  					srv, err = metricsserver.NewServer(options, config, httpClient)
   563  					return srv, err
   564  				},
   565  			})
   566  			Expect(m).ToNot(BeNil())
   567  			Expect(err).ToNot(HaveOccurred())
   568  
   569  			// Triggering the metric server start here manually to test if it works.
   570  			// Usually this happens later during manager.Start().
   571  			Expect(srv.Start(context.Background())).ToNot(Succeed())
   572  
   573  			Expect(ln.Close()).To(Succeed())
   574  		})
   575  
   576  		It("should return an error if the metrics bind address is already in use and secure serving enabled", func() {
   577  			ln, err := net.Listen("tcp", ":0") //nolint:gosec
   578  			Expect(err).ShouldNot(HaveOccurred())
   579  
   580  			var srv metricsserver.Server
   581  			m, err := New(cfg, Options{
   582  				Metrics: metricsserver.Options{
   583  					BindAddress:   ln.Addr().String(),
   584  					SecureServing: true,
   585  				},
   586  				newMetricsServer: func(options metricsserver.Options, config *rest.Config, httpClient *http.Client) (metricsserver.Server, error) {
   587  					var err error
   588  					srv, err = metricsserver.NewServer(options, config, httpClient)
   589  					return srv, err
   590  				},
   591  			})
   592  			Expect(m).ToNot(BeNil())
   593  			Expect(err).ToNot(HaveOccurred())
   594  
   595  			// Triggering the metric server start here manually to test if it works.
   596  			// Usually this happens later during manager.Start().
   597  			Expect(srv.Start(context.Background())).ToNot(Succeed())
   598  
   599  			Expect(ln.Close()).To(Succeed())
   600  		})
   601  
   602  		It("should create a listener for the health probes if a valid address is provided", func() {
   603  			var listener net.Listener
   604  			m, err := New(cfg, Options{
   605  				HealthProbeBindAddress: ":0",
   606  				newHealthProbeListener: func(addr string) (net.Listener, error) {
   607  					var err error
   608  					listener, err = defaultHealthProbeListener(addr)
   609  					return listener, err
   610  				},
   611  			})
   612  			Expect(m).ToNot(BeNil())
   613  			Expect(err).ToNot(HaveOccurred())
   614  			Expect(listener).ToNot(BeNil())
   615  			Expect(listener.Close()).ToNot(HaveOccurred())
   616  		})
   617  
   618  		It("should return an error if the health probes bind address is already in use", func() {
   619  			ln, err := defaultHealthProbeListener(":0")
   620  			Expect(err).ShouldNot(HaveOccurred())
   621  
   622  			var listener net.Listener
   623  			m, err := New(cfg, Options{
   624  				HealthProbeBindAddress: ln.Addr().String(),
   625  				newHealthProbeListener: func(addr string) (net.Listener, error) {
   626  					var err error
   627  					listener, err = defaultHealthProbeListener(addr)
   628  					return listener, err
   629  				},
   630  			})
   631  			Expect(m).To(BeNil())
   632  			Expect(err).To(HaveOccurred())
   633  			Expect(listener).To(BeNil())
   634  
   635  			Expect(ln.Close()).ToNot(HaveOccurred())
   636  		})
   637  	})
   638  
   639  	Describe("Start", func() {
   640  		var startSuite = func(options Options, callbacks ...func(Manager)) {
   641  			It("should Start each Component", func() {
   642  				m, err := New(cfg, options)
   643  				Expect(err).NotTo(HaveOccurred())
   644  				for _, cb := range callbacks {
   645  					cb(m)
   646  				}
   647  				var wgRunnableStarted sync.WaitGroup
   648  				wgRunnableStarted.Add(2)
   649  				Expect(m.Add(RunnableFunc(func(context.Context) error {
   650  					defer GinkgoRecover()
   651  					wgRunnableStarted.Done()
   652  					return nil
   653  				}))).To(Succeed())
   654  
   655  				Expect(m.Add(RunnableFunc(func(context.Context) error {
   656  					defer GinkgoRecover()
   657  					wgRunnableStarted.Done()
   658  					return nil
   659  				}))).To(Succeed())
   660  
   661  				ctx, cancel := context.WithCancel(context.Background())
   662  				defer cancel()
   663  				go func() {
   664  					defer GinkgoRecover()
   665  					Expect(m.Elected()).ShouldNot(BeClosed())
   666  					Expect(m.Start(ctx)).NotTo(HaveOccurred())
   667  				}()
   668  
   669  				<-m.Elected()
   670  				wgRunnableStarted.Wait()
   671  			})
   672  
   673  			It("should not manipulate the provided config", func() {
   674  				// strip WrapTransport, cause func values are PartialEq, not Eq --
   675  				// specifically, for reflect.DeepEqual, for all functions F,
   676  				// F != nil implies F != F, which means no full equivalence relation.
   677  				cfg := rest.CopyConfig(cfg)
   678  				cfg.WrapTransport = nil
   679  				originalCfg := rest.CopyConfig(cfg)
   680  				// The options object is shared by multiple tests, copy it
   681  				// into our scope so we manipulate it for this testcase only
   682  				options := options
   683  				options.newResourceLock = nil
   684  				m, err := New(cfg, options)
   685  				Expect(err).NotTo(HaveOccurred())
   686  				for _, cb := range callbacks {
   687  					cb(m)
   688  				}
   689  				Expect(m.GetConfig()).To(Equal(originalCfg))
   690  			})
   691  
   692  			It("should stop when context is cancelled", func() {
   693  				m, err := New(cfg, options)
   694  				Expect(err).NotTo(HaveOccurred())
   695  				for _, cb := range callbacks {
   696  					cb(m)
   697  				}
   698  				ctx, cancel := context.WithCancel(context.Background())
   699  				cancel()
   700  				Expect(m.Start(ctx)).NotTo(HaveOccurred())
   701  			})
   702  
   703  			It("should return an error if it can't start the cache", func() {
   704  				m, err := New(cfg, options)
   705  				Expect(err).NotTo(HaveOccurred())
   706  				for _, cb := range callbacks {
   707  					cb(m)
   708  				}
   709  				mgr, ok := m.(*controllerManager)
   710  				Expect(ok).To(BeTrue())
   711  				Expect(mgr.Add(
   712  					&cacheProvider{cache: &informertest.FakeInformers{Error: fmt.Errorf("expected error")}},
   713  				)).To(Succeed())
   714  
   715  				ctx, cancel := context.WithCancel(context.Background())
   716  				defer cancel()
   717  				Expect(m.Start(ctx)).To(MatchError(ContainSubstring("expected error")))
   718  			})
   719  
   720  			It("should start the cache before starting anything else", func() {
   721  				fakeCache := &startSignalingInformer{Cache: &informertest.FakeInformers{}}
   722  				options.NewCache = func(_ *rest.Config, _ cache.Options) (cache.Cache, error) {
   723  					return fakeCache, nil
   724  				}
   725  				m, err := New(cfg, options)
   726  				Expect(err).NotTo(HaveOccurred())
   727  				for _, cb := range callbacks {
   728  					cb(m)
   729  				}
   730  
   731  				runnableWasStarted := make(chan struct{})
   732  				runnable := RunnableFunc(func(ctx context.Context) error {
   733  					defer GinkgoRecover()
   734  					if !fakeCache.wasSynced {
   735  						return errors.New("runnable got started before cache was synced")
   736  					}
   737  					close(runnableWasStarted)
   738  					return nil
   739  				})
   740  				Expect(m.Add(runnable)).To(Succeed())
   741  
   742  				ctx, cancel := context.WithCancel(context.Background())
   743  				defer cancel()
   744  				go func() {
   745  					defer GinkgoRecover()
   746  					Expect(m.Start(ctx)).ToNot(HaveOccurred())
   747  				}()
   748  
   749  				<-runnableWasStarted
   750  			})
   751  
   752  			It("should start additional clusters before anything else", func() {
   753  				fakeCache := &startSignalingInformer{Cache: &informertest.FakeInformers{}}
   754  				options.NewCache = func(_ *rest.Config, _ cache.Options) (cache.Cache, error) {
   755  					return fakeCache, nil
   756  				}
   757  				m, err := New(cfg, options)
   758  				Expect(err).NotTo(HaveOccurred())
   759  				for _, cb := range callbacks {
   760  					cb(m)
   761  				}
   762  
   763  				additionalClusterCache := &startSignalingInformer{Cache: &informertest.FakeInformers{}}
   764  				additionalCluster, err := cluster.New(cfg, func(o *cluster.Options) {
   765  					o.NewCache = func(_ *rest.Config, _ cache.Options) (cache.Cache, error) {
   766  						return additionalClusterCache, nil
   767  					}
   768  				})
   769  				Expect(err).NotTo(HaveOccurred())
   770  				Expect(m.Add(additionalCluster)).NotTo(HaveOccurred())
   771  
   772  				runnableWasStarted := make(chan struct{})
   773  				Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
   774  					defer GinkgoRecover()
   775  					if !fakeCache.wasSynced {
   776  						return errors.New("WaitForCacheSyncCalled wasn't called before Runnable got started")
   777  					}
   778  					if !additionalClusterCache.wasSynced {
   779  						return errors.New("the additional clusters WaitForCacheSync wasn't called before Runnable got started")
   780  					}
   781  					close(runnableWasStarted)
   782  					return nil
   783  				}))).To(Succeed())
   784  
   785  				ctx, cancel := context.WithCancel(context.Background())
   786  				defer cancel()
   787  				go func() {
   788  					defer GinkgoRecover()
   789  					Expect(m.Start(ctx)).ToNot(HaveOccurred())
   790  				}()
   791  
   792  				<-runnableWasStarted
   793  			})
   794  
   795  			It("should return an error if any Components fail to Start", func() {
   796  				m, err := New(cfg, options)
   797  				Expect(err).NotTo(HaveOccurred())
   798  				for _, cb := range callbacks {
   799  					cb(m)
   800  				}
   801  
   802  				Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
   803  					defer GinkgoRecover()
   804  					<-ctx.Done()
   805  					return nil
   806  				}))).To(Succeed())
   807  
   808  				Expect(m.Add(RunnableFunc(func(context.Context) error {
   809  					defer GinkgoRecover()
   810  					return fmt.Errorf("expected error")
   811  				}))).To(Succeed())
   812  
   813  				Expect(m.Add(RunnableFunc(func(context.Context) error {
   814  					defer GinkgoRecover()
   815  					return nil
   816  				}))).To(Succeed())
   817  
   818  				defer GinkgoRecover()
   819  				ctx, cancel := context.WithCancel(context.Background())
   820  				defer cancel()
   821  				err = m.Start(ctx)
   822  				Expect(err).To(HaveOccurred())
   823  				Expect(err.Error()).To(Equal("expected error"))
   824  			})
   825  
   826  			It("should start caches added after Manager has started", func() {
   827  				fakeCache := &startSignalingInformer{Cache: &informertest.FakeInformers{}}
   828  				options.NewCache = func(_ *rest.Config, _ cache.Options) (cache.Cache, error) {
   829  					return fakeCache, nil
   830  				}
   831  				m, err := New(cfg, options)
   832  				Expect(err).NotTo(HaveOccurred())
   833  				for _, cb := range callbacks {
   834  					cb(m)
   835  				}
   836  
   837  				runnableWasStarted := make(chan struct{})
   838  				Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
   839  					defer GinkgoRecover()
   840  					if !fakeCache.wasSynced {
   841  						return errors.New("WaitForCacheSyncCalled wasn't called before Runnable got started")
   842  					}
   843  					close(runnableWasStarted)
   844  					return nil
   845  				}))).To(Succeed())
   846  
   847  				ctx, cancel := context.WithCancel(context.Background())
   848  				defer cancel()
   849  				go func() {
   850  					defer GinkgoRecover()
   851  					Expect(m.Start(ctx)).ToNot(HaveOccurred())
   852  				}()
   853  
   854  				<-runnableWasStarted
   855  
   856  				additionalClusterCache := &startSignalingInformer{Cache: &informertest.FakeInformers{}}
   857  				fakeCluster := &startClusterAfterManager{informer: additionalClusterCache}
   858  
   859  				Expect(err).NotTo(HaveOccurred())
   860  				Expect(m.Add(fakeCluster)).NotTo(HaveOccurred())
   861  
   862  				Eventually(func() bool {
   863  					fakeCluster.informer.mu.Lock()
   864  					defer fakeCluster.informer.mu.Unlock()
   865  					return fakeCluster.informer.wasStarted && fakeCluster.informer.wasSynced
   866  				}).Should(BeTrue())
   867  			})
   868  
   869  			It("should wait for runnables to stop", func() {
   870  				m, err := New(cfg, options)
   871  				Expect(err).NotTo(HaveOccurred())
   872  				for _, cb := range callbacks {
   873  					cb(m)
   874  				}
   875  
   876  				var lock sync.Mutex
   877  				var runnableDoneCount int64
   878  				runnableDoneFunc := func() {
   879  					lock.Lock()
   880  					defer lock.Unlock()
   881  					atomic.AddInt64(&runnableDoneCount, 1)
   882  				}
   883  				var wgRunnableRunning sync.WaitGroup
   884  				wgRunnableRunning.Add(2)
   885  				Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
   886  					wgRunnableRunning.Done()
   887  					defer GinkgoRecover()
   888  					defer runnableDoneFunc()
   889  					<-ctx.Done()
   890  					return nil
   891  				}))).To(Succeed())
   892  
   893  				Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
   894  					wgRunnableRunning.Done()
   895  					defer GinkgoRecover()
   896  					defer runnableDoneFunc()
   897  					<-ctx.Done()
   898  					time.Sleep(300 * time.Millisecond) // slow closure simulation
   899  					return nil
   900  				}))).To(Succeed())
   901  
   902  				defer GinkgoRecover()
   903  				ctx, cancel := context.WithCancel(context.Background())
   904  
   905  				var wgManagerRunning sync.WaitGroup
   906  				wgManagerRunning.Add(1)
   907  				go func() {
   908  					defer GinkgoRecover()
   909  					defer wgManagerRunning.Done()
   910  					Expect(m.Start(ctx)).NotTo(HaveOccurred())
   911  					Eventually(func() int64 {
   912  						return atomic.LoadInt64(&runnableDoneCount)
   913  					}).Should(BeEquivalentTo(2))
   914  				}()
   915  				wgRunnableRunning.Wait()
   916  				cancel()
   917  
   918  				wgManagerRunning.Wait()
   919  			})
   920  
   921  			It("should return an error if any Components fail to Start and wait for runnables to stop", func() {
   922  				m, err := New(cfg, options)
   923  				Expect(err).NotTo(HaveOccurred())
   924  				for _, cb := range callbacks {
   925  					cb(m)
   926  				}
   927  				defer GinkgoRecover()
   928  				var lock sync.Mutex
   929  				runnableDoneCount := 0
   930  				runnableDoneFunc := func() {
   931  					lock.Lock()
   932  					defer lock.Unlock()
   933  					runnableDoneCount++
   934  				}
   935  
   936  				Expect(m.Add(RunnableFunc(func(context.Context) error {
   937  					defer GinkgoRecover()
   938  					defer runnableDoneFunc()
   939  					return fmt.Errorf("expected error")
   940  				}))).To(Succeed())
   941  
   942  				Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
   943  					defer GinkgoRecover()
   944  					defer runnableDoneFunc()
   945  					<-ctx.Done()
   946  					return nil
   947  				}))).To(Succeed())
   948  
   949  				ctx, cancel := context.WithCancel(context.Background())
   950  				defer cancel()
   951  				Expect(m.Start(ctx)).To(HaveOccurred())
   952  				Expect(runnableDoneCount).To(Equal(2))
   953  			})
   954  
   955  			It("should refuse to add runnable if stop procedure is already engaged", func() {
   956  				m, err := New(cfg, options)
   957  				Expect(err).NotTo(HaveOccurred())
   958  				for _, cb := range callbacks {
   959  					cb(m)
   960  				}
   961  				defer GinkgoRecover()
   962  
   963  				var wgRunnableRunning sync.WaitGroup
   964  				wgRunnableRunning.Add(1)
   965  				Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
   966  					wgRunnableRunning.Done()
   967  					defer GinkgoRecover()
   968  					<-ctx.Done()
   969  					return nil
   970  				}))).To(Succeed())
   971  
   972  				ctx, cancel := context.WithCancel(context.Background())
   973  				go func() {
   974  					Expect(m.Start(ctx)).NotTo(HaveOccurred())
   975  				}()
   976  				wgRunnableRunning.Wait()
   977  				cancel()
   978  				time.Sleep(100 * time.Millisecond) // give some time for the stop chan closure to be caught by the manager
   979  				Expect(m.Add(RunnableFunc(func(context.Context) error {
   980  					defer GinkgoRecover()
   981  					return nil
   982  				}))).NotTo(Succeed())
   983  			})
   984  
   985  			It("should return both runnables and stop errors when both error", func() {
   986  				m, err := New(cfg, options)
   987  				Expect(err).NotTo(HaveOccurred())
   988  				for _, cb := range callbacks {
   989  					cb(m)
   990  				}
   991  				m.(*controllerManager).gracefulShutdownTimeout = 1 * time.Nanosecond
   992  				Expect(m.Add(RunnableFunc(func(context.Context) error {
   993  					return runnableError{}
   994  				}))).To(Succeed())
   995  				testDone := make(chan struct{})
   996  				defer close(testDone)
   997  				Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
   998  					<-ctx.Done()
   999  					timer := time.NewTimer(30 * time.Second)
  1000  					defer timer.Stop()
  1001  					select {
  1002  					case <-testDone:
  1003  						return nil
  1004  					case <-timer.C:
  1005  						return nil
  1006  					}
  1007  				}))).To(Succeed())
  1008  				ctx, cancel := context.WithCancel(context.Background())
  1009  				defer cancel()
  1010  				err = m.Start(ctx)
  1011  				Expect(err).To(HaveOccurred())
  1012  				eMsg := "[not feeling like that, failed waiting for all runnables to end within grace period of 1ns: context deadline exceeded]"
  1013  				Expect(err.Error()).To(Equal(eMsg))
  1014  				Expect(errors.Is(err, context.DeadlineExceeded)).To(BeTrue())
  1015  				Expect(errors.Is(err, runnableError{})).To(BeTrue())
  1016  			})
  1017  
  1018  			It("should return only stop errors if runnables dont error", func() {
  1019  				m, err := New(cfg, options)
  1020  				Expect(err).NotTo(HaveOccurred())
  1021  				for _, cb := range callbacks {
  1022  					cb(m)
  1023  				}
  1024  				m.(*controllerManager).gracefulShutdownTimeout = 1 * time.Nanosecond
  1025  				Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
  1026  					<-ctx.Done()
  1027  					return nil
  1028  				}))).To(Succeed())
  1029  				testDone := make(chan struct{})
  1030  				defer close(testDone)
  1031  				Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
  1032  					<-ctx.Done()
  1033  					timer := time.NewTimer(30 * time.Second)
  1034  					defer timer.Stop()
  1035  					select {
  1036  					case <-testDone:
  1037  						return nil
  1038  					case <-timer.C:
  1039  						return nil
  1040  					}
  1041  				}))).To(Succeed())
  1042  				ctx, cancel := context.WithCancel(context.Background())
  1043  				managerStopDone := make(chan struct{})
  1044  				go func() { err = m.Start(ctx); close(managerStopDone) }()
  1045  				// Use the 'elected' channel to find out if startup was done, otherwise we stop
  1046  				// before we started the Runnable and see flakes, mostly in low-CPU envs like CI
  1047  				<-m.(*controllerManager).elected
  1048  				cancel()
  1049  				<-managerStopDone
  1050  				Expect(err).To(HaveOccurred())
  1051  				Expect(err.Error()).To(Equal("failed waiting for all runnables to end within grace period of 1ns: context deadline exceeded"))
  1052  				Expect(errors.Is(err, context.DeadlineExceeded)).To(BeTrue())
  1053  				Expect(errors.Is(err, runnableError{})).ToNot(BeTrue())
  1054  			})
  1055  
  1056  			It("should return only runnables error if stop doesn't error", func() {
  1057  				m, err := New(cfg, options)
  1058  				Expect(err).NotTo(HaveOccurred())
  1059  				for _, cb := range callbacks {
  1060  					cb(m)
  1061  				}
  1062  				Expect(m.Add(RunnableFunc(func(context.Context) error {
  1063  					return runnableError{}
  1064  				}))).To(Succeed())
  1065  				ctx, cancel := context.WithCancel(context.Background())
  1066  				defer cancel()
  1067  				err = m.Start(ctx)
  1068  				Expect(err).To(HaveOccurred())
  1069  				Expect(err.Error()).To(Equal("not feeling like that"))
  1070  				Expect(errors.Is(err, context.DeadlineExceeded)).ToNot(BeTrue())
  1071  				Expect(errors.Is(err, runnableError{})).To(BeTrue())
  1072  			})
  1073  
  1074  			It("should not wait for runnables if gracefulShutdownTimeout is 0", func() {
  1075  				m, err := New(cfg, options)
  1076  				Expect(err).NotTo(HaveOccurred())
  1077  				for _, cb := range callbacks {
  1078  					cb(m)
  1079  				}
  1080  				m.(*controllerManager).gracefulShutdownTimeout = time.Duration(0)
  1081  
  1082  				runnableStopped := make(chan struct{})
  1083  				Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
  1084  					<-ctx.Done()
  1085  					time.Sleep(100 * time.Millisecond)
  1086  					close(runnableStopped)
  1087  					return nil
  1088  				}))).ToNot(HaveOccurred())
  1089  
  1090  				ctx, cancel := context.WithCancel(context.Background())
  1091  				managerStopDone := make(chan struct{})
  1092  				go func() {
  1093  					defer GinkgoRecover()
  1094  					Expect(m.Start(ctx)).NotTo(HaveOccurred())
  1095  					close(managerStopDone)
  1096  				}()
  1097  				<-m.Elected()
  1098  				cancel()
  1099  
  1100  				<-managerStopDone
  1101  				<-runnableStopped
  1102  			})
  1103  
  1104  			It("should wait forever for runnables if gracefulShutdownTimeout is <0 (-1)", func() {
  1105  				m, err := New(cfg, options)
  1106  				Expect(err).NotTo(HaveOccurred())
  1107  				for _, cb := range callbacks {
  1108  					cb(m)
  1109  				}
  1110  				m.(*controllerManager).gracefulShutdownTimeout = time.Duration(-1)
  1111  
  1112  				Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
  1113  					<-ctx.Done()
  1114  					time.Sleep(100 * time.Millisecond)
  1115  					return nil
  1116  				}))).ToNot(HaveOccurred())
  1117  				Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
  1118  					<-ctx.Done()
  1119  					time.Sleep(200 * time.Millisecond)
  1120  					return nil
  1121  				}))).ToNot(HaveOccurred())
  1122  				Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
  1123  					<-ctx.Done()
  1124  					time.Sleep(500 * time.Millisecond)
  1125  					return nil
  1126  				}))).ToNot(HaveOccurred())
  1127  				Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
  1128  					<-ctx.Done()
  1129  					time.Sleep(1500 * time.Millisecond)
  1130  					return nil
  1131  				}))).ToNot(HaveOccurred())
  1132  
  1133  				ctx, cancel := context.WithCancel(context.Background())
  1134  				managerStopDone := make(chan struct{})
  1135  				go func() {
  1136  					defer GinkgoRecover()
  1137  					Expect(m.Start(ctx)).NotTo(HaveOccurred())
  1138  					close(managerStopDone)
  1139  				}()
  1140  				<-m.Elected()
  1141  				cancel()
  1142  
  1143  				beforeDone := time.Now()
  1144  				<-managerStopDone
  1145  				Expect(time.Since(beforeDone)).To(BeNumerically(">=", 1500*time.Millisecond))
  1146  			})
  1147  
  1148  		}
  1149  
  1150  		Context("with defaults", func() {
  1151  			startSuite(Options{})
  1152  		})
  1153  
  1154  		Context("with leaderelection enabled", func() {
  1155  			startSuite(
  1156  				Options{
  1157  					LeaderElection:          true,
  1158  					LeaderElectionID:        "controller-runtime",
  1159  					LeaderElectionNamespace: "default",
  1160  					newResourceLock:         fakeleaderelection.NewResourceLock,
  1161  				},
  1162  				func(m Manager) {
  1163  					cm, ok := m.(*controllerManager)
  1164  					Expect(ok).To(BeTrue())
  1165  					cm.onStoppedLeading = func() {}
  1166  				},
  1167  			)
  1168  		})
  1169  
  1170  		Context("should start serving metrics", func() {
  1171  			var srv metricsserver.Server
  1172  			var defaultServer metricsDefaultServer
  1173  			var opts Options
  1174  
  1175  			BeforeEach(func() {
  1176  				srv = nil
  1177  				opts = Options{
  1178  					Metrics: metricsserver.Options{
  1179  						BindAddress: ":0",
  1180  					},
  1181  					newMetricsServer: func(options metricsserver.Options, config *rest.Config, httpClient *http.Client) (metricsserver.Server, error) {
  1182  						var err error
  1183  						srv, err = metricsserver.NewServer(options, config, httpClient)
  1184  						if srv != nil {
  1185  							defaultServer = srv.(metricsDefaultServer)
  1186  						}
  1187  						return srv, err
  1188  					},
  1189  				}
  1190  			})
  1191  
  1192  			It("should stop serving metrics when stop is called", func() {
  1193  				m, err := New(cfg, opts)
  1194  				Expect(err).NotTo(HaveOccurred())
  1195  
  1196  				ctx, cancel := context.WithCancel(context.Background())
  1197  				go func() {
  1198  					defer GinkgoRecover()
  1199  					Expect(m.Start(ctx)).NotTo(HaveOccurred())
  1200  				}()
  1201  				<-m.Elected()
  1202  				// Note: Wait until metrics server has been started. A finished leader election
  1203  				// doesn't guarantee that the metrics server is up.
  1204  				Eventually(func() string { return defaultServer.GetBindAddr() }, 10*time.Second).ShouldNot(BeEmpty())
  1205  
  1206  				// Check the metrics started
  1207  				endpoint := fmt.Sprintf("http://%s/metrics", defaultServer.GetBindAddr())
  1208  				_, err = http.Get(endpoint)
  1209  				Expect(err).NotTo(HaveOccurred())
  1210  
  1211  				// Shutdown the server
  1212  				cancel()
  1213  
  1214  				// Expect the metrics server to shutdown
  1215  				Eventually(func() error {
  1216  					_, err = http.Get(endpoint)
  1217  					return err
  1218  				}, 10*time.Second).ShouldNot(Succeed())
  1219  			})
  1220  
  1221  			It("should serve metrics endpoint", func() {
  1222  				m, err := New(cfg, opts)
  1223  				Expect(err).NotTo(HaveOccurred())
  1224  
  1225  				ctx, cancel := context.WithCancel(context.Background())
  1226  				defer cancel()
  1227  				go func() {
  1228  					defer GinkgoRecover()
  1229  					Expect(m.Start(ctx)).NotTo(HaveOccurred())
  1230  				}()
  1231  				<-m.Elected()
  1232  				// Note: Wait until metrics server has been started. A finished leader election
  1233  				// doesn't guarantee that the metrics server is up.
  1234  				Eventually(func() string { return defaultServer.GetBindAddr() }, 10*time.Second).ShouldNot(BeEmpty())
  1235  
  1236  				metricsEndpoint := fmt.Sprintf("http://%s/metrics", defaultServer.GetBindAddr())
  1237  				resp, err := http.Get(metricsEndpoint)
  1238  				Expect(err).NotTo(HaveOccurred())
  1239  				Expect(resp.StatusCode).To(Equal(200))
  1240  			})
  1241  
  1242  			It("should not serve anything other than metrics endpoint by default", func() {
  1243  				m, err := New(cfg, opts)
  1244  				Expect(err).NotTo(HaveOccurred())
  1245  
  1246  				ctx, cancel := context.WithCancel(context.Background())
  1247  				defer cancel()
  1248  				go func() {
  1249  					defer GinkgoRecover()
  1250  					Expect(m.Start(ctx)).NotTo(HaveOccurred())
  1251  				}()
  1252  				<-m.Elected()
  1253  				// Note: Wait until metrics server has been started. A finished leader election
  1254  				// doesn't guarantee that the metrics server is up.
  1255  				Eventually(func() string { return defaultServer.GetBindAddr() }, 10*time.Second).ShouldNot(BeEmpty())
  1256  
  1257  				endpoint := fmt.Sprintf("http://%s/should-not-exist", defaultServer.GetBindAddr())
  1258  				resp, err := http.Get(endpoint)
  1259  				Expect(err).NotTo(HaveOccurred())
  1260  				defer resp.Body.Close()
  1261  				Expect(resp.StatusCode).To(Equal(404))
  1262  			})
  1263  
  1264  			It("should serve metrics in its registry", func() {
  1265  				one := prometheus.NewCounter(prometheus.CounterOpts{
  1266  					Name: "test_one",
  1267  					Help: "test metric for testing",
  1268  				})
  1269  				one.Inc()
  1270  				err := metrics.Registry.Register(one)
  1271  				Expect(err).NotTo(HaveOccurred())
  1272  
  1273  				m, err := New(cfg, opts)
  1274  				Expect(err).NotTo(HaveOccurred())
  1275  
  1276  				ctx, cancel := context.WithCancel(context.Background())
  1277  				defer cancel()
  1278  				go func() {
  1279  					defer GinkgoRecover()
  1280  					Expect(m.Start(ctx)).NotTo(HaveOccurred())
  1281  				}()
  1282  				<-m.Elected()
  1283  				// Note: Wait until metrics server has been started. A finished leader election
  1284  				// doesn't guarantee that the metrics server is up.
  1285  				Eventually(func() string { return defaultServer.GetBindAddr() }, 10*time.Second).ShouldNot(BeEmpty())
  1286  
  1287  				metricsEndpoint := fmt.Sprintf("http://%s/metrics", defaultServer.GetBindAddr())
  1288  				resp, err := http.Get(metricsEndpoint)
  1289  				Expect(err).NotTo(HaveOccurred())
  1290  				defer resp.Body.Close()
  1291  				Expect(resp.StatusCode).To(Equal(200))
  1292  
  1293  				data, err := io.ReadAll(resp.Body)
  1294  				Expect(err).NotTo(HaveOccurred())
  1295  				Expect(string(data)).To(ContainSubstring("%s\n%s\n%s\n",
  1296  					`# HELP test_one test metric for testing`,
  1297  					`# TYPE test_one counter`,
  1298  					`test_one 1`,
  1299  				))
  1300  
  1301  				// Unregister will return false if the metric was never registered
  1302  				ok := metrics.Registry.Unregister(one)
  1303  				Expect(ok).To(BeTrue())
  1304  			})
  1305  
  1306  			It("should serve extra endpoints", func() {
  1307  				opts.Metrics.ExtraHandlers = map[string]http.Handler{
  1308  					"/debug": http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
  1309  						_, _ = w.Write([]byte("Some debug info"))
  1310  					}),
  1311  				}
  1312  				m, err := New(cfg, opts)
  1313  				Expect(err).NotTo(HaveOccurred())
  1314  
  1315  				ctx, cancel := context.WithCancel(context.Background())
  1316  				defer cancel()
  1317  				go func() {
  1318  					defer GinkgoRecover()
  1319  					Expect(m.Start(ctx)).NotTo(HaveOccurred())
  1320  				}()
  1321  				<-m.Elected()
  1322  				// Note: Wait until metrics server has been started. A finished leader election
  1323  				// doesn't guarantee that the metrics server is up.
  1324  				Eventually(func() string { return defaultServer.GetBindAddr() }, 10*time.Second).ShouldNot(BeEmpty())
  1325  
  1326  				endpoint := fmt.Sprintf("http://%s/debug", defaultServer.GetBindAddr())
  1327  				resp, err := http.Get(endpoint)
  1328  				Expect(err).NotTo(HaveOccurred())
  1329  				defer resp.Body.Close()
  1330  				Expect(resp.StatusCode).To(Equal(http.StatusOK))
  1331  
  1332  				body, err := io.ReadAll(resp.Body)
  1333  				Expect(err).NotTo(HaveOccurred())
  1334  				Expect(string(body)).To(Equal("Some debug info"))
  1335  			})
  1336  		})
  1337  	})
  1338  
  1339  	Context("should start serving health probes", func() {
  1340  		var listener net.Listener
  1341  		var opts Options
  1342  
  1343  		BeforeEach(func() {
  1344  			listener = nil
  1345  			opts = Options{
  1346  				newHealthProbeListener: func(addr string) (net.Listener, error) {
  1347  					var err error
  1348  					listener, err = defaultHealthProbeListener(addr)
  1349  					return listener, err
  1350  				},
  1351  			}
  1352  		})
  1353  
  1354  		AfterEach(func() {
  1355  			if listener != nil {
  1356  				listener.Close()
  1357  			}
  1358  		})
  1359  
  1360  		It("should stop serving health probes when stop is called", func() {
  1361  			opts.HealthProbeBindAddress = ":0"
  1362  			m, err := New(cfg, opts)
  1363  			Expect(err).NotTo(HaveOccurred())
  1364  
  1365  			ctx, cancel := context.WithCancel(context.Background())
  1366  			go func() {
  1367  				defer GinkgoRecover()
  1368  				Expect(m.Start(ctx)).NotTo(HaveOccurred())
  1369  			}()
  1370  			<-m.Elected()
  1371  
  1372  			// Check the health probes started
  1373  			endpoint := fmt.Sprintf("http://%s", listener.Addr().String())
  1374  			_, err = http.Get(endpoint)
  1375  			Expect(err).NotTo(HaveOccurred())
  1376  
  1377  			// Shutdown the server
  1378  			cancel()
  1379  
  1380  			// Expect the health probes server to shutdown
  1381  			Eventually(func() error {
  1382  				_, err = http.Get(endpoint)
  1383  				return err
  1384  			}, 10*time.Second).ShouldNot(Succeed())
  1385  		})
  1386  
  1387  		It("should serve readiness endpoint", func() {
  1388  			opts.HealthProbeBindAddress = ":0"
  1389  			m, err := New(cfg, opts)
  1390  			Expect(err).NotTo(HaveOccurred())
  1391  
  1392  			res := fmt.Errorf("not ready yet")
  1393  			namedCheck := "check"
  1394  			err = m.AddReadyzCheck(namedCheck, func(_ *http.Request) error { return res })
  1395  			Expect(err).NotTo(HaveOccurred())
  1396  
  1397  			ctx, cancel := context.WithCancel(context.Background())
  1398  			defer cancel()
  1399  			go func() {
  1400  				defer GinkgoRecover()
  1401  				Expect(m.Start(ctx)).NotTo(HaveOccurred())
  1402  			}()
  1403  			<-m.Elected()
  1404  
  1405  			readinessEndpoint := fmt.Sprint("http://", listener.Addr().String(), defaultReadinessEndpoint)
  1406  
  1407  			// Controller is not ready
  1408  			resp, err := http.Get(readinessEndpoint)
  1409  			Expect(err).NotTo(HaveOccurred())
  1410  			defer resp.Body.Close()
  1411  			Expect(resp.StatusCode).To(Equal(http.StatusInternalServerError))
  1412  
  1413  			// Controller is ready
  1414  			res = nil
  1415  			resp, err = http.Get(readinessEndpoint)
  1416  			Expect(err).NotTo(HaveOccurred())
  1417  			defer resp.Body.Close()
  1418  			Expect(resp.StatusCode).To(Equal(http.StatusOK))
  1419  
  1420  			// Check readiness path without trailing slash without redirect
  1421  			readinessEndpoint = fmt.Sprint("http://", listener.Addr().String(), defaultReadinessEndpoint)
  1422  			res = nil
  1423  			httpClient := http.Client{
  1424  				CheckRedirect: func(req *http.Request, via []*http.Request) error {
  1425  					return http.ErrUseLastResponse // Do not follow redirect
  1426  				},
  1427  			}
  1428  			resp, err = httpClient.Get(readinessEndpoint)
  1429  			Expect(err).NotTo(HaveOccurred())
  1430  			defer resp.Body.Close()
  1431  			Expect(resp.StatusCode).To(Equal(http.StatusOK))
  1432  
  1433  			// Check readiness path for individual check
  1434  			readinessEndpoint = fmt.Sprint("http://", listener.Addr().String(), path.Join(defaultReadinessEndpoint, namedCheck))
  1435  			res = nil
  1436  			resp, err = http.Get(readinessEndpoint)
  1437  			Expect(err).NotTo(HaveOccurred())
  1438  			defer resp.Body.Close()
  1439  			Expect(resp.StatusCode).To(Equal(http.StatusOK))
  1440  		})
  1441  
  1442  		It("should serve liveness endpoint", func() {
  1443  			opts.HealthProbeBindAddress = ":0"
  1444  			m, err := New(cfg, opts)
  1445  			Expect(err).NotTo(HaveOccurred())
  1446  
  1447  			res := fmt.Errorf("not alive")
  1448  			namedCheck := "check"
  1449  			err = m.AddHealthzCheck(namedCheck, func(_ *http.Request) error { return res })
  1450  			Expect(err).NotTo(HaveOccurred())
  1451  
  1452  			ctx, cancel := context.WithCancel(context.Background())
  1453  			defer cancel()
  1454  			go func() {
  1455  				defer GinkgoRecover()
  1456  				Expect(m.Start(ctx)).NotTo(HaveOccurred())
  1457  			}()
  1458  			<-m.Elected()
  1459  
  1460  			livenessEndpoint := fmt.Sprint("http://", listener.Addr().String(), defaultLivenessEndpoint)
  1461  
  1462  			// Controller is not ready
  1463  			resp, err := http.Get(livenessEndpoint)
  1464  			Expect(err).NotTo(HaveOccurred())
  1465  			defer resp.Body.Close()
  1466  			Expect(resp.StatusCode).To(Equal(http.StatusInternalServerError))
  1467  
  1468  			// Controller is ready
  1469  			res = nil
  1470  			resp, err = http.Get(livenessEndpoint)
  1471  			Expect(err).NotTo(HaveOccurred())
  1472  			defer resp.Body.Close()
  1473  			Expect(resp.StatusCode).To(Equal(http.StatusOK))
  1474  
  1475  			// Check liveness path without trailing slash without redirect
  1476  			livenessEndpoint = fmt.Sprint("http://", listener.Addr().String(), defaultLivenessEndpoint)
  1477  			res = nil
  1478  			httpClient := http.Client{
  1479  				CheckRedirect: func(req *http.Request, via []*http.Request) error {
  1480  					return http.ErrUseLastResponse // Do not follow redirect
  1481  				},
  1482  			}
  1483  			resp, err = httpClient.Get(livenessEndpoint)
  1484  			Expect(err).NotTo(HaveOccurred())
  1485  			defer resp.Body.Close()
  1486  			Expect(resp.StatusCode).To(Equal(http.StatusOK))
  1487  
  1488  			// Check readiness path for individual check
  1489  			livenessEndpoint = fmt.Sprint("http://", listener.Addr().String(), path.Join(defaultLivenessEndpoint, namedCheck))
  1490  			res = nil
  1491  			resp, err = http.Get(livenessEndpoint)
  1492  			Expect(err).NotTo(HaveOccurred())
  1493  			defer resp.Body.Close()
  1494  			Expect(resp.StatusCode).To(Equal(http.StatusOK))
  1495  		})
  1496  	})
  1497  
  1498  	Context("should start serving pprof", func() {
  1499  		var listener net.Listener
  1500  		var opts Options
  1501  
  1502  		BeforeEach(func() {
  1503  			listener = nil
  1504  			opts = Options{
  1505  				newPprofListener: func(addr string) (net.Listener, error) {
  1506  					var err error
  1507  					listener, err = defaultPprofListener(addr)
  1508  					return listener, err
  1509  				},
  1510  			}
  1511  		})
  1512  
  1513  		AfterEach(func() {
  1514  			if listener != nil {
  1515  				listener.Close()
  1516  			}
  1517  		})
  1518  
  1519  		It("should stop serving pprof when stop is called", func() {
  1520  			opts.PprofBindAddress = ":0"
  1521  			m, err := New(cfg, opts)
  1522  			Expect(err).NotTo(HaveOccurred())
  1523  
  1524  			ctx, cancel := context.WithCancel(context.Background())
  1525  			go func() {
  1526  				defer GinkgoRecover()
  1527  				Expect(m.Start(ctx)).NotTo(HaveOccurred())
  1528  			}()
  1529  			<-m.Elected()
  1530  
  1531  			// Check the pprof started
  1532  			endpoint := fmt.Sprintf("http://%s", listener.Addr().String())
  1533  			_, err = http.Get(endpoint)
  1534  			Expect(err).NotTo(HaveOccurred())
  1535  
  1536  			// Shutdown the server
  1537  			cancel()
  1538  
  1539  			// Expect the pprof server to shutdown
  1540  			Eventually(func() error {
  1541  				_, err = http.Get(endpoint)
  1542  				return err
  1543  			}, 10*time.Second).ShouldNot(Succeed())
  1544  		})
  1545  
  1546  		It("should serve pprof endpoints", func() {
  1547  			opts.PprofBindAddress = ":0"
  1548  			m, err := New(cfg, opts)
  1549  			Expect(err).NotTo(HaveOccurred())
  1550  
  1551  			ctx, cancel := context.WithCancel(context.Background())
  1552  			defer cancel()
  1553  			go func() {
  1554  				defer GinkgoRecover()
  1555  				Expect(m.Start(ctx)).NotTo(HaveOccurred())
  1556  			}()
  1557  			<-m.Elected()
  1558  
  1559  			pprofIndexEndpoint := fmt.Sprintf("http://%s/debug/pprof/", listener.Addr().String())
  1560  			resp, err := http.Get(pprofIndexEndpoint)
  1561  			Expect(err).NotTo(HaveOccurred())
  1562  			defer resp.Body.Close()
  1563  			Expect(resp.StatusCode).To(Equal(http.StatusOK))
  1564  
  1565  			pprofCmdlineEndpoint := fmt.Sprintf("http://%s/debug/pprof/cmdline", listener.Addr().String())
  1566  			resp, err = http.Get(pprofCmdlineEndpoint)
  1567  			Expect(err).NotTo(HaveOccurred())
  1568  			defer resp.Body.Close()
  1569  			Expect(resp.StatusCode).To(Equal(http.StatusOK))
  1570  
  1571  			pprofProfileEndpoint := fmt.Sprintf("http://%s/debug/pprof/profile", listener.Addr().String())
  1572  			resp, err = http.Get(pprofProfileEndpoint)
  1573  			Expect(err).NotTo(HaveOccurred())
  1574  			defer resp.Body.Close()
  1575  			Expect(resp.StatusCode).To(Equal(http.StatusOK))
  1576  
  1577  			pprofSymbolEndpoint := fmt.Sprintf("http://%s/debug/pprof/symbol", listener.Addr().String())
  1578  			resp, err = http.Get(pprofSymbolEndpoint)
  1579  			Expect(err).NotTo(HaveOccurred())
  1580  			defer resp.Body.Close()
  1581  			Expect(resp.StatusCode).To(Equal(http.StatusOK))
  1582  
  1583  			pprofTraceEndpoint := fmt.Sprintf("http://%s/debug/pprof/trace", listener.Addr().String())
  1584  			resp, err = http.Get(pprofTraceEndpoint)
  1585  			Expect(err).NotTo(HaveOccurred())
  1586  			defer resp.Body.Close()
  1587  			Expect(resp.StatusCode).To(Equal(http.StatusOK))
  1588  		})
  1589  	})
  1590  
  1591  	Describe("Add", func() {
  1592  		It("should immediately start the Component if the Manager has already Started another Component",
  1593  			func() {
  1594  				m, err := New(cfg, Options{})
  1595  				Expect(err).NotTo(HaveOccurred())
  1596  				mgr, ok := m.(*controllerManager)
  1597  				Expect(ok).To(BeTrue())
  1598  
  1599  				// Add one component before starting
  1600  				c1 := make(chan struct{})
  1601  				Expect(m.Add(RunnableFunc(func(context.Context) error {
  1602  					defer GinkgoRecover()
  1603  					close(c1)
  1604  					return nil
  1605  				}))).To(Succeed())
  1606  
  1607  				ctx, cancel := context.WithCancel(context.Background())
  1608  				defer cancel()
  1609  				go func() {
  1610  					defer GinkgoRecover()
  1611  					Expect(m.Start(ctx)).NotTo(HaveOccurred())
  1612  				}()
  1613  				<-m.Elected()
  1614  
  1615  				// Wait for the Manager to start
  1616  				Eventually(func() bool {
  1617  					return mgr.runnables.Caches.Started()
  1618  				}).Should(BeTrue())
  1619  
  1620  				// Add another component after starting
  1621  				c2 := make(chan struct{})
  1622  				Expect(m.Add(RunnableFunc(func(context.Context) error {
  1623  					defer GinkgoRecover()
  1624  					close(c2)
  1625  					return nil
  1626  				}))).To(Succeed())
  1627  				<-c1
  1628  				<-c2
  1629  			})
  1630  
  1631  		It("should immediately start the Component if the Manager has already Started", func() {
  1632  			m, err := New(cfg, Options{})
  1633  			Expect(err).NotTo(HaveOccurred())
  1634  			mgr, ok := m.(*controllerManager)
  1635  			Expect(ok).To(BeTrue())
  1636  
  1637  			ctx, cancel := context.WithCancel(context.Background())
  1638  			defer cancel()
  1639  			go func() {
  1640  				defer GinkgoRecover()
  1641  				Expect(m.Start(ctx)).NotTo(HaveOccurred())
  1642  			}()
  1643  
  1644  			// Wait for the Manager to start
  1645  			Eventually(func() bool {
  1646  				return mgr.runnables.Caches.Started()
  1647  			}).Should(BeTrue())
  1648  
  1649  			c1 := make(chan struct{})
  1650  			Expect(m.Add(RunnableFunc(func(context.Context) error {
  1651  				defer GinkgoRecover()
  1652  				close(c1)
  1653  				return nil
  1654  			}))).To(Succeed())
  1655  			<-c1
  1656  		})
  1657  
  1658  		It("should fail if attempted to start a second time", func() {
  1659  			m, err := New(cfg, Options{})
  1660  			Expect(err).NotTo(HaveOccurred())
  1661  
  1662  			ctx, cancel := context.WithCancel(context.Background())
  1663  			defer cancel()
  1664  			go func() {
  1665  				defer GinkgoRecover()
  1666  				Expect(m.Start(ctx)).NotTo(HaveOccurred())
  1667  			}()
  1668  			// Wait for the Manager to start
  1669  			Eventually(func() bool {
  1670  				mgr, ok := m.(*controllerManager)
  1671  				Expect(ok).To(BeTrue())
  1672  				return mgr.runnables.Caches.Started()
  1673  			}).Should(BeTrue())
  1674  
  1675  			err = m.Start(ctx)
  1676  			Expect(err).To(HaveOccurred())
  1677  			Expect(err.Error()).To(Equal("manager already started"))
  1678  
  1679  		})
  1680  	})
  1681  
  1682  	It("should not leak goroutines when stopped", func() {
  1683  		currentGRs := goleak.IgnoreCurrent()
  1684  
  1685  		m, err := New(cfg, Options{})
  1686  		Expect(err).NotTo(HaveOccurred())
  1687  
  1688  		ctx, cancel := context.WithCancel(context.Background())
  1689  		cancel()
  1690  		Expect(m.Start(ctx)).NotTo(HaveOccurred())
  1691  
  1692  		// force-close keep-alive connections.  These'll time anyway (after
  1693  		// like 30s or so) but force it to speed up the tests.
  1694  		clientTransport.CloseIdleConnections()
  1695  		Eventually(func() error { return goleak.Find(currentGRs) }).Should(Succeed())
  1696  	})
  1697  
  1698  	It("should not leak goroutines if the default event broadcaster is used & events are emitted", func() {
  1699  		currentGRs := goleak.IgnoreCurrent()
  1700  
  1701  		m, err := New(cfg, Options{ /* implicit: default setting for EventBroadcaster */ })
  1702  		Expect(err).NotTo(HaveOccurred())
  1703  
  1704  		By("adding a runnable that emits an event")
  1705  		ns := corev1.Namespace{}
  1706  		ns.Name = "default"
  1707  
  1708  		recorder := m.GetEventRecorderFor("rock-and-roll")
  1709  		Expect(m.Add(RunnableFunc(func(_ context.Context) error {
  1710  			recorder.Event(&ns, "Warning", "BallroomBlitz", "yeah, yeah, yeah-yeah-yeah")
  1711  			return nil
  1712  		}))).To(Succeed())
  1713  
  1714  		By("starting the manager & waiting till we've sent our event")
  1715  		ctx, cancel := context.WithCancel(context.Background())
  1716  		doneCh := make(chan struct{})
  1717  		go func() {
  1718  			defer GinkgoRecover()
  1719  			defer close(doneCh)
  1720  			Expect(m.Start(ctx)).To(Succeed())
  1721  		}()
  1722  		<-m.Elected()
  1723  
  1724  		Eventually(func() *corev1.Event {
  1725  			evts, err := clientset.CoreV1().Events("").Search(m.GetScheme(), &ns)
  1726  			Expect(err).NotTo(HaveOccurred())
  1727  
  1728  			for i, evt := range evts.Items {
  1729  				if evt.Reason == "BallroomBlitz" {
  1730  					return &evts.Items[i]
  1731  				}
  1732  			}
  1733  			return nil
  1734  		}).ShouldNot(BeNil())
  1735  
  1736  		By("making sure there's no extra go routines still running after we stop")
  1737  		cancel()
  1738  		<-doneCh
  1739  
  1740  		// force-close keep-alive connections.  These'll time anyway (after
  1741  		// like 30s or so) but force it to speed up the tests.
  1742  		clientTransport.CloseIdleConnections()
  1743  		Eventually(func() error { return goleak.Find(currentGRs) }).Should(Succeed())
  1744  	})
  1745  
  1746  	It("should provide a function to get the Config", func() {
  1747  		m, err := New(cfg, Options{})
  1748  		Expect(err).NotTo(HaveOccurred())
  1749  		mgr, ok := m.(*controllerManager)
  1750  		Expect(ok).To(BeTrue())
  1751  		Expect(m.GetConfig()).To(Equal(mgr.cluster.GetConfig()))
  1752  	})
  1753  
  1754  	It("should provide a function to get the Client", func() {
  1755  		m, err := New(cfg, Options{})
  1756  		Expect(err).NotTo(HaveOccurred())
  1757  		mgr, ok := m.(*controllerManager)
  1758  		Expect(ok).To(BeTrue())
  1759  		Expect(m.GetClient()).To(Equal(mgr.cluster.GetClient()))
  1760  	})
  1761  
  1762  	It("should provide a function to get the Scheme", func() {
  1763  		m, err := New(cfg, Options{})
  1764  		Expect(err).NotTo(HaveOccurred())
  1765  		mgr, ok := m.(*controllerManager)
  1766  		Expect(ok).To(BeTrue())
  1767  		Expect(m.GetScheme()).To(Equal(mgr.cluster.GetScheme()))
  1768  	})
  1769  
  1770  	It("should provide a function to get the FieldIndexer", func() {
  1771  		m, err := New(cfg, Options{})
  1772  		Expect(err).NotTo(HaveOccurred())
  1773  		mgr, ok := m.(*controllerManager)
  1774  		Expect(ok).To(BeTrue())
  1775  		Expect(m.GetFieldIndexer()).To(Equal(mgr.cluster.GetFieldIndexer()))
  1776  	})
  1777  
  1778  	It("should provide a function to get the EventRecorder", func() {
  1779  		m, err := New(cfg, Options{})
  1780  		Expect(err).NotTo(HaveOccurred())
  1781  		Expect(m.GetEventRecorderFor("test")).NotTo(BeNil())
  1782  	})
  1783  	It("should provide a function to get the APIReader", func() {
  1784  		m, err := New(cfg, Options{})
  1785  		Expect(err).NotTo(HaveOccurred())
  1786  		Expect(m.GetAPIReader()).NotTo(BeNil())
  1787  	})
  1788  })
  1789  
  1790  type runnableError struct {
  1791  }
  1792  
  1793  func (runnableError) Error() string {
  1794  	return "not feeling like that"
  1795  }
  1796  
  1797  var _ Runnable = &cacheProvider{}
  1798  
  1799  type cacheProvider struct {
  1800  	cache cache.Cache
  1801  }
  1802  
  1803  func (c *cacheProvider) GetCache() cache.Cache {
  1804  	return c.cache
  1805  }
  1806  
  1807  func (c *cacheProvider) Start(ctx context.Context) error {
  1808  	return c.cache.Start(ctx)
  1809  }
  1810  
  1811  type startSignalingInformer struct {
  1812  	mu sync.Mutex
  1813  
  1814  	// The manager calls Start and WaitForCacheSync in
  1815  	// parallel, so we have to protect wasStarted with a Mutex
  1816  	// and block in WaitForCacheSync until it is true.
  1817  	wasStarted bool
  1818  	// was synced will be true once Start was called and
  1819  	// WaitForCacheSync returned, just like a real cache.
  1820  	wasSynced bool
  1821  	cache.Cache
  1822  }
  1823  
  1824  func (c *startSignalingInformer) Start(ctx context.Context) error {
  1825  	c.mu.Lock()
  1826  	c.wasStarted = true
  1827  	c.mu.Unlock()
  1828  	return c.Cache.Start(ctx)
  1829  }
  1830  
  1831  func (c *startSignalingInformer) WaitForCacheSync(ctx context.Context) bool {
  1832  	defer func() {
  1833  		c.mu.Lock()
  1834  		c.wasSynced = true
  1835  		c.mu.Unlock()
  1836  	}()
  1837  	return c.Cache.WaitForCacheSync(ctx)
  1838  }
  1839  
  1840  type startClusterAfterManager struct {
  1841  	informer *startSignalingInformer
  1842  }
  1843  
  1844  func (c *startClusterAfterManager) Start(ctx context.Context) error {
  1845  	return c.informer.Start(ctx)
  1846  }
  1847  
  1848  func (c *startClusterAfterManager) GetCache() cache.Cache {
  1849  	return c.informer
  1850  }
  1851  
  1852  // metricsDefaultServer is used to type check the default metrics server implementation
  1853  // so we can retrieve the bind addr without having to make GetBindAddr a function on the
  1854  // metricsserver.Server interface or resort to reflection.
  1855  type metricsDefaultServer interface {
  1856  	GetBindAddr() string
  1857  }
  1858  
  1859  type needElection struct {
  1860  	ch chan struct{}
  1861  }
  1862  
  1863  func (n *needElection) Start(_ context.Context) error {
  1864  	n.ch <- struct{}{}
  1865  	return nil
  1866  }
  1867  
  1868  func (n *needElection) NeedLeaderElection() bool {
  1869  	return true
  1870  }
  1871  

View as plain text