...

Source file src/edge-infra.dev/pkg/sds/lib/process/processmanager/processgroup.go

Documentation: edge-infra.dev/pkg/sds/lib/process/processmanager

     1  package processmanager
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  
     8  	"github.com/go-logr/logr"
     9  )
    10  
    11  // ProcessGroup handles the running of a group of long-running processes.
    12  //
    13  // Starting the group will start each process in the order
    14  // they were added to the group.
    15  //
    16  // Calling stop stops each process in the reverse-order that
    17  // they were added to the group.
    18  //
    19  // If any of the processes exit, all other processes will be
    20  // stopped and the result will be sent to the channel returned
    21  // by Result().
    22  type ProcessGroup interface {
    23  	// The PID of the processes in the group (nil if process is not running).
    24  	PIDs() map[string]*int
    25  
    26  	// The names of the processes in the group.
    27  	Processes() []string
    28  
    29  	ProcessManager
    30  }
    31  
    32  type processGroup struct {
    33  	processManager
    34  
    35  	// The Process instances in the group.
    36  	procs []Process
    37  }
    38  
    39  // Creates a new process group for the given processes.
    40  //
    41  // The processes in the group must be long-running, i.e.
    42  // configured with WithExpectNoExit().
    43  func NewProcessGroup(name string, procs ...Process) (ProcessGroup, error) {
    44  	if err := validateGroupProcesses(procs); err != nil {
    45  		return nil, err
    46  	}
    47  
    48  	// disable context handler for processes to let
    49  	// ProcessGroup handle their shutdown
    50  	for _, proc := range procs {
    51  		proc.WithNoContextHandler()
    52  	}
    53  
    54  	return &processGroup{
    55  		processManager: processManager{
    56  			name:       name,
    57  			resultChan: make(chan error, len(procs)),
    58  			log:        logr.Discard(),
    59  			vlog:       logr.Discard(),
    60  		},
    61  		procs: procs,
    62  	}, nil
    63  }
    64  
    65  func validateGroupProcesses(procs []Process) error {
    66  	for _, proc := range procs {
    67  		if proc.ExpectsExit() {
    68  			return fmt.Errorf("processes in group should be long-running %s is not", proc.Name())
    69  		}
    70  	}
    71  	return nil
    72  }
    73  
    74  func (pg *processGroup) Start(ctx context.Context) (err error) {
    75  	pg.Mutex.Lock()
    76  	defer pg.Mutex.Unlock()
    77  
    78  	if pg.isRunning {
    79  		return nil
    80  	}
    81  
    82  	// cleanup processes and long-running threads if we fail
    83  	defer func() {
    84  		err = pg.cleanupOnFailure(ctx, err)
    85  	}()
    86  
    87  	pg.log.Info("starting process group")
    88  
    89  	// create child context so we can cancel long-running threads
    90  	// when Stop() is called
    91  	procCtx, cancel := context.WithCancel(ctx)
    92  	pg.cancel = cancel
    93  
    94  	// start context handler to call Stop() if ctx is cancelled
    95  	pg.startContextHandler(procCtx, ctx)
    96  
    97  	if err := pg.executeHooks(procCtx, preStart); err != nil {
    98  		return err
    99  	}
   100  
   101  	if err := pg.startProcesses(ctx); err != nil {
   102  		return fmt.Errorf("failed to start processes: %w", err)
   103  	}
   104  
   105  	pg.vlog.Info("process group is running", "PIDs", pg.PIDs())
   106  
   107  	// start exit-handler to handle any processes exiting unexpectedly
   108  	pg.startExitHandler(procCtx)
   109  
   110  	if err := pg.executeHooks(procCtx, postStart); err != nil {
   111  		return err
   112  	}
   113  
   114  	if err := pg.waitUntilReadyWithTimeout(ctx); err != nil {
   115  		return fmt.Errorf("%s process group is not ready: %w", pg.Name(), err)
   116  	}
   117  
   118  	pg.vlog.Info("process group is ready", "PIDs", pg.PIDs())
   119  	pg.isRunning = true
   120  
   121  	return nil
   122  }
   123  
   124  func (pg *processGroup) cleanupOnFailure(ctx context.Context, err error) error {
   125  	if err == nil {
   126  		return nil
   127  	}
   128  	pg.vlog.Info("starting process group failed, cleaning up")
   129  	return errors.Join(err, pg.stop(ctx))
   130  }
   131  
   132  func (pg *processGroup) startContextHandler(ctx, startCtx context.Context) {
   133  	if pg.skipContextHandling {
   134  		return
   135  	}
   136  	go func() {
   137  		go func() {
   138  			if err := contextHandler(ctx, startCtx, pg, pg.log); err != nil {
   139  				pg.log.Error(err, "failed to shutdown")
   140  			}
   141  		}()
   142  	}()
   143  }
   144  
   145  func (pg *processGroup) startProcesses(ctx context.Context) error {
   146  	for _, proc := range pg.procs {
   147  		if err := proc.Start(ctx); err != nil {
   148  			return err
   149  		}
   150  	}
   151  	return nil
   152  }
   153  
   154  func (pg *processGroup) startExitHandler(ctx context.Context) {
   155  	for _, proc := range pg.procs {
   156  		go func() {
   157  			select {
   158  			case result := <-proc.Result():
   159  				pg.vlog.Info("process in group has exited, stopping remaining processes", "process", proc.Name(), "PID", proc.PID())
   160  				pg.resultChan <- errors.Join(
   161  					resultError(pg.Name(), result, true),
   162  					pg.Stop(ctx),
   163  				)
   164  			case <-ctx.Done():
   165  				return
   166  			}
   167  		}()
   168  	}
   169  }
   170  
   171  func (pg *processGroup) Stop(ctx context.Context) error {
   172  	pg.Mutex.Lock()
   173  	defer pg.Mutex.Unlock()
   174  
   175  	if !pg.isRunning {
   176  		return nil
   177  	}
   178  
   179  	return pg.stop(ctx)
   180  }
   181  
   182  func (pg *processGroup) stop(ctx context.Context) error {
   183  	pg.log.Info("stopping process group", "PIDs", pg.PIDs())
   184  
   185  	// cancel exit-handler and any hooks that are still running
   186  	pg.cancel()
   187  
   188  	if err := pg.executeHooks(ctx, preStop); err != nil {
   189  		return err
   190  	}
   191  
   192  	if err := pg.stopProcesses(ctx); err != nil {
   193  		return fmt.Errorf("unable to stop process group: %w", err)
   194  	}
   195  
   196  	if err := pg.executeHooks(ctx, postStop); err != nil {
   197  		return err
   198  	}
   199  
   200  	pg.vlog.Info("process group has stopped", "PIDs", pg.PIDs())
   201  	pg.isRunning = false
   202  
   203  	return nil
   204  }
   205  
   206  func (pg *processGroup) stopProcesses(ctx context.Context) error {
   207  	for idx := len(pg.procs) - 1; idx >= 0; idx-- {
   208  		proc := pg.procs[idx]
   209  		if err := proc.Stop(ctx); err != nil {
   210  			return err
   211  		}
   212  	}
   213  	return nil
   214  }
   215  
   216  func (pg *processGroup) Restart(ctx context.Context) error {
   217  	if err := pg.Stop(ctx); err != nil {
   218  		return err
   219  	}
   220  	return pg.Start(ctx)
   221  }
   222  
   223  func (pg *processGroup) WithLogger(log logr.Logger, verbose bool) {
   224  	for _, proc := range pg.procs {
   225  		proc.WithLogger(log, verbose)
   226  	}
   227  	pg.log = log.WithName(fmt.Sprintf("%s-processgroup", pg.Name())).WithValues("processgroup", pg.Name(), "processes", pg.Processes())
   228  	if verbose {
   229  		pg.vlog = pg.log
   230  	}
   231  }
   232  
   233  func (pg *processGroup) Processes() []string {
   234  	procNames := []string{}
   235  	for _, proc := range pg.procs {
   236  		procNames = append(procNames, proc.Name())
   237  	}
   238  	return procNames
   239  }
   240  
   241  func (pg *processGroup) PIDs() map[string]*int {
   242  	pids := map[string]*int{}
   243  	for _, proc := range pg.procs {
   244  		pids[proc.Name()] = proc.PID()
   245  	}
   246  	return pids
   247  }
   248  

View as plain text