1
2
3 package cmd
4
5 import (
6 "context"
7 "fmt"
8 "io"
9 "net"
10 "net/url"
11 "os/exec"
12 "path/filepath"
13 "strings"
14 "sync"
15 "time"
16
17 "github.com/Microsoft/go-winio"
18 "github.com/containerd/containerd/namespaces"
19 "github.com/pkg/errors"
20 "github.com/sirupsen/logrus"
21
22 "github.com/Microsoft/hcsshim/internal/log"
23 )
24
25 const (
26 binaryPipeFmt = `\\.\pipe\binary-%s-%s`
27 binaryCmdWaitTimeout = 10 * time.Second
28 binaryCmdStartTimeout = 10 * time.Second
29 )
30
31
32
33
34
35
36
37
38
39
40
41 func NewBinaryIO(ctx context.Context, id string, uri *url.URL) (_ UpstreamIO, err error) {
42 ns, err := namespaces.NamespaceRequired(ctx)
43 if err != nil {
44 ns = namespaces.Default
45 }
46
47 var stdoutPipe, stderrPipe, waitPipe io.ReadWriteCloser
48
49 stdoutPipePath := fmt.Sprintf(binaryPipeFmt, id, "stdout")
50 stdoutPipe, err = openNPipe(stdoutPipePath)
51 if err != nil {
52 return nil, err
53 }
54
55 stderrPipePath := fmt.Sprintf(binaryPipeFmt, id, "stderr")
56 stderrPipe, err = openNPipe(stderrPipePath)
57 if err != nil {
58 return nil, err
59 }
60
61 waitPipePath := fmt.Sprintf(binaryPipeFmt, id, "wait")
62 waitPipe, err = openNPipe(waitPipePath)
63 if err != nil {
64 return nil, err
65 }
66 defer func() {
67 if err := waitPipe.Close(); err != nil {
68 log.G(ctx).WithError(err).Errorf("error closing wait pipe: %s", waitPipePath)
69 }
70 }()
71
72 envs := []string{
73 "CONTAINER_ID=" + id,
74 "CONTAINER_NAMESPACE=" + ns,
75 "CONTAINER_STDOUT=" + stdoutPipePath,
76 "CONTAINER_STDERR=" + stderrPipePath,
77 "CONTAINER_WAIT=" + waitPipePath,
78 }
79 cmd, err := newBinaryCmd(ctx, uri, envs)
80 if err != nil {
81 return nil, err
82 }
83
84 if err := cmd.Start(); err != nil {
85 return nil, err
86 }
87
88 errCh := make(chan error, 1)
89
90 go func() {
91 b := make([]byte, 1)
92 if _, err := waitPipe.Read(b); err != nil && err != io.EOF {
93 errCh <- err
94 return
95 }
96 errCh <- nil
97 }()
98
99 select {
100 case err = <-errCh:
101 if err != nil {
102 return nil, errors.Wrap(err, "failed to start binary logger")
103 }
104 case <-time.After(binaryCmdStartTimeout):
105 return nil, errors.New("failed to start binary logger: timeout")
106 }
107
108 log.G(ctx).WithFields(logrus.Fields{
109 "containerID": id,
110 "containerNamespace": ns,
111 "binaryCmd": cmd.String(),
112 "binaryProcessID": cmd.Process.Pid,
113 }).Debug("binary io process started")
114
115 return &binaryIO{
116 cmd: cmd,
117 stdout: stdoutPipePath,
118 sout: stdoutPipe,
119 stderr: stderrPipePath,
120 serr: stderrPipe,
121 }, nil
122 }
123
124
125 func sanitizePath(uri *url.URL) string {
126 path := filepath.Clean(uri.Path)
127
128 if strings.Contains(path, `:\`) {
129 return strings.TrimPrefix(path, "\\")
130 }
131
132 return path
133 }
134
135 func newBinaryCmd(ctx context.Context, uri *url.URL, envs []string) (*exec.Cmd, error) {
136 if uri.Path == "" {
137 return nil, errors.New("no logging driver path provided")
138 }
139
140 var args []string
141 for k, vs := range uri.Query() {
142 args = append(args, k)
143 if len(vs) > 0 && vs[0] != "" {
144 args = append(args, vs[0])
145 }
146 }
147
148 execPath := sanitizePath(uri)
149
150 cmd := exec.CommandContext(ctx, execPath, args...)
151 cmd.Env = append(cmd.Env, envs...)
152
153 return cmd, nil
154 }
155
156 var _ UpstreamIO = &binaryIO{}
157
158
159 type binaryIO struct {
160 cmd *exec.Cmd
161
162 binaryCloser sync.Once
163
164 stdout, stderr string
165
166 sout, serr io.ReadWriteCloser
167 soutCloser sync.Once
168 }
169
170
171 func (b *binaryIO) Close(ctx context.Context) {
172 b.soutCloser.Do(func() {
173 if b.sout != nil {
174 err := b.sout.Close()
175 if err != nil {
176 log.G(ctx).WithError(err).Errorf("error while closing stdout npipe")
177 }
178 }
179 if b.serr != nil {
180 err := b.serr.Close()
181 if err != nil {
182 log.G(ctx).WithError(err).Errorf("error while closing stderr npipe")
183 }
184 }
185 })
186 b.binaryCloser.Do(func() {
187 done := make(chan error, 1)
188 go func() {
189 done <- b.cmd.Wait()
190 }()
191
192 select {
193 case err := <-done:
194 if err != nil {
195 log.G(ctx).WithError(err).Errorf("error while waiting for binary cmd to finish")
196 }
197 case <-time.After(binaryCmdWaitTimeout):
198 log.G(ctx).Errorf("timeout while waiting for binaryIO process to finish. Killing")
199 err := b.cmd.Process.Kill()
200 if err != nil {
201 log.G(ctx).WithError(err).Errorf("error while killing binaryIO process")
202 }
203 }
204 })
205 }
206
207 func (b *binaryIO) CloseStdin(_ context.Context) {}
208
209 func (b *binaryIO) Stdin() io.Reader {
210 return nil
211 }
212
213 func (b *binaryIO) StdinPath() string {
214 return ""
215 }
216
217 func (b *binaryIO) Stdout() io.Writer {
218 return b.sout
219 }
220
221 func (b *binaryIO) StdoutPath() string {
222 return b.stdout
223 }
224
225 func (b *binaryIO) Stderr() io.Writer {
226 return b.serr
227 }
228
229 func (b *binaryIO) StderrPath() string {
230 return b.stderr
231 }
232
233 func (b *binaryIO) Terminal() bool {
234 return false
235 }
236
237 type pipe struct {
238 l net.Listener
239 con net.Conn
240 conErr error
241 conWg sync.WaitGroup
242 }
243
244 func openNPipe(path string) (io.ReadWriteCloser, error) {
245 l, err := winio.ListenPipe(path, nil)
246 if err != nil {
247 return nil, err
248 }
249
250 p := &pipe{l: l}
251 p.conWg.Add(1)
252
253 go func() {
254 defer p.conWg.Done()
255 c, err := l.Accept()
256 if err != nil {
257 p.conErr = err
258 return
259 }
260 p.con = c
261 }()
262 return p, nil
263 }
264
265 func (p *pipe) Write(b []byte) (int, error) {
266 p.conWg.Wait()
267 if p.conErr != nil {
268 return 0, errors.Wrap(p.conErr, "connection error")
269 }
270 return p.con.Write(b)
271 }
272
273 func (p *pipe) Read(b []byte) (int, error) {
274 p.conWg.Wait()
275 if p.conErr != nil {
276 return 0, errors.Wrap(p.conErr, "connection error")
277 }
278 return p.con.Read(b)
279 }
280
281 func (p *pipe) Close() error {
282 if err := p.l.Close(); err != nil {
283 log.G(context.TODO()).WithError(err).Debug("error closing pipe listener")
284 }
285 p.conWg.Wait()
286 if p.con != nil {
287 return p.con.Close()
288 }
289 return p.conErr
290 }
291
View as plain text