1 package processmanager
2
3 import (
4 "context"
5 "errors"
6 "fmt"
7 "os"
8 "os/exec"
9 "strings"
10 "time"
11
12 "github.com/go-logr/logr"
13 "golang.org/x/sys/unix"
14 )
15
16 type ReadyCheckFunc func(context.Context) (bool, error)
17
18
19
20
21
22
23
24
25 type Process interface {
26
27 PID() *int
28
29
30
31 WithArgs(args ...string)
32
33
34
35
36 WithExpectNoExit()
37
38 ExpectsExit() bool
39
40 ProcessManager
41 }
42
43 type process struct {
44 processManager
45
46
47 path string
48
49 args []string
50
51 cmd *exec.Cmd
52
53
54 procExitChan chan error
55
56
57
58 expectNoExit bool
59 }
60
61
62 func NewProcess(name string, path string, args ...string) (Process, error) {
63 if _, err := os.ReadFile(path); err != nil {
64 return nil, fmt.Errorf("unable to find command %s: %w", path, err)
65 }
66
67 return &process{
68 processManager: processManager{
69 name: name,
70 resultChan: make(chan error, 1),
71 log: logr.Discard(),
72 vlog: logr.Discard(),
73 },
74 path: path,
75 args: args,
76 procExitChan: make(chan error),
77 }, nil
78 }
79
80 func (proc *process) Start(ctx context.Context) (err error) {
81 proc.Mutex.Lock()
82 defer proc.Mutex.Unlock()
83
84 if proc.isRunning {
85 return nil
86 }
87
88
89 defer func() {
90 err = proc.cleanupOnFailure(ctx, err)
91 }()
92
93 args := strings.Join(proc.args, " ")
94 proc.log.Info("running process", "args", args)
95
96
97
98 procCtx, cancel := context.WithCancel(ctx)
99 proc.cancel = cancel
100
101
102 proc.startContextHandler(procCtx, ctx)
103
104 if err := proc.executeHooks(procCtx, preStart); err != nil {
105 return err
106 }
107
108 if err := proc.startProcess(); err != nil {
109 return fmt.Errorf("unable to start %s process: %w", proc.Name(), err)
110 }
111
112 proc.vlog.Info("process is running", "PID", proc.PID(), "args", args)
113
114
115 proc.startExitHandler(procCtx)
116
117 if err := proc.executeHooks(procCtx, postStart); err != nil {
118 return err
119 }
120
121 if err := proc.waitUntilReadyWithTimeout(ctx); err != nil {
122 return fmt.Errorf("%s process is not ready: %w", proc.Name(), err)
123 }
124
125 proc.vlog.Info("process is ready", "PID", proc.PID(), "args", args)
126 proc.isRunning = true
127
128 return nil
129 }
130
131 func (proc *process) cleanupOnFailure(ctx context.Context, err error) error {
132 if err == nil {
133 return nil
134 }
135 proc.vlog.Info("starting process failed, cleaning up")
136 return errors.Join(err, proc.stop(ctx))
137 }
138
139 func (proc *process) startContextHandler(ctx, startCtx context.Context) {
140 if proc.skipContextHandling {
141 return
142 }
143 go func() {
144 if err := contextHandler(ctx, startCtx, proc, proc.log); err != nil {
145 proc.log.Error(err, "failed to shutdown")
146 }
147 }()
148 }
149
150
151
152 func (proc *process) startProcess() error {
153 proc.cmd = exec.Command(proc.path, proc.args...)
154
155 proc.cmd.Stdout = os.Stdout
156 proc.cmd.Stderr = os.Stderr
157
158 if err := proc.cmd.Start(); err != nil {
159 return err
160 }
161
162
163 go func() { proc.procExitChan <- proc.cmd.Wait() }()
164
165 return nil
166 }
167
168
169
170
171
172
173 func (proc *process) startExitHandler(ctx context.Context) {
174 go func() {
175 select {
176 case result := <-proc.procExitChan:
177 proc.vlog.Info("process has exited", "PID", proc.PID())
178 proc.resultChan <- errors.Join(
179 resultError(proc.Name(), result, proc.expectNoExit),
180 proc.Stop(ctx),
181 )
182 case <-ctx.Done():
183 return
184 }
185 }()
186 }
187
188 func (proc *process) Stop(ctx context.Context) error {
189 proc.Mutex.Lock()
190 defer proc.Mutex.Unlock()
191
192 if !proc.isRunning {
193 return nil
194 }
195
196 return proc.stop(ctx)
197 }
198
199 func (proc *process) stop(ctx context.Context) error {
200 pid := proc.PID()
201 proc.log.Info("stopping process", "PID", pid)
202
203
204 proc.cancel()
205
206 if err := proc.executeHooks(ctx, preStop); err != nil {
207 return err
208 }
209
210 if err := proc.stopProcess(); err != nil {
211 return fmt.Errorf("unable to stop process: %w", err)
212 }
213
214 if err := proc.executeHooks(ctx, postStop); err != nil {
215 return err
216 }
217
218 proc.vlog.Info("process has stopped", "PID", pid)
219 proc.isRunning = false
220
221 return nil
222 }
223
224
225
226 func (proc *process) stopProcess() error {
227 pid := proc.PID()
228 if pid == nil {
229 return nil
230 }
231
232
233 if err := proc.cmd.Process.Signal(unix.SIGTERM); err == os.ErrProcessDone {
234 return nil
235 } else if err != nil {
236 return fmt.Errorf("failed to send SIGTERM to process %s with PID=%d: %w", proc.path, pid, err)
237 }
238
239 if err := proc.waitForProcessExit(); err != nil {
240 return fmt.Errorf("process with PID=%d did not exit: %w", pid, err)
241 }
242
243 proc.cmd = nil
244 return nil
245 }
246
247
248 func (proc *process) waitForProcessExit() error {
249 select {
250 case err := <-proc.procExitChan:
251 if _, ok := err.(*exec.ExitError); ok || err == nil {
252 return nil
253 }
254 return fmt.Errorf("error received whilst exiting: %w", err)
255 case <-time.After(exitTimeout):
256 return fmt.Errorf("timeout reached")
257 }
258 }
259
260 func (proc *process) Restart(ctx context.Context) error {
261 if err := proc.Stop(ctx); err != nil {
262 return err
263 }
264 return proc.Start(ctx)
265 }
266
267 func (proc *process) WithLogger(log logr.Logger, verbose bool) {
268 proc.log = log.WithName(fmt.Sprintf("%s-process", proc.Name())).WithValues("process", proc.Name(), "path", proc.path)
269 if verbose {
270 proc.vlog = proc.log
271 }
272 }
273
274 func (proc *process) PID() *int {
275 if proc.cmd != nil && proc.cmd.Process != nil {
276 return &proc.cmd.Process.Pid
277 }
278 return nil
279 }
280
281 func (proc *process) WithArgs(args ...string) {
282 proc.args = args
283 }
284
285 func (proc *process) WithExpectNoExit() {
286 proc.expectNoExit = true
287 }
288
289 func (proc *process) ExpectsExit() bool {
290 return !proc.expectNoExit
291 }
292
View as plain text