1
16
17 package manager
18
19 import (
20 "context"
21 "errors"
22 "fmt"
23 "net"
24 "net/http"
25 "net/http/pprof"
26 "sync"
27 "sync/atomic"
28 "time"
29
30 "github.com/go-logr/logr"
31 "k8s.io/apimachinery/pkg/api/meta"
32 "k8s.io/apimachinery/pkg/runtime"
33 kerrors "k8s.io/apimachinery/pkg/util/errors"
34 "k8s.io/client-go/rest"
35 "k8s.io/client-go/tools/leaderelection"
36 "k8s.io/client-go/tools/leaderelection/resourcelock"
37 "k8s.io/client-go/tools/record"
38
39 "sigs.k8s.io/controller-runtime/pkg/cache"
40 "sigs.k8s.io/controller-runtime/pkg/client"
41 "sigs.k8s.io/controller-runtime/pkg/cluster"
42 "sigs.k8s.io/controller-runtime/pkg/config"
43 "sigs.k8s.io/controller-runtime/pkg/healthz"
44 "sigs.k8s.io/controller-runtime/pkg/internal/httpserver"
45 intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder"
46 metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
47 "sigs.k8s.io/controller-runtime/pkg/webhook"
48 )
49
50 const (
51
52 defaultLeaseDuration = 15 * time.Second
53 defaultRenewDeadline = 10 * time.Second
54 defaultRetryPeriod = 2 * time.Second
55 defaultGracefulShutdownPeriod = 30 * time.Second
56
57 defaultReadinessEndpoint = "/readyz"
58 defaultLivenessEndpoint = "/healthz"
59 )
60
61 var _ Runnable = &controllerManager{}
62
63 type controllerManager struct {
64 sync.Mutex
65 started bool
66
67 stopProcedureEngaged *int64
68 errChan chan error
69 runnables *runnables
70
71
72 cluster cluster.Cluster
73
74
75
76 recorderProvider *intrec.Provider
77
78
79 resourceLock resourcelock.Interface
80
81
82
83 leaderElectionReleaseOnCancel bool
84
85
86 metricsServer metricsserver.Server
87
88
89 healthProbeListener net.Listener
90
91
92 readinessEndpointName string
93
94
95 livenessEndpointName string
96
97
98 readyzHandler *healthz.Handler
99
100
101 healthzHandler *healthz.Handler
102
103
104 pprofListener net.Listener
105
106
107 controllerConfig config.Controller
108
109
110
111 logger logr.Logger
112
113
114
115 leaderElectionStopped chan struct{}
116
117
118
119
120 leaderElectionCancel context.CancelFunc
121
122
123
124
125 elected chan struct{}
126
127 webhookServer webhook.Server
128
129
130 webhookServerOnce sync.Once
131
132
133
134 leaderElectionID string
135
136
137 leaseDuration time.Duration
138
139
140 renewDeadline time.Duration
141
142
143 retryPeriod time.Duration
144
145
146
147 gracefulShutdownTimeout time.Duration
148
149
150
151 onStoppedLeading func()
152
153
154
155
156 shutdownCtx context.Context
157
158 internalCtx context.Context
159 internalCancel context.CancelFunc
160
161
162
163 internalProceduresStop chan struct{}
164 }
165
166 type hasCache interface {
167 Runnable
168 GetCache() cache.Cache
169 }
170
171
172 func (cm *controllerManager) Add(r Runnable) error {
173 cm.Lock()
174 defer cm.Unlock()
175 return cm.add(r)
176 }
177
178 func (cm *controllerManager) add(r Runnable) error {
179 return cm.runnables.Add(r)
180 }
181
182
183 func (cm *controllerManager) AddHealthzCheck(name string, check healthz.Checker) error {
184 cm.Lock()
185 defer cm.Unlock()
186
187 if cm.started {
188 return fmt.Errorf("unable to add new checker because healthz endpoint has already been created")
189 }
190
191 if cm.healthzHandler == nil {
192 cm.healthzHandler = &healthz.Handler{Checks: map[string]healthz.Checker{}}
193 }
194
195 cm.healthzHandler.Checks[name] = check
196 return nil
197 }
198
199
200 func (cm *controllerManager) AddReadyzCheck(name string, check healthz.Checker) error {
201 cm.Lock()
202 defer cm.Unlock()
203
204 if cm.started {
205 return fmt.Errorf("unable to add new checker because healthz endpoint has already been created")
206 }
207
208 if cm.readyzHandler == nil {
209 cm.readyzHandler = &healthz.Handler{Checks: map[string]healthz.Checker{}}
210 }
211
212 cm.readyzHandler.Checks[name] = check
213 return nil
214 }
215
216 func (cm *controllerManager) GetHTTPClient() *http.Client {
217 return cm.cluster.GetHTTPClient()
218 }
219
220 func (cm *controllerManager) GetConfig() *rest.Config {
221 return cm.cluster.GetConfig()
222 }
223
224 func (cm *controllerManager) GetClient() client.Client {
225 return cm.cluster.GetClient()
226 }
227
228 func (cm *controllerManager) GetScheme() *runtime.Scheme {
229 return cm.cluster.GetScheme()
230 }
231
232 func (cm *controllerManager) GetFieldIndexer() client.FieldIndexer {
233 return cm.cluster.GetFieldIndexer()
234 }
235
236 func (cm *controllerManager) GetCache() cache.Cache {
237 return cm.cluster.GetCache()
238 }
239
240 func (cm *controllerManager) GetEventRecorderFor(name string) record.EventRecorder {
241 return cm.cluster.GetEventRecorderFor(name)
242 }
243
244 func (cm *controllerManager) GetRESTMapper() meta.RESTMapper {
245 return cm.cluster.GetRESTMapper()
246 }
247
248 func (cm *controllerManager) GetAPIReader() client.Reader {
249 return cm.cluster.GetAPIReader()
250 }
251
252 func (cm *controllerManager) GetWebhookServer() webhook.Server {
253 cm.webhookServerOnce.Do(func() {
254 if cm.webhookServer == nil {
255 panic("webhook should not be nil")
256 }
257 if err := cm.Add(cm.webhookServer); err != nil {
258 panic(fmt.Sprintf("unable to add webhook server to the controller manager: %s", err))
259 }
260 })
261 return cm.webhookServer
262 }
263
264 func (cm *controllerManager) GetLogger() logr.Logger {
265 return cm.logger
266 }
267
268 func (cm *controllerManager) GetControllerOptions() config.Controller {
269 return cm.controllerConfig
270 }
271
272 func (cm *controllerManager) addHealthProbeServer() error {
273 mux := http.NewServeMux()
274 srv := httpserver.New(mux)
275
276 if cm.readyzHandler != nil {
277 mux.Handle(cm.readinessEndpointName, http.StripPrefix(cm.readinessEndpointName, cm.readyzHandler))
278
279 mux.Handle(cm.readinessEndpointName+"/", http.StripPrefix(cm.readinessEndpointName, cm.readyzHandler))
280 }
281 if cm.healthzHandler != nil {
282 mux.Handle(cm.livenessEndpointName, http.StripPrefix(cm.livenessEndpointName, cm.healthzHandler))
283
284 mux.Handle(cm.livenessEndpointName+"/", http.StripPrefix(cm.livenessEndpointName, cm.healthzHandler))
285 }
286
287 return cm.add(&Server{
288 Name: "health probe",
289 Server: srv,
290 Listener: cm.healthProbeListener,
291 })
292 }
293
294 func (cm *controllerManager) addPprofServer() error {
295 mux := http.NewServeMux()
296 srv := httpserver.New(mux)
297
298 mux.HandleFunc("/debug/pprof/", pprof.Index)
299 mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
300 mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
301 mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
302 mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
303
304 return cm.add(&Server{
305 Name: "pprof",
306 Server: srv,
307 Listener: cm.pprofListener,
308 })
309 }
310
311
312
313
314
315
316 func (cm *controllerManager) Start(ctx context.Context) (err error) {
317 cm.Lock()
318 if cm.started {
319 cm.Unlock()
320 return errors.New("manager already started")
321 }
322 cm.started = true
323
324 var ready bool
325 defer func() {
326
327
328 if !ready {
329 cm.Unlock()
330 }
331 }()
332
333
334 cm.internalCtx, cm.internalCancel = context.WithCancel(ctx)
335
336
337 stopComplete := make(chan struct{})
338 defer close(stopComplete)
339
340 defer func() {
341
342 stopErr := cm.engageStopProcedure(stopComplete)
343 if stopErr != nil {
344 if err != nil {
345
346
347
348 err = kerrors.NewAggregate([]error{err, stopErr})
349 } else {
350 err = stopErr
351 }
352 }
353 }()
354
355
356 if err := cm.add(cm.cluster); err != nil {
357 return fmt.Errorf("failed to add cluster to runnables: %w", err)
358 }
359
360
361
362
363 if cm.metricsServer != nil {
364
365
366 if err := cm.runnables.HTTPServers.Add(cm.metricsServer, nil); err != nil {
367 return fmt.Errorf("failed to add metrics server: %w", err)
368 }
369 }
370
371
372 if cm.healthProbeListener != nil {
373 if err := cm.addHealthProbeServer(); err != nil {
374 return fmt.Errorf("failed to add health probe server: %w", err)
375 }
376 }
377
378
379 if cm.pprofListener != nil {
380 if err := cm.addPprofServer(); err != nil {
381 return fmt.Errorf("failed to add pprof server: %w", err)
382 }
383 }
384
385
386
387
388
389 logCtx := logr.NewContext(cm.internalCtx, cm.logger)
390 if err := cm.runnables.HTTPServers.Start(logCtx); err != nil {
391 return fmt.Errorf("failed to start HTTP servers: %w", err)
392 }
393
394
395
396
397
398
399
400 if err := cm.runnables.Webhooks.Start(cm.internalCtx); err != nil {
401 return fmt.Errorf("failed to start webhooks: %w", err)
402 }
403
404
405 if err := cm.runnables.Caches.Start(cm.internalCtx); err != nil {
406 return fmt.Errorf("failed to start caches: %w", err)
407 }
408
409
410 if err := cm.runnables.Others.Start(cm.internalCtx); err != nil {
411 return fmt.Errorf("failed to start other runnables: %w", err)
412 }
413
414
415 {
416 ctx, cancel := context.WithCancel(context.Background())
417 cm.leaderElectionCancel = cancel
418 go func() {
419 if cm.resourceLock != nil {
420 if err := cm.startLeaderElection(ctx); err != nil {
421 cm.errChan <- err
422 }
423 } else {
424
425 if err := cm.startLeaderElectionRunnables(); err != nil {
426 cm.errChan <- err
427 }
428 close(cm.elected)
429 }
430 }()
431 }
432
433 ready = true
434 cm.Unlock()
435 select {
436 case <-ctx.Done():
437
438 return nil
439 case err := <-cm.errChan:
440
441 return err
442 }
443 }
444
445
446
447 func (cm *controllerManager) engageStopProcedure(stopComplete <-chan struct{}) error {
448 if !atomic.CompareAndSwapInt64(cm.stopProcedureEngaged, 0, 1) {
449 return errors.New("stop procedure already engaged")
450 }
451
452
453
454
455
456 var shutdownCancel context.CancelFunc
457 if cm.gracefulShutdownTimeout < 0 {
458
459 cm.shutdownCtx, shutdownCancel = context.WithCancel(context.Background())
460 } else {
461 cm.shutdownCtx, shutdownCancel = context.WithTimeout(context.Background(), cm.gracefulShutdownTimeout)
462 }
463 defer shutdownCancel()
464
465
466
467
468 var closeOnce sync.Once
469 go func() {
470 for {
471
472
473 closeOnce.Do(func() {
474
475 close(cm.internalProceduresStop)
476 cm.internalCancel()
477 })
478 select {
479 case err, ok := <-cm.errChan:
480 if ok {
481 cm.logger.Error(err, "error received after stop sequence was engaged")
482 }
483 case <-stopComplete:
484 return
485 }
486 }
487 }()
488
489
490
491
492 defer cm.recorderProvider.Stop(cm.shutdownCtx)
493 defer func() {
494
495 if cm.resourceLock != nil {
496
497
498
499
500 cm.leaderElectionCancel()
501 <-cm.leaderElectionStopped
502 }
503 }()
504
505 go func() {
506
507 cm.logger.Info("Stopping and waiting for non leader election runnables")
508 cm.runnables.Others.StopAndWait(cm.shutdownCtx)
509
510
511 cm.logger.Info("Stopping and waiting for leader election runnables")
512
513 cm.runnables.LeaderElection.startOnce.Do(func() {})
514 cm.runnables.LeaderElection.StopAndWait(cm.shutdownCtx)
515
516
517
518
519 cm.logger.Info("Stopping and waiting for caches")
520 cm.runnables.Caches.StopAndWait(cm.shutdownCtx)
521
522
523 cm.logger.Info("Stopping and waiting for webhooks")
524 cm.runnables.Webhooks.StopAndWait(cm.shutdownCtx)
525
526 cm.logger.Info("Stopping and waiting for HTTP servers")
527 cm.runnables.HTTPServers.StopAndWait(cm.shutdownCtx)
528
529
530 cm.logger.Info("Wait completed, proceeding to shutdown the manager")
531 shutdownCancel()
532 }()
533
534 <-cm.shutdownCtx.Done()
535 if err := cm.shutdownCtx.Err(); err != nil && !errors.Is(err, context.Canceled) {
536 if errors.Is(err, context.DeadlineExceeded) {
537 if cm.gracefulShutdownTimeout > 0 {
538 return fmt.Errorf("failed waiting for all runnables to end within grace period of %s: %w", cm.gracefulShutdownTimeout, err)
539 }
540 return nil
541 }
542
543 return err
544 }
545
546 return nil
547 }
548
549 func (cm *controllerManager) startLeaderElectionRunnables() error {
550 return cm.runnables.LeaderElection.Start(cm.internalCtx)
551 }
552
553 func (cm *controllerManager) startLeaderElection(ctx context.Context) (err error) {
554 l, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
555 Lock: cm.resourceLock,
556 LeaseDuration: cm.leaseDuration,
557 RenewDeadline: cm.renewDeadline,
558 RetryPeriod: cm.retryPeriod,
559 Callbacks: leaderelection.LeaderCallbacks{
560 OnStartedLeading: func(_ context.Context) {
561 if err := cm.startLeaderElectionRunnables(); err != nil {
562 cm.errChan <- err
563 return
564 }
565 close(cm.elected)
566 },
567 OnStoppedLeading: func() {
568 if cm.onStoppedLeading != nil {
569 cm.onStoppedLeading()
570 }
571
572
573 cm.gracefulShutdownTimeout = time.Duration(0)
574
575
576
577 cm.errChan <- errors.New("leader election lost")
578 },
579 },
580 ReleaseOnCancel: cm.leaderElectionReleaseOnCancel,
581 Name: cm.leaderElectionID,
582 })
583 if err != nil {
584 return err
585 }
586
587
588 go func() {
589 l.Run(ctx)
590 <-ctx.Done()
591 close(cm.leaderElectionStopped)
592 }()
593 return nil
594 }
595
596 func (cm *controllerManager) Elected() <-chan struct{} {
597 return cm.elected
598 }
599
View as plain text