package manager import ( "context" "errors" "fmt" "net" "net/http" "sync" "sync/atomic" "time" "github.com/go-logr/logr" "github.com/prometheus/client_golang/prometheus/promhttp" kerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/utils/ptr" "edge-infra.dev/pkg/lib/logging" "edge-infra.dev/pkg/lib/runtime/healthz" "edge-infra.dev/pkg/lib/runtime/metrics" ) const ( defaultGracefulShutdownPeriod = 30 * time.Second defaultReadinessEndpoint = "/readyz" defaultLivenessEndpoint = "/livez" defaultHealthProbeBindAddress = ":8000" defaultMetricsEndpoint = "/metrics" listenerFailed = "server failed to listen. You may want to disable the " + "metrics server (pass \"0\" as the bind address) or use another port if it is " + "due to conflicts" ) // New constructs a manager with default abilities. func New(options Options) (Manager, error) { // Set default values for options fields options = setOptionsDefaults(options) log := *options.Logger // Create the metrics listener. This will throw an error if the metrics bind // address is invalid or already in use. metricsListener, err := options.newMetricsListener(options.MetricsBindAddress) if err != nil { log.Error(err, listenerFailed, "kind", "metrics") return nil, err } // By default we have no extra endpoints to expose on metrics http server. metricsExtraHandlers := make(map[string]http.Handler) // Create health probes listener. This will throw an error if the bind // address is invalid or already in use. healthProbeListener, err := options.newHealthProbeListener(options.HealthProbeBindAddress) if err != nil { log.Error(err, listenerFailed, "kind", "health probe") return nil, err } errChan := make(chan error) runnables := newRunnables(options.BaseContext, errChan) return &defaultManager{ stopProcedureEngaged: ptr.To(int64(0)), logger: *options.Logger, runnables: runnables, errChan: errChan, healthProbeListener: healthProbeListener, metricsListener: metricsListener, metricsExtraHandlers: metricsExtraHandlers, readinessEndpointName: options.ReadinessEndpointName, livenessEndpointName: options.LivenessEndpointName, internalProceduresStop: make(chan struct{}), }, nil } // setOptionsDefaults set default values for Options fields. func setOptionsDefaults(options Options) Options { if options.Logger == nil { options.Logger = &logging.NewLogger().Logger } if options.Logger.GetSink() == nil { options.Logger = &logging.NewLogger().Logger } if options.newMetricsListener == nil { options.newMetricsListener = metrics.NewListener } if options.ReadinessEndpointName == "" { options.ReadinessEndpointName = defaultReadinessEndpoint } if options.LivenessEndpointName == "" { options.LivenessEndpointName = defaultLivenessEndpoint } if options.HealthProbeBindAddress == "" { options.HealthProbeBindAddress = defaultHealthProbeBindAddress } if options.newHealthProbeListener == nil { options.newHealthProbeListener = defaultHealthProbeListener } if options.GracefulShutdownTimeout == nil { gracefulShutdownTimeout := defaultGracefulShutdownPeriod options.GracefulShutdownTimeout = &gracefulShutdownTimeout } if options.BaseContext == nil { options.BaseContext = defaultBaseContext } return options } // defaultHealthProbeListener creates the default health probes listener bound to the given address. func defaultHealthProbeListener(addr string) (net.Listener, error) { if addr == "" || addr == "0" { return nil, nil } ln, err := net.Listen("tcp", addr) if err != nil { return nil, fmt.Errorf("error listening on %s: %w", addr, err) } return ln, nil } // defaultBaseContext is used as the BaseContext value in Options if one // has not already been set. func defaultBaseContext() context.Context { return context.Background() } // Assert that the Runnable interface is implemented var _ Runnable = &defaultManager{} type defaultManager struct { sync.Mutex started bool stopProcedureEngaged *int64 errChan chan error runnables *runnables // Logger is the logger that should be used by this manager. // If none is set, it defaults to a logging.EdgeLogger backed logr.Logger. logger logr.Logger // metricsListener is used to serve prometheus metrics metricsListener net.Listener // metricsExtraHandlers contains extra handlers to register on http server that serves metrics. metricsExtraHandlers map[string]http.Handler // healthProbeListener is used to serve liveness probe healthProbeListener net.Listener // Readiness probe endpoint name readinessEndpointName string // Liveness probe endpoint name livenessEndpointName string // Readyz probe handler readyzHandler *healthz.Handler // Healthz probe handler healthzHandler *healthz.Handler // gracefulShutdownTimeout is the duration given to runnable to stop // before the manager actually returns on stop. gracefulShutdownTimeout time.Duration // shutdownCtx is the context that can be used during shutdown. It will be cancelled // after the gracefulShutdownTimeout ended. It must not be accessed before internalStop // is closed because it will be nil. shutdownCtx context.Context internalCtx context.Context internalCancel context.CancelFunc // internalProceduresStop channel is used internally to the manager when coordinating // the proper shutdown of servers. This channel is also used for dependency injection. internalProceduresStop chan struct{} } // Add sets dependencies on i, and adds it to the list of Runnables to start. func (dm *defaultManager) Add(r Runnable) error { dm.Lock() defer dm.Unlock() return dm.add(r) } func (dm *defaultManager) add(r Runnable) error { return dm.runnables.Add(r) } // AddMetricsExtraHandler adds extra handler served on path to the http server that serves metrics. func (dm *defaultManager) AddMetricsExtraHandler(path string, handler http.Handler) error { dm.Lock() defer dm.Unlock() if dm.started { return fmt.Errorf("unable to add new metrics handler because metrics endpoint has already been created") } if path == defaultMetricsEndpoint { return fmt.Errorf("overriding builtin %s endpoint is not allowed", defaultMetricsEndpoint) } if _, found := dm.metricsExtraHandlers[path]; found { return fmt.Errorf("can't register extra handler by duplicate path %q on metrics http server", path) } dm.metricsExtraHandlers[path] = handler dm.logger.V(2).Info("Registering metrics http server extra handler", "path", path) return nil } // AddLivezCheck allows you to add Healthz checker. func (dm *defaultManager) AddLivezCheck(name string, check healthz.Checker) error { dm.Lock() defer dm.Unlock() if dm.started { return fmt.Errorf("unable to add new checker because healthz endpoint has already been created") } if dm.healthzHandler == nil { dm.healthzHandler = &healthz.Handler{Checks: map[string]healthz.Checker{}, Log: dm.logger} } dm.healthzHandler.Checks[name] = check return nil } // AddReadyzCheck allows you to add Readyz checker. func (dm *defaultManager) AddReadyzCheck(name string, check healthz.Checker) error { dm.Lock() defer dm.Unlock() if dm.started { return fmt.Errorf("unable to add new checker because healthz endpoint has already been created") } if dm.readyzHandler == nil { dm.readyzHandler = &healthz.Handler{Checks: map[string]healthz.Checker{}, Log: dm.logger} } dm.readyzHandler.Checks[name] = check return nil } func (dm *defaultManager) GetLogger() logr.Logger { return dm.logger } func (dm *defaultManager) serveMetrics() { handler := promhttp.HandlerFor(metrics.Registry, promhttp.HandlerOpts{ ErrorHandling: promhttp.HTTPErrorOnError, }) mux := http.NewServeMux() mux.Handle(defaultMetricsEndpoint, handler) for path, extraHandler := range dm.metricsExtraHandlers { mux.Handle(path, extraHandler) } server := NewServer(mux) go dm.httpServe("metrics", dm.logger.WithValues("path", defaultMetricsEndpoint), server, dm.metricsListener) } func (dm *defaultManager) serveHealthProbes() { mux := http.NewServeMux() server := NewServer(mux) if dm.readyzHandler != nil { mux.Handle(dm.readinessEndpointName, http.StripPrefix(dm.readinessEndpointName, dm.readyzHandler)) // Append '/' suffix to handle subpaths mux.Handle(dm.readinessEndpointName+"/", http.StripPrefix(dm.readinessEndpointName, dm.readyzHandler)) } if dm.healthzHandler != nil { mux.Handle(dm.livenessEndpointName, http.StripPrefix(dm.livenessEndpointName, dm.healthzHandler)) // Append '/' suffix to handle subpaths mux.Handle(dm.livenessEndpointName+"/", http.StripPrefix(dm.livenessEndpointName, dm.healthzHandler)) } go dm.httpServe("health probe", dm.logger, server, dm.healthProbeListener) } func (dm *defaultManager) httpServe(kind string, log logr.Logger, server *http.Server, ln net.Listener) { log = log.WithValues("kind", kind, "addr", ln.Addr()) go func() { log.Info("starting server") if err := server.Serve(ln); err != nil { if errors.Is(err, http.ErrServerClosed) { return } if atomic.LoadInt64(dm.stopProcedureEngaged) > 0 { // There might be cases where connections are still open and we try to shutdown // but not having enough time to close the connection causes an error in Serve // // In that case we want to avoid returning an error to the main error channel. log.Error(err, "error on Serve after stop has been engaged") return } dm.errChan <- err } }() // Shutdown the server when stop is closed. <-dm.internalProceduresStop if err := server.Shutdown(dm.shutdownCtx); err != nil { if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { // Avoid logging context related errors. return } if atomic.LoadInt64(dm.stopProcedureEngaged) > 0 { dm.logger.Error(err, "error on Shutdown after stop has been engaged") return } dm.errChan <- err } } // Start starts the manager and waits indefinitely. // There is only two ways to have start return: // An error has occurred during in one of the internal operations, // such as leader election, cache start, webhooks, and so on. // Or, the context is cancelled. func (dm *defaultManager) Start(ctx context.Context) (err error) { dm.Lock() if dm.started { dm.Unlock() return errors.New("manager already started") } var ready bool defer func() { // Only unlock the manager if we haven't reached // the internal readiness condition. if !ready { dm.Unlock() } }() // Initialize the internal context. dm.internalCtx, dm.internalCancel = context.WithCancel(ctx) // This chan indicates that stop is complete, in other words all runnables have returned or timeout on stop request stopComplete := make(chan struct{}) defer close(stopComplete) // This must be deferred after closing stopComplete, otherwise we deadlock. defer func() { // https://hips.hearstapps.com/hmg-prod.s3.amazonaws.com/images/gettyimages-459889618-1533579787.jpg stopErr := dm.engageStopProcedure(stopComplete) if stopErr != nil { if err != nil { // Utilerrors.Aggregate allows to use errors.Is for all contained errors // whereas fmt.Errorf allows wrapping at most one error which means the // other one can not be found anymore. err = kerrors.NewAggregate([]error{err, stopErr}) } else { err = stopErr } } }() // Have default healthz probe if none added. This functionality differs from controller-runtime. if dm.healthzHandler == nil { dm.healthzHandler = &healthz.Handler{Checks: map[string]healthz.Checker{}, Log: dm.logger} } // if dm.readyzHandler == nil { // dm.readyzHandler = &healthz.Handler{Checks: map[string]healthz.Checker{}} // } // Metrics should be served whether the controller is leader or not. // (If we don't serve metrics for non-leaders, prometheus will still scrape // the pod but will get a connection refused). if dm.metricsListener != nil { dm.serveMetrics() } // Serve health probes. if dm.healthProbeListener != nil { dm.serveHealthProbes() } // // First start any webhook servers, which includes conversion, validation, and defaulting // // webhooks that are registered. // // // // WARNING: Webhooks MUST start before any cache is populated, otherwise there is a race condition // // between conversion webhooks and the cache sync (usually initial list) which causes the webhooks // // to never start because no cache can be populated. // if err := dm.runnables.Webhooks.Start(dm.internalCtx); err != nil { // if !errors.Is(err, wait.ErrWaitTimeout) { // return err // } // } // Start the non-leaderelection Runnables after the cache has synced. if err := dm.runnables.Others.Start(dm.internalCtx); err != nil { if !wait.Interrupted(err) { return err } } ready = true dm.Unlock() select { case <-ctx.Done(): // We are done return nil case err := <-dm.errChan: // Error starting or running a runnable return err } } // engageStopProcedure signals all runnables to stop, reads potential errors // from the errChan and waits for them to end. It must not be called more than once. func (dm *defaultManager) engageStopProcedure(stopComplete <-chan struct{}) error { if !atomic.CompareAndSwapInt64(dm.stopProcedureEngaged, 0, 1) { return errors.New("stop procedure already engaged") } // Populate the shutdown context, this operation MUST be done before // closing the internalProceduresStop channel. // // The shutdown context immediately expires if the gracefulShutdownTimeout is not set. var shutdownCancel context.CancelFunc dm.shutdownCtx, shutdownCancel = context.WithTimeout(context.Background(), dm.gracefulShutdownTimeout) defer shutdownCancel() // Start draining the errors before acquiring the lock to make sure we don't deadlock // if something that has the lock is blocked on trying to write into the unbuffered // channel after something else already wrote into it. var closeOnce sync.Once go func() { for { // Closing in the for loop is required to avoid race conditions between // the closure of all internal procedures and making sure to have a reader off the error channel. closeOnce.Do(func() { // Cancel the internal stop channel and wait for the procedures to stop and complete. close(dm.internalProceduresStop) dm.internalCancel() }) select { case err, ok := <-dm.errChan: if ok { dm.logger.Error(err, "error received after stop sequence was engaged") } case <-stopComplete: return } } }() go func() { dm.logger.Info("stopping and waiting for runnables") dm.runnables.Others.StopAndWait(dm.shutdownCtx) // // Webhooks should come last, as they might be still serving some requests. // dm.logger.Info("Stopping and waiting for webhooks") // dm.runnables.Webhooks.StopAndWait(dm.shutdownCtx) // Proceed to close the manager and overall shutdown context. dm.logger.Info("wait completed, proceeding to shutdown the manager") shutdownCancel() }() <-dm.shutdownCtx.Done() if err := dm.shutdownCtx.Err(); err != nil && !errors.Is(err, context.Canceled) { if errors.Is(err, context.DeadlineExceeded) { if dm.gracefulShutdownTimeout > 0 { return fmt.Errorf("failed waiting for all runnables to end within grace period of %s: %w", dm.gracefulShutdownTimeout, err) } return nil } // For any other error, return the error. return err } return nil }