1
2
3
4 package stdio
5
6 import (
7 "io"
8 "os"
9 "strings"
10 "sync"
11
12 "github.com/Microsoft/hcsshim/internal/guest/transport"
13 "github.com/pkg/errors"
14 "github.com/sirupsen/logrus"
15 )
16
17
18
19 type ConnectionSet struct {
20 In, Out, Err transport.Connection
21 }
22
23
24 func (s *ConnectionSet) Close() error {
25 var err error
26 if s.In != nil {
27 if cerr := s.In.Close(); cerr != nil {
28 err = errors.Wrap(cerr, "failed Close on stdin")
29 }
30 s.In = nil
31 }
32 if s.Out != nil {
33 if cerr := s.Out.Close(); cerr != nil && err == nil {
34 err = errors.Wrap(cerr, "failed Close on stdout")
35 }
36 s.Out = nil
37 }
38 if s.Err != nil {
39 if cerr := s.Err.Close(); cerr != nil && err == nil {
40 err = errors.Wrap(cerr, "failed Close on stderr")
41 }
42 s.Err = nil
43 }
44 return err
45 }
46
47
48
49 type FileSet struct {
50 In, Out, Err *os.File
51 }
52
53
54 func (fs *FileSet) Close() error {
55 var err error
56 if fs.In != nil {
57 if cerr := fs.In.Close(); cerr != nil {
58 err = errors.Wrap(cerr, "failed Close on stdin")
59 }
60 fs.In = nil
61 }
62 if fs.Out != nil {
63 if cerr := fs.Out.Close(); cerr != nil && err == nil {
64 err = errors.Wrap(cerr, "failed Close on stdout")
65 }
66 fs.Out = nil
67 }
68 if fs.Err != nil {
69 if cerr := fs.Err.Close(); cerr != nil && err == nil {
70 err = errors.Wrap(cerr, "failed Close on stderr")
71 }
72 fs.Err = nil
73 }
74 return err
75 }
76
77
78
79 func (s *ConnectionSet) Files() (_ *FileSet, err error) {
80 fs := &FileSet{}
81 defer func() {
82 if err != nil {
83 fs.Close()
84 }
85 }()
86 if s.In != nil {
87 fs.In, err = s.In.File()
88 if err != nil {
89 return nil, errors.Wrap(err, "failed to dup stdin socket for command")
90 }
91 }
92 if s.Out != nil {
93 fs.Out, err = s.Out.File()
94 if err != nil {
95 return nil, errors.Wrap(err, "failed to dup stdout socket for command")
96 }
97 }
98 if s.Err != nil {
99 fs.Err, err = s.Err.File()
100 if err != nil {
101 return nil, errors.Wrap(err, "failed to dup stderr socket for command")
102 }
103 }
104 return fs, nil
105 }
106
107
108
109 func NewPipeRelay(s *ConnectionSet) (_ *PipeRelay, err error) {
110 pr := &PipeRelay{s: s}
111 defer func() {
112 if err != nil {
113 pr.closePipes()
114 }
115 }()
116
117 if s == nil || s.In != nil {
118 pr.pipes[0], pr.pipes[1], err = os.Pipe()
119 if err != nil {
120 return nil, errors.Wrap(err, "failed to create stdin pipe relay")
121 }
122 }
123 if s == nil || s.Out != nil {
124 pr.pipes[2], pr.pipes[3], err = os.Pipe()
125 if err != nil {
126 return nil, errors.Wrap(err, "failed to create stdout pipe relay")
127 }
128 }
129 if s == nil || s.Err != nil {
130 pr.pipes[4], pr.pipes[5], err = os.Pipe()
131 if err != nil {
132 return nil, errors.Wrap(err, "failed to create stderr pipe relay")
133 }
134 }
135 return pr, nil
136 }
137
138
139
140 type PipeRelay struct {
141 wg sync.WaitGroup
142 s *ConnectionSet
143
144 pipes [6]*os.File
145 }
146
147
148
149 func (pr *PipeRelay) ReplaceConnectionSet(s *ConnectionSet) {
150 pr.s = s
151 }
152
153
154
155 func (pr *PipeRelay) Files() (*FileSet, error) {
156 fs := new(FileSet)
157 if pr.s == nil || pr.s.In != nil {
158 fs.In = pr.pipes[0]
159 }
160 if pr.s == nil || pr.s.Out != nil {
161 fs.Out = pr.pipes[3]
162 }
163 if pr.s == nil || pr.s.Err != nil {
164 fs.Err = pr.pipes[5]
165 }
166 return fs, nil
167 }
168
169 func copyAndCleanClose(c transport.Connection, r io.Reader, name string) {
170 if n, err := io.Copy(c, r); err != nil {
171 logrus.WithFields(logrus.Fields{
172 logrus.ErrorKey: err,
173 "bytes": n,
174 "file": name,
175 }).Error("opengcs::PipeRelay::copyAndCleanClose - error copying from pipe")
176 }
177
178
179
180 if err := c.CloseWrite(); err == nil {
181 var b [1]byte
182 _, err = c.Read(b[:])
183 if err == nil {
184 err = errors.New("unexpected data in socket")
185 }
186 if err != io.EOF {
187 logrus.WithFields(logrus.Fields{
188 logrus.ErrorKey: err,
189 "file": name,
190 }).Error("opengcs::PipeRelay::copyAndCleanClose - error reading for clean close")
191 }
192 } else {
193 logrus.WithFields(logrus.Fields{
194 logrus.ErrorKey: err,
195 "file": name,
196 }).Error("opengcs::PipeRelay::copyAndCleanClose - error shutting down socket")
197 }
198 if err := c.Close(); err != nil {
199 logrus.WithFields(logrus.Fields{
200 logrus.ErrorKey: err,
201 "file": name,
202 }).Error("opengcs::PipeRelay::copyAndCleanClose - error closing socket")
203 }
204 }
205
206
207
208 func (pr *PipeRelay) Start() {
209 if pr.s.In != nil {
210 pr.wg.Add(1)
211 go func() {
212 if n, err := io.Copy(pr.pipes[1], pr.s.In); err != nil {
213 logrus.WithFields(logrus.Fields{
214 logrus.ErrorKey: err,
215 "bytes": n,
216 }).Error("opengcs::PipeRelay::Start - error copying stdin to pipe")
217 }
218 if err := pr.pipes[1].Close(); err != nil {
219 logrus.WithFields(logrus.Fields{
220 logrus.ErrorKey: err,
221 }).Error("opengcs::PipeRelay::Start - error closing stdin write pipe")
222 }
223 pr.pipes[1] = nil
224 pr.wg.Done()
225 }()
226 }
227 if pr.s.Out != nil {
228 pr.wg.Add(1)
229 go func() {
230 copyAndCleanClose(pr.s.Out, pr.pipes[2], "stdout")
231 pr.wg.Done()
232 }()
233 }
234 if pr.s.Err != nil {
235 pr.wg.Add(1)
236 go func() {
237 copyAndCleanClose(pr.s.Err, pr.pipes[4], "stderr")
238 pr.wg.Done()
239 }()
240 }
241 }
242
243
244
245 func (pr *PipeRelay) Wait() {
246
247
248
249
250 if pr.s != nil && pr.s.In != nil {
251 _ = pr.s.In.CloseRead()
252 }
253
254 pr.wg.Wait()
255 pr.closePipes()
256 if pr.s != nil {
257 pr.s.Close()
258 }
259 }
260
261
262
263
264
265 func (pr *PipeRelay) CloseUnusedPipes() {
266 if pr.s == nil {
267 pr.closePipes()
268 } else {
269 if pr.s.In == nil {
270
271 pr.pipes[1].Close()
272 }
273 if pr.s.Out == nil {
274
275 pr.pipes[2].Close()
276 }
277 if pr.s.Err == nil {
278
279 pr.pipes[4].Close()
280 }
281 }
282 }
283
284 func (pr *PipeRelay) closePipes() {
285 for i := 0; i < len(pr.pipes); i++ {
286 if pr.pipes[i] != nil {
287 if err := pr.pipes[i].Close(); err != nil {
288 if !strings.Contains(err.Error(), "file already closed") {
289 logrus.WithFields(logrus.Fields{
290 logrus.ErrorKey: err,
291 }).Error("opengcs::PipeRelay::closePipes - error closing relay pipe")
292 }
293 }
294 pr.pipes[i] = nil
295 }
296 }
297 }
298
299
300 func NewTtyRelay(s *ConnectionSet, pty *os.File) *TtyRelay {
301 return &TtyRelay{s: s, pty: pty}
302 }
303
304
305 type TtyRelay struct {
306 m sync.Mutex
307 closed bool
308 wg sync.WaitGroup
309 s *ConnectionSet
310 pty *os.File
311 }
312
313
314
315 func (r *TtyRelay) ReplaceConnectionSet(s *ConnectionSet) {
316 r.s = s
317 }
318
319
320 func (r *TtyRelay) ResizeConsole(height, width uint16) error {
321 r.m.Lock()
322 defer r.m.Unlock()
323
324 if r.closed {
325 return nil
326 }
327 return ResizeConsole(r.pty, height, width)
328 }
329
330
331
332 func (r *TtyRelay) Start() {
333 if r.s.In != nil {
334 r.wg.Add(1)
335 go func() {
336 if _, err := io.Copy(r.pty, r.s.In); err != nil {
337 logrus.WithFields(logrus.Fields{
338 logrus.ErrorKey: err,
339 }).Error("opengcs::TtyRelay::Start - error copying stdin to pty")
340 }
341 r.wg.Done()
342 }()
343 }
344 if r.s.Out != nil {
345 r.wg.Add(1)
346 go func() {
347 if _, err := io.Copy(r.s.Out, r.pty); err != nil {
348 logrus.WithFields(logrus.Fields{
349 logrus.ErrorKey: err,
350 }).Error("opengcs::TtyRelay::Start - error copying pty to stdout")
351 }
352 r.wg.Done()
353 }()
354 }
355 }
356
357
358
359 func (r *TtyRelay) Wait() {
360
361
362
363
364 if r.s != nil && r.s.In != nil {
365 _ = r.s.In.CloseRead()
366 }
367
368
369 r.wg.Wait()
370
371 r.m.Lock()
372 defer r.m.Unlock()
373
374 r.pty.Close()
375 r.closed = true
376 if r.s != nil {
377 r.s.Close()
378 }
379 }
380
View as plain text