1
2
3 package cmd
4
5 import (
6 "context"
7 "fmt"
8 "io"
9 "math/rand"
10 "net"
11 "sync"
12 "syscall"
13 "time"
14
15 winio "github.com/Microsoft/go-winio"
16 "github.com/Microsoft/go-winio/pkg/guid"
17 "github.com/Microsoft/hcsshim/internal/log"
18 "github.com/cenkalti/backoff/v4"
19 "github.com/sirupsen/logrus"
20 "golang.org/x/sys/windows"
21 )
22
23 func init() {
24
25 rand.Seed(time.Now().UnixNano())
26 }
27
28
29
30
31 func NewNpipeIO(ctx context.Context, stdin, stdout, stderr string, terminal bool, retryTimeout time.Duration) (_ UpstreamIO, err error) {
32 log.G(ctx).WithFields(logrus.Fields{
33 "stdin": stdin,
34 "stdout": stdout,
35 "stderr": stderr,
36 "terminal": terminal,
37 }).Debug("NewNpipeIO")
38
39 nio := &npipeio{
40 stdin: stdin,
41 stdout: stdout,
42 stderr: stderr,
43 terminal: terminal,
44 }
45 defer func() {
46 if err != nil {
47 nio.Close(ctx)
48 }
49 }()
50
51 if stdin != "" {
52 c, err := winio.DialPipeContext(ctx, stdin)
53 if err != nil {
54 return nil, err
55 }
56
57
58
59
60 nio.sin = c
61 }
62 if stdout != "" {
63 c, err := winio.DialPipeContext(ctx, stdout)
64 if err != nil {
65 return nil, err
66 }
67 nio.sout = &nPipeRetryWriter{ctx, c, stdout, newBackOff(retryTimeout)}
68 }
69 if stderr != "" {
70 c, err := winio.DialPipeContext(ctx, stderr)
71 if err != nil {
72 return nil, err
73 }
74 nio.serr = &nPipeRetryWriter{ctx, c, stderr, newBackOff(retryTimeout)}
75 }
76 return nio, nil
77 }
78
79
80
81 type nPipeRetryWriter struct {
82 ctx context.Context
83 net.Conn
84 pipePath string
85 backOff backoff.BackOff
86 }
87
88
89
90 func newBackOff(timeout time.Duration) backoff.BackOff {
91 return &backoff.ExponentialBackOff{
92
93 InitialInterval: time.Millisecond * 200,
94 RandomizationFactor: backoff.DefaultRandomizationFactor,
95 Multiplier: backoff.DefaultMultiplier,
96
97
98 MaxInterval: time.Minute * 1,
99
100 MaxElapsedTime: timeout,
101 Stop: backoff.Stop,
102 Clock: backoff.SystemClock,
103 }
104 }
105
106 func (nprw *nPipeRetryWriter) Write(p []byte) (n int, err error) {
107 var currBufPos int
108 for {
109
110
111 n, err = nprw.Conn.Write(p[currBufPos:])
112 currBufPos += n
113 if err != nil {
114
115 if isDisconnectedErr(err) {
116
117 log.G(nprw.ctx).WithFields(logrus.Fields{
118 "address": nprw.pipePath,
119 logrus.ErrorKey: err,
120 }).Error("Named pipe disconnected, retrying dial")
121
122
123 nprw.Conn.Close()
124 newConn, retryErr := nprw.retryDialPipe()
125 if retryErr == nil {
126 log.G(nprw.ctx).WithField("address", nprw.pipePath).Info("Succeeded in reconnecting to named pipe")
127
128 nprw.Conn = newConn
129 continue
130 }
131 err = retryErr
132 }
133 }
134 return currBufPos, err
135 }
136 }
137
138
139
140
141
142 func (nprw *nPipeRetryWriter) retryDialPipe() (net.Conn, error) {
143
144
145 nprw.backOff.Reset()
146 for {
147 backOffTime := nprw.backOff.NextBackOff()
148
149
150
151 conn, err := winio.DialPipe(nprw.pipePath, nil)
152 if err == nil {
153 return conn, nil
154 }
155
156 if backOffTime == backoff.Stop {
157 return nil, fmt.Errorf("reached timeout while retrying dial on %s", nprw.pipePath)
158 }
159 time.Sleep(backOffTime)
160 }
161 }
162
163
164
165 func isDisconnectedErr(err error) bool {
166 if serr, ok := err.(syscall.Errno); ok {
167
168 return serr == windows.ERROR_NO_DATA || serr == windows.ERROR_PIPE_NOT_CONNECTED || serr == windows.ERROR_BROKEN_PIPE
169 }
170 return false
171 }
172
173 var _ = (UpstreamIO)(&npipeio{})
174
175 type npipeio struct {
176
177
178
179 stdin, stdout, stderr string
180
181
182
183 terminal bool
184
185
186
187
188
189 sin io.ReadCloser
190 sinCloser sync.Once
191
192
193
194
195
196 sout, serr io.WriteCloser
197 outErrCloser sync.Once
198 }
199
200 func (nio *npipeio) Close(ctx context.Context) {
201 nio.sinCloser.Do(func() {
202 if nio.sin != nil {
203 log.G(ctx).Debug("npipeio::sinCloser")
204 nio.sin.Close()
205 }
206 })
207 nio.outErrCloser.Do(func() {
208 if nio.sout != nil {
209 log.G(ctx).Debug("npipeio::outErrCloser - stdout")
210 nio.sout.Close()
211 }
212 if nio.serr != nil {
213 log.G(ctx).Debug("npipeio::outErrCloser - stderr")
214 nio.serr.Close()
215 }
216 })
217 }
218
219 func (nio *npipeio) CloseStdin(ctx context.Context) {
220 nio.sinCloser.Do(func() {
221 if nio.sin != nil {
222 log.G(ctx).Debug("npipeio::sinCloser")
223 nio.sin.Close()
224 }
225 })
226 }
227
228 func (nio *npipeio) Stdin() io.Reader {
229 return nio.sin
230 }
231
232 func (nio *npipeio) StdinPath() string {
233 return nio.stdin
234 }
235
236 func (nio *npipeio) Stdout() io.Writer {
237 return nio.sout
238 }
239
240 func (nio *npipeio) StdoutPath() string {
241 return nio.stdout
242 }
243
244 func (nio *npipeio) Stderr() io.Writer {
245 return nio.serr
246 }
247
248 func (nio *npipeio) StderrPath() string {
249 return nio.stderr
250 }
251
252 func (nio *npipeio) Terminal() bool {
253 return nio.terminal
254 }
255
256
257
258
259
260
261 func CreatePipeAndListen(f interface{}, in bool) (string, error) {
262 p, l, err := CreateNamedPipeListener()
263 if err != nil {
264 return "", err
265 }
266 go func() {
267 c, err := l.Accept()
268 if err != nil {
269 logrus.WithError(err).Error("failed to accept pipe")
270 return
271 }
272
273 if in {
274 _, _ = io.Copy(c, f.(io.Reader))
275 c.Close()
276 } else {
277 _, _ = io.Copy(f.(io.Writer), c)
278 }
279 }()
280 return p, nil
281 }
282
283
284
285 func CreateNamedPipeListener() (string, net.Listener, error) {
286 g, err := guid.NewV4()
287 if err != nil {
288 return "", nil, err
289 }
290 p := `\\.\pipe\` + g.String()
291 l, err := winio.ListenPipe(p, nil)
292 if err != nil {
293 return "", nil, err
294 }
295 return p, l, nil
296 }
297
View as plain text