1 package processmanager
2
3 import (
4 "context"
5 "errors"
6 "fmt"
7
8 "github.com/go-logr/logr"
9 )
10
11
12
13
14
15
16
17
18
19
20
21
22 type ProcessGroup interface {
23
24 PIDs() map[string]*int
25
26
27 Processes() []string
28
29 ProcessManager
30 }
31
32 type processGroup struct {
33 processManager
34
35
36 procs []Process
37 }
38
39
40
41
42
43 func NewProcessGroup(name string, procs ...Process) (ProcessGroup, error) {
44 if err := validateGroupProcesses(procs); err != nil {
45 return nil, err
46 }
47
48
49
50 for _, proc := range procs {
51 proc.WithNoContextHandler()
52 }
53
54 return &processGroup{
55 processManager: processManager{
56 name: name,
57 resultChan: make(chan error, len(procs)),
58 log: logr.Discard(),
59 vlog: logr.Discard(),
60 },
61 procs: procs,
62 }, nil
63 }
64
65 func validateGroupProcesses(procs []Process) error {
66 for _, proc := range procs {
67 if proc.ExpectsExit() {
68 return fmt.Errorf("processes in group should be long-running %s is not", proc.Name())
69 }
70 }
71 return nil
72 }
73
74 func (pg *processGroup) Start(ctx context.Context) (err error) {
75 pg.Mutex.Lock()
76 defer pg.Mutex.Unlock()
77
78 if pg.isRunning {
79 return nil
80 }
81
82
83 defer func() {
84 err = pg.cleanupOnFailure(ctx, err)
85 }()
86
87 pg.log.Info("starting process group")
88
89
90
91 procCtx, cancel := context.WithCancel(ctx)
92 pg.cancel = cancel
93
94
95 pg.startContextHandler(procCtx, ctx)
96
97 if err := pg.executeHooks(procCtx, preStart); err != nil {
98 return err
99 }
100
101 if err := pg.startProcesses(ctx); err != nil {
102 return fmt.Errorf("failed to start processes: %w", err)
103 }
104
105 pg.vlog.Info("process group is running", "PIDs", pg.PIDs())
106
107
108 pg.startExitHandler(procCtx)
109
110 if err := pg.executeHooks(procCtx, postStart); err != nil {
111 return err
112 }
113
114 if err := pg.waitUntilReadyWithTimeout(ctx); err != nil {
115 return fmt.Errorf("%s process group is not ready: %w", pg.Name(), err)
116 }
117
118 pg.vlog.Info("process group is ready", "PIDs", pg.PIDs())
119 pg.isRunning = true
120
121 return nil
122 }
123
124 func (pg *processGroup) cleanupOnFailure(ctx context.Context, err error) error {
125 if err == nil {
126 return nil
127 }
128 pg.vlog.Info("starting process group failed, cleaning up")
129 return errors.Join(err, pg.stop(ctx))
130 }
131
132 func (pg *processGroup) startContextHandler(ctx, startCtx context.Context) {
133 if pg.skipContextHandling {
134 return
135 }
136 go func() {
137 go func() {
138 if err := contextHandler(ctx, startCtx, pg, pg.log); err != nil {
139 pg.log.Error(err, "failed to shutdown")
140 }
141 }()
142 }()
143 }
144
145 func (pg *processGroup) startProcesses(ctx context.Context) error {
146 for _, proc := range pg.procs {
147 if err := proc.Start(ctx); err != nil {
148 return err
149 }
150 }
151 return nil
152 }
153
154 func (pg *processGroup) startExitHandler(ctx context.Context) {
155 for _, proc := range pg.procs {
156 go func() {
157 select {
158 case result := <-proc.Result():
159 pg.vlog.Info("process in group has exited, stopping remaining processes", "process", proc.Name(), "PID", proc.PID())
160 pg.resultChan <- errors.Join(
161 resultError(pg.Name(), result, true),
162 pg.Stop(ctx),
163 )
164 case <-ctx.Done():
165 return
166 }
167 }()
168 }
169 }
170
171 func (pg *processGroup) Stop(ctx context.Context) error {
172 pg.Mutex.Lock()
173 defer pg.Mutex.Unlock()
174
175 if !pg.isRunning {
176 return nil
177 }
178
179 return pg.stop(ctx)
180 }
181
182 func (pg *processGroup) stop(ctx context.Context) error {
183 pg.log.Info("stopping process group", "PIDs", pg.PIDs())
184
185
186 pg.cancel()
187
188 if err := pg.executeHooks(ctx, preStop); err != nil {
189 return err
190 }
191
192 if err := pg.stopProcesses(ctx); err != nil {
193 return fmt.Errorf("unable to stop process group: %w", err)
194 }
195
196 if err := pg.executeHooks(ctx, postStop); err != nil {
197 return err
198 }
199
200 pg.vlog.Info("process group has stopped", "PIDs", pg.PIDs())
201 pg.isRunning = false
202
203 return nil
204 }
205
206 func (pg *processGroup) stopProcesses(ctx context.Context) error {
207 for idx := len(pg.procs) - 1; idx >= 0; idx-- {
208 proc := pg.procs[idx]
209 if err := proc.Stop(ctx); err != nil {
210 return err
211 }
212 }
213 return nil
214 }
215
216 func (pg *processGroup) Restart(ctx context.Context) error {
217 if err := pg.Stop(ctx); err != nil {
218 return err
219 }
220 return pg.Start(ctx)
221 }
222
223 func (pg *processGroup) WithLogger(log logr.Logger, verbose bool) {
224 for _, proc := range pg.procs {
225 proc.WithLogger(log, verbose)
226 }
227 pg.log = log.WithName(fmt.Sprintf("%s-processgroup", pg.Name())).WithValues("processgroup", pg.Name(), "processes", pg.Processes())
228 if verbose {
229 pg.vlog = pg.log
230 }
231 }
232
233 func (pg *processGroup) Processes() []string {
234 procNames := []string{}
235 for _, proc := range pg.procs {
236 procNames = append(procNames, proc.Name())
237 }
238 return procNames
239 }
240
241 func (pg *processGroup) PIDs() map[string]*int {
242 pids := map[string]*int{}
243 for _, proc := range pg.procs {
244 pids[proc.Name()] = proc.PID()
245 }
246 return pids
247 }
248
View as plain text