...

Source file src/edge-infra.dev/pkg/sds/lib/process/processmanager/processmanager.go

Documentation: edge-infra.dev/pkg/sds/lib/process/processmanager

     1  package processmanager
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"sync"
     7  	"time"
     8  
     9  	"github.com/go-logr/logr"
    10  	"k8s.io/apimachinery/pkg/util/wait"
    11  )
    12  
    13  const (
    14  	startupTimeout = time.Second * 30
    15  	exitTimeout    = time.Second * 10
    16  )
    17  
    18  type hookType string
    19  
    20  const (
    21  	preStart  hookType = "pre-start"
    22  	postStart hookType = "post-start"
    23  	preStop   hookType = "pre-stop"
    24  	postStop  hookType = "post-stop"
    25  )
    26  
    27  // A hook function that can be called during the process lifecycle.
    28  //
    29  // The following process manager methods can configure hooks:
    30  //   - WithPreStartHooks(...HookFunc)
    31  //   - WithPostStartHooks(...HookFunc)
    32  //   - WithPreStopHooks(...HookFunc)
    33  //   - WithPostStopHooks(...HookFunc)
    34  //
    35  // If the function creates asynchronous goroutines, they should
    36  // exit should the context be cancelled.
    37  type HookFunc func(ctx context.Context) error
    38  
    39  // ProcessManager handles the running of processes.
    40  type ProcessManager interface {
    41  	Name() string
    42  
    43  	// Start starts processes, watching stdout and stderr.
    44  	// This does nothing if it is already running.
    45  	//
    46  	// The process manager will be stopped if the context is
    47  	// cancelled, unless disabled via WithContextHandler().
    48  	Start(context.Context) error
    49  	// Stop sends SIGTERM to processes and waits for it to stop.
    50  	// This does nothing if it is already stopped.
    51  	Stop(context.Context) error
    52  	// Restart stops the process then starts it again.
    53  	Restart(context.Context) error
    54  
    55  	// Result returns the channel the process manager uses to report
    56  	// the result of the process on exiting.
    57  	Result() <-chan error
    58  
    59  	// WithLogger sets the logger. If unset, there will be no logging
    60  	// from the process manager. The processes will still output
    61  	// to stdout.
    62  	WithLogger(log logr.Logger, verbose bool)
    63  
    64  	// WithPreStartHooks sets the start hooks to be ran before the
    65  	// process is started.
    66  	//
    67  	// Start hooks will be cancelled if the process manager is stopped.
    68  	WithPreStartHooks(...HookFunc)
    69  	// WithPostStartHooks sets the start hooks to be ran after the
    70  	// process is started.
    71  	//
    72  	// Start hooks will be cancelled if the process manager is stopped.
    73  	WithPostStartHooks(...HookFunc)
    74  	// WithPreStopHooks sets the stop hooks to be ran before the
    75  	// process is stopped.
    76  	WithPreStopHooks(...HookFunc)
    77  	// WithPostStopHooks sets the stop hooks to be ran after the
    78  	// process is stopped.
    79  	WithPostStopHooks(...HookFunc)
    80  
    81  	// WithContextHandler disables the process manager stopping when
    82  	// the context that it was started with is cancelled.
    83  	WithNoContextHandler()
    84  
    85  	// IsReady checks whether the process is running and ready. If
    86  	// WithReadyCheck() has not been called, then the process is ready
    87  	// as soon as it starts running.
    88  	IsReady(context.Context) (bool, error)
    89  	// WithReadyCheck provides a function which returns whether the
    90  	// process is ready. The process must be running to be ready.
    91  	WithReadyCheck(ReadyCheckFunc)
    92  
    93  	// WithWaitUntilReady will wait until IsReady() passes once the
    94  	// process is started. If a timeout is reached, an error will
    95  	// be returned.
    96  	WithWaitUntilReady(timeout time.Duration)
    97  	// WaitUntilReady() blocks until IsReady() returns true, or
    98  	// context is cancelled.
    99  	WaitUntilReady(context.Context) error
   100  	// WaitUntilStopped blocks until IsRunning() returns false, or
   101  	// context is cancelled.
   102  	WaitUntilStopped(context.Context) error
   103  }
   104  
   105  type processManager struct {
   106  	// Useful name for the process manager.
   107  	name string
   108  
   109  	// Hook functions to be called prior to starting the process manager.
   110  	preStartHooks []HookFunc
   111  	// Hook functions to be called after starting the process manager.
   112  	postStartHooks []HookFunc
   113  	// Hook functions to be called prior to stopping the process manager.
   114  	preStopHooks []HookFunc
   115  	// Hook functions to be called after stopping the process manager.
   116  	postStopHooks []HookFunc
   117  
   118  	// The process manager's logger.
   119  	log logr.Logger
   120  	// The process manager's verbose logger.
   121  	vlog logr.Logger
   122  
   123  	// Used to externally report the result of the process manager
   124  	// process exiting via the Result() method.
   125  	resultChan chan error
   126  	// Used to cancel any asynchronous processes when the process
   127  	// manager is stopped.
   128  	cancel context.CancelFunc
   129  	// Configures whether the process manager should call stop if
   130  	// the context it as started with is cancelled.
   131  	skipContextHandling bool
   132  
   133  	// A function to check if the process is running and ready.
   134  	readyCheck ReadyCheckFunc
   135  	// Whether we should wait for IsReady() to pass after the
   136  	// process starts running.
   137  	waitUntilReady bool
   138  	// Timeout duration for ready check wait.
   139  	waitUntilReadyTimeout time.Duration
   140  
   141  	// Whether the process manager is currently running.
   142  	isRunning bool
   143  
   144  	// Ensures start and stop calls cannot be made concurrently.
   145  	sync.Mutex
   146  }
   147  
   148  func (pm *processManager) Name() string {
   149  	return pm.name
   150  }
   151  
   152  func (pm *processManager) Result() <-chan error {
   153  	return pm.resultChan
   154  }
   155  
   156  func (pm *processManager) WithPreStartHooks(hooks ...HookFunc) {
   157  	pm.preStartHooks = hooks
   158  }
   159  
   160  func (pm *processManager) WithPostStartHooks(hooks ...HookFunc) {
   161  	pm.postStartHooks = hooks
   162  }
   163  
   164  func (pm *processManager) WithPreStopHooks(hooks ...HookFunc) {
   165  	pm.preStopHooks = hooks
   166  }
   167  
   168  func (pm *processManager) WithPostStopHooks(hooks ...HookFunc) {
   169  	pm.postStopHooks = hooks
   170  }
   171  
   172  // Executes each hook in order, returning as soon as an error is received.
   173  func (pm *processManager) executeHooks(ctx context.Context, typ hookType) error {
   174  	switch typ {
   175  	case preStart:
   176  		return executeHookFuncs(ctx, pm.Name(), preStart, pm.preStartHooks)
   177  	case postStart:
   178  		return executeHookFuncs(ctx, pm.Name(), postStart, pm.postStartHooks)
   179  	case preStop:
   180  		return executeHookFuncs(ctx, pm.Name(), preStop, pm.preStopHooks)
   181  	case postStop:
   182  		return executeHookFuncs(ctx, pm.Name(), postStop, pm.postStopHooks)
   183  	}
   184  	return nil
   185  }
   186  
   187  func executeHookFuncs(ctx context.Context, name string, typ hookType, hooks []HookFunc) error {
   188  	for idx, hook := range hooks {
   189  		if err := hook(ctx); err != nil {
   190  			return fmt.Errorf("failed to execute %s hook %d for %s: %w", typ, idx+1, name, err)
   191  		}
   192  	}
   193  	return nil
   194  }
   195  
   196  func (pm *processManager) WithNoContextHandler() {
   197  	pm.skipContextHandling = true
   198  }
   199  
   200  func (pm *processManager) IsReady(ctx context.Context) (bool, error) {
   201  	if pm.readyCheck != nil {
   202  		return pm.readyCheck(ctx)
   203  	}
   204  
   205  	if !pm.isRunning {
   206  		return false, fmt.Errorf("%s process is not running", pm.Name())
   207  	}
   208  
   209  	return true, nil
   210  }
   211  
   212  func (pm *processManager) WithReadyCheck(readyCheck ReadyCheckFunc) {
   213  	pm.readyCheck = readyCheck
   214  }
   215  
   216  func (pm *processManager) WithWaitUntilReady(timeout time.Duration) {
   217  	pm.waitUntilReady = true
   218  	pm.waitUntilReadyTimeout = timeout
   219  }
   220  
   221  func (pm *processManager) waitUntilReadyWithTimeout(ctx context.Context) error {
   222  	if !pm.waitUntilReady || pm.readyCheck == nil {
   223  		return nil
   224  	}
   225  
   226  	// use timeout so we don't hang forever
   227  	waitCtx, cancel := context.WithTimeout(ctx, pm.waitUntilReadyTimeout)
   228  	defer cancel()
   229  
   230  	return pm.WaitUntilReady(waitCtx)
   231  }
   232  
   233  func (pm *processManager) WaitUntilReady(ctx context.Context) (err error) {
   234  	pollErr := wait.PollUntilContextCancel(ctx, time.Second, true, func(ctx context.Context) (ready bool, pollErr error) {
   235  		if ready, err = pm.IsReady(ctx); ready {
   236  			return true, nil
   237  		}
   238  		return false, nil
   239  	})
   240  
   241  	if pollErr != nil {
   242  		return err
   243  	}
   244  
   245  	return nil
   246  }
   247  
   248  func (pm *processManager) WaitUntilStopped(ctx context.Context) error {
   249  	if err := wait.PollUntilContextCancel(ctx, time.Second, true, func(context.Context) (bool, error) {
   250  		ready, err := pm.IsReady(ctx)
   251  		return !ready, err
   252  	}); err != nil {
   253  		return fmt.Errorf("%s did not stop running: %w", pm.Name(), err)
   254  	}
   255  	return nil
   256  }
   257  
   258  // If the context the process manager was started with is cancelled, stop it.
   259  func contextHandler(ctx, startCtx context.Context, pm ProcessManager, log logr.Logger) error {
   260  	// ctx is a child of startCtx, so this blocks until either
   261  	// context is cancelled
   262  	<-ctx.Done()
   263  
   264  	select {
   265  	case <-startCtx.Done():
   266  		// if the start context was cancelled, stop the process manager
   267  		// using a new (uncancelled) context with timeout
   268  		log.Info("context cancelled: shutting down")
   269  		stopCtx, cancel := context.WithTimeout(context.Background(), exitTimeout)
   270  		defer cancel()
   271  		return pm.Stop(stopCtx)
   272  	default:
   273  		// if ctx was cancelled, Stop() has been called so exit
   274  		// without stopping
   275  		return nil
   276  	}
   277  }
   278  
   279  func resultError(name string, result error, expectNoExit bool) error {
   280  	if result != nil {
   281  		return fmt.Errorf("%s exited with error: %w", name, result)
   282  	}
   283  	if expectNoExit {
   284  		return fmt.Errorf("%s exited unexpectedly", name)
   285  	}
   286  	return nil
   287  }
   288  

View as plain text