...

Source file src/edge-infra.dev/pkg/sds/lib/process/processmanager/process.go

Documentation: edge-infra.dev/pkg/sds/lib/process/processmanager

     1  package processmanager
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  	"os"
     8  	"os/exec"
     9  	"strings"
    10  	"time"
    11  
    12  	"github.com/go-logr/logr"
    13  	"golang.org/x/sys/unix"
    14  )
    15  
    16  type ReadyCheckFunc func(context.Context) (bool, error)
    17  
    18  // Process handles the running of a process.
    19  //
    20  // Starting the process with Start() runs it asynchronously. Calling
    21  // Stop() will send a SIGTERM to the process.
    22  //
    23  // If the process exits unexpectedly (e.g. they reach an error state),
    24  // the result will be sent to the channel returned by Result().
    25  type Process interface {
    26  	// The PID of the process (nil if process is not running).
    27  	PID() *int
    28  
    29  	// WithArgs sets the command arguments. Has no affect until
    30  	// the process is restarted.
    31  	WithArgs(args ...string)
    32  
    33  	// WithExpectNoExit tells the process manager that its process
    34  	// is not expected to stop. If the process does stop, an error
    35  	// should be returned, even if the process returned no error.
    36  	WithExpectNoExit()
    37  	// Whether the process is expected to exit. See WithExpectNoExit().
    38  	ExpectsExit() bool
    39  
    40  	ProcessManager
    41  }
    42  
    43  type process struct {
    44  	processManager
    45  
    46  	// Path to the process binary.
    47  	path string
    48  	// Arguments the process will be ran with.
    49  	args []string
    50  	// The os/exec instance of the running command.
    51  	cmd *exec.Cmd
    52  
    53  	// Used internally to receive the result of the os/exec command.
    54  	procExitChan chan error
    55  	// Whether the process is expected to exit. If true, an error
    56  	// will be returned to the result channel on exit, even if the
    57  	// process exited cleanly.
    58  	expectNoExit bool
    59  }
    60  
    61  // Create a new Process, given arguments and path to the process.
    62  func NewProcess(name string, path string, args ...string) (Process, error) {
    63  	if _, err := os.ReadFile(path); err != nil {
    64  		return nil, fmt.Errorf("unable to find command %s: %w", path, err)
    65  	}
    66  
    67  	return &process{
    68  		processManager: processManager{
    69  			name:       name,
    70  			resultChan: make(chan error, 1),
    71  			log:        logr.Discard(),
    72  			vlog:       logr.Discard(),
    73  		},
    74  		path:         path,
    75  		args:         args,
    76  		procExitChan: make(chan error),
    77  	}, nil
    78  }
    79  
    80  func (proc *process) Start(ctx context.Context) (err error) {
    81  	proc.Mutex.Lock()
    82  	defer proc.Mutex.Unlock()
    83  
    84  	if proc.isRunning {
    85  		return nil
    86  	}
    87  
    88  	// cleanup process and long-running threads if we fail
    89  	defer func() {
    90  		err = proc.cleanupOnFailure(ctx, err)
    91  	}()
    92  
    93  	args := strings.Join(proc.args, " ")
    94  	proc.log.Info("running process", "args", args)
    95  
    96  	// create child context so we can cancel long-running threads
    97  	// when Stop() is called
    98  	procCtx, cancel := context.WithCancel(ctx)
    99  	proc.cancel = cancel
   100  
   101  	// start context handler to call Stop() if ctx is cancelled
   102  	proc.startContextHandler(procCtx, ctx)
   103  
   104  	if err := proc.executeHooks(procCtx, preStart); err != nil {
   105  		return err
   106  	}
   107  
   108  	if err := proc.startProcess(); err != nil {
   109  		return fmt.Errorf("unable to start %s process: %w", proc.Name(), err)
   110  	}
   111  
   112  	proc.vlog.Info("process is running", "PID", proc.PID(), "args", args)
   113  
   114  	// start exit-handler to handle process exiting unexpectedly
   115  	proc.startExitHandler(procCtx)
   116  
   117  	if err := proc.executeHooks(procCtx, postStart); err != nil {
   118  		return err
   119  	}
   120  
   121  	if err := proc.waitUntilReadyWithTimeout(ctx); err != nil {
   122  		return fmt.Errorf("%s process is not ready: %w", proc.Name(), err)
   123  	}
   124  
   125  	proc.vlog.Info("process is ready", "PID", proc.PID(), "args", args)
   126  	proc.isRunning = true
   127  
   128  	return nil
   129  }
   130  
   131  func (proc *process) cleanupOnFailure(ctx context.Context, err error) error {
   132  	if err == nil {
   133  		return nil
   134  	}
   135  	proc.vlog.Info("starting process failed, cleaning up")
   136  	return errors.Join(err, proc.stop(ctx))
   137  }
   138  
   139  func (proc *process) startContextHandler(ctx, startCtx context.Context) {
   140  	if proc.skipContextHandling {
   141  		return
   142  	}
   143  	go func() {
   144  		if err := contextHandler(ctx, startCtx, proc, proc.log); err != nil {
   145  			proc.log.Error(err, "failed to shutdown")
   146  		}
   147  	}()
   148  }
   149  
   150  // Starts the process and sets stdout and stderr. Sends the result of
   151  // the process to the exit channel once it completes.
   152  func (proc *process) startProcess() error {
   153  	proc.cmd = exec.Command(proc.path, proc.args...) //#nosec G204
   154  
   155  	proc.cmd.Stdout = os.Stdout
   156  	proc.cmd.Stderr = os.Stderr
   157  
   158  	if err := proc.cmd.Start(); err != nil {
   159  		return err
   160  	}
   161  
   162  	// send the result of the command to the exit channel once it exits
   163  	go func() { proc.procExitChan <- proc.cmd.Wait() }()
   164  
   165  	return nil
   166  }
   167  
   168  // If an exit signal is received from the process, perform cleanup and
   169  // return the result to the result channel for consumers to evaluate.
   170  //
   171  // If the context is cancelled, Stop() has been called, so the exit-handler
   172  // can stop running.
   173  func (proc *process) startExitHandler(ctx context.Context) {
   174  	go func() {
   175  		select {
   176  		case result := <-proc.procExitChan:
   177  			proc.vlog.Info("process has exited", "PID", proc.PID())
   178  			proc.resultChan <- errors.Join(
   179  				resultError(proc.Name(), result, proc.expectNoExit),
   180  				proc.Stop(ctx),
   181  			)
   182  		case <-ctx.Done():
   183  			return
   184  		}
   185  	}()
   186  }
   187  
   188  func (proc *process) Stop(ctx context.Context) error {
   189  	proc.Mutex.Lock()
   190  	defer proc.Mutex.Unlock()
   191  
   192  	if !proc.isRunning {
   193  		return nil
   194  	}
   195  
   196  	return proc.stop(ctx)
   197  }
   198  
   199  func (proc *process) stop(ctx context.Context) error {
   200  	pid := proc.PID()
   201  	proc.log.Info("stopping process", "PID", pid)
   202  
   203  	// cancel long-running threads
   204  	proc.cancel()
   205  
   206  	if err := proc.executeHooks(ctx, preStop); err != nil {
   207  		return err
   208  	}
   209  
   210  	if err := proc.stopProcess(); err != nil {
   211  		return fmt.Errorf("unable to stop process: %w", err)
   212  	}
   213  
   214  	if err := proc.executeHooks(ctx, postStop); err != nil {
   215  		return err
   216  	}
   217  
   218  	proc.vlog.Info("process has stopped", "PID", pid)
   219  	proc.isRunning = false
   220  
   221  	return nil
   222  }
   223  
   224  // Sends a SIGTERM signal to the process, with a 10 second wait for
   225  // the process to complete.
   226  func (proc *process) stopProcess() error {
   227  	pid := proc.PID()
   228  	if pid == nil {
   229  		return nil
   230  	}
   231  
   232  	// If the process is already complete, we are done.
   233  	if err := proc.cmd.Process.Signal(unix.SIGTERM); err == os.ErrProcessDone {
   234  		return nil
   235  	} else if err != nil {
   236  		return fmt.Errorf("failed to send SIGTERM to process %s with PID=%d: %w", proc.path, pid, err)
   237  	}
   238  
   239  	if err := proc.waitForProcessExit(); err != nil {
   240  		return fmt.Errorf("process with PID=%d did not exit: %w", pid, err)
   241  	}
   242  
   243  	proc.cmd = nil
   244  	return nil
   245  }
   246  
   247  // Waits for 10 seconds for the process to exit, ignoring any exit errors.
   248  func (proc *process) waitForProcessExit() error {
   249  	select {
   250  	case err := <-proc.procExitChan:
   251  		if _, ok := err.(*exec.ExitError); ok || err == nil {
   252  			return nil
   253  		}
   254  		return fmt.Errorf("error received whilst exiting: %w", err)
   255  	case <-time.After(exitTimeout):
   256  		return fmt.Errorf("timeout reached")
   257  	}
   258  }
   259  
   260  func (proc *process) Restart(ctx context.Context) error {
   261  	if err := proc.Stop(ctx); err != nil {
   262  		return err
   263  	}
   264  	return proc.Start(ctx)
   265  }
   266  
   267  func (proc *process) WithLogger(log logr.Logger, verbose bool) {
   268  	proc.log = log.WithName(fmt.Sprintf("%s-process", proc.Name())).WithValues("process", proc.Name(), "path", proc.path)
   269  	if verbose {
   270  		proc.vlog = proc.log
   271  	}
   272  }
   273  
   274  func (proc *process) PID() *int {
   275  	if proc.cmd != nil && proc.cmd.Process != nil {
   276  		return &proc.cmd.Process.Pid
   277  	}
   278  	return nil
   279  }
   280  
   281  func (proc *process) WithArgs(args ...string) {
   282  	proc.args = args
   283  }
   284  
   285  func (proc *process) WithExpectNoExit() {
   286  	proc.expectNoExit = true
   287  }
   288  
   289  func (proc *process) ExpectsExit() bool {
   290  	return !proc.expectNoExit
   291  }
   292  

View as plain text