1
16
17 package process
18
19 import (
20 "crypto/tls"
21 "fmt"
22 "io"
23 "net"
24 "net/http"
25 "net/url"
26 "os"
27 "os/exec"
28 "path"
29 "regexp"
30 "sync"
31 "syscall"
32 "time"
33 )
34
35
36 type ListenAddr struct {
37 Address string
38 Port string
39 }
40
41
42 func (l *ListenAddr) URL(scheme string, path string) *url.URL {
43 return &url.URL{
44 Scheme: scheme,
45 Host: l.HostPort(),
46 Path: path,
47 }
48 }
49
50
51 func (l *ListenAddr) HostPort() string {
52 return net.JoinHostPort(l.Address, l.Port)
53 }
54
55
56
57 type HealthCheck struct {
58 url.URL
59
60
61
62
63
64 PollInterval time.Duration
65 }
66
67
68 type State struct {
69 Cmd *exec.Cmd
70
71
72
73
74
75 HealthCheck HealthCheck
76
77 Args []string
78
79 StopTimeout time.Duration
80 StartTimeout time.Duration
81
82 Dir string
83 DirNeedsCleaning bool
84 Path string
85
86
87
88 ready bool
89
90
91
92 waitDone chan struct{}
93 errMu sync.Mutex
94 exitErr error
95 exited bool
96 }
97
98
99
100
101
102 func (ps *State) Init(name string) error {
103 if ps.Path == "" {
104 if name == "" {
105 return fmt.Errorf("must have at least one of name or path")
106 }
107 ps.Path = BinPathFinder(name, "")
108 }
109
110 if ps.Dir == "" {
111 newDir, err := os.MkdirTemp("", "k8s_test_framework_")
112 if err != nil {
113 return err
114 }
115 ps.Dir = newDir
116 ps.DirNeedsCleaning = true
117 }
118
119 if ps.StartTimeout == 0 {
120 ps.StartTimeout = 20 * time.Second
121 }
122
123 if ps.StopTimeout == 0 {
124 ps.StopTimeout = 20 * time.Second
125 }
126 return nil
127 }
128
129 type stopChannel chan struct{}
130
131
132
133
134 func (ps *State) CheckFlag(flag string) (bool, error) {
135 cmd := exec.Command(ps.Path, "--help")
136 outContents, err := cmd.CombinedOutput()
137 if err != nil {
138 return false, fmt.Errorf("unable to run command %q to check for flag %q: %w", ps.Path, flag, err)
139 }
140 pat := `(?m)^\s*--` + flag + `\b`
141 matched, err := regexp.Match(pat, outContents)
142 if err != nil {
143 return false, fmt.Errorf("unable to check command %q for flag %q in help output: %w", ps.Path, flag, err)
144 }
145 return matched, nil
146 }
147
148
149
150 func (ps *State) Start(stdout, stderr io.Writer) (err error) {
151 if ps.ready {
152 return nil
153 }
154
155 ps.Cmd = exec.Command(ps.Path, ps.Args...)
156 ps.Cmd.Stdout = stdout
157 ps.Cmd.Stderr = stderr
158 ps.Cmd.SysProcAttr = GetSysProcAttr()
159
160 ready := make(chan bool)
161 timedOut := time.After(ps.StartTimeout)
162 pollerStopCh := make(stopChannel)
163 go pollURLUntilOK(ps.HealthCheck.URL, ps.HealthCheck.PollInterval, ready, pollerStopCh)
164
165 ps.waitDone = make(chan struct{})
166
167 if err := ps.Cmd.Start(); err != nil {
168 ps.errMu.Lock()
169 defer ps.errMu.Unlock()
170 ps.exited = true
171 return err
172 }
173 go func() {
174 defer close(ps.waitDone)
175 err := ps.Cmd.Wait()
176
177 ps.errMu.Lock()
178 defer ps.errMu.Unlock()
179 ps.exitErr = err
180 ps.exited = true
181 }()
182
183 select {
184 case <-ready:
185 ps.ready = true
186 return nil
187 case <-ps.waitDone:
188 close(pollerStopCh)
189 return fmt.Errorf("timeout waiting for process %s to start successfully "+
190 "(it may have failed to start, or stopped unexpectedly before becoming ready)",
191 path.Base(ps.Path))
192 case <-timedOut:
193 close(pollerStopCh)
194 if ps.Cmd != nil {
195
196 ps.Cmd.Process.Signal(syscall.SIGTERM)
197 }
198 return fmt.Errorf("timeout waiting for process %s to start", path.Base(ps.Path))
199 }
200 }
201
202
203
204
205 func (ps *State) Exited() (bool, error) {
206 ps.errMu.Lock()
207 defer ps.errMu.Unlock()
208 return ps.exited, ps.exitErr
209 }
210
211 func pollURLUntilOK(url url.URL, interval time.Duration, ready chan bool, stopCh stopChannel) {
212 client := &http.Client{
213 Transport: &http.Transport{
214 TLSClientConfig: &tls.Config{
215
216
217
218 InsecureSkipVerify: true,
219 },
220 },
221 }
222 if interval <= 0 {
223 interval = 100 * time.Millisecond
224 }
225 for {
226 res, err := client.Get(url.String())
227 if err == nil {
228 res.Body.Close()
229 if res.StatusCode == http.StatusOK {
230 ready <- true
231 return
232 }
233 }
234
235 select {
236 case <-stopCh:
237 return
238 default:
239 time.Sleep(interval)
240 }
241 }
242 }
243
244
245
246 func (ps *State) Stop() error {
247
248 defer func() {
249 if ps.DirNeedsCleaning {
250 _ = os.RemoveAll(ps.Dir)
251 }
252 }()
253 if ps.Cmd == nil {
254 return nil
255 }
256 if done, _ := ps.Exited(); done {
257 return nil
258 }
259 if err := ps.Cmd.Process.Signal(syscall.SIGTERM); err != nil {
260 return fmt.Errorf("unable to signal for process %s to stop: %w", ps.Path, err)
261 }
262
263 timedOut := time.After(ps.StopTimeout)
264
265 select {
266 case <-ps.waitDone:
267 break
268 case <-timedOut:
269 if err := ps.Cmd.Process.Signal(syscall.SIGKILL); err != nil {
270 return fmt.Errorf("unable to kill process %s: %w", ps.Path, err)
271 }
272 return fmt.Errorf("timeout waiting for process %s to stop", path.Base(ps.Path))
273 }
274 ps.ready = false
275 return nil
276 }
277
View as plain text