...

Source file src/github.com/Microsoft/hcsshim/internal/cmd/io_binary.go

Documentation: github.com/Microsoft/hcsshim/internal/cmd

     1  //go:build windows
     2  
     3  package cmd
     4  
     5  import (
     6  	"context"
     7  	"fmt"
     8  	"io"
     9  	"net"
    10  	"net/url"
    11  	"os/exec"
    12  	"path/filepath"
    13  	"strings"
    14  	"sync"
    15  	"time"
    16  
    17  	"github.com/Microsoft/go-winio"
    18  	"github.com/containerd/containerd/namespaces"
    19  	"github.com/pkg/errors"
    20  	"github.com/sirupsen/logrus"
    21  
    22  	"github.com/Microsoft/hcsshim/internal/log"
    23  )
    24  
    25  const (
    26  	binaryPipeFmt         = `\\.\pipe\binary-%s-%s`
    27  	binaryCmdWaitTimeout  = 10 * time.Second
    28  	binaryCmdStartTimeout = 10 * time.Second
    29  )
    30  
    31  // NewBinaryIO runs a custom binary process for pluggable shim logging driver.
    32  //
    33  // Container's IO will be redirected to the logging driver via named pipes, which are
    34  // passed as "CONTAINER_STDOUT", "CONTAINER_STDERR" environment variables. The logging
    35  // driver MUST dial a wait pipe passed via "CONTAINER_WAIT" environment variable AND CLOSE
    36  // it to indicate that it's ready to consume the IO. For customer's convenience container ID
    37  // and namespace are also passed via "CONTAINER_ID" and "CONTAINER_NAMESPACE".
    38  //
    39  // The path to the logging driver can be provided via a URL's host/path. Additional arguments
    40  // can be passed to the logger via URL query params
    41  func NewBinaryIO(ctx context.Context, id string, uri *url.URL) (_ UpstreamIO, err error) {
    42  	ns, err := namespaces.NamespaceRequired(ctx)
    43  	if err != nil {
    44  		ns = namespaces.Default
    45  	}
    46  
    47  	var stdoutPipe, stderrPipe, waitPipe io.ReadWriteCloser
    48  
    49  	stdoutPipePath := fmt.Sprintf(binaryPipeFmt, id, "stdout")
    50  	stdoutPipe, err = openNPipe(stdoutPipePath)
    51  	if err != nil {
    52  		return nil, err
    53  	}
    54  
    55  	stderrPipePath := fmt.Sprintf(binaryPipeFmt, id, "stderr")
    56  	stderrPipe, err = openNPipe(stderrPipePath)
    57  	if err != nil {
    58  		return nil, err
    59  	}
    60  
    61  	waitPipePath := fmt.Sprintf(binaryPipeFmt, id, "wait")
    62  	waitPipe, err = openNPipe(waitPipePath)
    63  	if err != nil {
    64  		return nil, err
    65  	}
    66  	defer func() {
    67  		if err := waitPipe.Close(); err != nil {
    68  			log.G(ctx).WithError(err).Errorf("error closing wait pipe: %s", waitPipePath)
    69  		}
    70  	}()
    71  
    72  	envs := []string{
    73  		"CONTAINER_ID=" + id,
    74  		"CONTAINER_NAMESPACE=" + ns,
    75  		"CONTAINER_STDOUT=" + stdoutPipePath,
    76  		"CONTAINER_STDERR=" + stderrPipePath,
    77  		"CONTAINER_WAIT=" + waitPipePath,
    78  	}
    79  	cmd, err := newBinaryCmd(ctx, uri, envs)
    80  	if err != nil {
    81  		return nil, err
    82  	}
    83  
    84  	if err := cmd.Start(); err != nil {
    85  		return nil, err
    86  	}
    87  
    88  	errCh := make(chan error, 1)
    89  	// Wait for logging driver to signal to the wait pipe that it's ready to consume IO
    90  	go func() {
    91  		b := make([]byte, 1)
    92  		if _, err := waitPipe.Read(b); err != nil && err != io.EOF {
    93  			errCh <- err
    94  			return
    95  		}
    96  		errCh <- nil
    97  	}()
    98  
    99  	select {
   100  	case err = <-errCh:
   101  		if err != nil {
   102  			return nil, errors.Wrap(err, "failed to start binary logger")
   103  		}
   104  	case <-time.After(binaryCmdStartTimeout):
   105  		return nil, errors.New("failed to start binary logger: timeout")
   106  	}
   107  
   108  	log.G(ctx).WithFields(logrus.Fields{
   109  		"containerID":        id,
   110  		"containerNamespace": ns,
   111  		"binaryCmd":          cmd.String(),
   112  		"binaryProcessID":    cmd.Process.Pid,
   113  	}).Debug("binary io process started")
   114  
   115  	return &binaryIO{
   116  		cmd:    cmd,
   117  		stdout: stdoutPipePath,
   118  		sout:   stdoutPipe,
   119  		stderr: stderrPipePath,
   120  		serr:   stderrPipe,
   121  	}, nil
   122  }
   123  
   124  // sanitizePath parses the URL object and returns a clean path to the logging driver
   125  func sanitizePath(uri *url.URL) string {
   126  	path := filepath.Clean(uri.Path)
   127  
   128  	if strings.Contains(path, `:\`) {
   129  		return strings.TrimPrefix(path, "\\")
   130  	}
   131  
   132  	return path
   133  }
   134  
   135  func newBinaryCmd(ctx context.Context, uri *url.URL, envs []string) (*exec.Cmd, error) {
   136  	if uri.Path == "" {
   137  		return nil, errors.New("no logging driver path provided")
   138  	}
   139  
   140  	var args []string
   141  	for k, vs := range uri.Query() {
   142  		args = append(args, k)
   143  		if len(vs) > 0 && vs[0] != "" {
   144  			args = append(args, vs[0])
   145  		}
   146  	}
   147  
   148  	execPath := sanitizePath(uri)
   149  
   150  	cmd := exec.CommandContext(ctx, execPath, args...)
   151  	cmd.Env = append(cmd.Env, envs...)
   152  
   153  	return cmd, nil
   154  }
   155  
   156  var _ UpstreamIO = &binaryIO{}
   157  
   158  // Implements UpstreamIO interface to enable shim pluggable logging
   159  type binaryIO struct {
   160  	cmd *exec.Cmd
   161  
   162  	binaryCloser sync.Once
   163  
   164  	stdout, stderr string
   165  
   166  	sout, serr io.ReadWriteCloser
   167  	soutCloser sync.Once
   168  }
   169  
   170  // Close named pipes for container stdout and stderr and wait for the binary process to finish.
   171  func (b *binaryIO) Close(ctx context.Context) {
   172  	b.soutCloser.Do(func() {
   173  		if b.sout != nil {
   174  			err := b.sout.Close()
   175  			if err != nil {
   176  				log.G(ctx).WithError(err).Errorf("error while closing stdout npipe")
   177  			}
   178  		}
   179  		if b.serr != nil {
   180  			err := b.serr.Close()
   181  			if err != nil {
   182  				log.G(ctx).WithError(err).Errorf("error while closing stderr npipe")
   183  			}
   184  		}
   185  	})
   186  	b.binaryCloser.Do(func() {
   187  		done := make(chan error, 1)
   188  		go func() {
   189  			done <- b.cmd.Wait()
   190  		}()
   191  
   192  		select {
   193  		case err := <-done:
   194  			if err != nil {
   195  				log.G(ctx).WithError(err).Errorf("error while waiting for binary cmd to finish")
   196  			}
   197  		case <-time.After(binaryCmdWaitTimeout):
   198  			log.G(ctx).Errorf("timeout while waiting for binaryIO process to finish. Killing")
   199  			err := b.cmd.Process.Kill()
   200  			if err != nil {
   201  				log.G(ctx).WithError(err).Errorf("error while killing binaryIO process")
   202  			}
   203  		}
   204  	})
   205  }
   206  
   207  func (b *binaryIO) CloseStdin(_ context.Context) {}
   208  
   209  func (b *binaryIO) Stdin() io.Reader {
   210  	return nil
   211  }
   212  
   213  func (b *binaryIO) StdinPath() string {
   214  	return ""
   215  }
   216  
   217  func (b *binaryIO) Stdout() io.Writer {
   218  	return b.sout
   219  }
   220  
   221  func (b *binaryIO) StdoutPath() string {
   222  	return b.stdout
   223  }
   224  
   225  func (b *binaryIO) Stderr() io.Writer {
   226  	return b.serr
   227  }
   228  
   229  func (b *binaryIO) StderrPath() string {
   230  	return b.stderr
   231  }
   232  
   233  func (b *binaryIO) Terminal() bool {
   234  	return false
   235  }
   236  
   237  type pipe struct {
   238  	l      net.Listener
   239  	con    net.Conn
   240  	conErr error
   241  	conWg  sync.WaitGroup
   242  }
   243  
   244  func openNPipe(path string) (io.ReadWriteCloser, error) {
   245  	l, err := winio.ListenPipe(path, nil)
   246  	if err != nil {
   247  		return nil, err
   248  	}
   249  
   250  	p := &pipe{l: l}
   251  	p.conWg.Add(1)
   252  
   253  	go func() {
   254  		defer p.conWg.Done()
   255  		c, err := l.Accept()
   256  		if err != nil {
   257  			p.conErr = err
   258  			return
   259  		}
   260  		p.con = c
   261  	}()
   262  	return p, nil
   263  }
   264  
   265  func (p *pipe) Write(b []byte) (int, error) {
   266  	p.conWg.Wait()
   267  	if p.conErr != nil {
   268  		return 0, errors.Wrap(p.conErr, "connection error")
   269  	}
   270  	return p.con.Write(b)
   271  }
   272  
   273  func (p *pipe) Read(b []byte) (int, error) {
   274  	p.conWg.Wait()
   275  	if p.conErr != nil {
   276  		return 0, errors.Wrap(p.conErr, "connection error")
   277  	}
   278  	return p.con.Read(b)
   279  }
   280  
   281  func (p *pipe) Close() error {
   282  	if err := p.l.Close(); err != nil {
   283  		log.G(context.TODO()).WithError(err).Debug("error closing pipe listener")
   284  	}
   285  	p.conWg.Wait()
   286  	if p.con != nil {
   287  		return p.con.Close()
   288  	}
   289  	return p.conErr
   290  }
   291  

View as plain text