...

Source file src/github.com/Microsoft/hcsshim/internal/guest/stdio/stdio.go

Documentation: github.com/Microsoft/hcsshim/internal/guest/stdio

     1  //go:build linux
     2  // +build linux
     3  
     4  package stdio
     5  
     6  import (
     7  	"io"
     8  	"os"
     9  	"strings"
    10  	"sync"
    11  
    12  	"github.com/Microsoft/hcsshim/internal/guest/transport"
    13  	"github.com/pkg/errors"
    14  	"github.com/sirupsen/logrus"
    15  )
    16  
    17  // ConnectionSet is a structure defining the readers and writers the Core
    18  // implementation should forward a process's stdio through.
    19  type ConnectionSet struct {
    20  	In, Out, Err transport.Connection
    21  }
    22  
    23  // Close closes each stdio connection.
    24  func (s *ConnectionSet) Close() error {
    25  	var err error
    26  	if s.In != nil {
    27  		if cerr := s.In.Close(); cerr != nil {
    28  			err = errors.Wrap(cerr, "failed Close on stdin")
    29  		}
    30  		s.In = nil
    31  	}
    32  	if s.Out != nil {
    33  		if cerr := s.Out.Close(); cerr != nil && err == nil {
    34  			err = errors.Wrap(cerr, "failed Close on stdout")
    35  		}
    36  		s.Out = nil
    37  	}
    38  	if s.Err != nil {
    39  		if cerr := s.Err.Close(); cerr != nil && err == nil {
    40  			err = errors.Wrap(cerr, "failed Close on stderr")
    41  		}
    42  		s.Err = nil
    43  	}
    44  	return err
    45  }
    46  
    47  // FileSet represents the stdio of a process. It contains os.File types for
    48  // in, out, err.
    49  type FileSet struct {
    50  	In, Out, Err *os.File
    51  }
    52  
    53  // Close closes all the FileSet handles.
    54  func (fs *FileSet) Close() error {
    55  	var err error
    56  	if fs.In != nil {
    57  		if cerr := fs.In.Close(); cerr != nil {
    58  			err = errors.Wrap(cerr, "failed Close on stdin")
    59  		}
    60  		fs.In = nil
    61  	}
    62  	if fs.Out != nil {
    63  		if cerr := fs.Out.Close(); cerr != nil && err == nil {
    64  			err = errors.Wrap(cerr, "failed Close on stdout")
    65  		}
    66  		fs.Out = nil
    67  	}
    68  	if fs.Err != nil {
    69  		if cerr := fs.Err.Close(); cerr != nil && err == nil {
    70  			err = errors.Wrap(cerr, "failed Close on stderr")
    71  		}
    72  		fs.Err = nil
    73  	}
    74  	return err
    75  }
    76  
    77  // Files returns a FileSet with an os.File for each connection
    78  // in the connection set.
    79  func (s *ConnectionSet) Files() (_ *FileSet, err error) {
    80  	fs := &FileSet{}
    81  	defer func() {
    82  		if err != nil {
    83  			fs.Close()
    84  		}
    85  	}()
    86  	if s.In != nil {
    87  		fs.In, err = s.In.File()
    88  		if err != nil {
    89  			return nil, errors.Wrap(err, "failed to dup stdin socket for command")
    90  		}
    91  	}
    92  	if s.Out != nil {
    93  		fs.Out, err = s.Out.File()
    94  		if err != nil {
    95  			return nil, errors.Wrap(err, "failed to dup stdout socket for command")
    96  		}
    97  	}
    98  	if s.Err != nil {
    99  		fs.Err, err = s.Err.File()
   100  		if err != nil {
   101  			return nil, errors.Wrap(err, "failed to dup stderr socket for command")
   102  		}
   103  	}
   104  	return fs, nil
   105  }
   106  
   107  // NewPipeRelay returns a new pipe relay wrapping the given connection stdin,
   108  // stdout, stderr set. If s is nil will assume al stdin, stdout, stderr pipes.
   109  func NewPipeRelay(s *ConnectionSet) (_ *PipeRelay, err error) {
   110  	pr := &PipeRelay{s: s}
   111  	defer func() {
   112  		if err != nil {
   113  			pr.closePipes()
   114  		}
   115  	}()
   116  
   117  	if s == nil || s.In != nil {
   118  		pr.pipes[0], pr.pipes[1], err = os.Pipe()
   119  		if err != nil {
   120  			return nil, errors.Wrap(err, "failed to create stdin pipe relay")
   121  		}
   122  	}
   123  	if s == nil || s.Out != nil {
   124  		pr.pipes[2], pr.pipes[3], err = os.Pipe()
   125  		if err != nil {
   126  			return nil, errors.Wrap(err, "failed to create stdout pipe relay")
   127  		}
   128  	}
   129  	if s == nil || s.Err != nil {
   130  		pr.pipes[4], pr.pipes[5], err = os.Pipe()
   131  		if err != nil {
   132  			return nil, errors.Wrap(err, "failed to create stderr pipe relay")
   133  		}
   134  	}
   135  	return pr, nil
   136  }
   137  
   138  // PipeRelay is a relay built to expose a pipe interface
   139  // for stdin, stdout, stderr on top of a ConnectionSet.
   140  type PipeRelay struct {
   141  	wg sync.WaitGroup
   142  	s  *ConnectionSet
   143  	// pipes format is stdin [0 read, 1 write], stdout [2 read, 3 write], stderr [4 read, 5 write].
   144  	pipes [6]*os.File
   145  }
   146  
   147  // ReplaceConnectionSet allows the caller to add a new destination set after
   148  // creating the relay. This can only be called previous to the call to Start.
   149  func (pr *PipeRelay) ReplaceConnectionSet(s *ConnectionSet) {
   150  	pr.s = s
   151  }
   152  
   153  // Files returns a FileSet with an os.File for each connection
   154  // in the connection set.
   155  func (pr *PipeRelay) Files() (*FileSet, error) {
   156  	fs := new(FileSet)
   157  	if pr.s == nil || pr.s.In != nil {
   158  		fs.In = pr.pipes[0]
   159  	}
   160  	if pr.s == nil || pr.s.Out != nil {
   161  		fs.Out = pr.pipes[3]
   162  	}
   163  	if pr.s == nil || pr.s.Err != nil {
   164  		fs.Err = pr.pipes[5]
   165  	}
   166  	return fs, nil
   167  }
   168  
   169  func copyAndCleanClose(c transport.Connection, r io.Reader, name string) {
   170  	if n, err := io.Copy(c, r); err != nil {
   171  		logrus.WithFields(logrus.Fields{
   172  			logrus.ErrorKey: err,
   173  			"bytes":         n,
   174  			"file":          name,
   175  		}).Error("opengcs::PipeRelay::copyAndCleanClose - error copying from pipe")
   176  	}
   177  	// Shut down the write end of the socket, then read a byte (which should
   178  	// yield EOF) to wait for the other endpoint to finish reading and close
   179  	// the connection.
   180  	if err := c.CloseWrite(); err == nil {
   181  		var b [1]byte
   182  		_, err = c.Read(b[:])
   183  		if err == nil {
   184  			err = errors.New("unexpected data in socket")
   185  		}
   186  		if err != io.EOF { //nolint:errorlint
   187  			logrus.WithFields(logrus.Fields{
   188  				logrus.ErrorKey: err,
   189  				"file":          name,
   190  			}).Error("opengcs::PipeRelay::copyAndCleanClose - error reading for clean close")
   191  		}
   192  	} else {
   193  		logrus.WithFields(logrus.Fields{
   194  			logrus.ErrorKey: err,
   195  			"file":          name,
   196  		}).Error("opengcs::PipeRelay::copyAndCleanClose - error shutting down socket")
   197  	}
   198  	if err := c.Close(); err != nil {
   199  		logrus.WithFields(logrus.Fields{
   200  			logrus.ErrorKey: err,
   201  			"file":          name,
   202  		}).Error("opengcs::PipeRelay::copyAndCleanClose - error closing socket")
   203  	}
   204  }
   205  
   206  // Start starts the relay operation. The caller must call Wait to wait
   207  // for the relay to finish and release the associated resources.
   208  func (pr *PipeRelay) Start() {
   209  	if pr.s.In != nil {
   210  		pr.wg.Add(1)
   211  		go func() {
   212  			if n, err := io.Copy(pr.pipes[1], pr.s.In); err != nil {
   213  				logrus.WithFields(logrus.Fields{
   214  					logrus.ErrorKey: err,
   215  					"bytes":         n,
   216  				}).Error("opengcs::PipeRelay::Start - error copying stdin to pipe")
   217  			}
   218  			if err := pr.pipes[1].Close(); err != nil {
   219  				logrus.WithFields(logrus.Fields{
   220  					logrus.ErrorKey: err,
   221  				}).Error("opengcs::PipeRelay::Start - error closing stdin write pipe")
   222  			}
   223  			pr.pipes[1] = nil
   224  			pr.wg.Done()
   225  		}()
   226  	}
   227  	if pr.s.Out != nil {
   228  		pr.wg.Add(1)
   229  		go func() {
   230  			copyAndCleanClose(pr.s.Out, pr.pipes[2], "stdout")
   231  			pr.wg.Done()
   232  		}()
   233  	}
   234  	if pr.s.Err != nil {
   235  		pr.wg.Add(1)
   236  		go func() {
   237  			copyAndCleanClose(pr.s.Err, pr.pipes[4], "stderr")
   238  			pr.wg.Done()
   239  		}()
   240  	}
   241  }
   242  
   243  // Wait waits for the relaying to finish and closes the associated
   244  // pipes and connections.
   245  func (pr *PipeRelay) Wait() {
   246  	// Close stdin so that the copying goroutine is safely unblocked; this is necessary
   247  	// because the host expects stdin to be closed before it will report process
   248  	// exit back to the client, and the client expects the process notification before
   249  	// it will close its side of stdin (which io.Copy is waiting on in the copying goroutine).
   250  	if pr.s != nil && pr.s.In != nil {
   251  		_ = pr.s.In.CloseRead()
   252  	}
   253  
   254  	pr.wg.Wait()
   255  	pr.closePipes()
   256  	if pr.s != nil {
   257  		pr.s.Close()
   258  	}
   259  }
   260  
   261  // CloseUnusedPipes gives the caller the ability to close any pipes that do not
   262  // have a corresponding entry on the ConnectionSet. This is to be used in
   263  // conjunction with NewPipeRelay where s is nil which wil open all pipes and
   264  // later calling ReplaceConnectionSet with the actual connections.
   265  func (pr *PipeRelay) CloseUnusedPipes() {
   266  	if pr.s == nil {
   267  		pr.closePipes()
   268  	} else {
   269  		if pr.s.In == nil {
   270  			// Write end of stdin
   271  			pr.pipes[1].Close()
   272  		}
   273  		if pr.s.Out == nil {
   274  			// Read end of stdout
   275  			pr.pipes[2].Close()
   276  		}
   277  		if pr.s.Err == nil {
   278  			// Read end of stderr
   279  			pr.pipes[4].Close()
   280  		}
   281  	}
   282  }
   283  
   284  func (pr *PipeRelay) closePipes() {
   285  	for i := 0; i < len(pr.pipes); i++ {
   286  		if pr.pipes[i] != nil {
   287  			if err := pr.pipes[i].Close(); err != nil {
   288  				if !strings.Contains(err.Error(), "file already closed") {
   289  					logrus.WithFields(logrus.Fields{
   290  						logrus.ErrorKey: err,
   291  					}).Error("opengcs::PipeRelay::closePipes - error closing relay pipe")
   292  				}
   293  			}
   294  			pr.pipes[i] = nil
   295  		}
   296  	}
   297  }
   298  
   299  // NewTtyRelay returns a new TTY relay for a given master PTY file.
   300  func NewTtyRelay(s *ConnectionSet, pty *os.File) *TtyRelay {
   301  	return &TtyRelay{s: s, pty: pty}
   302  }
   303  
   304  // TtyRelay relays IO between a set of stdio connections and a master PTY file.
   305  type TtyRelay struct {
   306  	m      sync.Mutex
   307  	closed bool
   308  	wg     sync.WaitGroup
   309  	s      *ConnectionSet
   310  	pty    *os.File
   311  }
   312  
   313  // ReplaceConnectionSet allows the caller to add a new destination set after
   314  // creating the relay. This can only be called previous to the call to Start.
   315  func (r *TtyRelay) ReplaceConnectionSet(s *ConnectionSet) {
   316  	r.s = s
   317  }
   318  
   319  // ResizeConsole sends the appropriate resize to a pTTY FD.
   320  func (r *TtyRelay) ResizeConsole(height, width uint16) error {
   321  	r.m.Lock()
   322  	defer r.m.Unlock()
   323  
   324  	if r.closed {
   325  		return nil
   326  	}
   327  	return ResizeConsole(r.pty, height, width)
   328  }
   329  
   330  // Start starts the relay operation. The caller must call Wait to wait
   331  // for the relay to finish and release the associated resources.
   332  func (r *TtyRelay) Start() {
   333  	if r.s.In != nil {
   334  		r.wg.Add(1)
   335  		go func() {
   336  			if _, err := io.Copy(r.pty, r.s.In); err != nil {
   337  				logrus.WithFields(logrus.Fields{
   338  					logrus.ErrorKey: err,
   339  				}).Error("opengcs::TtyRelay::Start - error copying stdin to pty")
   340  			}
   341  			r.wg.Done()
   342  		}()
   343  	}
   344  	if r.s.Out != nil {
   345  		r.wg.Add(1)
   346  		go func() {
   347  			if _, err := io.Copy(r.s.Out, r.pty); err != nil {
   348  				logrus.WithFields(logrus.Fields{
   349  					logrus.ErrorKey: err,
   350  				}).Error("opengcs::TtyRelay::Start - error copying pty to stdout")
   351  			}
   352  			r.wg.Done()
   353  		}()
   354  	}
   355  }
   356  
   357  // Wait waits for the relaying to finish and closes the associated
   358  // files and connections.
   359  func (r *TtyRelay) Wait() {
   360  	// Close stdin so that the copying goroutine is safely unblocked; this is necessary
   361  	// because the host expects stdin to be closed before it will report process
   362  	// exit back to the client, and the client expects the process notification before
   363  	// it will close its side of stdin (which io.Copy is waiting on in the copying goroutine).
   364  	if r.s != nil && r.s.In != nil {
   365  		_ = r.s.In.CloseRead()
   366  	}
   367  
   368  	// Wait for all users of stdioSet and master to finish before closing them.
   369  	r.wg.Wait()
   370  
   371  	r.m.Lock()
   372  	defer r.m.Unlock()
   373  
   374  	r.pty.Close()
   375  	r.closed = true
   376  	if r.s != nil {
   377  		r.s.Close()
   378  	}
   379  }
   380  

View as plain text