...

Source file src/edge-infra.dev/pkg/lib/runtime/manager/default_manager.go

Documentation: edge-infra.dev/pkg/lib/runtime/manager

     1  package manager
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  	"net"
     8  	"net/http"
     9  	"sync"
    10  	"sync/atomic"
    11  	"time"
    12  
    13  	"github.com/go-logr/logr"
    14  	"github.com/prometheus/client_golang/prometheus/promhttp"
    15  	kerrors "k8s.io/apimachinery/pkg/util/errors"
    16  	"k8s.io/apimachinery/pkg/util/wait"
    17  	"k8s.io/utils/ptr"
    18  
    19  	"edge-infra.dev/pkg/lib/logging"
    20  	"edge-infra.dev/pkg/lib/runtime/healthz"
    21  	"edge-infra.dev/pkg/lib/runtime/metrics"
    22  )
    23  
    24  const (
    25  	defaultGracefulShutdownPeriod = 30 * time.Second
    26  
    27  	defaultReadinessEndpoint      = "/readyz"
    28  	defaultLivenessEndpoint       = "/livez"
    29  	defaultHealthProbeBindAddress = ":8000"
    30  	defaultMetricsEndpoint        = "/metrics"
    31  
    32  	listenerFailed = "server failed to listen. You may want to disable the " +
    33  		"metrics server (pass \"0\" as the bind address) or use another port if it is " +
    34  		"due to conflicts"
    35  )
    36  
    37  // New constructs a manager with default abilities.
    38  func New(options Options) (Manager, error) {
    39  	// Set default values for options fields
    40  	options = setOptionsDefaults(options)
    41  	log := *options.Logger
    42  
    43  	// Create the metrics listener. This will throw an error if the metrics bind
    44  	// address is invalid or already in use.
    45  	metricsListener, err := options.newMetricsListener(options.MetricsBindAddress)
    46  	if err != nil {
    47  		log.Error(err, listenerFailed, "kind", "metrics")
    48  		return nil, err
    49  	}
    50  
    51  	// By default we have no extra endpoints to expose on metrics http server.
    52  	metricsExtraHandlers := make(map[string]http.Handler)
    53  
    54  	// Create health probes listener. This will throw an error if the bind
    55  	// address is invalid or already in use.
    56  	healthProbeListener, err := options.newHealthProbeListener(options.HealthProbeBindAddress)
    57  	if err != nil {
    58  		log.Error(err, listenerFailed, "kind", "health probe")
    59  		return nil, err
    60  	}
    61  
    62  	errChan := make(chan error)
    63  	runnables := newRunnables(options.BaseContext, errChan)
    64  
    65  	return &defaultManager{
    66  		stopProcedureEngaged:   ptr.To(int64(0)),
    67  		logger:                 *options.Logger,
    68  		runnables:              runnables,
    69  		errChan:                errChan,
    70  		healthProbeListener:    healthProbeListener,
    71  		metricsListener:        metricsListener,
    72  		metricsExtraHandlers:   metricsExtraHandlers,
    73  		readinessEndpointName:  options.ReadinessEndpointName,
    74  		livenessEndpointName:   options.LivenessEndpointName,
    75  		internalProceduresStop: make(chan struct{}),
    76  	}, nil
    77  }
    78  
    79  // setOptionsDefaults set default values for Options fields.
    80  func setOptionsDefaults(options Options) Options {
    81  	if options.Logger == nil {
    82  		options.Logger = &logging.NewLogger().Logger
    83  	}
    84  
    85  	if options.Logger.GetSink() == nil {
    86  		options.Logger = &logging.NewLogger().Logger
    87  	}
    88  
    89  	if options.newMetricsListener == nil {
    90  		options.newMetricsListener = metrics.NewListener
    91  	}
    92  
    93  	if options.ReadinessEndpointName == "" {
    94  		options.ReadinessEndpointName = defaultReadinessEndpoint
    95  	}
    96  
    97  	if options.LivenessEndpointName == "" {
    98  		options.LivenessEndpointName = defaultLivenessEndpoint
    99  	}
   100  
   101  	if options.HealthProbeBindAddress == "" {
   102  		options.HealthProbeBindAddress = defaultHealthProbeBindAddress
   103  	}
   104  
   105  	if options.newHealthProbeListener == nil {
   106  		options.newHealthProbeListener = defaultHealthProbeListener
   107  	}
   108  
   109  	if options.GracefulShutdownTimeout == nil {
   110  		gracefulShutdownTimeout := defaultGracefulShutdownPeriod
   111  		options.GracefulShutdownTimeout = &gracefulShutdownTimeout
   112  	}
   113  
   114  	if options.BaseContext == nil {
   115  		options.BaseContext = defaultBaseContext
   116  	}
   117  
   118  	return options
   119  }
   120  
   121  // defaultHealthProbeListener creates the default health probes listener bound to the given address.
   122  func defaultHealthProbeListener(addr string) (net.Listener, error) {
   123  	if addr == "" || addr == "0" {
   124  		return nil, nil
   125  	}
   126  
   127  	ln, err := net.Listen("tcp", addr)
   128  	if err != nil {
   129  		return nil, fmt.Errorf("error listening on %s: %w", addr, err)
   130  	}
   131  	return ln, nil
   132  }
   133  
   134  // defaultBaseContext is used as the BaseContext value in Options if one
   135  // has not already been set.
   136  func defaultBaseContext() context.Context {
   137  	return context.Background()
   138  }
   139  
   140  // Assert that the Runnable interface is implemented
   141  var _ Runnable = &defaultManager{}
   142  
   143  type defaultManager struct {
   144  	sync.Mutex
   145  	started bool
   146  
   147  	stopProcedureEngaged *int64
   148  	errChan              chan error
   149  	runnables            *runnables
   150  
   151  	// Logger is the logger that should be used by this manager.
   152  	// If none is set, it defaults to a logging.EdgeLogger backed logr.Logger.
   153  	logger logr.Logger
   154  
   155  	// metricsListener is used to serve prometheus metrics
   156  	metricsListener net.Listener
   157  
   158  	// metricsExtraHandlers contains extra handlers to register on http server that serves metrics.
   159  	metricsExtraHandlers map[string]http.Handler
   160  
   161  	// healthProbeListener is used to serve liveness probe
   162  	healthProbeListener net.Listener
   163  
   164  	// Readiness probe endpoint name
   165  	readinessEndpointName string
   166  
   167  	// Liveness probe endpoint name
   168  	livenessEndpointName string
   169  
   170  	// Readyz probe handler
   171  	readyzHandler *healthz.Handler
   172  
   173  	// Healthz probe handler
   174  	healthzHandler *healthz.Handler
   175  
   176  	// gracefulShutdownTimeout is the duration given to runnable to stop
   177  	// before the manager actually returns on stop.
   178  	gracefulShutdownTimeout time.Duration
   179  
   180  	// shutdownCtx is the context that can be used during shutdown. It will be cancelled
   181  	// after the gracefulShutdownTimeout ended. It must not be accessed before internalStop
   182  	// is closed because it will be nil.
   183  	shutdownCtx context.Context
   184  
   185  	internalCtx    context.Context
   186  	internalCancel context.CancelFunc
   187  
   188  	// internalProceduresStop channel is used internally to the manager when coordinating
   189  	// the proper shutdown of servers. This channel is also used for dependency injection.
   190  	internalProceduresStop chan struct{}
   191  }
   192  
   193  // Add sets dependencies on i, and adds it to the list of Runnables to start.
   194  func (dm *defaultManager) Add(r Runnable) error {
   195  	dm.Lock()
   196  	defer dm.Unlock()
   197  	return dm.add(r)
   198  }
   199  
   200  func (dm *defaultManager) add(r Runnable) error {
   201  	return dm.runnables.Add(r)
   202  }
   203  
   204  // AddMetricsExtraHandler adds extra handler served on path to the http server that serves metrics.
   205  func (dm *defaultManager) AddMetricsExtraHandler(path string, handler http.Handler) error {
   206  	dm.Lock()
   207  	defer dm.Unlock()
   208  
   209  	if dm.started {
   210  		return fmt.Errorf("unable to add new metrics handler because metrics endpoint has already been created")
   211  	}
   212  
   213  	if path == defaultMetricsEndpoint {
   214  		return fmt.Errorf("overriding builtin %s endpoint is not allowed", defaultMetricsEndpoint)
   215  	}
   216  
   217  	if _, found := dm.metricsExtraHandlers[path]; found {
   218  		return fmt.Errorf("can't register extra handler by duplicate path %q on metrics http server", path)
   219  	}
   220  
   221  	dm.metricsExtraHandlers[path] = handler
   222  	dm.logger.V(2).Info("Registering metrics http server extra handler", "path", path)
   223  	return nil
   224  }
   225  
   226  // AddLivezCheck allows you to add Healthz checker.
   227  func (dm *defaultManager) AddLivezCheck(name string, check healthz.Checker) error {
   228  	dm.Lock()
   229  	defer dm.Unlock()
   230  
   231  	if dm.started {
   232  		return fmt.Errorf("unable to add new checker because healthz endpoint has already been created")
   233  	}
   234  
   235  	if dm.healthzHandler == nil {
   236  		dm.healthzHandler = &healthz.Handler{Checks: map[string]healthz.Checker{}, Log: dm.logger}
   237  	}
   238  
   239  	dm.healthzHandler.Checks[name] = check
   240  	return nil
   241  }
   242  
   243  // AddReadyzCheck allows you to add Readyz checker.
   244  func (dm *defaultManager) AddReadyzCheck(name string, check healthz.Checker) error {
   245  	dm.Lock()
   246  	defer dm.Unlock()
   247  
   248  	if dm.started {
   249  		return fmt.Errorf("unable to add new checker because healthz endpoint has already been created")
   250  	}
   251  
   252  	if dm.readyzHandler == nil {
   253  		dm.readyzHandler = &healthz.Handler{Checks: map[string]healthz.Checker{}, Log: dm.logger}
   254  	}
   255  
   256  	dm.readyzHandler.Checks[name] = check
   257  	return nil
   258  }
   259  
   260  func (dm *defaultManager) GetLogger() logr.Logger {
   261  	return dm.logger
   262  }
   263  
   264  func (dm *defaultManager) serveMetrics() {
   265  	handler := promhttp.HandlerFor(metrics.Registry, promhttp.HandlerOpts{
   266  		ErrorHandling: promhttp.HTTPErrorOnError,
   267  	})
   268  	mux := http.NewServeMux()
   269  	mux.Handle(defaultMetricsEndpoint, handler)
   270  	for path, extraHandler := range dm.metricsExtraHandlers {
   271  		mux.Handle(path, extraHandler)
   272  	}
   273  
   274  	server := NewServer(mux)
   275  	go dm.httpServe("metrics", dm.logger.WithValues("path", defaultMetricsEndpoint), server, dm.metricsListener)
   276  }
   277  
   278  func (dm *defaultManager) serveHealthProbes() {
   279  	mux := http.NewServeMux()
   280  	server := NewServer(mux)
   281  
   282  	if dm.readyzHandler != nil {
   283  		mux.Handle(dm.readinessEndpointName, http.StripPrefix(dm.readinessEndpointName, dm.readyzHandler))
   284  		// Append '/' suffix to handle subpaths
   285  		mux.Handle(dm.readinessEndpointName+"/", http.StripPrefix(dm.readinessEndpointName, dm.readyzHandler))
   286  	}
   287  	if dm.healthzHandler != nil {
   288  		mux.Handle(dm.livenessEndpointName, http.StripPrefix(dm.livenessEndpointName, dm.healthzHandler))
   289  		// Append '/' suffix to handle subpaths
   290  		mux.Handle(dm.livenessEndpointName+"/", http.StripPrefix(dm.livenessEndpointName, dm.healthzHandler))
   291  	}
   292  
   293  	go dm.httpServe("health probe", dm.logger, server, dm.healthProbeListener)
   294  }
   295  
   296  func (dm *defaultManager) httpServe(kind string, log logr.Logger, server *http.Server, ln net.Listener) {
   297  	log = log.WithValues("kind", kind, "addr", ln.Addr())
   298  
   299  	go func() {
   300  		log.Info("starting server")
   301  		if err := server.Serve(ln); err != nil {
   302  			if errors.Is(err, http.ErrServerClosed) {
   303  				return
   304  			}
   305  			if atomic.LoadInt64(dm.stopProcedureEngaged) > 0 {
   306  				// There might be cases where connections are still open and we try to shutdown
   307  				// but not having enough time to close the connection causes an error in Serve
   308  				//
   309  				// In that case we want to avoid returning an error to the main error channel.
   310  				log.Error(err, "error on Serve after stop has been engaged")
   311  				return
   312  			}
   313  			dm.errChan <- err
   314  		}
   315  	}()
   316  
   317  	// Shutdown the server when stop is closed.
   318  	<-dm.internalProceduresStop
   319  	if err := server.Shutdown(dm.shutdownCtx); err != nil {
   320  		if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
   321  			// Avoid logging context related errors.
   322  			return
   323  		}
   324  		if atomic.LoadInt64(dm.stopProcedureEngaged) > 0 {
   325  			dm.logger.Error(err, "error on Shutdown after stop has been engaged")
   326  			return
   327  		}
   328  		dm.errChan <- err
   329  	}
   330  }
   331  
   332  // Start starts the manager and waits indefinitely.
   333  // There is only two ways to have start return:
   334  // An error has occurred during in one of the internal operations,
   335  // such as leader election, cache start, webhooks, and so on.
   336  // Or, the context is cancelled.
   337  func (dm *defaultManager) Start(ctx context.Context) (err error) {
   338  	dm.Lock()
   339  	if dm.started {
   340  		dm.Unlock()
   341  		return errors.New("manager already started")
   342  	}
   343  	var ready bool
   344  	defer func() {
   345  		// Only unlock the manager if we haven't reached
   346  		// the internal readiness condition.
   347  		if !ready {
   348  			dm.Unlock()
   349  		}
   350  	}()
   351  
   352  	// Initialize the internal context.
   353  	dm.internalCtx, dm.internalCancel = context.WithCancel(ctx)
   354  
   355  	// This chan indicates that stop is complete, in other words all runnables have returned or timeout on stop request
   356  	stopComplete := make(chan struct{})
   357  	defer close(stopComplete)
   358  	// This must be deferred after closing stopComplete, otherwise we deadlock.
   359  	defer func() {
   360  		// https://hips.hearstapps.com/hmg-prod.s3.amazonaws.com/images/gettyimages-459889618-1533579787.jpg
   361  		stopErr := dm.engageStopProcedure(stopComplete)
   362  		if stopErr != nil {
   363  			if err != nil {
   364  				// Utilerrors.Aggregate allows to use errors.Is for all contained errors
   365  				// whereas fmt.Errorf allows wrapping at most one error which means the
   366  				// other one can not be found anymore.
   367  				err = kerrors.NewAggregate([]error{err, stopErr})
   368  			} else {
   369  				err = stopErr
   370  			}
   371  		}
   372  	}()
   373  
   374  	// Have default healthz probe if none added. This functionality differs from controller-runtime.
   375  	if dm.healthzHandler == nil {
   376  		dm.healthzHandler = &healthz.Handler{Checks: map[string]healthz.Checker{}, Log: dm.logger}
   377  	}
   378  	// if dm.readyzHandler == nil {
   379  	// 	dm.readyzHandler = &healthz.Handler{Checks: map[string]healthz.Checker{}}
   380  	// }
   381  
   382  	// Metrics should be served whether the controller is leader or not.
   383  	// (If we don't serve metrics for non-leaders, prometheus will still scrape
   384  	// the pod but will get a connection refused).
   385  	if dm.metricsListener != nil {
   386  		dm.serveMetrics()
   387  	}
   388  
   389  	// Serve health probes.
   390  	if dm.healthProbeListener != nil {
   391  		dm.serveHealthProbes()
   392  	}
   393  
   394  	// // First start any webhook servers, which includes conversion, validation, and defaulting
   395  	// // webhooks that are registered.
   396  	// //
   397  	// // WARNING: Webhooks MUST start before any cache is populated, otherwise there is a race condition
   398  	// // between conversion webhooks and the cache sync (usually initial list) which causes the webhooks
   399  	// // to never start because no cache can be populated.
   400  	// if err := dm.runnables.Webhooks.Start(dm.internalCtx); err != nil {
   401  	// 	if !errors.Is(err, wait.ErrWaitTimeout) {
   402  	// 		return err
   403  	// 	}
   404  	// }
   405  
   406  	// Start the non-leaderelection Runnables after the cache has synced.
   407  	if err := dm.runnables.Others.Start(dm.internalCtx); err != nil {
   408  		if !wait.Interrupted(err) {
   409  			return err
   410  		}
   411  	}
   412  
   413  	ready = true
   414  	dm.Unlock()
   415  	select {
   416  	case <-ctx.Done():
   417  		// We are done
   418  		return nil
   419  	case err := <-dm.errChan:
   420  		// Error starting or running a runnable
   421  		return err
   422  	}
   423  }
   424  
   425  // engageStopProcedure signals all runnables to stop, reads potential errors
   426  // from the errChan and waits for them to end. It must not be called more than once.
   427  func (dm *defaultManager) engageStopProcedure(stopComplete <-chan struct{}) error {
   428  	if !atomic.CompareAndSwapInt64(dm.stopProcedureEngaged, 0, 1) {
   429  		return errors.New("stop procedure already engaged")
   430  	}
   431  
   432  	// Populate the shutdown context, this operation MUST be done before
   433  	// closing the internalProceduresStop channel.
   434  	//
   435  	// The shutdown context immediately expires if the gracefulShutdownTimeout is not set.
   436  	var shutdownCancel context.CancelFunc
   437  	dm.shutdownCtx, shutdownCancel = context.WithTimeout(context.Background(), dm.gracefulShutdownTimeout)
   438  	defer shutdownCancel()
   439  
   440  	// Start draining the errors before acquiring the lock to make sure we don't deadlock
   441  	// if something that has the lock is blocked on trying to write into the unbuffered
   442  	// channel after something else already wrote into it.
   443  	var closeOnce sync.Once
   444  	go func() {
   445  		for {
   446  			// Closing in the for loop is required to avoid race conditions between
   447  			// the closure of all internal procedures and making sure to have a reader off the error channel.
   448  			closeOnce.Do(func() {
   449  				// Cancel the internal stop channel and wait for the procedures to stop and complete.
   450  				close(dm.internalProceduresStop)
   451  				dm.internalCancel()
   452  			})
   453  			select {
   454  			case err, ok := <-dm.errChan:
   455  				if ok {
   456  					dm.logger.Error(err, "error received after stop sequence was engaged")
   457  				}
   458  			case <-stopComplete:
   459  				return
   460  			}
   461  		}
   462  	}()
   463  
   464  	go func() {
   465  		dm.logger.Info("stopping and waiting for runnables")
   466  		dm.runnables.Others.StopAndWait(dm.shutdownCtx)
   467  
   468  		// // Webhooks should come last, as they might be still serving some requests.
   469  		// dm.logger.Info("Stopping and waiting for webhooks")
   470  		// dm.runnables.Webhooks.StopAndWait(dm.shutdownCtx)
   471  
   472  		// Proceed to close the manager and overall shutdown context.
   473  		dm.logger.Info("wait completed, proceeding to shutdown the manager")
   474  		shutdownCancel()
   475  	}()
   476  
   477  	<-dm.shutdownCtx.Done()
   478  	if err := dm.shutdownCtx.Err(); err != nil && !errors.Is(err, context.Canceled) {
   479  		if errors.Is(err, context.DeadlineExceeded) {
   480  			if dm.gracefulShutdownTimeout > 0 {
   481  				return fmt.Errorf("failed waiting for all runnables to end within grace period of %s: %w", dm.gracefulShutdownTimeout, err)
   482  			}
   483  			return nil
   484  		}
   485  		// For any other error, return the error.
   486  		return err
   487  	}
   488  
   489  	return nil
   490  }
   491  

View as plain text