1 package processmanager
2
3 import (
4 "context"
5 "fmt"
6 "sync"
7 "time"
8
9 "github.com/go-logr/logr"
10 "k8s.io/apimachinery/pkg/util/wait"
11 )
12
13 const (
14 startupTimeout = time.Second * 30
15 exitTimeout = time.Second * 10
16 )
17
18 type hookType string
19
20 const (
21 preStart hookType = "pre-start"
22 postStart hookType = "post-start"
23 preStop hookType = "pre-stop"
24 postStop hookType = "post-stop"
25 )
26
27
28
29
30
31
32
33
34
35
36
37 type HookFunc func(ctx context.Context) error
38
39
40 type ProcessManager interface {
41 Name() string
42
43
44
45
46
47
48 Start(context.Context) error
49
50
51 Stop(context.Context) error
52
53 Restart(context.Context) error
54
55
56
57 Result() <-chan error
58
59
60
61
62 WithLogger(log logr.Logger, verbose bool)
63
64
65
66
67
68 WithPreStartHooks(...HookFunc)
69
70
71
72
73 WithPostStartHooks(...HookFunc)
74
75
76 WithPreStopHooks(...HookFunc)
77
78
79 WithPostStopHooks(...HookFunc)
80
81
82
83 WithNoContextHandler()
84
85
86
87
88 IsReady(context.Context) (bool, error)
89
90
91 WithReadyCheck(ReadyCheckFunc)
92
93
94
95
96 WithWaitUntilReady(timeout time.Duration)
97
98
99 WaitUntilReady(context.Context) error
100
101
102 WaitUntilStopped(context.Context) error
103 }
104
105 type processManager struct {
106
107 name string
108
109
110 preStartHooks []HookFunc
111
112 postStartHooks []HookFunc
113
114 preStopHooks []HookFunc
115
116 postStopHooks []HookFunc
117
118
119 log logr.Logger
120
121 vlog logr.Logger
122
123
124
125 resultChan chan error
126
127
128 cancel context.CancelFunc
129
130
131 skipContextHandling bool
132
133
134 readyCheck ReadyCheckFunc
135
136
137 waitUntilReady bool
138
139 waitUntilReadyTimeout time.Duration
140
141
142 isRunning bool
143
144
145 sync.Mutex
146 }
147
148 func (pm *processManager) Name() string {
149 return pm.name
150 }
151
152 func (pm *processManager) Result() <-chan error {
153 return pm.resultChan
154 }
155
156 func (pm *processManager) WithPreStartHooks(hooks ...HookFunc) {
157 pm.preStartHooks = hooks
158 }
159
160 func (pm *processManager) WithPostStartHooks(hooks ...HookFunc) {
161 pm.postStartHooks = hooks
162 }
163
164 func (pm *processManager) WithPreStopHooks(hooks ...HookFunc) {
165 pm.preStopHooks = hooks
166 }
167
168 func (pm *processManager) WithPostStopHooks(hooks ...HookFunc) {
169 pm.postStopHooks = hooks
170 }
171
172
173 func (pm *processManager) executeHooks(ctx context.Context, typ hookType) error {
174 switch typ {
175 case preStart:
176 return executeHookFuncs(ctx, pm.Name(), preStart, pm.preStartHooks)
177 case postStart:
178 return executeHookFuncs(ctx, pm.Name(), postStart, pm.postStartHooks)
179 case preStop:
180 return executeHookFuncs(ctx, pm.Name(), preStop, pm.preStopHooks)
181 case postStop:
182 return executeHookFuncs(ctx, pm.Name(), postStop, pm.postStopHooks)
183 }
184 return nil
185 }
186
187 func executeHookFuncs(ctx context.Context, name string, typ hookType, hooks []HookFunc) error {
188 for idx, hook := range hooks {
189 if err := hook(ctx); err != nil {
190 return fmt.Errorf("failed to execute %s hook %d for %s: %w", typ, idx+1, name, err)
191 }
192 }
193 return nil
194 }
195
196 func (pm *processManager) WithNoContextHandler() {
197 pm.skipContextHandling = true
198 }
199
200 func (pm *processManager) IsReady(ctx context.Context) (bool, error) {
201 if pm.readyCheck != nil {
202 return pm.readyCheck(ctx)
203 }
204
205 if !pm.isRunning {
206 return false, fmt.Errorf("%s process is not running", pm.Name())
207 }
208
209 return true, nil
210 }
211
212 func (pm *processManager) WithReadyCheck(readyCheck ReadyCheckFunc) {
213 pm.readyCheck = readyCheck
214 }
215
216 func (pm *processManager) WithWaitUntilReady(timeout time.Duration) {
217 pm.waitUntilReady = true
218 pm.waitUntilReadyTimeout = timeout
219 }
220
221 func (pm *processManager) waitUntilReadyWithTimeout(ctx context.Context) error {
222 if !pm.waitUntilReady || pm.readyCheck == nil {
223 return nil
224 }
225
226
227 waitCtx, cancel := context.WithTimeout(ctx, pm.waitUntilReadyTimeout)
228 defer cancel()
229
230 return pm.WaitUntilReady(waitCtx)
231 }
232
233 func (pm *processManager) WaitUntilReady(ctx context.Context) (err error) {
234 pollErr := wait.PollUntilContextCancel(ctx, time.Second, true, func(ctx context.Context) (ready bool, pollErr error) {
235 if ready, err = pm.IsReady(ctx); ready {
236 return true, nil
237 }
238 return false, nil
239 })
240
241 if pollErr != nil {
242 return err
243 }
244
245 return nil
246 }
247
248 func (pm *processManager) WaitUntilStopped(ctx context.Context) error {
249 if err := wait.PollUntilContextCancel(ctx, time.Second, true, func(context.Context) (bool, error) {
250 ready, err := pm.IsReady(ctx)
251 return !ready, err
252 }); err != nil {
253 return fmt.Errorf("%s did not stop running: %w", pm.Name(), err)
254 }
255 return nil
256 }
257
258
259 func contextHandler(ctx, startCtx context.Context, pm ProcessManager, log logr.Logger) error {
260
261
262 <-ctx.Done()
263
264 select {
265 case <-startCtx.Done():
266
267
268 log.Info("context cancelled: shutting down")
269 stopCtx, cancel := context.WithTimeout(context.Background(), exitTimeout)
270 defer cancel()
271 return pm.Stop(stopCtx)
272 default:
273
274
275 return nil
276 }
277 }
278
279 func resultError(name string, result error, expectNoExit bool) error {
280 if result != nil {
281 return fmt.Errorf("%s exited with error: %w", name, result)
282 }
283 if expectNoExit {
284 return fmt.Errorf("%s exited unexpectedly", name)
285 }
286 return nil
287 }
288
View as plain text