...

Source file src/github.com/Microsoft/hcsshim/internal/guest/runtime/hcsv2/process.go

Documentation: github.com/Microsoft/hcsshim/internal/guest/runtime/hcsv2

     1  //go:build linux
     2  // +build linux
     3  
     4  package hcsv2
     5  
     6  import (
     7  	"context"
     8  	"fmt"
     9  	"os/exec"
    10  	"sync"
    11  	"syscall"
    12  
    13  	"github.com/Microsoft/hcsshim/internal/guest/gcserr"
    14  	"github.com/Microsoft/hcsshim/internal/guest/runtime"
    15  	"github.com/Microsoft/hcsshim/internal/guest/stdio"
    16  	"github.com/Microsoft/hcsshim/internal/log"
    17  	"github.com/Microsoft/hcsshim/internal/logfields"
    18  	"github.com/Microsoft/hcsshim/internal/oc"
    19  	oci "github.com/opencontainers/runtime-spec/specs-go"
    20  	"github.com/pkg/errors"
    21  	"github.com/sirupsen/logrus"
    22  	"go.opencensus.io/trace"
    23  )
    24  
    25  type Process interface {
    26  	// Kill sends `signal` to the process.
    27  	//
    28  	// If the process has already exited returns `gcserr.HrErrNotFound` by contract.
    29  	Kill(ctx context.Context, signal syscall.Signal) error
    30  	// Pid returns the process id of the process.
    31  	Pid() int
    32  	// ResizeConsole resizes the tty to `height`x`width` for the process.
    33  	ResizeConsole(ctx context.Context, height, width uint16) error
    34  	// Wait returns a channel that can be used to wait for the process to exit
    35  	// and gather the exit code. The second channel must be signaled from the
    36  	// caller when the caller has completed its use of this call to Wait.
    37  	Wait() (<-chan int, chan<- bool)
    38  }
    39  
    40  // Process is a struct that defines the lifetime and operations associated with
    41  // an oci.Process.
    42  type containerProcess struct {
    43  	// c is the owning container
    44  	c    *Container
    45  	spec *oci.Process
    46  	// cid is the container id that owns this process.
    47  	cid string
    48  
    49  	process runtime.Process
    50  	pid     uint32
    51  	// init is `true` if this is the container process itself
    52  	init bool
    53  
    54  	// This is only valid post the exitWg
    55  	exitCode int
    56  	// exitWg is marked as done as soon as the underlying
    57  	// (runtime.Process).Wait() call returns, and exitCode has been updated.
    58  	exitWg sync.WaitGroup
    59  
    60  	// Used to allow addition/removal to the writersWg after an initial wait has
    61  	// already been issued. It is not safe to call Add/Done without holding this
    62  	// lock.
    63  	writersSyncRoot sync.Mutex
    64  	// Used to track the number of writers that need to finish
    65  	// before the process can be marked for cleanup.
    66  	writersWg sync.WaitGroup
    67  	// Used to track the 1st caller to the writersWg that successfully
    68  	// acknowledges it wrote the exit response.
    69  	writersCalled bool
    70  }
    71  
    72  var _ Process = &containerProcess{}
    73  
    74  // newProcess returns a containerProcess struct that has been initialized with
    75  // an outstanding wait for process exit, and post exit an outstanding wait for
    76  // process cleanup to release all resources once at least 1 waiter has
    77  // successfully written the exit response.
    78  func newProcess(c *Container, spec *oci.Process, process runtime.Process, pid uint32, init bool) *containerProcess {
    79  	p := &containerProcess{
    80  		c:       c,
    81  		spec:    spec,
    82  		process: process,
    83  		init:    init,
    84  		cid:     c.id,
    85  		pid:     pid,
    86  	}
    87  	p.exitWg.Add(1)
    88  	p.writersWg.Add(1)
    89  	go func() {
    90  		ctx, span := oc.StartSpan(context.Background(), "newProcess::waitBackground")
    91  		defer span.End()
    92  		span.AddAttributes(
    93  			trace.StringAttribute(logfields.ContainerID, p.cid),
    94  			trace.Int64Attribute(logfields.ProcessID, int64(p.pid)))
    95  
    96  		// Wait for the process to exit
    97  		exitCode, err := p.process.Wait()
    98  		if err != nil {
    99  			log.G(ctx).WithError(err).Error("failed to wait for runc process")
   100  		}
   101  		p.exitCode = exitCode
   102  		log.G(ctx).WithField("exitCode", p.exitCode).Debug("process exited")
   103  
   104  		// Free any process waiters
   105  		p.exitWg.Done()
   106  
   107  		// Schedule the removal of this process object from the map once at
   108  		// least one waiter has read the result
   109  		go func() {
   110  			p.writersWg.Wait()
   111  			// cleanup the process state
   112  			if derr := p.process.Delete(); derr != nil {
   113  				log.G(ctx).WithFields(logrus.Fields{
   114  					"cid": p.cid,
   115  					"pid": p.pid,
   116  				}).Debugf("process cleanup error: %s", derr)
   117  			}
   118  			c.processesMutex.Lock()
   119  
   120  			_, span := oc.StartSpan(context.Background(), "newProcess::waitBackground::waitAllWaiters")
   121  			defer span.End()
   122  			span.AddAttributes(
   123  				trace.StringAttribute("cid", p.cid),
   124  				trace.Int64Attribute("pid", int64(p.pid)))
   125  
   126  			delete(c.processes, p.pid)
   127  			c.processesMutex.Unlock()
   128  		}()
   129  	}()
   130  	return p
   131  }
   132  
   133  // Kill sends 'signal' to the process.
   134  //
   135  // If the process has already exited returns `gcserr.HrErrNotFound` by contract.
   136  func (p *containerProcess) Kill(_ context.Context, signal syscall.Signal) error {
   137  	if err := syscall.Kill(int(p.pid), signal); err != nil {
   138  		if errors.Is(err, syscall.ESRCH) {
   139  			return gcserr.NewHresultError(gcserr.HrErrNotFound)
   140  		}
   141  		return err
   142  	}
   143  
   144  	if p.init {
   145  		p.c.setExitType(signal)
   146  	}
   147  
   148  	return nil
   149  }
   150  
   151  func (p *containerProcess) Pid() int {
   152  	return int(p.pid)
   153  }
   154  
   155  // ResizeConsole resizes the tty to `height`x`width` for the process.
   156  func (p *containerProcess) ResizeConsole(_ context.Context, height, width uint16) error {
   157  	tty := p.process.Tty()
   158  	if tty == nil {
   159  		return fmt.Errorf("pid: %d, is not a tty and cannot be resized", p.pid)
   160  	}
   161  	return tty.ResizeConsole(height, width)
   162  }
   163  
   164  // Wait returns a channel that can be used to wait for the process to exit and
   165  // gather the exit code. The second channel must be signaled from the caller
   166  // when the caller has completed its use of this call to Wait.
   167  func (p *containerProcess) Wait() (<-chan int, chan<- bool) {
   168  	ctx, span := oc.StartSpan(context.Background(), "opengcs::containerProcess::Wait")
   169  	span.AddAttributes(
   170  		trace.StringAttribute("cid", p.cid),
   171  		trace.Int64Attribute("pid", int64(p.pid)))
   172  
   173  	exitCodeChan := make(chan int, 1)
   174  	doneChan := make(chan bool)
   175  
   176  	// Increment our waiters for this waiter
   177  	p.writersSyncRoot.Lock()
   178  	p.writersWg.Add(1)
   179  	p.writersSyncRoot.Unlock()
   180  
   181  	go func() {
   182  		bgExitCodeChan := make(chan int, 1)
   183  		go func() {
   184  			p.exitWg.Wait()
   185  			bgExitCodeChan <- p.exitCode
   186  		}()
   187  
   188  		// Wait for the exit code or the caller to stop waiting.
   189  		select {
   190  		case exitCode := <-bgExitCodeChan:
   191  			exitCodeChan <- exitCode
   192  
   193  			// The caller got the exit code. Wait for them to tell us they have
   194  			// issued the write
   195  			<-doneChan
   196  			p.writersSyncRoot.Lock()
   197  			// Decrement this waiter
   198  			log.G(ctx).Debug("wait completed, releasing wait count")
   199  
   200  			p.writersWg.Done()
   201  			if !p.writersCalled {
   202  				// We have at least 1 response for the exit code for this
   203  				// process. Decrement the release waiter that will free the
   204  				// process resources when the writersWg hits 0
   205  				log.G(ctx).Debug("first wait completed, releasing first wait count")
   206  
   207  				p.writersCalled = true
   208  				p.writersWg.Done()
   209  			}
   210  			p.writersSyncRoot.Unlock()
   211  			span.End()
   212  
   213  		case <-doneChan:
   214  			// In this case the caller timed out before the process exited. Just
   215  			// decrement the waiter but since no exit code we just deal with our
   216  			// waiter.
   217  			p.writersSyncRoot.Lock()
   218  			log.G(ctx).Debug("wait canceled before exit, releasing wait count")
   219  
   220  			p.writersWg.Done()
   221  			p.writersSyncRoot.Unlock()
   222  			span.End()
   223  		}
   224  	}()
   225  	return exitCodeChan, doneChan
   226  }
   227  
   228  func newExternalProcess(ctx context.Context, cmd *exec.Cmd, tty *stdio.TtyRelay, onRemove func(pid int)) (*externalProcess, error) {
   229  	ep := &externalProcess{
   230  		cmd:       cmd,
   231  		tty:       tty,
   232  		waitBlock: make(chan struct{}),
   233  		remove:    onRemove,
   234  	}
   235  	if err := cmd.Start(); err != nil {
   236  		return nil, errors.Wrap(err, "failed to call Start for external process")
   237  	}
   238  	if tty != nil {
   239  		tty.Start()
   240  	}
   241  	go func() {
   242  		_ = cmd.Wait()
   243  		ep.exitCode = cmd.ProcessState.ExitCode()
   244  		log.G(ctx).WithFields(logrus.Fields{
   245  			"pid":      cmd.Process.Pid,
   246  			"exitCode": ep.exitCode,
   247  		}).Debug("external process exited")
   248  		if ep.tty != nil {
   249  			ep.tty.Wait()
   250  		}
   251  		close(ep.waitBlock)
   252  	}()
   253  	return ep, nil
   254  }
   255  
   256  type externalProcess struct {
   257  	cmd *exec.Cmd
   258  	tty *stdio.TtyRelay
   259  
   260  	waitBlock chan struct{}
   261  	exitCode  int
   262  
   263  	removeOnce sync.Once
   264  	remove     func(pid int)
   265  }
   266  
   267  var _ Process = &externalProcess{}
   268  
   269  func (ep *externalProcess) Kill(_ context.Context, signal syscall.Signal) error {
   270  	if err := syscall.Kill(ep.cmd.Process.Pid, signal); err != nil {
   271  		if errors.Is(err, syscall.ESRCH) {
   272  			return gcserr.NewHresultError(gcserr.HrErrNotFound)
   273  		}
   274  		return err
   275  	}
   276  	return nil
   277  }
   278  
   279  func (ep *externalProcess) Pid() int {
   280  	return ep.cmd.Process.Pid
   281  }
   282  
   283  func (ep *externalProcess) ResizeConsole(_ context.Context, height, width uint16) error {
   284  	if ep.tty == nil {
   285  		return fmt.Errorf("pid: %d, is not a tty and cannot be resized", ep.cmd.Process.Pid)
   286  	}
   287  	return ep.tty.ResizeConsole(height, width)
   288  }
   289  
   290  func (ep *externalProcess) Wait() (<-chan int, chan<- bool) {
   291  	_, span := oc.StartSpan(context.Background(), "opengcs::externalProcess::Wait")
   292  	span.AddAttributes(trace.Int64Attribute("pid", int64(ep.cmd.Process.Pid)))
   293  
   294  	exitCodeChan := make(chan int, 1)
   295  	doneChan := make(chan bool)
   296  
   297  	go func() {
   298  		defer close(exitCodeChan)
   299  
   300  		// Wait for the exit code or the caller to stop waiting.
   301  		select {
   302  		case <-ep.waitBlock:
   303  			// Process exited send the exit code and wait for caller to close.
   304  			exitCodeChan <- ep.exitCode
   305  			<-doneChan
   306  			// At least one waiter was successful, remove this external process.
   307  			ep.removeOnce.Do(func() {
   308  				ep.remove(ep.cmd.Process.Pid)
   309  			})
   310  		case <-doneChan:
   311  			// Caller closed early, do nothing.
   312  		}
   313  	}()
   314  	return exitCodeChan, doneChan
   315  }
   316  

View as plain text