package processmanager import ( "context" "errors" "fmt" "github.com/go-logr/logr" ) // ProcessGroup handles the running of a group of long-running processes. // // Starting the group will start each process in the order // they were added to the group. // // Calling stop stops each process in the reverse-order that // they were added to the group. // // If any of the processes exit, all other processes will be // stopped and the result will be sent to the channel returned // by Result(). type ProcessGroup interface { // The PID of the processes in the group (nil if process is not running). PIDs() map[string]*int // The names of the processes in the group. Processes() []string ProcessManager } type processGroup struct { processManager // The Process instances in the group. procs []Process } // Creates a new process group for the given processes. // // The processes in the group must be long-running, i.e. // configured with WithExpectNoExit(). func NewProcessGroup(name string, procs ...Process) (ProcessGroup, error) { if err := validateGroupProcesses(procs); err != nil { return nil, err } // disable context handler for processes to let // ProcessGroup handle their shutdown for _, proc := range procs { proc.WithNoContextHandler() } return &processGroup{ processManager: processManager{ name: name, resultChan: make(chan error, len(procs)), log: logr.Discard(), vlog: logr.Discard(), }, procs: procs, }, nil } func validateGroupProcesses(procs []Process) error { for _, proc := range procs { if proc.ExpectsExit() { return fmt.Errorf("processes in group should be long-running %s is not", proc.Name()) } } return nil } func (pg *processGroup) Start(ctx context.Context) (err error) { pg.Mutex.Lock() defer pg.Mutex.Unlock() if pg.isRunning { return nil } // cleanup processes and long-running threads if we fail defer func() { err = pg.cleanupOnFailure(ctx, err) }() pg.log.Info("starting process group") // create child context so we can cancel long-running threads // when Stop() is called procCtx, cancel := context.WithCancel(ctx) pg.cancel = cancel // start context handler to call Stop() if ctx is cancelled pg.startContextHandler(procCtx, ctx) if err := pg.executeHooks(procCtx, preStart); err != nil { return err } if err := pg.startProcesses(ctx); err != nil { return fmt.Errorf("failed to start processes: %w", err) } pg.vlog.Info("process group is running", "PIDs", pg.PIDs()) // start exit-handler to handle any processes exiting unexpectedly pg.startExitHandler(procCtx) if err := pg.executeHooks(procCtx, postStart); err != nil { return err } if err := pg.waitUntilReadyWithTimeout(ctx); err != nil { return fmt.Errorf("%s process group is not ready: %w", pg.Name(), err) } pg.vlog.Info("process group is ready", "PIDs", pg.PIDs()) pg.isRunning = true return nil } func (pg *processGroup) cleanupOnFailure(ctx context.Context, err error) error { if err == nil { return nil } pg.vlog.Info("starting process group failed, cleaning up") return errors.Join(err, pg.stop(ctx)) } func (pg *processGroup) startContextHandler(ctx, startCtx context.Context) { if pg.skipContextHandling { return } go func() { go func() { if err := contextHandler(ctx, startCtx, pg, pg.log); err != nil { pg.log.Error(err, "failed to shutdown") } }() }() } func (pg *processGroup) startProcesses(ctx context.Context) error { for _, proc := range pg.procs { if err := proc.Start(ctx); err != nil { return err } } return nil } func (pg *processGroup) startExitHandler(ctx context.Context) { for _, proc := range pg.procs { go func() { select { case result := <-proc.Result(): pg.vlog.Info("process in group has exited, stopping remaining processes", "process", proc.Name(), "PID", proc.PID()) pg.resultChan <- errors.Join( resultError(pg.Name(), result, true), pg.Stop(ctx), ) case <-ctx.Done(): return } }() } } func (pg *processGroup) Stop(ctx context.Context) error { pg.Mutex.Lock() defer pg.Mutex.Unlock() if !pg.isRunning { return nil } return pg.stop(ctx) } func (pg *processGroup) stop(ctx context.Context) error { pg.log.Info("stopping process group", "PIDs", pg.PIDs()) // cancel exit-handler and any hooks that are still running pg.cancel() if err := pg.executeHooks(ctx, preStop); err != nil { return err } if err := pg.stopProcesses(ctx); err != nil { return fmt.Errorf("unable to stop process group: %w", err) } if err := pg.executeHooks(ctx, postStop); err != nil { return err } pg.vlog.Info("process group has stopped", "PIDs", pg.PIDs()) pg.isRunning = false return nil } func (pg *processGroup) stopProcesses(ctx context.Context) error { for idx := len(pg.procs) - 1; idx >= 0; idx-- { proc := pg.procs[idx] if err := proc.Stop(ctx); err != nil { return err } } return nil } func (pg *processGroup) Restart(ctx context.Context) error { if err := pg.Stop(ctx); err != nil { return err } return pg.Start(ctx) } func (pg *processGroup) WithLogger(log logr.Logger, verbose bool) { for _, proc := range pg.procs { proc.WithLogger(log, verbose) } pg.log = log.WithName(fmt.Sprintf("%s-processgroup", pg.Name())).WithValues("processgroup", pg.Name(), "processes", pg.Processes()) if verbose { pg.vlog = pg.log } } func (pg *processGroup) Processes() []string { procNames := []string{} for _, proc := range pg.procs { procNames = append(procNames, proc.Name()) } return procNames } func (pg *processGroup) PIDs() map[string]*int { pids := map[string]*int{} for _, proc := range pg.procs { pids[proc.Name()] = proc.PID() } return pids }