...

Source file src/github.com/Microsoft/hcsshim/cmd/containerd-shim-runhcs-v1/exec_hcs.go

Documentation: github.com/Microsoft/hcsshim/cmd/containerd-shim-runhcs-v1

     1  //go:build windows
     2  
     3  package main
     4  
     5  import (
     6  	"context"
     7  	"sync"
     8  	"time"
     9  
    10  	eventstypes "github.com/containerd/containerd/api/events"
    11  	containerd_v1_types "github.com/containerd/containerd/api/types/task"
    12  	"github.com/containerd/containerd/errdefs"
    13  	"github.com/containerd/containerd/runtime"
    14  	"github.com/containerd/containerd/runtime/v2/task"
    15  	"github.com/opencontainers/runtime-spec/specs-go"
    16  	"github.com/pkg/errors"
    17  	"github.com/sirupsen/logrus"
    18  	"go.opencensus.io/trace"
    19  
    20  	"github.com/Microsoft/hcsshim/internal/cmd"
    21  	"github.com/Microsoft/hcsshim/internal/cow"
    22  	"github.com/Microsoft/hcsshim/internal/hcs"
    23  	"github.com/Microsoft/hcsshim/internal/log"
    24  	"github.com/Microsoft/hcsshim/internal/oc"
    25  	"github.com/Microsoft/hcsshim/internal/protocol/guestresource"
    26  	"github.com/Microsoft/hcsshim/internal/signals"
    27  	"github.com/Microsoft/hcsshim/internal/uvm"
    28  	"github.com/Microsoft/hcsshim/osversion"
    29  )
    30  
    31  // newHcsExec creates an exec to track the lifetime of `spec` in `c` which is
    32  // actually created on the call to `Start()`. If `id==tid` then this is the init
    33  // exec and the exec will also start `c` on the call to `Start()` before execing
    34  // the process `spec.Process`.
    35  func newHcsExec(
    36  	ctx context.Context,
    37  	events publisher,
    38  	tid string,
    39  	host *uvm.UtilityVM,
    40  	c cow.Container,
    41  	id, bundle string,
    42  	isWCOW bool,
    43  	spec *specs.Process,
    44  	io cmd.UpstreamIO) shimExec {
    45  	log.G(ctx).WithFields(logrus.Fields{
    46  		"tid":    tid,
    47  		"eid":    id, // Init exec ID is always same as Task ID
    48  		"bundle": bundle,
    49  		"wcow":   isWCOW,
    50  	}).Debug("newHcsExec")
    51  
    52  	he := &hcsExec{
    53  		events:      events,
    54  		tid:         tid,
    55  		host:        host,
    56  		c:           c,
    57  		id:          id,
    58  		bundle:      bundle,
    59  		isWCOW:      isWCOW,
    60  		spec:        spec,
    61  		io:          io,
    62  		processDone: make(chan struct{}),
    63  		state:       shimExecStateCreated,
    64  		exitStatus:  255, // By design for non-exited process status.
    65  		exited:      make(chan struct{}),
    66  	}
    67  	go he.waitForContainerExit()
    68  	return he
    69  }
    70  
    71  var _ = (shimExec)(&hcsExec{})
    72  
    73  type hcsExec struct {
    74  	events publisher
    75  	// tid is the task id of the container hosting this process.
    76  	//
    77  	// This MUST be treated as read only in the lifetime of the exec.
    78  	tid string
    79  	// host is the hosting VM for `c`. If `host==nil` this exec MUST be a
    80  	// process isolated WCOW exec.
    81  	//
    82  	// This MUST be treated as read only in the lifetime of the exec.
    83  	host *uvm.UtilityVM
    84  	// c is the hosting container for this exec.
    85  	//
    86  	// This MUST be treated as read only in the lifetime of the exec.
    87  	c cow.Container
    88  	// id is the id of this process.
    89  	//
    90  	// This MUST be treated as read only in the lifetime of the exec.
    91  	id string
    92  	// bundle is the on disk path to the folder containing the `process.json`
    93  	// describing this process. If `id==tid` the process is described in the
    94  	// `config.json`.
    95  	//
    96  	// This MUST be treated as read only in the lifetime of the exec.
    97  	bundle string
    98  	// isWCOW is set to `true` when this process is part of a Windows OCI spec.
    99  	//
   100  	// This MUST be treated as read only in the lifetime of the exec.
   101  	isWCOW bool
   102  	// spec is the OCI Process spec that was passed in at create time. This is
   103  	// stored because we don't actually create the process until the call to
   104  	// `Start`.
   105  	//
   106  	// This MUST be treated as read only in the lifetime of the exec.
   107  	spec *specs.Process
   108  	// io is the upstream io connections used for copying between the upstream
   109  	// io and the downstream io. The upstream IO MUST already be connected at
   110  	// create time in order to be valid.
   111  	//
   112  	// This MUST be treated as read only in the lifetime of the exec.
   113  	io              cmd.UpstreamIO
   114  	processDone     chan struct{}
   115  	processDoneOnce sync.Once
   116  
   117  	// sl is the state lock that MUST be held to safely read/write any of the
   118  	// following members.
   119  	sl         sync.Mutex
   120  	state      shimExecState
   121  	pid        int
   122  	exitStatus uint32
   123  	exitedAt   time.Time
   124  	p          *cmd.Cmd
   125  
   126  	// exited is a wait block which waits async for the process to exit.
   127  	exited     chan struct{}
   128  	exitedOnce sync.Once
   129  }
   130  
   131  func (he *hcsExec) ID() string {
   132  	return he.id
   133  }
   134  
   135  func (he *hcsExec) Pid() int {
   136  	he.sl.Lock()
   137  	defer he.sl.Unlock()
   138  	return he.pid
   139  }
   140  
   141  func (he *hcsExec) State() shimExecState {
   142  	he.sl.Lock()
   143  	defer he.sl.Unlock()
   144  	return he.state
   145  }
   146  
   147  func (he *hcsExec) Status() *task.StateResponse {
   148  	he.sl.Lock()
   149  	defer he.sl.Unlock()
   150  
   151  	var s containerd_v1_types.Status
   152  	switch he.state {
   153  	case shimExecStateCreated:
   154  		s = containerd_v1_types.StatusCreated
   155  	case shimExecStateRunning:
   156  		s = containerd_v1_types.StatusRunning
   157  	case shimExecStateExited:
   158  		s = containerd_v1_types.StatusStopped
   159  	}
   160  
   161  	return &task.StateResponse{
   162  		ID:         he.tid,
   163  		ExecID:     he.id,
   164  		Bundle:     he.bundle,
   165  		Pid:        uint32(he.pid),
   166  		Status:     s,
   167  		Stdin:      he.io.StdinPath(),
   168  		Stdout:     he.io.StdoutPath(),
   169  		Stderr:     he.io.StderrPath(),
   170  		Terminal:   he.io.Terminal(),
   171  		ExitStatus: he.exitStatus,
   172  		ExitedAt:   he.exitedAt,
   173  	}
   174  }
   175  
   176  func (he *hcsExec) startInternal(ctx context.Context, initializeContainer bool) (err error) {
   177  	he.sl.Lock()
   178  	defer he.sl.Unlock()
   179  	if he.state != shimExecStateCreated {
   180  		return newExecInvalidStateError(he.tid, he.id, he.state, "start")
   181  	}
   182  	defer func() {
   183  		if err != nil {
   184  			he.exitFromCreatedL(ctx, 1)
   185  		}
   186  	}()
   187  	if initializeContainer {
   188  		err = he.c.Start(ctx)
   189  		if err != nil {
   190  			return err
   191  		}
   192  		defer func() {
   193  			if err != nil {
   194  				_ = he.c.Terminate(ctx)
   195  				he.c.Close()
   196  			}
   197  		}()
   198  	}
   199  	cmd := &cmd.Cmd{
   200  		Host:   he.c,
   201  		Stdin:  he.io.Stdin(),
   202  		Stdout: he.io.Stdout(),
   203  		Stderr: he.io.Stderr(),
   204  		Log: log.G(ctx).WithFields(logrus.Fields{
   205  			"tid": he.tid,
   206  			"eid": he.id,
   207  		}),
   208  		CopyAfterExitTimeout: time.Second * 1,
   209  	}
   210  	if he.isWCOW || he.id != he.tid {
   211  		// An init exec passes the process as part of the config. We only pass
   212  		// the spec if this is a true exec.
   213  		cmd.Spec = he.spec
   214  	}
   215  	err = cmd.Start()
   216  	if err != nil {
   217  		return err
   218  	}
   219  	he.p = cmd
   220  
   221  	// Assign the PID and transition the state.
   222  	he.pid = he.p.Process.Pid()
   223  	he.state = shimExecStateRunning
   224  
   225  	// Publish the task/exec start event. This MUST happen before waitForExit to
   226  	// avoid publishing the exit previous to the start.
   227  	if he.id != he.tid {
   228  		if err := he.events.publishEvent(
   229  			ctx,
   230  			runtime.TaskExecStartedEventTopic,
   231  			&eventstypes.TaskExecStarted{
   232  				ContainerID: he.tid,
   233  				ExecID:      he.id,
   234  				Pid:         uint32(he.pid),
   235  			}); err != nil {
   236  			return err
   237  		}
   238  	} else {
   239  		if err := he.events.publishEvent(
   240  			ctx,
   241  			runtime.TaskStartEventTopic,
   242  			&eventstypes.TaskStart{
   243  				ContainerID: he.tid,
   244  				Pid:         uint32(he.pid),
   245  			}); err != nil {
   246  			return err
   247  		}
   248  	}
   249  
   250  	// wait in the background for the exit.
   251  	go he.waitForExit()
   252  	return nil
   253  }
   254  
   255  func (he *hcsExec) Start(ctx context.Context) (err error) {
   256  	// If he.id == he.tid then this is the init exec.
   257  	// We need to initialize the container itself before starting this exec.
   258  	return he.startInternal(ctx, he.id == he.tid)
   259  }
   260  
   261  func (he *hcsExec) Kill(ctx context.Context, signal uint32) error {
   262  	he.sl.Lock()
   263  	defer he.sl.Unlock()
   264  	switch he.state {
   265  	case shimExecStateCreated:
   266  		he.exitFromCreatedL(ctx, 1)
   267  		return nil
   268  	case shimExecStateRunning:
   269  		supported := false
   270  		if osversion.Build() >= osversion.RS5 {
   271  			supported = he.host == nil || he.host.SignalProcessSupported()
   272  		}
   273  		var options interface{}
   274  		var err error
   275  		if he.isWCOW {
   276  			var opt *guestresource.SignalProcessOptionsWCOW
   277  			opt, err = signals.ValidateWCOW(int(signal), supported)
   278  			if opt != nil {
   279  				options = opt
   280  			}
   281  		} else {
   282  			var opt *guestresource.SignalProcessOptionsLCOW
   283  			opt, err = signals.ValidateLCOW(int(signal), supported)
   284  			if opt != nil {
   285  				options = opt
   286  			}
   287  		}
   288  		if err != nil {
   289  			return errors.Wrapf(errdefs.ErrFailedPrecondition, "signal %d: %v", signal, err)
   290  		}
   291  		var delivered bool
   292  		if supported && options != nil {
   293  			if he.isWCOW {
   294  				// Servercore images block on signaling and wait until the target process
   295  				// is terminated to return to the caller. This causes issues when graceful
   296  				// termination of containers is requested (Bug36689012).
   297  				// To fix this, we deliver the signal to the target process in a separate background
   298  				// thread so that the caller can wait for the desired timeout before sending
   299  				// a SIGKILL to the process.
   300  				// TODO: We can get rid of these changes once the fix to support graceful termination is
   301  				// made in windows.
   302  				go func() {
   303  					signalDelivered, deliveryErr := he.p.Process.Signal(ctx, options)
   304  
   305  					if deliveryErr != nil {
   306  						if !hcs.IsAlreadyStopped(deliveryErr) {
   307  							// Process is not already stopped and there was a signal delivery error to this process
   308  							log.G(ctx).WithField("err", deliveryErr).Errorf("Error in delivering signal %d, to pid: %d", signal, he.pid)
   309  						}
   310  					}
   311  					if !signalDelivered {
   312  						log.G(ctx).Errorf("Error: NotFound; exec: '%s' in task: '%s' not found", he.id, he.tid)
   313  					}
   314  				}()
   315  				delivered, err = true, nil
   316  			} else {
   317  				delivered, err = he.p.Process.Signal(ctx, options)
   318  			}
   319  		} else {
   320  			// legacy path before signals support OR if WCOW with signals
   321  			// support needs to issue a terminate.
   322  			delivered, err = he.p.Process.Kill(ctx)
   323  		}
   324  		if err != nil {
   325  			if hcs.IsAlreadyStopped(err) {
   326  				// Desired state is actual state. No use in erroring out just because we couldn't kill
   327  				// an already dead process.
   328  				return nil
   329  			}
   330  			return err
   331  		}
   332  		if !delivered {
   333  			return errors.Wrapf(errdefs.ErrNotFound, "exec: '%s' in task: '%s' not found", he.id, he.tid)
   334  		}
   335  		return nil
   336  	case shimExecStateExited:
   337  		return errors.Wrapf(errdefs.ErrNotFound, "exec: '%s' in task: '%s' not found", he.id, he.tid)
   338  	default:
   339  		return newExecInvalidStateError(he.tid, he.id, he.state, "kill")
   340  	}
   341  }
   342  
   343  func (he *hcsExec) ResizePty(ctx context.Context, width, height uint32) error {
   344  	he.sl.Lock()
   345  	defer he.sl.Unlock()
   346  	if !he.io.Terminal() {
   347  		return errors.Wrapf(errdefs.ErrFailedPrecondition, "exec: '%s' in task: '%s' is not a tty", he.id, he.tid)
   348  	}
   349  
   350  	if he.state == shimExecStateRunning {
   351  		return he.p.Process.ResizeConsole(ctx, uint16(width), uint16(height))
   352  	}
   353  	return nil
   354  }
   355  
   356  func (he *hcsExec) CloseIO(ctx context.Context, stdin bool) error {
   357  	// If we have any upstream IO we close the upstream connection. This will
   358  	// unblock the `io.Copy` in the `Start()` call which will signal
   359  	// `he.p.CloseStdin()`. If `he.io.Stdin()` is already closed this is safe to
   360  	// call multiple times.
   361  	he.io.CloseStdin(ctx)
   362  	return nil
   363  }
   364  
   365  func (he *hcsExec) Wait() *task.StateResponse {
   366  	<-he.exited
   367  	return he.Status()
   368  }
   369  
   370  func (he *hcsExec) ForceExit(ctx context.Context, status int) {
   371  	he.sl.Lock()
   372  	defer he.sl.Unlock()
   373  	if he.state != shimExecStateExited {
   374  		switch he.state {
   375  		case shimExecStateCreated:
   376  			he.exitFromCreatedL(ctx, status)
   377  		case shimExecStateRunning:
   378  			// Kill the process to unblock `he.waitForExit`
   379  			_, _ = he.p.Process.Kill(ctx)
   380  		}
   381  	}
   382  }
   383  
   384  // exitFromCreatedL transitions the shim to the exited state from the created
   385  // state. It is the callers responsibility to hold `he.sl` for the durration of
   386  // this transition.
   387  //
   388  // This call is idempotent and will not affect any state if the shim is already
   389  // in the `shimExecStateExited` state.
   390  //
   391  // To transition for a created state the following must be done:
   392  //
   393  // 1. Issue `he.processDoneCancel` to unblock the goroutine
   394  // `he.waitForContainerExit()`.
   395  //
   396  // 2. Set `he.state`, `he.exitStatus` and `he.exitedAt` to the exited values.
   397  //
   398  // 3. Release any upstream IO resources that were never used in a copy.
   399  //
   400  // 4. Close `he.exited` channel to unblock any waiters who might have called
   401  // `Create`/`Wait`/`Start` which is a valid pattern.
   402  //
   403  // We DO NOT send the async `TaskExit` event because we never would have sent
   404  // the `TaskStart`/`TaskExecStarted` event.
   405  func (he *hcsExec) exitFromCreatedL(ctx context.Context, status int) {
   406  	if he.state != shimExecStateExited {
   407  		// Avoid logging the force if we already exited gracefully
   408  		log.G(ctx).WithField("status", status).Debug("hcsExec::exitFromCreatedL")
   409  
   410  		// Unblock the container exit goroutine
   411  		he.processDoneOnce.Do(func() { close(he.processDone) })
   412  		// Transition this exec
   413  		he.state = shimExecStateExited
   414  		he.exitStatus = uint32(status)
   415  		he.exitedAt = time.Now()
   416  		// Release all upstream IO connections (if any)
   417  		he.io.Close(ctx)
   418  		// Free any waiters
   419  		he.exitedOnce.Do(func() {
   420  			close(he.exited)
   421  		})
   422  	}
   423  }
   424  
   425  // waitForExit waits for the `he.p` to exit. This MUST only be called after a
   426  // successful call to `Create` and MUST not be called more than once.
   427  //
   428  // This MUST be called via a goroutine.
   429  //
   430  // In the case of an exit from a running process the following must be done:
   431  //
   432  // 1. Wait for `he.p` to exit.
   433  //
   434  // 2. Issue `he.processDoneCancel` to unblock the goroutine
   435  // `he.waitForContainerExit()` (if still running). We do this early to avoid the
   436  // container exit also attempting to kill the process. However this race
   437  // condition is safe and handled.
   438  //
   439  // 3. Capture the process exit code and set `he.state`, `he.exitStatus` and
   440  // `he.exitedAt` to the exited values.
   441  //
   442  // 4. Wait for all IO to complete and release any upstream IO connections.
   443  //
   444  // 5. Send the async `TaskExit` to upstream listeners of any events.
   445  //
   446  // 6. Close `he.exited` channel to unblock any waiters who might have called
   447  // `Create`/`Wait`/`Start` which is a valid pattern.
   448  func (he *hcsExec) waitForExit() {
   449  	var err error // this will only save the last error, since we dont return early on error
   450  	ctx, span := oc.StartSpan(context.Background(), "hcsExec::waitForExit")
   451  	defer span.End()
   452  	defer func() { oc.SetSpanStatus(span, err) }()
   453  	span.AddAttributes(
   454  		trace.StringAttribute("tid", he.tid),
   455  		trace.StringAttribute("eid", he.id))
   456  
   457  	err = he.p.Process.Wait()
   458  	if err != nil {
   459  		log.G(ctx).WithError(err).Error("failed process Wait")
   460  	}
   461  
   462  	// Issue the process cancellation to unblock the container wait as early as
   463  	// possible.
   464  	he.processDoneOnce.Do(func() { close(he.processDone) })
   465  
   466  	code, err := he.p.Process.ExitCode()
   467  	if err != nil {
   468  		log.G(ctx).WithError(err).Error("failed to get ExitCode")
   469  	} else {
   470  		log.G(ctx).WithField("exitCode", code).Debug("exited")
   471  	}
   472  
   473  	he.sl.Lock()
   474  	he.state = shimExecStateExited
   475  	he.exitStatus = uint32(code)
   476  	he.exitedAt = time.Now()
   477  	he.sl.Unlock()
   478  
   479  	// Wait for all IO copies to complete and free the resources.
   480  	_ = he.p.Wait()
   481  	he.io.Close(ctx)
   482  
   483  	// Only send the `runtime.TaskExitEventTopic` notification if this is a true
   484  	// exec. For the `init` exec this is handled in task teardown.
   485  	if he.tid != he.id {
   486  		// We had a valid process so send the exited notification.
   487  		if err := he.events.publishEvent(
   488  			ctx,
   489  			runtime.TaskExitEventTopic,
   490  			&eventstypes.TaskExit{
   491  				ContainerID: he.tid,
   492  				ID:          he.id,
   493  				Pid:         uint32(he.pid),
   494  				ExitStatus:  he.exitStatus,
   495  				ExitedAt:    he.exitedAt,
   496  			}); err != nil {
   497  			log.G(ctx).WithError(err).Error("failed to publish TaskExitEvent")
   498  		}
   499  	}
   500  
   501  	// Free any waiters.
   502  	he.exitedOnce.Do(func() {
   503  		close(he.exited)
   504  	})
   505  }
   506  
   507  // waitForContainerExit waits for `he.c` to exit. Depending on the exec's state
   508  // will forcibly transition this exec to the exited state and unblock any
   509  // waiters.
   510  //
   511  // This MUST be called via a goroutine at exec create.
   512  func (he *hcsExec) waitForContainerExit() {
   513  	ctx, span := oc.StartSpan(context.Background(), "hcsExec::waitForContainerExit")
   514  	defer span.End()
   515  	span.AddAttributes(
   516  		trace.StringAttribute("tid", he.tid),
   517  		trace.StringAttribute("eid", he.id))
   518  
   519  	// wait for container or process to exit and ckean up resrources
   520  	select {
   521  	case <-he.c.WaitChannel():
   522  		// Container exited first. We need to force the process into the exited
   523  		// state and cleanup any resources
   524  		he.sl.Lock()
   525  		switch he.state {
   526  		case shimExecStateCreated:
   527  			he.exitFromCreatedL(ctx, 1)
   528  		case shimExecStateRunning:
   529  			// Kill the process to unblock `he.waitForExit`.
   530  			_, _ = he.p.Process.Kill(ctx)
   531  		}
   532  		he.sl.Unlock()
   533  	case <-he.processDone:
   534  		// Process exited first. This is the normal case do nothing because
   535  		// `he.waitForExit` will release any waiters.
   536  	}
   537  }
   538  

View as plain text