1
16
17 package manager
18
19 import (
20 "context"
21 "errors"
22 "fmt"
23 "io"
24 "net"
25 "net/http"
26 "path"
27 "sync"
28 "sync/atomic"
29 "time"
30
31 "github.com/go-logr/logr"
32 . "github.com/onsi/ginkgo/v2"
33 . "github.com/onsi/gomega"
34 "github.com/prometheus/client_golang/prometheus"
35 "go.uber.org/goleak"
36 coordinationv1 "k8s.io/api/coordination/v1"
37 corev1 "k8s.io/api/core/v1"
38 "k8s.io/apimachinery/pkg/api/meta"
39 "k8s.io/apimachinery/pkg/runtime"
40 "k8s.io/apimachinery/pkg/runtime/schema"
41 "k8s.io/client-go/rest"
42 "k8s.io/client-go/tools/leaderelection/resourcelock"
43 "sigs.k8s.io/controller-runtime/pkg/cache"
44 "sigs.k8s.io/controller-runtime/pkg/cache/informertest"
45 "sigs.k8s.io/controller-runtime/pkg/client"
46 "sigs.k8s.io/controller-runtime/pkg/cluster"
47 intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder"
48 "sigs.k8s.io/controller-runtime/pkg/leaderelection"
49 fakeleaderelection "sigs.k8s.io/controller-runtime/pkg/leaderelection/fake"
50 "sigs.k8s.io/controller-runtime/pkg/metrics"
51 metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
52 "sigs.k8s.io/controller-runtime/pkg/recorder"
53 "sigs.k8s.io/controller-runtime/pkg/webhook"
54 )
55
56 var _ = Describe("manger.Manager", func() {
57 Describe("New", func() {
58 It("should return an error if there is no Config", func() {
59 m, err := New(nil, Options{})
60 Expect(m).To(BeNil())
61 Expect(err.Error()).To(ContainSubstring("must specify Config"))
62
63 })
64
65 It("should return an error if it can't create a RestMapper", func() {
66 expected := fmt.Errorf("expected error: RestMapper")
67 m, err := New(cfg, Options{
68 MapperProvider: func(c *rest.Config, httpClient *http.Client) (meta.RESTMapper, error) { return nil, expected },
69 })
70 Expect(m).To(BeNil())
71 Expect(err).To(Equal(expected))
72
73 })
74
75 It("should return an error it can't create a client.Client", func() {
76 m, err := New(cfg, Options{
77 NewClient: func(config *rest.Config, options client.Options) (client.Client, error) {
78 return nil, errors.New("expected error")
79 },
80 })
81 Expect(m).To(BeNil())
82 Expect(err).To(HaveOccurred())
83 Expect(err.Error()).To(ContainSubstring("expected error"))
84 })
85
86 It("should return an error it can't create a cache.Cache", func() {
87 m, err := New(cfg, Options{
88 NewCache: func(config *rest.Config, opts cache.Options) (cache.Cache, error) {
89 return nil, fmt.Errorf("expected error")
90 },
91 })
92 Expect(m).To(BeNil())
93 Expect(err).To(HaveOccurred())
94 Expect(err.Error()).To(ContainSubstring("expected error"))
95 })
96
97 It("should create a client defined in by the new client function", func() {
98 m, err := New(cfg, Options{
99 NewClient: func(config *rest.Config, options client.Options) (client.Client, error) {
100 return nil, nil
101 },
102 })
103 Expect(m).ToNot(BeNil())
104 Expect(err).ToNot(HaveOccurred())
105 Expect(m.GetClient()).To(BeNil())
106 })
107
108 It("should return an error it can't create a recorder.Provider", func() {
109 m, err := New(cfg, Options{
110 newRecorderProvider: func(_ *rest.Config, _ *http.Client, _ *runtime.Scheme, _ logr.Logger, _ intrec.EventBroadcasterProducer) (*intrec.Provider, error) {
111 return nil, fmt.Errorf("expected error")
112 },
113 })
114 Expect(m).To(BeNil())
115 Expect(err).To(HaveOccurred())
116 Expect(err.Error()).To(ContainSubstring("expected error"))
117 })
118
119 It("should lazily initialize a webhook server if needed", func() {
120 By("creating a manager with options")
121 m, err := New(cfg, Options{WebhookServer: webhook.NewServer(webhook.Options{Port: 9440, Host: "foo.com"})})
122 Expect(err).NotTo(HaveOccurred())
123 Expect(m).NotTo(BeNil())
124
125 By("checking options are passed to the webhook server")
126 svr := m.GetWebhookServer()
127 Expect(svr).NotTo(BeNil())
128 Expect(svr.(*webhook.DefaultServer).Options.Port).To(Equal(9440))
129 Expect(svr.(*webhook.DefaultServer).Options.Host).To(Equal("foo.com"))
130 })
131
132 It("should not initialize a webhook server if Options.WebhookServer is set", func() {
133 By("creating a manager with options")
134 srv := webhook.NewServer(webhook.Options{Port: 9440})
135 m, err := New(cfg, Options{WebhookServer: srv})
136 Expect(err).NotTo(HaveOccurred())
137 Expect(m).NotTo(BeNil())
138
139 By("checking the server contains the Port set on the webhook server and not passed to Options")
140 svr := m.GetWebhookServer()
141 Expect(svr).NotTo(BeNil())
142 Expect(svr).To(Equal(srv))
143 Expect(svr.(*webhook.DefaultServer).Options.Port).To(Equal(9440))
144 })
145
146 It("should allow passing a custom webhook.Server implementation", func() {
147 type customWebhook struct {
148 webhook.Server
149 }
150 m, err := New(cfg, Options{WebhookServer: customWebhook{}})
151 Expect(err).NotTo(HaveOccurred())
152 Expect(m).NotTo(BeNil())
153
154 svr := m.GetWebhookServer()
155 Expect(svr).NotTo(BeNil())
156
157 _, isCustomWebhook := svr.(customWebhook)
158 Expect(isCustomWebhook).To(BeTrue())
159 })
160
161 Context("with leader election enabled", func() {
162 It("should only cancel the leader election after all runnables are done", func() {
163 m, err := New(cfg, Options{
164 LeaderElection: true,
165 LeaderElectionNamespace: "default",
166 LeaderElectionID: "test-leader-election-id-2",
167 HealthProbeBindAddress: "0",
168 Metrics: metricsserver.Options{BindAddress: "0"},
169 PprofBindAddress: "0",
170 })
171 Expect(err).ToNot(HaveOccurred())
172 gvkcorev1 := schema.GroupVersionKind{Group: corev1.SchemeGroupVersion.Group, Version: corev1.SchemeGroupVersion.Version, Kind: "ConfigMap"}
173 gvkcoordinationv1 := schema.GroupVersionKind{Group: coordinationv1.SchemeGroupVersion.Group, Version: coordinationv1.SchemeGroupVersion.Version, Kind: "Lease"}
174 Expect(m.GetScheme().Recognizes(gvkcorev1)).To(BeTrue())
175 Expect(m.GetScheme().Recognizes(gvkcoordinationv1)).To(BeTrue())
176 runnableDone := make(chan struct{})
177 slowRunnable := RunnableFunc(func(ctx context.Context) error {
178 <-ctx.Done()
179 time.Sleep(100 * time.Millisecond)
180 close(runnableDone)
181 return nil
182 })
183 Expect(m.Add(slowRunnable)).To(Succeed())
184
185 cm := m.(*controllerManager)
186 cm.gracefulShutdownTimeout = time.Second
187 leaderElectionDone := make(chan struct{})
188 cm.onStoppedLeading = func() {
189 close(leaderElectionDone)
190 }
191
192 ctx, cancel := context.WithCancel(context.Background())
193 mgrDone := make(chan struct{})
194 go func() {
195 defer GinkgoRecover()
196 Expect(m.Start(ctx)).To(Succeed())
197 close(mgrDone)
198 }()
199 <-cm.Elected()
200 cancel()
201 select {
202 case <-leaderElectionDone:
203 Expect(errors.New("leader election was cancelled before runnables were done")).ToNot(HaveOccurred())
204 case <-runnableDone:
205
206 }
207
208 <-mgrDone
209
210 })
211 It("should disable gracefulShutdown when stopping to lead", func() {
212 m, err := New(cfg, Options{
213 LeaderElection: true,
214 LeaderElectionNamespace: "default",
215 LeaderElectionID: "test-leader-election-id-3",
216 HealthProbeBindAddress: "0",
217 Metrics: metricsserver.Options{BindAddress: "0"},
218 PprofBindAddress: "0",
219 })
220 Expect(err).ToNot(HaveOccurred())
221
222 ctx, cancel := context.WithCancel(context.Background())
223 defer cancel()
224 mgrDone := make(chan struct{})
225 go func() {
226 defer GinkgoRecover()
227 err := m.Start(ctx)
228 Expect(err).To(HaveOccurred())
229 Expect(err.Error()).To(ContainSubstring("leader election lost"))
230 close(mgrDone)
231 }()
232 cm := m.(*controllerManager)
233 <-cm.elected
234
235 cm.leaderElectionCancel()
236 <-mgrDone
237
238 Expect(cm.gracefulShutdownTimeout.Nanoseconds()).To(Equal(int64(0)))
239 })
240
241 It("should prevent leader election when shutting down a non-elected manager", func() {
242 var rl resourcelock.Interface
243 m1, err := New(cfg, Options{
244 LeaderElection: true,
245 LeaderElectionNamespace: "default",
246 LeaderElectionID: "test-leader-election-id",
247 newResourceLock: func(config *rest.Config, recorderProvider recorder.Provider, options leaderelection.Options) (resourcelock.Interface, error) {
248 var err error
249 rl, err = leaderelection.NewResourceLock(config, recorderProvider, options)
250 return rl, err
251 },
252 HealthProbeBindAddress: "0",
253 Metrics: metricsserver.Options{BindAddress: "0"},
254 PprofBindAddress: "0",
255 })
256 Expect(err).ToNot(HaveOccurred())
257 Expect(m1).ToNot(BeNil())
258 Expect(rl.Describe()).To(Equal("default/test-leader-election-id"))
259
260 m1cm, ok := m1.(*controllerManager)
261 Expect(ok).To(BeTrue())
262 m1cm.onStoppedLeading = func() {}
263
264 m2, err := New(cfg, Options{
265 LeaderElection: true,
266 LeaderElectionNamespace: "default",
267 LeaderElectionID: "test-leader-election-id",
268 newResourceLock: func(config *rest.Config, recorderProvider recorder.Provider, options leaderelection.Options) (resourcelock.Interface, error) {
269 var err error
270 rl, err = leaderelection.NewResourceLock(config, recorderProvider, options)
271 return rl, err
272 },
273 HealthProbeBindAddress: "0",
274 Metrics: metricsserver.Options{BindAddress: "0"},
275 PprofBindAddress: "0",
276 })
277 Expect(err).ToNot(HaveOccurred())
278 Expect(m2).ToNot(BeNil())
279 Expect(rl.Describe()).To(Equal("default/test-leader-election-id"))
280
281 m1done := make(chan struct{})
282 Expect(m1.Add(RunnableFunc(func(ctx context.Context) error {
283 defer GinkgoRecover()
284 close(m1done)
285 return nil
286 }))).To(Succeed())
287
288 ctx1, cancel1 := context.WithCancel(context.Background())
289 defer cancel1()
290 go func() {
291 defer GinkgoRecover()
292 Expect(m1.Elected()).ShouldNot(BeClosed())
293 Expect(m1.Start(ctx1)).NotTo(HaveOccurred())
294 }()
295 <-m1.Elected()
296 <-m1done
297
298 electionRunnable := &needElection{make(chan struct{})}
299
300 Expect(m2.Add(electionRunnable)).To(Succeed())
301
302 ctx2, cancel2 := context.WithCancel(context.Background())
303 m2done := make(chan struct{})
304 go func() {
305 defer GinkgoRecover()
306 Expect(m2.Start(ctx2)).NotTo(HaveOccurred())
307 close(m2done)
308 }()
309 Consistently(m2.Elected()).ShouldNot(Receive())
310
311 go func() {
312 defer GinkgoRecover()
313 Consistently(electionRunnable.ch).ShouldNot(Receive())
314 }()
315 cancel2()
316 <-m2done
317 })
318
319 It("should default ID to controller-runtime if ID is not set", func() {
320 var rl resourcelock.Interface
321 m1, err := New(cfg, Options{
322 LeaderElection: true,
323 LeaderElectionNamespace: "default",
324 LeaderElectionID: "test-leader-election-id",
325 newResourceLock: func(config *rest.Config, recorderProvider recorder.Provider, options leaderelection.Options) (resourcelock.Interface, error) {
326 var err error
327 rl, err = leaderelection.NewResourceLock(config, recorderProvider, options)
328 return rl, err
329 },
330 HealthProbeBindAddress: "0",
331 Metrics: metricsserver.Options{BindAddress: "0"},
332 PprofBindAddress: "0",
333 })
334 Expect(err).ToNot(HaveOccurred())
335 Expect(m1).ToNot(BeNil())
336 Expect(rl.Describe()).To(Equal("default/test-leader-election-id"))
337
338 m1cm, ok := m1.(*controllerManager)
339 Expect(ok).To(BeTrue())
340 m1cm.onStoppedLeading = func() {}
341
342 m2, err := New(cfg, Options{
343 LeaderElection: true,
344 LeaderElectionNamespace: "default",
345 LeaderElectionID: "test-leader-election-id",
346 newResourceLock: func(config *rest.Config, recorderProvider recorder.Provider, options leaderelection.Options) (resourcelock.Interface, error) {
347 var err error
348 rl, err = leaderelection.NewResourceLock(config, recorderProvider, options)
349 return rl, err
350 },
351 HealthProbeBindAddress: "0",
352 Metrics: metricsserver.Options{BindAddress: "0"},
353 PprofBindAddress: "0",
354 })
355 Expect(err).ToNot(HaveOccurred())
356 Expect(m2).ToNot(BeNil())
357 Expect(rl.Describe()).To(Equal("default/test-leader-election-id"))
358
359 m2cm, ok := m2.(*controllerManager)
360 Expect(ok).To(BeTrue())
361 m2cm.onStoppedLeading = func() {}
362
363 c1 := make(chan struct{})
364 Expect(m1.Add(RunnableFunc(func(ctx context.Context) error {
365 defer GinkgoRecover()
366 close(c1)
367 return nil
368 }))).To(Succeed())
369
370 ctx1, cancel1 := context.WithCancel(context.Background())
371 defer cancel1()
372 go func() {
373 defer GinkgoRecover()
374 Expect(m1.Elected()).ShouldNot(BeClosed())
375 Expect(m1.Start(ctx1)).NotTo(HaveOccurred())
376 }()
377 <-m1.Elected()
378 <-c1
379
380 c2 := make(chan struct{})
381 Expect(m2.Add(RunnableFunc(func(context.Context) error {
382 defer GinkgoRecover()
383 close(c2)
384 return nil
385 }))).To(Succeed())
386
387 ctx2, cancel := context.WithCancel(context.Background())
388 m2done := make(chan struct{})
389 go func() {
390 defer GinkgoRecover()
391 Expect(m2.Start(ctx2)).NotTo(HaveOccurred())
392 close(m2done)
393 }()
394 Consistently(m2.Elected()).ShouldNot(Receive())
395
396 Consistently(c2).ShouldNot(Receive())
397 cancel()
398 <-m2done
399 })
400
401 It("should return an error if it can't create a ResourceLock", func() {
402 m, err := New(cfg, Options{
403 newResourceLock: func(_ *rest.Config, _ recorder.Provider, _ leaderelection.Options) (resourcelock.Interface, error) {
404 return nil, fmt.Errorf("expected error")
405 },
406 })
407 Expect(m).To(BeNil())
408 Expect(err).To(MatchError(ContainSubstring("expected error")))
409 })
410
411 It("should return an error if namespace not set and not running in cluster", func() {
412 m, err := New(cfg, Options{LeaderElection: true, LeaderElectionID: "controller-runtime"})
413 Expect(m).To(BeNil())
414 Expect(err).To(HaveOccurred())
415 Expect(err.Error()).To(ContainSubstring("unable to find leader election namespace: not running in-cluster, please specify LeaderElectionNamespace"))
416 })
417
418
419
420
421 It("should default to LeasesResourceLock", func() {
422 m, err := New(cfg, Options{LeaderElection: true, LeaderElectionID: "controller-runtime", LeaderElectionNamespace: "my-ns"})
423 Expect(m).ToNot(BeNil())
424 Expect(err).ToNot(HaveOccurred())
425 cm, ok := m.(*controllerManager)
426 Expect(ok).To(BeTrue())
427 _, isLeaseLock := cm.resourceLock.(*resourcelock.LeaseLock)
428 Expect(isLeaseLock).To(BeTrue())
429
430 })
431 It("should use the specified ResourceLock", func() {
432 m, err := New(cfg, Options{
433 LeaderElection: true,
434 LeaderElectionResourceLock: resourcelock.LeasesResourceLock,
435 LeaderElectionID: "controller-runtime",
436 LeaderElectionNamespace: "my-ns",
437 })
438 Expect(m).ToNot(BeNil())
439 Expect(err).ToNot(HaveOccurred())
440 cm, ok := m.(*controllerManager)
441 Expect(ok).To(BeTrue())
442 _, isLeaseLock := cm.resourceLock.(*resourcelock.LeaseLock)
443 Expect(isLeaseLock).To(BeTrue())
444 })
445 It("should release lease if ElectionReleaseOnCancel is true", func() {
446 var rl resourcelock.Interface
447 m, err := New(cfg, Options{
448 LeaderElection: true,
449 LeaderElectionResourceLock: resourcelock.LeasesResourceLock,
450 LeaderElectionID: "controller-runtime",
451 LeaderElectionNamespace: "my-ns",
452 LeaderElectionReleaseOnCancel: true,
453 newResourceLock: func(config *rest.Config, recorderProvider recorder.Provider, options leaderelection.Options) (resourcelock.Interface, error) {
454 var err error
455 rl, err = fakeleaderelection.NewResourceLock(config, recorderProvider, options)
456 return rl, err
457 },
458 })
459 Expect(err).ToNot(HaveOccurred())
460
461 ctx, cancel := context.WithCancel(context.Background())
462 doneCh := make(chan struct{})
463 go func() {
464 defer GinkgoRecover()
465 defer close(doneCh)
466 Expect(m.Start(ctx)).NotTo(HaveOccurred())
467 }()
468 <-m.(*controllerManager).elected
469 cancel()
470 <-doneCh
471
472 ctx, cancel = context.WithCancel(context.Background())
473 defer cancel()
474 record, _, err := rl.Get(ctx)
475 Expect(err).ToNot(HaveOccurred())
476 Expect(record.HolderIdentity).To(BeEmpty())
477 })
478 When("using a custom LeaderElectionResourceLockInterface", func() {
479 It("should use the custom LeaderElectionResourceLockInterface", func() {
480 rl, err := fakeleaderelection.NewResourceLock(nil, nil, leaderelection.Options{})
481 Expect(err).NotTo(HaveOccurred())
482
483 m, err := New(cfg, Options{
484 LeaderElection: true,
485 LeaderElectionResourceLockInterface: rl,
486 newResourceLock: func(config *rest.Config, recorderProvider recorder.Provider, options leaderelection.Options) (resourcelock.Interface, error) {
487 return nil, fmt.Errorf("this should not be called")
488 },
489 })
490 Expect(m).ToNot(BeNil())
491 Expect(err).ToNot(HaveOccurred())
492 cm, ok := m.(*controllerManager)
493 Expect(ok).To(BeTrue())
494 Expect(cm.resourceLock).To(Equal(rl))
495 })
496 })
497 })
498
499 It("should create a metrics server if a valid address is provided", func() {
500 var srv metricsserver.Server
501 m, err := New(cfg, Options{
502 Metrics: metricsserver.Options{BindAddress: ":0"},
503 newMetricsServer: func(options metricsserver.Options, config *rest.Config, httpClient *http.Client) (metricsserver.Server, error) {
504 var err error
505 srv, err = metricsserver.NewServer(options, config, httpClient)
506 return srv, err
507 },
508 })
509 Expect(m).ToNot(BeNil())
510 Expect(err).ToNot(HaveOccurred())
511 Expect(srv).ToNot(BeNil())
512
513
514
515 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
516 Expect(srv.Start(ctx)).To(Succeed())
517 cancel()
518 })
519
520 It("should create a metrics server if a valid address is provided and secure serving is enabled", func() {
521 var srv metricsserver.Server
522 m, err := New(cfg, Options{
523 Metrics: metricsserver.Options{BindAddress: ":0", SecureServing: true},
524 newMetricsServer: func(options metricsserver.Options, config *rest.Config, httpClient *http.Client) (metricsserver.Server, error) {
525 var err error
526 srv, err = metricsserver.NewServer(options, config, httpClient)
527 return srv, err
528 },
529 })
530 Expect(m).ToNot(BeNil())
531 Expect(err).ToNot(HaveOccurred())
532 Expect(srv).ToNot(BeNil())
533
534
535
536 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
537 Expect(srv.Start(ctx)).To(Succeed())
538 cancel()
539 })
540
541 It("should be able to create a manager with a cache that fails on missing informer", func() {
542 m, err := New(cfg, Options{
543 Cache: cache.Options{
544 ReaderFailOnMissingInformer: true,
545 },
546 })
547 Expect(m).ToNot(BeNil())
548 Expect(err).ToNot(HaveOccurred())
549 })
550
551 It("should return an error if the metrics bind address is already in use", func() {
552 ln, err := net.Listen("tcp", ":0")
553 Expect(err).ShouldNot(HaveOccurred())
554
555 var srv metricsserver.Server
556 m, err := New(cfg, Options{
557 Metrics: metricsserver.Options{
558 BindAddress: ln.Addr().String(),
559 },
560 newMetricsServer: func(options metricsserver.Options, config *rest.Config, httpClient *http.Client) (metricsserver.Server, error) {
561 var err error
562 srv, err = metricsserver.NewServer(options, config, httpClient)
563 return srv, err
564 },
565 })
566 Expect(m).ToNot(BeNil())
567 Expect(err).ToNot(HaveOccurred())
568
569
570
571 Expect(srv.Start(context.Background())).ToNot(Succeed())
572
573 Expect(ln.Close()).To(Succeed())
574 })
575
576 It("should return an error if the metrics bind address is already in use and secure serving enabled", func() {
577 ln, err := net.Listen("tcp", ":0")
578 Expect(err).ShouldNot(HaveOccurred())
579
580 var srv metricsserver.Server
581 m, err := New(cfg, Options{
582 Metrics: metricsserver.Options{
583 BindAddress: ln.Addr().String(),
584 SecureServing: true,
585 },
586 newMetricsServer: func(options metricsserver.Options, config *rest.Config, httpClient *http.Client) (metricsserver.Server, error) {
587 var err error
588 srv, err = metricsserver.NewServer(options, config, httpClient)
589 return srv, err
590 },
591 })
592 Expect(m).ToNot(BeNil())
593 Expect(err).ToNot(HaveOccurred())
594
595
596
597 Expect(srv.Start(context.Background())).ToNot(Succeed())
598
599 Expect(ln.Close()).To(Succeed())
600 })
601
602 It("should create a listener for the health probes if a valid address is provided", func() {
603 var listener net.Listener
604 m, err := New(cfg, Options{
605 HealthProbeBindAddress: ":0",
606 newHealthProbeListener: func(addr string) (net.Listener, error) {
607 var err error
608 listener, err = defaultHealthProbeListener(addr)
609 return listener, err
610 },
611 })
612 Expect(m).ToNot(BeNil())
613 Expect(err).ToNot(HaveOccurred())
614 Expect(listener).ToNot(BeNil())
615 Expect(listener.Close()).ToNot(HaveOccurred())
616 })
617
618 It("should return an error if the health probes bind address is already in use", func() {
619 ln, err := defaultHealthProbeListener(":0")
620 Expect(err).ShouldNot(HaveOccurred())
621
622 var listener net.Listener
623 m, err := New(cfg, Options{
624 HealthProbeBindAddress: ln.Addr().String(),
625 newHealthProbeListener: func(addr string) (net.Listener, error) {
626 var err error
627 listener, err = defaultHealthProbeListener(addr)
628 return listener, err
629 },
630 })
631 Expect(m).To(BeNil())
632 Expect(err).To(HaveOccurred())
633 Expect(listener).To(BeNil())
634
635 Expect(ln.Close()).ToNot(HaveOccurred())
636 })
637 })
638
639 Describe("Start", func() {
640 var startSuite = func(options Options, callbacks ...func(Manager)) {
641 It("should Start each Component", func() {
642 m, err := New(cfg, options)
643 Expect(err).NotTo(HaveOccurred())
644 for _, cb := range callbacks {
645 cb(m)
646 }
647 var wgRunnableStarted sync.WaitGroup
648 wgRunnableStarted.Add(2)
649 Expect(m.Add(RunnableFunc(func(context.Context) error {
650 defer GinkgoRecover()
651 wgRunnableStarted.Done()
652 return nil
653 }))).To(Succeed())
654
655 Expect(m.Add(RunnableFunc(func(context.Context) error {
656 defer GinkgoRecover()
657 wgRunnableStarted.Done()
658 return nil
659 }))).To(Succeed())
660
661 ctx, cancel := context.WithCancel(context.Background())
662 defer cancel()
663 go func() {
664 defer GinkgoRecover()
665 Expect(m.Elected()).ShouldNot(BeClosed())
666 Expect(m.Start(ctx)).NotTo(HaveOccurred())
667 }()
668
669 <-m.Elected()
670 wgRunnableStarted.Wait()
671 })
672
673 It("should not manipulate the provided config", func() {
674
675
676
677 cfg := rest.CopyConfig(cfg)
678 cfg.WrapTransport = nil
679 originalCfg := rest.CopyConfig(cfg)
680
681
682 options := options
683 options.newResourceLock = nil
684 m, err := New(cfg, options)
685 Expect(err).NotTo(HaveOccurred())
686 for _, cb := range callbacks {
687 cb(m)
688 }
689 Expect(m.GetConfig()).To(Equal(originalCfg))
690 })
691
692 It("should stop when context is cancelled", func() {
693 m, err := New(cfg, options)
694 Expect(err).NotTo(HaveOccurred())
695 for _, cb := range callbacks {
696 cb(m)
697 }
698 ctx, cancel := context.WithCancel(context.Background())
699 cancel()
700 Expect(m.Start(ctx)).NotTo(HaveOccurred())
701 })
702
703 It("should return an error if it can't start the cache", func() {
704 m, err := New(cfg, options)
705 Expect(err).NotTo(HaveOccurred())
706 for _, cb := range callbacks {
707 cb(m)
708 }
709 mgr, ok := m.(*controllerManager)
710 Expect(ok).To(BeTrue())
711 Expect(mgr.Add(
712 &cacheProvider{cache: &informertest.FakeInformers{Error: fmt.Errorf("expected error")}},
713 )).To(Succeed())
714
715 ctx, cancel := context.WithCancel(context.Background())
716 defer cancel()
717 Expect(m.Start(ctx)).To(MatchError(ContainSubstring("expected error")))
718 })
719
720 It("should start the cache before starting anything else", func() {
721 fakeCache := &startSignalingInformer{Cache: &informertest.FakeInformers{}}
722 options.NewCache = func(_ *rest.Config, _ cache.Options) (cache.Cache, error) {
723 return fakeCache, nil
724 }
725 m, err := New(cfg, options)
726 Expect(err).NotTo(HaveOccurred())
727 for _, cb := range callbacks {
728 cb(m)
729 }
730
731 runnableWasStarted := make(chan struct{})
732 runnable := RunnableFunc(func(ctx context.Context) error {
733 defer GinkgoRecover()
734 if !fakeCache.wasSynced {
735 return errors.New("runnable got started before cache was synced")
736 }
737 close(runnableWasStarted)
738 return nil
739 })
740 Expect(m.Add(runnable)).To(Succeed())
741
742 ctx, cancel := context.WithCancel(context.Background())
743 defer cancel()
744 go func() {
745 defer GinkgoRecover()
746 Expect(m.Start(ctx)).ToNot(HaveOccurred())
747 }()
748
749 <-runnableWasStarted
750 })
751
752 It("should start additional clusters before anything else", func() {
753 fakeCache := &startSignalingInformer{Cache: &informertest.FakeInformers{}}
754 options.NewCache = func(_ *rest.Config, _ cache.Options) (cache.Cache, error) {
755 return fakeCache, nil
756 }
757 m, err := New(cfg, options)
758 Expect(err).NotTo(HaveOccurred())
759 for _, cb := range callbacks {
760 cb(m)
761 }
762
763 additionalClusterCache := &startSignalingInformer{Cache: &informertest.FakeInformers{}}
764 additionalCluster, err := cluster.New(cfg, func(o *cluster.Options) {
765 o.NewCache = func(_ *rest.Config, _ cache.Options) (cache.Cache, error) {
766 return additionalClusterCache, nil
767 }
768 })
769 Expect(err).NotTo(HaveOccurred())
770 Expect(m.Add(additionalCluster)).NotTo(HaveOccurred())
771
772 runnableWasStarted := make(chan struct{})
773 Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
774 defer GinkgoRecover()
775 if !fakeCache.wasSynced {
776 return errors.New("WaitForCacheSyncCalled wasn't called before Runnable got started")
777 }
778 if !additionalClusterCache.wasSynced {
779 return errors.New("the additional clusters WaitForCacheSync wasn't called before Runnable got started")
780 }
781 close(runnableWasStarted)
782 return nil
783 }))).To(Succeed())
784
785 ctx, cancel := context.WithCancel(context.Background())
786 defer cancel()
787 go func() {
788 defer GinkgoRecover()
789 Expect(m.Start(ctx)).ToNot(HaveOccurred())
790 }()
791
792 <-runnableWasStarted
793 })
794
795 It("should return an error if any Components fail to Start", func() {
796 m, err := New(cfg, options)
797 Expect(err).NotTo(HaveOccurred())
798 for _, cb := range callbacks {
799 cb(m)
800 }
801
802 Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
803 defer GinkgoRecover()
804 <-ctx.Done()
805 return nil
806 }))).To(Succeed())
807
808 Expect(m.Add(RunnableFunc(func(context.Context) error {
809 defer GinkgoRecover()
810 return fmt.Errorf("expected error")
811 }))).To(Succeed())
812
813 Expect(m.Add(RunnableFunc(func(context.Context) error {
814 defer GinkgoRecover()
815 return nil
816 }))).To(Succeed())
817
818 defer GinkgoRecover()
819 ctx, cancel := context.WithCancel(context.Background())
820 defer cancel()
821 err = m.Start(ctx)
822 Expect(err).To(HaveOccurred())
823 Expect(err.Error()).To(Equal("expected error"))
824 })
825
826 It("should start caches added after Manager has started", func() {
827 fakeCache := &startSignalingInformer{Cache: &informertest.FakeInformers{}}
828 options.NewCache = func(_ *rest.Config, _ cache.Options) (cache.Cache, error) {
829 return fakeCache, nil
830 }
831 m, err := New(cfg, options)
832 Expect(err).NotTo(HaveOccurred())
833 for _, cb := range callbacks {
834 cb(m)
835 }
836
837 runnableWasStarted := make(chan struct{})
838 Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
839 defer GinkgoRecover()
840 if !fakeCache.wasSynced {
841 return errors.New("WaitForCacheSyncCalled wasn't called before Runnable got started")
842 }
843 close(runnableWasStarted)
844 return nil
845 }))).To(Succeed())
846
847 ctx, cancel := context.WithCancel(context.Background())
848 defer cancel()
849 go func() {
850 defer GinkgoRecover()
851 Expect(m.Start(ctx)).ToNot(HaveOccurred())
852 }()
853
854 <-runnableWasStarted
855
856 additionalClusterCache := &startSignalingInformer{Cache: &informertest.FakeInformers{}}
857 fakeCluster := &startClusterAfterManager{informer: additionalClusterCache}
858
859 Expect(err).NotTo(HaveOccurred())
860 Expect(m.Add(fakeCluster)).NotTo(HaveOccurred())
861
862 Eventually(func() bool {
863 fakeCluster.informer.mu.Lock()
864 defer fakeCluster.informer.mu.Unlock()
865 return fakeCluster.informer.wasStarted && fakeCluster.informer.wasSynced
866 }).Should(BeTrue())
867 })
868
869 It("should wait for runnables to stop", func() {
870 m, err := New(cfg, options)
871 Expect(err).NotTo(HaveOccurred())
872 for _, cb := range callbacks {
873 cb(m)
874 }
875
876 var lock sync.Mutex
877 var runnableDoneCount int64
878 runnableDoneFunc := func() {
879 lock.Lock()
880 defer lock.Unlock()
881 atomic.AddInt64(&runnableDoneCount, 1)
882 }
883 var wgRunnableRunning sync.WaitGroup
884 wgRunnableRunning.Add(2)
885 Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
886 wgRunnableRunning.Done()
887 defer GinkgoRecover()
888 defer runnableDoneFunc()
889 <-ctx.Done()
890 return nil
891 }))).To(Succeed())
892
893 Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
894 wgRunnableRunning.Done()
895 defer GinkgoRecover()
896 defer runnableDoneFunc()
897 <-ctx.Done()
898 time.Sleep(300 * time.Millisecond)
899 return nil
900 }))).To(Succeed())
901
902 defer GinkgoRecover()
903 ctx, cancel := context.WithCancel(context.Background())
904
905 var wgManagerRunning sync.WaitGroup
906 wgManagerRunning.Add(1)
907 go func() {
908 defer GinkgoRecover()
909 defer wgManagerRunning.Done()
910 Expect(m.Start(ctx)).NotTo(HaveOccurred())
911 Eventually(func() int64 {
912 return atomic.LoadInt64(&runnableDoneCount)
913 }).Should(BeEquivalentTo(2))
914 }()
915 wgRunnableRunning.Wait()
916 cancel()
917
918 wgManagerRunning.Wait()
919 })
920
921 It("should return an error if any Components fail to Start and wait for runnables to stop", func() {
922 m, err := New(cfg, options)
923 Expect(err).NotTo(HaveOccurred())
924 for _, cb := range callbacks {
925 cb(m)
926 }
927 defer GinkgoRecover()
928 var lock sync.Mutex
929 runnableDoneCount := 0
930 runnableDoneFunc := func() {
931 lock.Lock()
932 defer lock.Unlock()
933 runnableDoneCount++
934 }
935
936 Expect(m.Add(RunnableFunc(func(context.Context) error {
937 defer GinkgoRecover()
938 defer runnableDoneFunc()
939 return fmt.Errorf("expected error")
940 }))).To(Succeed())
941
942 Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
943 defer GinkgoRecover()
944 defer runnableDoneFunc()
945 <-ctx.Done()
946 return nil
947 }))).To(Succeed())
948
949 ctx, cancel := context.WithCancel(context.Background())
950 defer cancel()
951 Expect(m.Start(ctx)).To(HaveOccurred())
952 Expect(runnableDoneCount).To(Equal(2))
953 })
954
955 It("should refuse to add runnable if stop procedure is already engaged", func() {
956 m, err := New(cfg, options)
957 Expect(err).NotTo(HaveOccurred())
958 for _, cb := range callbacks {
959 cb(m)
960 }
961 defer GinkgoRecover()
962
963 var wgRunnableRunning sync.WaitGroup
964 wgRunnableRunning.Add(1)
965 Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
966 wgRunnableRunning.Done()
967 defer GinkgoRecover()
968 <-ctx.Done()
969 return nil
970 }))).To(Succeed())
971
972 ctx, cancel := context.WithCancel(context.Background())
973 go func() {
974 Expect(m.Start(ctx)).NotTo(HaveOccurred())
975 }()
976 wgRunnableRunning.Wait()
977 cancel()
978 time.Sleep(100 * time.Millisecond)
979 Expect(m.Add(RunnableFunc(func(context.Context) error {
980 defer GinkgoRecover()
981 return nil
982 }))).NotTo(Succeed())
983 })
984
985 It("should return both runnables and stop errors when both error", func() {
986 m, err := New(cfg, options)
987 Expect(err).NotTo(HaveOccurred())
988 for _, cb := range callbacks {
989 cb(m)
990 }
991 m.(*controllerManager).gracefulShutdownTimeout = 1 * time.Nanosecond
992 Expect(m.Add(RunnableFunc(func(context.Context) error {
993 return runnableError{}
994 }))).To(Succeed())
995 testDone := make(chan struct{})
996 defer close(testDone)
997 Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
998 <-ctx.Done()
999 timer := time.NewTimer(30 * time.Second)
1000 defer timer.Stop()
1001 select {
1002 case <-testDone:
1003 return nil
1004 case <-timer.C:
1005 return nil
1006 }
1007 }))).To(Succeed())
1008 ctx, cancel := context.WithCancel(context.Background())
1009 defer cancel()
1010 err = m.Start(ctx)
1011 Expect(err).To(HaveOccurred())
1012 eMsg := "[not feeling like that, failed waiting for all runnables to end within grace period of 1ns: context deadline exceeded]"
1013 Expect(err.Error()).To(Equal(eMsg))
1014 Expect(errors.Is(err, context.DeadlineExceeded)).To(BeTrue())
1015 Expect(errors.Is(err, runnableError{})).To(BeTrue())
1016 })
1017
1018 It("should return only stop errors if runnables dont error", func() {
1019 m, err := New(cfg, options)
1020 Expect(err).NotTo(HaveOccurred())
1021 for _, cb := range callbacks {
1022 cb(m)
1023 }
1024 m.(*controllerManager).gracefulShutdownTimeout = 1 * time.Nanosecond
1025 Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
1026 <-ctx.Done()
1027 return nil
1028 }))).To(Succeed())
1029 testDone := make(chan struct{})
1030 defer close(testDone)
1031 Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
1032 <-ctx.Done()
1033 timer := time.NewTimer(30 * time.Second)
1034 defer timer.Stop()
1035 select {
1036 case <-testDone:
1037 return nil
1038 case <-timer.C:
1039 return nil
1040 }
1041 }))).To(Succeed())
1042 ctx, cancel := context.WithCancel(context.Background())
1043 managerStopDone := make(chan struct{})
1044 go func() { err = m.Start(ctx); close(managerStopDone) }()
1045
1046
1047 <-m.(*controllerManager).elected
1048 cancel()
1049 <-managerStopDone
1050 Expect(err).To(HaveOccurred())
1051 Expect(err.Error()).To(Equal("failed waiting for all runnables to end within grace period of 1ns: context deadline exceeded"))
1052 Expect(errors.Is(err, context.DeadlineExceeded)).To(BeTrue())
1053 Expect(errors.Is(err, runnableError{})).ToNot(BeTrue())
1054 })
1055
1056 It("should return only runnables error if stop doesn't error", func() {
1057 m, err := New(cfg, options)
1058 Expect(err).NotTo(HaveOccurred())
1059 for _, cb := range callbacks {
1060 cb(m)
1061 }
1062 Expect(m.Add(RunnableFunc(func(context.Context) error {
1063 return runnableError{}
1064 }))).To(Succeed())
1065 ctx, cancel := context.WithCancel(context.Background())
1066 defer cancel()
1067 err = m.Start(ctx)
1068 Expect(err).To(HaveOccurred())
1069 Expect(err.Error()).To(Equal("not feeling like that"))
1070 Expect(errors.Is(err, context.DeadlineExceeded)).ToNot(BeTrue())
1071 Expect(errors.Is(err, runnableError{})).To(BeTrue())
1072 })
1073
1074 It("should not wait for runnables if gracefulShutdownTimeout is 0", func() {
1075 m, err := New(cfg, options)
1076 Expect(err).NotTo(HaveOccurred())
1077 for _, cb := range callbacks {
1078 cb(m)
1079 }
1080 m.(*controllerManager).gracefulShutdownTimeout = time.Duration(0)
1081
1082 runnableStopped := make(chan struct{})
1083 Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
1084 <-ctx.Done()
1085 time.Sleep(100 * time.Millisecond)
1086 close(runnableStopped)
1087 return nil
1088 }))).ToNot(HaveOccurred())
1089
1090 ctx, cancel := context.WithCancel(context.Background())
1091 managerStopDone := make(chan struct{})
1092 go func() {
1093 defer GinkgoRecover()
1094 Expect(m.Start(ctx)).NotTo(HaveOccurred())
1095 close(managerStopDone)
1096 }()
1097 <-m.Elected()
1098 cancel()
1099
1100 <-managerStopDone
1101 <-runnableStopped
1102 })
1103
1104 It("should wait forever for runnables if gracefulShutdownTimeout is <0 (-1)", func() {
1105 m, err := New(cfg, options)
1106 Expect(err).NotTo(HaveOccurred())
1107 for _, cb := range callbacks {
1108 cb(m)
1109 }
1110 m.(*controllerManager).gracefulShutdownTimeout = time.Duration(-1)
1111
1112 Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
1113 <-ctx.Done()
1114 time.Sleep(100 * time.Millisecond)
1115 return nil
1116 }))).ToNot(HaveOccurred())
1117 Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
1118 <-ctx.Done()
1119 time.Sleep(200 * time.Millisecond)
1120 return nil
1121 }))).ToNot(HaveOccurred())
1122 Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
1123 <-ctx.Done()
1124 time.Sleep(500 * time.Millisecond)
1125 return nil
1126 }))).ToNot(HaveOccurred())
1127 Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
1128 <-ctx.Done()
1129 time.Sleep(1500 * time.Millisecond)
1130 return nil
1131 }))).ToNot(HaveOccurred())
1132
1133 ctx, cancel := context.WithCancel(context.Background())
1134 managerStopDone := make(chan struct{})
1135 go func() {
1136 defer GinkgoRecover()
1137 Expect(m.Start(ctx)).NotTo(HaveOccurred())
1138 close(managerStopDone)
1139 }()
1140 <-m.Elected()
1141 cancel()
1142
1143 beforeDone := time.Now()
1144 <-managerStopDone
1145 Expect(time.Since(beforeDone)).To(BeNumerically(">=", 1500*time.Millisecond))
1146 })
1147
1148 }
1149
1150 Context("with defaults", func() {
1151 startSuite(Options{})
1152 })
1153
1154 Context("with leaderelection enabled", func() {
1155 startSuite(
1156 Options{
1157 LeaderElection: true,
1158 LeaderElectionID: "controller-runtime",
1159 LeaderElectionNamespace: "default",
1160 newResourceLock: fakeleaderelection.NewResourceLock,
1161 },
1162 func(m Manager) {
1163 cm, ok := m.(*controllerManager)
1164 Expect(ok).To(BeTrue())
1165 cm.onStoppedLeading = func() {}
1166 },
1167 )
1168 })
1169
1170 Context("should start serving metrics", func() {
1171 var srv metricsserver.Server
1172 var defaultServer metricsDefaultServer
1173 var opts Options
1174
1175 BeforeEach(func() {
1176 srv = nil
1177 opts = Options{
1178 Metrics: metricsserver.Options{
1179 BindAddress: ":0",
1180 },
1181 newMetricsServer: func(options metricsserver.Options, config *rest.Config, httpClient *http.Client) (metricsserver.Server, error) {
1182 var err error
1183 srv, err = metricsserver.NewServer(options, config, httpClient)
1184 if srv != nil {
1185 defaultServer = srv.(metricsDefaultServer)
1186 }
1187 return srv, err
1188 },
1189 }
1190 })
1191
1192 It("should stop serving metrics when stop is called", func() {
1193 m, err := New(cfg, opts)
1194 Expect(err).NotTo(HaveOccurred())
1195
1196 ctx, cancel := context.WithCancel(context.Background())
1197 go func() {
1198 defer GinkgoRecover()
1199 Expect(m.Start(ctx)).NotTo(HaveOccurred())
1200 }()
1201 <-m.Elected()
1202
1203
1204 Eventually(func() string { return defaultServer.GetBindAddr() }, 10*time.Second).ShouldNot(BeEmpty())
1205
1206
1207 endpoint := fmt.Sprintf("http://%s/metrics", defaultServer.GetBindAddr())
1208 _, err = http.Get(endpoint)
1209 Expect(err).NotTo(HaveOccurred())
1210
1211
1212 cancel()
1213
1214
1215 Eventually(func() error {
1216 _, err = http.Get(endpoint)
1217 return err
1218 }, 10*time.Second).ShouldNot(Succeed())
1219 })
1220
1221 It("should serve metrics endpoint", func() {
1222 m, err := New(cfg, opts)
1223 Expect(err).NotTo(HaveOccurred())
1224
1225 ctx, cancel := context.WithCancel(context.Background())
1226 defer cancel()
1227 go func() {
1228 defer GinkgoRecover()
1229 Expect(m.Start(ctx)).NotTo(HaveOccurred())
1230 }()
1231 <-m.Elected()
1232
1233
1234 Eventually(func() string { return defaultServer.GetBindAddr() }, 10*time.Second).ShouldNot(BeEmpty())
1235
1236 metricsEndpoint := fmt.Sprintf("http://%s/metrics", defaultServer.GetBindAddr())
1237 resp, err := http.Get(metricsEndpoint)
1238 Expect(err).NotTo(HaveOccurred())
1239 Expect(resp.StatusCode).To(Equal(200))
1240 })
1241
1242 It("should not serve anything other than metrics endpoint by default", func() {
1243 m, err := New(cfg, opts)
1244 Expect(err).NotTo(HaveOccurred())
1245
1246 ctx, cancel := context.WithCancel(context.Background())
1247 defer cancel()
1248 go func() {
1249 defer GinkgoRecover()
1250 Expect(m.Start(ctx)).NotTo(HaveOccurred())
1251 }()
1252 <-m.Elected()
1253
1254
1255 Eventually(func() string { return defaultServer.GetBindAddr() }, 10*time.Second).ShouldNot(BeEmpty())
1256
1257 endpoint := fmt.Sprintf("http://%s/should-not-exist", defaultServer.GetBindAddr())
1258 resp, err := http.Get(endpoint)
1259 Expect(err).NotTo(HaveOccurred())
1260 defer resp.Body.Close()
1261 Expect(resp.StatusCode).To(Equal(404))
1262 })
1263
1264 It("should serve metrics in its registry", func() {
1265 one := prometheus.NewCounter(prometheus.CounterOpts{
1266 Name: "test_one",
1267 Help: "test metric for testing",
1268 })
1269 one.Inc()
1270 err := metrics.Registry.Register(one)
1271 Expect(err).NotTo(HaveOccurred())
1272
1273 m, err := New(cfg, opts)
1274 Expect(err).NotTo(HaveOccurred())
1275
1276 ctx, cancel := context.WithCancel(context.Background())
1277 defer cancel()
1278 go func() {
1279 defer GinkgoRecover()
1280 Expect(m.Start(ctx)).NotTo(HaveOccurred())
1281 }()
1282 <-m.Elected()
1283
1284
1285 Eventually(func() string { return defaultServer.GetBindAddr() }, 10*time.Second).ShouldNot(BeEmpty())
1286
1287 metricsEndpoint := fmt.Sprintf("http://%s/metrics", defaultServer.GetBindAddr())
1288 resp, err := http.Get(metricsEndpoint)
1289 Expect(err).NotTo(HaveOccurred())
1290 defer resp.Body.Close()
1291 Expect(resp.StatusCode).To(Equal(200))
1292
1293 data, err := io.ReadAll(resp.Body)
1294 Expect(err).NotTo(HaveOccurred())
1295 Expect(string(data)).To(ContainSubstring("%s\n%s\n%s\n",
1296 `# HELP test_one test metric for testing`,
1297 `# TYPE test_one counter`,
1298 `test_one 1`,
1299 ))
1300
1301
1302 ok := metrics.Registry.Unregister(one)
1303 Expect(ok).To(BeTrue())
1304 })
1305
1306 It("should serve extra endpoints", func() {
1307 opts.Metrics.ExtraHandlers = map[string]http.Handler{
1308 "/debug": http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
1309 _, _ = w.Write([]byte("Some debug info"))
1310 }),
1311 }
1312 m, err := New(cfg, opts)
1313 Expect(err).NotTo(HaveOccurred())
1314
1315 ctx, cancel := context.WithCancel(context.Background())
1316 defer cancel()
1317 go func() {
1318 defer GinkgoRecover()
1319 Expect(m.Start(ctx)).NotTo(HaveOccurred())
1320 }()
1321 <-m.Elected()
1322
1323
1324 Eventually(func() string { return defaultServer.GetBindAddr() }, 10*time.Second).ShouldNot(BeEmpty())
1325
1326 endpoint := fmt.Sprintf("http://%s/debug", defaultServer.GetBindAddr())
1327 resp, err := http.Get(endpoint)
1328 Expect(err).NotTo(HaveOccurred())
1329 defer resp.Body.Close()
1330 Expect(resp.StatusCode).To(Equal(http.StatusOK))
1331
1332 body, err := io.ReadAll(resp.Body)
1333 Expect(err).NotTo(HaveOccurred())
1334 Expect(string(body)).To(Equal("Some debug info"))
1335 })
1336 })
1337 })
1338
1339 Context("should start serving health probes", func() {
1340 var listener net.Listener
1341 var opts Options
1342
1343 BeforeEach(func() {
1344 listener = nil
1345 opts = Options{
1346 newHealthProbeListener: func(addr string) (net.Listener, error) {
1347 var err error
1348 listener, err = defaultHealthProbeListener(addr)
1349 return listener, err
1350 },
1351 }
1352 })
1353
1354 AfterEach(func() {
1355 if listener != nil {
1356 listener.Close()
1357 }
1358 })
1359
1360 It("should stop serving health probes when stop is called", func() {
1361 opts.HealthProbeBindAddress = ":0"
1362 m, err := New(cfg, opts)
1363 Expect(err).NotTo(HaveOccurred())
1364
1365 ctx, cancel := context.WithCancel(context.Background())
1366 go func() {
1367 defer GinkgoRecover()
1368 Expect(m.Start(ctx)).NotTo(HaveOccurred())
1369 }()
1370 <-m.Elected()
1371
1372
1373 endpoint := fmt.Sprintf("http://%s", listener.Addr().String())
1374 _, err = http.Get(endpoint)
1375 Expect(err).NotTo(HaveOccurred())
1376
1377
1378 cancel()
1379
1380
1381 Eventually(func() error {
1382 _, err = http.Get(endpoint)
1383 return err
1384 }, 10*time.Second).ShouldNot(Succeed())
1385 })
1386
1387 It("should serve readiness endpoint", func() {
1388 opts.HealthProbeBindAddress = ":0"
1389 m, err := New(cfg, opts)
1390 Expect(err).NotTo(HaveOccurred())
1391
1392 res := fmt.Errorf("not ready yet")
1393 namedCheck := "check"
1394 err = m.AddReadyzCheck(namedCheck, func(_ *http.Request) error { return res })
1395 Expect(err).NotTo(HaveOccurred())
1396
1397 ctx, cancel := context.WithCancel(context.Background())
1398 defer cancel()
1399 go func() {
1400 defer GinkgoRecover()
1401 Expect(m.Start(ctx)).NotTo(HaveOccurred())
1402 }()
1403 <-m.Elected()
1404
1405 readinessEndpoint := fmt.Sprint("http://", listener.Addr().String(), defaultReadinessEndpoint)
1406
1407
1408 resp, err := http.Get(readinessEndpoint)
1409 Expect(err).NotTo(HaveOccurred())
1410 defer resp.Body.Close()
1411 Expect(resp.StatusCode).To(Equal(http.StatusInternalServerError))
1412
1413
1414 res = nil
1415 resp, err = http.Get(readinessEndpoint)
1416 Expect(err).NotTo(HaveOccurred())
1417 defer resp.Body.Close()
1418 Expect(resp.StatusCode).To(Equal(http.StatusOK))
1419
1420
1421 readinessEndpoint = fmt.Sprint("http://", listener.Addr().String(), defaultReadinessEndpoint)
1422 res = nil
1423 httpClient := http.Client{
1424 CheckRedirect: func(req *http.Request, via []*http.Request) error {
1425 return http.ErrUseLastResponse
1426 },
1427 }
1428 resp, err = httpClient.Get(readinessEndpoint)
1429 Expect(err).NotTo(HaveOccurred())
1430 defer resp.Body.Close()
1431 Expect(resp.StatusCode).To(Equal(http.StatusOK))
1432
1433
1434 readinessEndpoint = fmt.Sprint("http://", listener.Addr().String(), path.Join(defaultReadinessEndpoint, namedCheck))
1435 res = nil
1436 resp, err = http.Get(readinessEndpoint)
1437 Expect(err).NotTo(HaveOccurred())
1438 defer resp.Body.Close()
1439 Expect(resp.StatusCode).To(Equal(http.StatusOK))
1440 })
1441
1442 It("should serve liveness endpoint", func() {
1443 opts.HealthProbeBindAddress = ":0"
1444 m, err := New(cfg, opts)
1445 Expect(err).NotTo(HaveOccurred())
1446
1447 res := fmt.Errorf("not alive")
1448 namedCheck := "check"
1449 err = m.AddHealthzCheck(namedCheck, func(_ *http.Request) error { return res })
1450 Expect(err).NotTo(HaveOccurred())
1451
1452 ctx, cancel := context.WithCancel(context.Background())
1453 defer cancel()
1454 go func() {
1455 defer GinkgoRecover()
1456 Expect(m.Start(ctx)).NotTo(HaveOccurred())
1457 }()
1458 <-m.Elected()
1459
1460 livenessEndpoint := fmt.Sprint("http://", listener.Addr().String(), defaultLivenessEndpoint)
1461
1462
1463 resp, err := http.Get(livenessEndpoint)
1464 Expect(err).NotTo(HaveOccurred())
1465 defer resp.Body.Close()
1466 Expect(resp.StatusCode).To(Equal(http.StatusInternalServerError))
1467
1468
1469 res = nil
1470 resp, err = http.Get(livenessEndpoint)
1471 Expect(err).NotTo(HaveOccurred())
1472 defer resp.Body.Close()
1473 Expect(resp.StatusCode).To(Equal(http.StatusOK))
1474
1475
1476 livenessEndpoint = fmt.Sprint("http://", listener.Addr().String(), defaultLivenessEndpoint)
1477 res = nil
1478 httpClient := http.Client{
1479 CheckRedirect: func(req *http.Request, via []*http.Request) error {
1480 return http.ErrUseLastResponse
1481 },
1482 }
1483 resp, err = httpClient.Get(livenessEndpoint)
1484 Expect(err).NotTo(HaveOccurred())
1485 defer resp.Body.Close()
1486 Expect(resp.StatusCode).To(Equal(http.StatusOK))
1487
1488
1489 livenessEndpoint = fmt.Sprint("http://", listener.Addr().String(), path.Join(defaultLivenessEndpoint, namedCheck))
1490 res = nil
1491 resp, err = http.Get(livenessEndpoint)
1492 Expect(err).NotTo(HaveOccurred())
1493 defer resp.Body.Close()
1494 Expect(resp.StatusCode).To(Equal(http.StatusOK))
1495 })
1496 })
1497
1498 Context("should start serving pprof", func() {
1499 var listener net.Listener
1500 var opts Options
1501
1502 BeforeEach(func() {
1503 listener = nil
1504 opts = Options{
1505 newPprofListener: func(addr string) (net.Listener, error) {
1506 var err error
1507 listener, err = defaultPprofListener(addr)
1508 return listener, err
1509 },
1510 }
1511 })
1512
1513 AfterEach(func() {
1514 if listener != nil {
1515 listener.Close()
1516 }
1517 })
1518
1519 It("should stop serving pprof when stop is called", func() {
1520 opts.PprofBindAddress = ":0"
1521 m, err := New(cfg, opts)
1522 Expect(err).NotTo(HaveOccurred())
1523
1524 ctx, cancel := context.WithCancel(context.Background())
1525 go func() {
1526 defer GinkgoRecover()
1527 Expect(m.Start(ctx)).NotTo(HaveOccurred())
1528 }()
1529 <-m.Elected()
1530
1531
1532 endpoint := fmt.Sprintf("http://%s", listener.Addr().String())
1533 _, err = http.Get(endpoint)
1534 Expect(err).NotTo(HaveOccurred())
1535
1536
1537 cancel()
1538
1539
1540 Eventually(func() error {
1541 _, err = http.Get(endpoint)
1542 return err
1543 }, 10*time.Second).ShouldNot(Succeed())
1544 })
1545
1546 It("should serve pprof endpoints", func() {
1547 opts.PprofBindAddress = ":0"
1548 m, err := New(cfg, opts)
1549 Expect(err).NotTo(HaveOccurred())
1550
1551 ctx, cancel := context.WithCancel(context.Background())
1552 defer cancel()
1553 go func() {
1554 defer GinkgoRecover()
1555 Expect(m.Start(ctx)).NotTo(HaveOccurred())
1556 }()
1557 <-m.Elected()
1558
1559 pprofIndexEndpoint := fmt.Sprintf("http://%s/debug/pprof/", listener.Addr().String())
1560 resp, err := http.Get(pprofIndexEndpoint)
1561 Expect(err).NotTo(HaveOccurred())
1562 defer resp.Body.Close()
1563 Expect(resp.StatusCode).To(Equal(http.StatusOK))
1564
1565 pprofCmdlineEndpoint := fmt.Sprintf("http://%s/debug/pprof/cmdline", listener.Addr().String())
1566 resp, err = http.Get(pprofCmdlineEndpoint)
1567 Expect(err).NotTo(HaveOccurred())
1568 defer resp.Body.Close()
1569 Expect(resp.StatusCode).To(Equal(http.StatusOK))
1570
1571 pprofProfileEndpoint := fmt.Sprintf("http://%s/debug/pprof/profile", listener.Addr().String())
1572 resp, err = http.Get(pprofProfileEndpoint)
1573 Expect(err).NotTo(HaveOccurred())
1574 defer resp.Body.Close()
1575 Expect(resp.StatusCode).To(Equal(http.StatusOK))
1576
1577 pprofSymbolEndpoint := fmt.Sprintf("http://%s/debug/pprof/symbol", listener.Addr().String())
1578 resp, err = http.Get(pprofSymbolEndpoint)
1579 Expect(err).NotTo(HaveOccurred())
1580 defer resp.Body.Close()
1581 Expect(resp.StatusCode).To(Equal(http.StatusOK))
1582
1583 pprofTraceEndpoint := fmt.Sprintf("http://%s/debug/pprof/trace", listener.Addr().String())
1584 resp, err = http.Get(pprofTraceEndpoint)
1585 Expect(err).NotTo(HaveOccurred())
1586 defer resp.Body.Close()
1587 Expect(resp.StatusCode).To(Equal(http.StatusOK))
1588 })
1589 })
1590
1591 Describe("Add", func() {
1592 It("should immediately start the Component if the Manager has already Started another Component",
1593 func() {
1594 m, err := New(cfg, Options{})
1595 Expect(err).NotTo(HaveOccurred())
1596 mgr, ok := m.(*controllerManager)
1597 Expect(ok).To(BeTrue())
1598
1599
1600 c1 := make(chan struct{})
1601 Expect(m.Add(RunnableFunc(func(context.Context) error {
1602 defer GinkgoRecover()
1603 close(c1)
1604 return nil
1605 }))).To(Succeed())
1606
1607 ctx, cancel := context.WithCancel(context.Background())
1608 defer cancel()
1609 go func() {
1610 defer GinkgoRecover()
1611 Expect(m.Start(ctx)).NotTo(HaveOccurred())
1612 }()
1613 <-m.Elected()
1614
1615
1616 Eventually(func() bool {
1617 return mgr.runnables.Caches.Started()
1618 }).Should(BeTrue())
1619
1620
1621 c2 := make(chan struct{})
1622 Expect(m.Add(RunnableFunc(func(context.Context) error {
1623 defer GinkgoRecover()
1624 close(c2)
1625 return nil
1626 }))).To(Succeed())
1627 <-c1
1628 <-c2
1629 })
1630
1631 It("should immediately start the Component if the Manager has already Started", func() {
1632 m, err := New(cfg, Options{})
1633 Expect(err).NotTo(HaveOccurred())
1634 mgr, ok := m.(*controllerManager)
1635 Expect(ok).To(BeTrue())
1636
1637 ctx, cancel := context.WithCancel(context.Background())
1638 defer cancel()
1639 go func() {
1640 defer GinkgoRecover()
1641 Expect(m.Start(ctx)).NotTo(HaveOccurred())
1642 }()
1643
1644
1645 Eventually(func() bool {
1646 return mgr.runnables.Caches.Started()
1647 }).Should(BeTrue())
1648
1649 c1 := make(chan struct{})
1650 Expect(m.Add(RunnableFunc(func(context.Context) error {
1651 defer GinkgoRecover()
1652 close(c1)
1653 return nil
1654 }))).To(Succeed())
1655 <-c1
1656 })
1657
1658 It("should fail if attempted to start a second time", func() {
1659 m, err := New(cfg, Options{})
1660 Expect(err).NotTo(HaveOccurred())
1661
1662 ctx, cancel := context.WithCancel(context.Background())
1663 defer cancel()
1664 go func() {
1665 defer GinkgoRecover()
1666 Expect(m.Start(ctx)).NotTo(HaveOccurred())
1667 }()
1668
1669 Eventually(func() bool {
1670 mgr, ok := m.(*controllerManager)
1671 Expect(ok).To(BeTrue())
1672 return mgr.runnables.Caches.Started()
1673 }).Should(BeTrue())
1674
1675 err = m.Start(ctx)
1676 Expect(err).To(HaveOccurred())
1677 Expect(err.Error()).To(Equal("manager already started"))
1678
1679 })
1680 })
1681
1682 It("should not leak goroutines when stopped", func() {
1683 currentGRs := goleak.IgnoreCurrent()
1684
1685 m, err := New(cfg, Options{})
1686 Expect(err).NotTo(HaveOccurred())
1687
1688 ctx, cancel := context.WithCancel(context.Background())
1689 cancel()
1690 Expect(m.Start(ctx)).NotTo(HaveOccurred())
1691
1692
1693
1694 clientTransport.CloseIdleConnections()
1695 Eventually(func() error { return goleak.Find(currentGRs) }).Should(Succeed())
1696 })
1697
1698 It("should not leak goroutines if the default event broadcaster is used & events are emitted", func() {
1699 currentGRs := goleak.IgnoreCurrent()
1700
1701 m, err := New(cfg, Options{ })
1702 Expect(err).NotTo(HaveOccurred())
1703
1704 By("adding a runnable that emits an event")
1705 ns := corev1.Namespace{}
1706 ns.Name = "default"
1707
1708 recorder := m.GetEventRecorderFor("rock-and-roll")
1709 Expect(m.Add(RunnableFunc(func(_ context.Context) error {
1710 recorder.Event(&ns, "Warning", "BallroomBlitz", "yeah, yeah, yeah-yeah-yeah")
1711 return nil
1712 }))).To(Succeed())
1713
1714 By("starting the manager & waiting till we've sent our event")
1715 ctx, cancel := context.WithCancel(context.Background())
1716 doneCh := make(chan struct{})
1717 go func() {
1718 defer GinkgoRecover()
1719 defer close(doneCh)
1720 Expect(m.Start(ctx)).To(Succeed())
1721 }()
1722 <-m.Elected()
1723
1724 Eventually(func() *corev1.Event {
1725 evts, err := clientset.CoreV1().Events("").Search(m.GetScheme(), &ns)
1726 Expect(err).NotTo(HaveOccurred())
1727
1728 for i, evt := range evts.Items {
1729 if evt.Reason == "BallroomBlitz" {
1730 return &evts.Items[i]
1731 }
1732 }
1733 return nil
1734 }).ShouldNot(BeNil())
1735
1736 By("making sure there's no extra go routines still running after we stop")
1737 cancel()
1738 <-doneCh
1739
1740
1741
1742 clientTransport.CloseIdleConnections()
1743 Eventually(func() error { return goleak.Find(currentGRs) }).Should(Succeed())
1744 })
1745
1746 It("should provide a function to get the Config", func() {
1747 m, err := New(cfg, Options{})
1748 Expect(err).NotTo(HaveOccurred())
1749 mgr, ok := m.(*controllerManager)
1750 Expect(ok).To(BeTrue())
1751 Expect(m.GetConfig()).To(Equal(mgr.cluster.GetConfig()))
1752 })
1753
1754 It("should provide a function to get the Client", func() {
1755 m, err := New(cfg, Options{})
1756 Expect(err).NotTo(HaveOccurred())
1757 mgr, ok := m.(*controllerManager)
1758 Expect(ok).To(BeTrue())
1759 Expect(m.GetClient()).To(Equal(mgr.cluster.GetClient()))
1760 })
1761
1762 It("should provide a function to get the Scheme", func() {
1763 m, err := New(cfg, Options{})
1764 Expect(err).NotTo(HaveOccurred())
1765 mgr, ok := m.(*controllerManager)
1766 Expect(ok).To(BeTrue())
1767 Expect(m.GetScheme()).To(Equal(mgr.cluster.GetScheme()))
1768 })
1769
1770 It("should provide a function to get the FieldIndexer", func() {
1771 m, err := New(cfg, Options{})
1772 Expect(err).NotTo(HaveOccurred())
1773 mgr, ok := m.(*controllerManager)
1774 Expect(ok).To(BeTrue())
1775 Expect(m.GetFieldIndexer()).To(Equal(mgr.cluster.GetFieldIndexer()))
1776 })
1777
1778 It("should provide a function to get the EventRecorder", func() {
1779 m, err := New(cfg, Options{})
1780 Expect(err).NotTo(HaveOccurred())
1781 Expect(m.GetEventRecorderFor("test")).NotTo(BeNil())
1782 })
1783 It("should provide a function to get the APIReader", func() {
1784 m, err := New(cfg, Options{})
1785 Expect(err).NotTo(HaveOccurred())
1786 Expect(m.GetAPIReader()).NotTo(BeNil())
1787 })
1788 })
1789
1790 type runnableError struct {
1791 }
1792
1793 func (runnableError) Error() string {
1794 return "not feeling like that"
1795 }
1796
1797 var _ Runnable = &cacheProvider{}
1798
1799 type cacheProvider struct {
1800 cache cache.Cache
1801 }
1802
1803 func (c *cacheProvider) GetCache() cache.Cache {
1804 return c.cache
1805 }
1806
1807 func (c *cacheProvider) Start(ctx context.Context) error {
1808 return c.cache.Start(ctx)
1809 }
1810
1811 type startSignalingInformer struct {
1812 mu sync.Mutex
1813
1814
1815
1816
1817 wasStarted bool
1818
1819
1820 wasSynced bool
1821 cache.Cache
1822 }
1823
1824 func (c *startSignalingInformer) Start(ctx context.Context) error {
1825 c.mu.Lock()
1826 c.wasStarted = true
1827 c.mu.Unlock()
1828 return c.Cache.Start(ctx)
1829 }
1830
1831 func (c *startSignalingInformer) WaitForCacheSync(ctx context.Context) bool {
1832 defer func() {
1833 c.mu.Lock()
1834 c.wasSynced = true
1835 c.mu.Unlock()
1836 }()
1837 return c.Cache.WaitForCacheSync(ctx)
1838 }
1839
1840 type startClusterAfterManager struct {
1841 informer *startSignalingInformer
1842 }
1843
1844 func (c *startClusterAfterManager) Start(ctx context.Context) error {
1845 return c.informer.Start(ctx)
1846 }
1847
1848 func (c *startClusterAfterManager) GetCache() cache.Cache {
1849 return c.informer
1850 }
1851
1852
1853
1854
1855 type metricsDefaultServer interface {
1856 GetBindAddr() string
1857 }
1858
1859 type needElection struct {
1860 ch chan struct{}
1861 }
1862
1863 func (n *needElection) Start(_ context.Context) error {
1864 n.ch <- struct{}{}
1865 return nil
1866 }
1867
1868 func (n *needElection) NeedLeaderElection() bool {
1869 return true
1870 }
1871
View as plain text