...

Source file src/github.com/Microsoft/hcsshim/internal/cmd/io_npipe.go

Documentation: github.com/Microsoft/hcsshim/internal/cmd

     1  //go:build windows
     2  
     3  package cmd
     4  
     5  import (
     6  	"context"
     7  	"fmt"
     8  	"io"
     9  	"math/rand"
    10  	"net"
    11  	"sync"
    12  	"syscall"
    13  	"time"
    14  
    15  	winio "github.com/Microsoft/go-winio"
    16  	"github.com/Microsoft/go-winio/pkg/guid"
    17  	"github.com/Microsoft/hcsshim/internal/log"
    18  	"github.com/cenkalti/backoff/v4"
    19  	"github.com/sirupsen/logrus"
    20  	"golang.org/x/sys/windows"
    21  )
    22  
    23  func init() {
    24  	// Need to seed for the rng in backoff.NextBackoff()
    25  	rand.Seed(time.Now().UnixNano())
    26  }
    27  
    28  // NewNpipeIO creates connected upstream io. It is the callers responsibility to validate that `if terminal == true`, `stderr == ""`. retryTimeout
    29  // refers to the timeout used to try and reconnect to the server end of the named pipe if the connection is severed. A value of 0 for retryTimeout
    30  // is treated as an infinite timeout.
    31  func NewNpipeIO(ctx context.Context, stdin, stdout, stderr string, terminal bool, retryTimeout time.Duration) (_ UpstreamIO, err error) {
    32  	log.G(ctx).WithFields(logrus.Fields{
    33  		"stdin":    stdin,
    34  		"stdout":   stdout,
    35  		"stderr":   stderr,
    36  		"terminal": terminal,
    37  	}).Debug("NewNpipeIO")
    38  
    39  	nio := &npipeio{
    40  		stdin:    stdin,
    41  		stdout:   stdout,
    42  		stderr:   stderr,
    43  		terminal: terminal,
    44  	}
    45  	defer func() {
    46  		if err != nil {
    47  			nio.Close(ctx)
    48  		}
    49  	}()
    50  
    51  	if stdin != "" {
    52  		c, err := winio.DialPipeContext(ctx, stdin)
    53  		if err != nil {
    54  			return nil, err
    55  		}
    56  		// We don't have any retry logic for stdin as there's no good way to detect that we'd even need to retry. If the process forwarding
    57  		// stdin to the container (some client interface to exec a process in a container) exited, we'll get EOF which io.Copy treats as
    58  		// success. For fifos on Linux it seems if all fd's for the write end of the pipe disappear, which is the same scenario, then
    59  		// the read end will get EOF as well.
    60  		nio.sin = c
    61  	}
    62  	if stdout != "" {
    63  		c, err := winio.DialPipeContext(ctx, stdout)
    64  		if err != nil {
    65  			return nil, err
    66  		}
    67  		nio.sout = &nPipeRetryWriter{ctx, c, stdout, newBackOff(retryTimeout)}
    68  	}
    69  	if stderr != "" {
    70  		c, err := winio.DialPipeContext(ctx, stderr)
    71  		if err != nil {
    72  			return nil, err
    73  		}
    74  		nio.serr = &nPipeRetryWriter{ctx, c, stderr, newBackOff(retryTimeout)}
    75  	}
    76  	return nio, nil
    77  }
    78  
    79  // nPipeRetryWriter is an io.Writer that wraps a net.Conn representing a named pipe connection. The retry logic is specifically only for
    80  // disconnect scenarios (pipe broken, server went away etc.) to attempt to re-establish a connection, and is not for retrying writes on a busy pipe.
    81  type nPipeRetryWriter struct {
    82  	ctx context.Context
    83  	net.Conn
    84  	pipePath string
    85  	backOff  backoff.BackOff
    86  }
    87  
    88  // newBackOff returns a new BackOff interface. The values chosen are fairly conservative, the main use is to get a somewhat random
    89  // retry timeout on each ask. This can help avoid flooding a server all at once.
    90  func newBackOff(timeout time.Duration) backoff.BackOff {
    91  	return &backoff.ExponentialBackOff{
    92  		// First backoff timeout will be somewhere in the 100 - 300 ms range given the default multiplier.
    93  		InitialInterval:     time.Millisecond * 200,
    94  		RandomizationFactor: backoff.DefaultRandomizationFactor,
    95  		Multiplier:          backoff.DefaultMultiplier,
    96  		// Set the max interval to a minute, seems like a sane value. We don't know how long the server will be down for, and if we reached
    97  		// this point it's been down for quite awhile.
    98  		MaxInterval: time.Minute * 1,
    99  		// `backoff.ExponentialBackoff` treats a 0 timeout as infinite, which is ideal as it's the logic we desire.
   100  		MaxElapsedTime: timeout,
   101  		Stop:           backoff.Stop,
   102  		Clock:          backoff.SystemClock,
   103  	}
   104  }
   105  
   106  func (nprw *nPipeRetryWriter) Write(p []byte) (n int, err error) {
   107  	var currBufPos int
   108  	for {
   109  		// p[currBufPos:] to handle a case where we wrote n bytes but got disconnected and now we just need to write the rest of the buffer. If this is the
   110  		// first write then the current position is 0 so we just try and write the whole buffer as usual.
   111  		n, err = nprw.Conn.Write(p[currBufPos:])
   112  		currBufPos += n
   113  		if err != nil {
   114  			// If the error is one that we can discern calls for a retry, attempt to redial the pipe.
   115  			if isDisconnectedErr(err) {
   116  				// Log that we're going to retry establishing the connection.
   117  				log.G(nprw.ctx).WithFields(logrus.Fields{
   118  					"address":       nprw.pipePath,
   119  					logrus.ErrorKey: err,
   120  				}).Error("Named pipe disconnected, retrying dial")
   121  
   122  				// Close the old conn first.
   123  				nprw.Conn.Close()
   124  				newConn, retryErr := nprw.retryDialPipe()
   125  				if retryErr == nil {
   126  					log.G(nprw.ctx).WithField("address", nprw.pipePath).Info("Succeeded in reconnecting to named pipe")
   127  
   128  					nprw.Conn = newConn
   129  					continue
   130  				}
   131  				err = retryErr
   132  			}
   133  		}
   134  		return currBufPos, err
   135  	}
   136  }
   137  
   138  // retryDialPipe is a helper to retry dialing a named pipe until the timeout of nprw.BackOff or a successful connection. This is mainly to
   139  // assist in scenarios where the server end of the pipe has crashed/went away and is no longer accepting new connections but may
   140  // come back online. The backoff used inside is to try and space out connections to the server as to not flood it all at once with connection
   141  // attempts at the same interval.
   142  func (nprw *nPipeRetryWriter) retryDialPipe() (net.Conn, error) {
   143  	// Reset the backoff object as it starts ticking down when it's created. This also ensures we can re-use it in the event the server goes
   144  	// away more than once.
   145  	nprw.backOff.Reset()
   146  	for {
   147  		backOffTime := nprw.backOff.NextBackOff()
   148  		// We don't simply use a context with a timeout and pass it to DialPipe because DialPipe only retries the connection (and thus makes use of
   149  		// the timeout) if it sees that the pipe is busy. If the server isn't up/not listening it will just error out immediately and not make use
   150  		// of the timeout passed. That's the case we're most likely in right now so we need our own retry logic on top.
   151  		conn, err := winio.DialPipe(nprw.pipePath, nil)
   152  		if err == nil {
   153  			return conn, nil
   154  		}
   155  		// Next backoff would go over our timeout. We've tried once more above due to the ordering of this check, but now we need to bail out.
   156  		if backOffTime == backoff.Stop {
   157  			return nil, fmt.Errorf("reached timeout while retrying dial on %s", nprw.pipePath)
   158  		}
   159  		time.Sleep(backOffTime)
   160  	}
   161  }
   162  
   163  // isDisconnectedErr is a helper to determine if the error received from writing to the server end of a named pipe indicates a disconnect/severed
   164  // connection. This can be used to attempt a redial if it's expected that the server will come back online at some point.
   165  func isDisconnectedErr(err error) bool {
   166  	if serr, ok := err.(syscall.Errno); ok {
   167  		// Server went away/something went wrong.
   168  		return serr == windows.ERROR_NO_DATA || serr == windows.ERROR_PIPE_NOT_CONNECTED || serr == windows.ERROR_BROKEN_PIPE
   169  	}
   170  	return false
   171  }
   172  
   173  var _ = (UpstreamIO)(&npipeio{})
   174  
   175  type npipeio struct {
   176  	// stdin, stdout, stderr are the original paths used to open the connections.
   177  	//
   178  	// They MUST be treated as readonly in the lifetime of the pipe io.
   179  	stdin, stdout, stderr string
   180  	// terminal is the original setting passed in on open.
   181  	//
   182  	// This MUST be treated as readonly in the lifetime of the pipe io.
   183  	terminal bool
   184  
   185  	// sin is the upstream `stdin` connection.
   186  	//
   187  	// `sin` MUST be treated as readonly in the lifetime of the pipe io after
   188  	// the return from `NewNpipeIO`.
   189  	sin       io.ReadCloser
   190  	sinCloser sync.Once
   191  
   192  	// sout and serr are the upstream `stdout` and `stderr` connections.
   193  	//
   194  	// `sout` and `serr` MUST be treated as readonly in the lifetime of the pipe
   195  	// io after the return from `NewNpipeIO`.
   196  	sout, serr   io.WriteCloser
   197  	outErrCloser sync.Once
   198  }
   199  
   200  func (nio *npipeio) Close(ctx context.Context) {
   201  	nio.sinCloser.Do(func() {
   202  		if nio.sin != nil {
   203  			log.G(ctx).Debug("npipeio::sinCloser")
   204  			nio.sin.Close()
   205  		}
   206  	})
   207  	nio.outErrCloser.Do(func() {
   208  		if nio.sout != nil {
   209  			log.G(ctx).Debug("npipeio::outErrCloser - stdout")
   210  			nio.sout.Close()
   211  		}
   212  		if nio.serr != nil {
   213  			log.G(ctx).Debug("npipeio::outErrCloser - stderr")
   214  			nio.serr.Close()
   215  		}
   216  	})
   217  }
   218  
   219  func (nio *npipeio) CloseStdin(ctx context.Context) {
   220  	nio.sinCloser.Do(func() {
   221  		if nio.sin != nil {
   222  			log.G(ctx).Debug("npipeio::sinCloser")
   223  			nio.sin.Close()
   224  		}
   225  	})
   226  }
   227  
   228  func (nio *npipeio) Stdin() io.Reader {
   229  	return nio.sin
   230  }
   231  
   232  func (nio *npipeio) StdinPath() string {
   233  	return nio.stdin
   234  }
   235  
   236  func (nio *npipeio) Stdout() io.Writer {
   237  	return nio.sout
   238  }
   239  
   240  func (nio *npipeio) StdoutPath() string {
   241  	return nio.stdout
   242  }
   243  
   244  func (nio *npipeio) Stderr() io.Writer {
   245  	return nio.serr
   246  }
   247  
   248  func (nio *npipeio) StderrPath() string {
   249  	return nio.stderr
   250  }
   251  
   252  func (nio *npipeio) Terminal() bool {
   253  	return nio.terminal
   254  }
   255  
   256  // CreatePipeAndListen is a helper function to create a pipe listener
   257  // and accept connections. Returns the created pipe path on success.
   258  //
   259  // If `in` is true, `f` should implement io.Reader
   260  // If `in` is false, `f` should implement io.Writer
   261  func CreatePipeAndListen(f interface{}, in bool) (string, error) {
   262  	p, l, err := CreateNamedPipeListener()
   263  	if err != nil {
   264  		return "", err
   265  	}
   266  	go func() {
   267  		c, err := l.Accept()
   268  		if err != nil {
   269  			logrus.WithError(err).Error("failed to accept pipe")
   270  			return
   271  		}
   272  
   273  		if in {
   274  			_, _ = io.Copy(c, f.(io.Reader))
   275  			c.Close()
   276  		} else {
   277  			_, _ = io.Copy(f.(io.Writer), c)
   278  		}
   279  	}()
   280  	return p, nil
   281  }
   282  
   283  // CreateNamedPipeListener is a helper function to create and return a pipe listener
   284  // and it's created path.
   285  func CreateNamedPipeListener() (string, net.Listener, error) {
   286  	g, err := guid.NewV4()
   287  	if err != nil {
   288  		return "", nil, err
   289  	}
   290  	p := `\\.\pipe\` + g.String()
   291  	l, err := winio.ListenPipe(p, nil)
   292  	if err != nil {
   293  		return "", nil, err
   294  	}
   295  	return p, l, nil
   296  }
   297  

View as plain text