1 package manager
2
3 import (
4 "context"
5 "errors"
6 "fmt"
7 "net"
8 "net/http"
9 "sync"
10 "sync/atomic"
11 "time"
12
13 "github.com/go-logr/logr"
14 "github.com/prometheus/client_golang/prometheus/promhttp"
15 kerrors "k8s.io/apimachinery/pkg/util/errors"
16 "k8s.io/apimachinery/pkg/util/wait"
17 "k8s.io/utils/ptr"
18
19 "edge-infra.dev/pkg/lib/logging"
20 "edge-infra.dev/pkg/lib/runtime/healthz"
21 "edge-infra.dev/pkg/lib/runtime/metrics"
22 )
23
24 const (
25 defaultGracefulShutdownPeriod = 30 * time.Second
26
27 defaultReadinessEndpoint = "/readyz"
28 defaultLivenessEndpoint = "/livez"
29 defaultHealthProbeBindAddress = ":8000"
30 defaultMetricsEndpoint = "/metrics"
31
32 listenerFailed = "server failed to listen. You may want to disable the " +
33 "metrics server (pass \"0\" as the bind address) or use another port if it is " +
34 "due to conflicts"
35 )
36
37
38 func New(options Options) (Manager, error) {
39
40 options = setOptionsDefaults(options)
41 log := *options.Logger
42
43
44
45 metricsListener, err := options.newMetricsListener(options.MetricsBindAddress)
46 if err != nil {
47 log.Error(err, listenerFailed, "kind", "metrics")
48 return nil, err
49 }
50
51
52 metricsExtraHandlers := make(map[string]http.Handler)
53
54
55
56 healthProbeListener, err := options.newHealthProbeListener(options.HealthProbeBindAddress)
57 if err != nil {
58 log.Error(err, listenerFailed, "kind", "health probe")
59 return nil, err
60 }
61
62 errChan := make(chan error)
63 runnables := newRunnables(options.BaseContext, errChan)
64
65 return &defaultManager{
66 stopProcedureEngaged: ptr.To(int64(0)),
67 logger: *options.Logger,
68 runnables: runnables,
69 errChan: errChan,
70 healthProbeListener: healthProbeListener,
71 metricsListener: metricsListener,
72 metricsExtraHandlers: metricsExtraHandlers,
73 readinessEndpointName: options.ReadinessEndpointName,
74 livenessEndpointName: options.LivenessEndpointName,
75 internalProceduresStop: make(chan struct{}),
76 }, nil
77 }
78
79
80 func setOptionsDefaults(options Options) Options {
81 if options.Logger == nil {
82 options.Logger = &logging.NewLogger().Logger
83 }
84
85 if options.Logger.GetSink() == nil {
86 options.Logger = &logging.NewLogger().Logger
87 }
88
89 if options.newMetricsListener == nil {
90 options.newMetricsListener = metrics.NewListener
91 }
92
93 if options.ReadinessEndpointName == "" {
94 options.ReadinessEndpointName = defaultReadinessEndpoint
95 }
96
97 if options.LivenessEndpointName == "" {
98 options.LivenessEndpointName = defaultLivenessEndpoint
99 }
100
101 if options.HealthProbeBindAddress == "" {
102 options.HealthProbeBindAddress = defaultHealthProbeBindAddress
103 }
104
105 if options.newHealthProbeListener == nil {
106 options.newHealthProbeListener = defaultHealthProbeListener
107 }
108
109 if options.GracefulShutdownTimeout == nil {
110 gracefulShutdownTimeout := defaultGracefulShutdownPeriod
111 options.GracefulShutdownTimeout = &gracefulShutdownTimeout
112 }
113
114 if options.BaseContext == nil {
115 options.BaseContext = defaultBaseContext
116 }
117
118 return options
119 }
120
121
122 func defaultHealthProbeListener(addr string) (net.Listener, error) {
123 if addr == "" || addr == "0" {
124 return nil, nil
125 }
126
127 ln, err := net.Listen("tcp", addr)
128 if err != nil {
129 return nil, fmt.Errorf("error listening on %s: %w", addr, err)
130 }
131 return ln, nil
132 }
133
134
135
136 func defaultBaseContext() context.Context {
137 return context.Background()
138 }
139
140
141 var _ Runnable = &defaultManager{}
142
143 type defaultManager struct {
144 sync.Mutex
145 started bool
146
147 stopProcedureEngaged *int64
148 errChan chan error
149 runnables *runnables
150
151
152
153 logger logr.Logger
154
155
156 metricsListener net.Listener
157
158
159 metricsExtraHandlers map[string]http.Handler
160
161
162 healthProbeListener net.Listener
163
164
165 readinessEndpointName string
166
167
168 livenessEndpointName string
169
170
171 readyzHandler *healthz.Handler
172
173
174 healthzHandler *healthz.Handler
175
176
177
178 gracefulShutdownTimeout time.Duration
179
180
181
182
183 shutdownCtx context.Context
184
185 internalCtx context.Context
186 internalCancel context.CancelFunc
187
188
189
190 internalProceduresStop chan struct{}
191 }
192
193
194 func (dm *defaultManager) Add(r Runnable) error {
195 dm.Lock()
196 defer dm.Unlock()
197 return dm.add(r)
198 }
199
200 func (dm *defaultManager) add(r Runnable) error {
201 return dm.runnables.Add(r)
202 }
203
204
205 func (dm *defaultManager) AddMetricsExtraHandler(path string, handler http.Handler) error {
206 dm.Lock()
207 defer dm.Unlock()
208
209 if dm.started {
210 return fmt.Errorf("unable to add new metrics handler because metrics endpoint has already been created")
211 }
212
213 if path == defaultMetricsEndpoint {
214 return fmt.Errorf("overriding builtin %s endpoint is not allowed", defaultMetricsEndpoint)
215 }
216
217 if _, found := dm.metricsExtraHandlers[path]; found {
218 return fmt.Errorf("can't register extra handler by duplicate path %q on metrics http server", path)
219 }
220
221 dm.metricsExtraHandlers[path] = handler
222 dm.logger.V(2).Info("Registering metrics http server extra handler", "path", path)
223 return nil
224 }
225
226
227 func (dm *defaultManager) AddLivezCheck(name string, check healthz.Checker) error {
228 dm.Lock()
229 defer dm.Unlock()
230
231 if dm.started {
232 return fmt.Errorf("unable to add new checker because healthz endpoint has already been created")
233 }
234
235 if dm.healthzHandler == nil {
236 dm.healthzHandler = &healthz.Handler{Checks: map[string]healthz.Checker{}, Log: dm.logger}
237 }
238
239 dm.healthzHandler.Checks[name] = check
240 return nil
241 }
242
243
244 func (dm *defaultManager) AddReadyzCheck(name string, check healthz.Checker) error {
245 dm.Lock()
246 defer dm.Unlock()
247
248 if dm.started {
249 return fmt.Errorf("unable to add new checker because healthz endpoint has already been created")
250 }
251
252 if dm.readyzHandler == nil {
253 dm.readyzHandler = &healthz.Handler{Checks: map[string]healthz.Checker{}, Log: dm.logger}
254 }
255
256 dm.readyzHandler.Checks[name] = check
257 return nil
258 }
259
260 func (dm *defaultManager) GetLogger() logr.Logger {
261 return dm.logger
262 }
263
264 func (dm *defaultManager) serveMetrics() {
265 handler := promhttp.HandlerFor(metrics.Registry, promhttp.HandlerOpts{
266 ErrorHandling: promhttp.HTTPErrorOnError,
267 })
268 mux := http.NewServeMux()
269 mux.Handle(defaultMetricsEndpoint, handler)
270 for path, extraHandler := range dm.metricsExtraHandlers {
271 mux.Handle(path, extraHandler)
272 }
273
274 server := NewServer(mux)
275 go dm.httpServe("metrics", dm.logger.WithValues("path", defaultMetricsEndpoint), server, dm.metricsListener)
276 }
277
278 func (dm *defaultManager) serveHealthProbes() {
279 mux := http.NewServeMux()
280 server := NewServer(mux)
281
282 if dm.readyzHandler != nil {
283 mux.Handle(dm.readinessEndpointName, http.StripPrefix(dm.readinessEndpointName, dm.readyzHandler))
284
285 mux.Handle(dm.readinessEndpointName+"/", http.StripPrefix(dm.readinessEndpointName, dm.readyzHandler))
286 }
287 if dm.healthzHandler != nil {
288 mux.Handle(dm.livenessEndpointName, http.StripPrefix(dm.livenessEndpointName, dm.healthzHandler))
289
290 mux.Handle(dm.livenessEndpointName+"/", http.StripPrefix(dm.livenessEndpointName, dm.healthzHandler))
291 }
292
293 go dm.httpServe("health probe", dm.logger, server, dm.healthProbeListener)
294 }
295
296 func (dm *defaultManager) httpServe(kind string, log logr.Logger, server *http.Server, ln net.Listener) {
297 log = log.WithValues("kind", kind, "addr", ln.Addr())
298
299 go func() {
300 log.Info("starting server")
301 if err := server.Serve(ln); err != nil {
302 if errors.Is(err, http.ErrServerClosed) {
303 return
304 }
305 if atomic.LoadInt64(dm.stopProcedureEngaged) > 0 {
306
307
308
309
310 log.Error(err, "error on Serve after stop has been engaged")
311 return
312 }
313 dm.errChan <- err
314 }
315 }()
316
317
318 <-dm.internalProceduresStop
319 if err := server.Shutdown(dm.shutdownCtx); err != nil {
320 if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
321
322 return
323 }
324 if atomic.LoadInt64(dm.stopProcedureEngaged) > 0 {
325 dm.logger.Error(err, "error on Shutdown after stop has been engaged")
326 return
327 }
328 dm.errChan <- err
329 }
330 }
331
332
333
334
335
336
337 func (dm *defaultManager) Start(ctx context.Context) (err error) {
338 dm.Lock()
339 if dm.started {
340 dm.Unlock()
341 return errors.New("manager already started")
342 }
343 var ready bool
344 defer func() {
345
346
347 if !ready {
348 dm.Unlock()
349 }
350 }()
351
352
353 dm.internalCtx, dm.internalCancel = context.WithCancel(ctx)
354
355
356 stopComplete := make(chan struct{})
357 defer close(stopComplete)
358
359 defer func() {
360
361 stopErr := dm.engageStopProcedure(stopComplete)
362 if stopErr != nil {
363 if err != nil {
364
365
366
367 err = kerrors.NewAggregate([]error{err, stopErr})
368 } else {
369 err = stopErr
370 }
371 }
372 }()
373
374
375 if dm.healthzHandler == nil {
376 dm.healthzHandler = &healthz.Handler{Checks: map[string]healthz.Checker{}, Log: dm.logger}
377 }
378
379
380
381
382
383
384
385 if dm.metricsListener != nil {
386 dm.serveMetrics()
387 }
388
389
390 if dm.healthProbeListener != nil {
391 dm.serveHealthProbes()
392 }
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407 if err := dm.runnables.Others.Start(dm.internalCtx); err != nil {
408 if !wait.Interrupted(err) {
409 return err
410 }
411 }
412
413 ready = true
414 dm.Unlock()
415 select {
416 case <-ctx.Done():
417
418 return nil
419 case err := <-dm.errChan:
420
421 return err
422 }
423 }
424
425
426
427 func (dm *defaultManager) engageStopProcedure(stopComplete <-chan struct{}) error {
428 if !atomic.CompareAndSwapInt64(dm.stopProcedureEngaged, 0, 1) {
429 return errors.New("stop procedure already engaged")
430 }
431
432
433
434
435
436 var shutdownCancel context.CancelFunc
437 dm.shutdownCtx, shutdownCancel = context.WithTimeout(context.Background(), dm.gracefulShutdownTimeout)
438 defer shutdownCancel()
439
440
441
442
443 var closeOnce sync.Once
444 go func() {
445 for {
446
447
448 closeOnce.Do(func() {
449
450 close(dm.internalProceduresStop)
451 dm.internalCancel()
452 })
453 select {
454 case err, ok := <-dm.errChan:
455 if ok {
456 dm.logger.Error(err, "error received after stop sequence was engaged")
457 }
458 case <-stopComplete:
459 return
460 }
461 }
462 }()
463
464 go func() {
465 dm.logger.Info("stopping and waiting for runnables")
466 dm.runnables.Others.StopAndWait(dm.shutdownCtx)
467
468
469
470
471
472
473 dm.logger.Info("wait completed, proceeding to shutdown the manager")
474 shutdownCancel()
475 }()
476
477 <-dm.shutdownCtx.Done()
478 if err := dm.shutdownCtx.Err(); err != nil && !errors.Is(err, context.Canceled) {
479 if errors.Is(err, context.DeadlineExceeded) {
480 if dm.gracefulShutdownTimeout > 0 {
481 return fmt.Errorf("failed waiting for all runnables to end within grace period of %s: %w", dm.gracefulShutdownTimeout, err)
482 }
483 return nil
484 }
485
486 return err
487 }
488
489 return nil
490 }
491
View as plain text