...

Source file src/github.com/Microsoft/hcsshim/cmd/containerd-shim-runhcs-v1/service.go

Documentation: github.com/Microsoft/hcsshim/cmd/containerd-shim-runhcs-v1

     1  //go:build windows
     2  
     3  package main
     4  
     5  import (
     6  	"context"
     7  	"os"
     8  	"runtime"
     9  	"strings"
    10  	"sync"
    11  	"sync/atomic"
    12  	"time"
    13  
    14  	"github.com/Microsoft/hcsshim/internal/extendedtask"
    15  	"github.com/Microsoft/hcsshim/internal/oc"
    16  	"github.com/Microsoft/hcsshim/internal/shimdiag"
    17  	"github.com/containerd/containerd/errdefs"
    18  	"github.com/containerd/containerd/runtime/v2/task"
    19  	google_protobuf1 "github.com/gogo/protobuf/types"
    20  	"go.opencensus.io/trace"
    21  )
    22  
    23  type ServiceOptions struct {
    24  	Events    publisher
    25  	TID       string
    26  	IsSandbox bool
    27  }
    28  
    29  type ServiceOption func(*ServiceOptions)
    30  
    31  func WithEventPublisher(e publisher) ServiceOption {
    32  	return func(o *ServiceOptions) {
    33  		o.Events = e
    34  	}
    35  }
    36  func WithTID(tid string) ServiceOption {
    37  	return func(o *ServiceOptions) {
    38  		o.TID = tid
    39  	}
    40  }
    41  func WithIsSandbox(s bool) ServiceOption {
    42  	return func(o *ServiceOptions) {
    43  		o.IsSandbox = s
    44  	}
    45  }
    46  
    47  type service struct {
    48  	events publisher
    49  	// tid is the original task id to be served. This can either be a single
    50  	// task or represent the POD sandbox task id. The first call to Create MUST
    51  	// match this id or the shim is considered to be invalid.
    52  	//
    53  	// This MUST be treated as readonly for the lifetime of the shim.
    54  	tid string
    55  	// isSandbox specifies if `tid` is a POD sandbox. If `false` the shim will
    56  	// reject all calls to `Create` where `tid` does not match. If `true`
    57  	// multiple calls to `Create` are allowed as long as the workload containers
    58  	// all have the same parent task id.
    59  	//
    60  	// This MUST be treated as readonly for the lifetime of the shim.
    61  	isSandbox bool
    62  
    63  	// taskOrPod is either the `pod` this shim is tracking if `isSandbox ==
    64  	// true` or it is the `task` this shim is tracking. If no call to `Create`
    65  	// has taken place yet `taskOrPod.Load()` MUST return `nil`.
    66  	taskOrPod atomic.Value
    67  
    68  	// cl is the create lock. Since each shim MUST only track a single task or
    69  	// POD. `cl` is used to create the task or POD sandbox. It SHOULD NOT be
    70  	// taken when creating tasks in a POD sandbox as they can happen
    71  	// concurrently.
    72  	cl sync.Mutex
    73  
    74  	// shutdown is closed to signal a shutdown request is received
    75  	shutdown chan struct{}
    76  	// shutdownOnce is responsible for closing `shutdown` and any other necessary cleanup
    77  	shutdownOnce sync.Once
    78  	// gracefulShutdown dictates whether to shutdown gracefully and clean up resources
    79  	// or exit immediately
    80  	gracefulShutdown bool
    81  }
    82  
    83  var _ = (task.TaskService)(&service{})
    84  
    85  func NewService(o ...ServiceOption) (svc *service, err error) {
    86  	var opts ServiceOptions
    87  	for _, op := range o {
    88  		op(&opts)
    89  	}
    90  
    91  	svc = &service{
    92  		events:    opts.Events,
    93  		tid:       opts.TID,
    94  		isSandbox: opts.IsSandbox,
    95  		shutdown:  make(chan struct{}),
    96  	}
    97  	return svc, nil
    98  }
    99  
   100  func (s *service) State(ctx context.Context, req *task.StateRequest) (resp *task.StateResponse, err error) {
   101  	ctx, span := oc.StartSpan(ctx, "State")
   102  	defer span.End()
   103  	defer func() {
   104  		if resp != nil {
   105  			span.AddAttributes(
   106  				trace.StringAttribute("status", resp.Status.String()),
   107  				trace.Int64Attribute("exitStatus", int64(resp.ExitStatus)),
   108  				trace.StringAttribute("exitedAt", resp.ExitedAt.String()))
   109  		}
   110  		oc.SetSpanStatus(span, err)
   111  	}()
   112  
   113  	span.AddAttributes(
   114  		trace.StringAttribute("tid", req.ID),
   115  		trace.StringAttribute("eid", req.ExecID))
   116  
   117  	if s.isSandbox {
   118  		span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
   119  	}
   120  
   121  	r, e := s.stateInternal(ctx, req)
   122  	return r, errdefs.ToGRPC(e)
   123  }
   124  
   125  func (s *service) Create(ctx context.Context, req *task.CreateTaskRequest) (resp *task.CreateTaskResponse, err error) {
   126  	ctx, span := oc.StartSpan(ctx, "Create")
   127  	defer span.End()
   128  	defer func() {
   129  		if resp != nil {
   130  			span.AddAttributes(trace.Int64Attribute("pid", int64(resp.Pid)))
   131  		}
   132  		oc.SetSpanStatus(span, err)
   133  	}()
   134  
   135  	span.AddAttributes(
   136  		trace.StringAttribute("tid", req.ID),
   137  		trace.StringAttribute("bundle", req.Bundle),
   138  		// trace.StringAttribute("rootfs", req.Rootfs), TODO: JTERRY75 -
   139  		// OpenCensus doesnt support slice like our logrus hook
   140  		trace.BoolAttribute("terminal", req.Terminal),
   141  		trace.StringAttribute("stdin", req.Stdin),
   142  		trace.StringAttribute("stdout", req.Stdout),
   143  		trace.StringAttribute("stderr", req.Stderr),
   144  		trace.StringAttribute("checkpoint", req.Checkpoint),
   145  		trace.StringAttribute("parentcheckpoint", req.ParentCheckpoint))
   146  
   147  	if s.isSandbox {
   148  		span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
   149  	}
   150  
   151  	r, e := s.createInternal(ctx, req)
   152  	return r, errdefs.ToGRPC(e)
   153  }
   154  
   155  func (s *service) Start(ctx context.Context, req *task.StartRequest) (resp *task.StartResponse, err error) {
   156  	ctx, span := oc.StartSpan(ctx, "Start")
   157  	defer span.End()
   158  	defer func() {
   159  		if resp != nil {
   160  			span.AddAttributes(trace.Int64Attribute("pid", int64(resp.Pid)))
   161  		}
   162  		oc.SetSpanStatus(span, err)
   163  	}()
   164  
   165  	span.AddAttributes(
   166  		trace.StringAttribute("tid", req.ID),
   167  		trace.StringAttribute("eid", req.ExecID))
   168  
   169  	if s.isSandbox {
   170  		span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
   171  	}
   172  
   173  	r, e := s.startInternal(ctx, req)
   174  	return r, errdefs.ToGRPC(e)
   175  }
   176  
   177  func (s *service) Delete(ctx context.Context, req *task.DeleteRequest) (resp *task.DeleteResponse, err error) {
   178  	ctx, span := oc.StartSpan(ctx, "Delete")
   179  	defer span.End()
   180  	defer func() {
   181  		if resp != nil {
   182  			span.AddAttributes(
   183  				trace.Int64Attribute("pid", int64(resp.Pid)),
   184  				trace.Int64Attribute("exitStatus", int64(resp.ExitStatus)),
   185  				trace.StringAttribute("exitedAt", resp.ExitedAt.String()))
   186  		}
   187  		oc.SetSpanStatus(span, err)
   188  	}()
   189  
   190  	span.AddAttributes(
   191  		trace.StringAttribute("tid", req.ID),
   192  		trace.StringAttribute("eid", req.ExecID))
   193  
   194  	if s.isSandbox {
   195  		span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
   196  	}
   197  
   198  	r, e := s.deleteInternal(ctx, req)
   199  	return r, errdefs.ToGRPC(e)
   200  }
   201  
   202  func (s *service) Pids(ctx context.Context, req *task.PidsRequest) (_ *task.PidsResponse, err error) {
   203  	ctx, span := oc.StartSpan(ctx, "Pids")
   204  	defer span.End()
   205  	defer func() { oc.SetSpanStatus(span, err) }()
   206  
   207  	span.AddAttributes(trace.StringAttribute("tid", req.ID))
   208  
   209  	if s.isSandbox {
   210  		span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
   211  	}
   212  
   213  	r, e := s.pidsInternal(ctx, req)
   214  	return r, errdefs.ToGRPC(e)
   215  }
   216  
   217  func (s *service) Pause(ctx context.Context, req *task.PauseRequest) (_ *google_protobuf1.Empty, err error) {
   218  	ctx, span := oc.StartSpan(ctx, "Pause")
   219  	defer span.End()
   220  	defer func() { oc.SetSpanStatus(span, err) }()
   221  
   222  	span.AddAttributes(trace.StringAttribute("tid", req.ID))
   223  
   224  	if s.isSandbox {
   225  		span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
   226  	}
   227  
   228  	r, e := s.pauseInternal(ctx, req)
   229  	return r, errdefs.ToGRPC(e)
   230  }
   231  
   232  func (s *service) Resume(ctx context.Context, req *task.ResumeRequest) (_ *google_protobuf1.Empty, err error) {
   233  	ctx, span := oc.StartSpan(ctx, "Resume")
   234  	defer span.End()
   235  	defer func() { oc.SetSpanStatus(span, err) }()
   236  
   237  	span.AddAttributes(trace.StringAttribute("tid", req.ID))
   238  
   239  	if s.isSandbox {
   240  		span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
   241  	}
   242  
   243  	r, e := s.resumeInternal(ctx, req)
   244  	return r, errdefs.ToGRPC(e)
   245  }
   246  
   247  func (s *service) Checkpoint(ctx context.Context, req *task.CheckpointTaskRequest) (_ *google_protobuf1.Empty, err error) {
   248  	ctx, span := oc.StartSpan(ctx, "Checkpoint")
   249  	defer span.End()
   250  	defer func() { oc.SetSpanStatus(span, err) }()
   251  
   252  	span.AddAttributes(
   253  		trace.StringAttribute("tid", req.ID),
   254  		trace.StringAttribute("path", req.Path))
   255  
   256  	if s.isSandbox {
   257  		span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
   258  	}
   259  
   260  	r, e := s.checkpointInternal(ctx, req)
   261  	return r, errdefs.ToGRPC(e)
   262  }
   263  
   264  func (s *service) Kill(ctx context.Context, req *task.KillRequest) (_ *google_protobuf1.Empty, err error) {
   265  	ctx, span := oc.StartSpan(ctx, "Kill")
   266  	defer span.End()
   267  	defer func() { oc.SetSpanStatus(span, err) }()
   268  
   269  	span.AddAttributes(
   270  		trace.StringAttribute("tid", req.ID),
   271  		trace.StringAttribute("eid", req.ExecID),
   272  		trace.Int64Attribute("signal", int64(req.Signal)),
   273  		trace.BoolAttribute("all", req.All))
   274  
   275  	if s.isSandbox {
   276  		span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
   277  	}
   278  
   279  	r, e := s.killInternal(ctx, req)
   280  	return r, errdefs.ToGRPC(e)
   281  }
   282  
   283  func (s *service) Exec(ctx context.Context, req *task.ExecProcessRequest) (_ *google_protobuf1.Empty, err error) {
   284  	ctx, span := oc.StartSpan(ctx, "Exec")
   285  	defer span.End()
   286  	defer func() { oc.SetSpanStatus(span, err) }()
   287  
   288  	span.AddAttributes(
   289  		trace.StringAttribute("tid", req.ID),
   290  		trace.StringAttribute("eid", req.ExecID),
   291  		trace.BoolAttribute("terminal", req.Terminal),
   292  		trace.StringAttribute("stdin", req.Stdin),
   293  		trace.StringAttribute("stdout", req.Stdout),
   294  		trace.StringAttribute("stderr", req.Stderr))
   295  
   296  	if s.isSandbox {
   297  		span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
   298  	}
   299  
   300  	r, e := s.execInternal(ctx, req)
   301  	return r, errdefs.ToGRPC(e)
   302  }
   303  
   304  func (s *service) DiagExecInHost(ctx context.Context, req *shimdiag.ExecProcessRequest) (_ *shimdiag.ExecProcessResponse, err error) {
   305  	ctx, span := oc.StartSpan(ctx, "DiagExecInHost")
   306  	defer span.End()
   307  	defer func() { oc.SetSpanStatus(span, err) }()
   308  
   309  	span.AddAttributes(
   310  		trace.StringAttribute("args", strings.Join(req.Args, " ")),
   311  		trace.StringAttribute("workdir", req.Workdir),
   312  		trace.BoolAttribute("terminal", req.Terminal),
   313  		trace.StringAttribute("stdin", req.Stdin),
   314  		trace.StringAttribute("stdout", req.Stdout),
   315  		trace.StringAttribute("stderr", req.Stderr))
   316  
   317  	if s.isSandbox {
   318  		span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
   319  	}
   320  
   321  	r, e := s.diagExecInHostInternal(ctx, req)
   322  	return r, errdefs.ToGRPC(e)
   323  }
   324  
   325  func (s *service) DiagShare(ctx context.Context, req *shimdiag.ShareRequest) (_ *shimdiag.ShareResponse, err error) {
   326  	ctx, span := oc.StartSpan(ctx, "DiagShare")
   327  	defer span.End()
   328  	defer func() { oc.SetSpanStatus(span, err) }()
   329  
   330  	span.AddAttributes(
   331  		trace.StringAttribute("hostpath", req.HostPath),
   332  		trace.StringAttribute("uvmpath", req.UvmPath),
   333  		trace.BoolAttribute("readonly", req.ReadOnly))
   334  
   335  	if s.isSandbox {
   336  		span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
   337  	}
   338  
   339  	r, e := s.diagShareInternal(ctx, req)
   340  	return r, errdefs.ToGRPC(e)
   341  }
   342  
   343  func (s *service) DiagTasks(ctx context.Context, req *shimdiag.TasksRequest) (_ *shimdiag.TasksResponse, err error) {
   344  	ctx, span := oc.StartSpan(ctx, "DiagTasks")
   345  	defer span.End()
   346  	defer func() { oc.SetSpanStatus(span, err) }()
   347  
   348  	span.AddAttributes(
   349  		trace.BoolAttribute("execs", req.Execs))
   350  
   351  	if s.isSandbox {
   352  		span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
   353  	}
   354  
   355  	r, e := s.diagTasksInternal(ctx, req)
   356  	return r, errdefs.ToGRPC(e)
   357  }
   358  
   359  func (s *service) ResizePty(ctx context.Context, req *task.ResizePtyRequest) (_ *google_protobuf1.Empty, err error) {
   360  	ctx, span := oc.StartSpan(ctx, "ResizePty")
   361  	defer span.End()
   362  	defer func() { oc.SetSpanStatus(span, err) }()
   363  
   364  	span.AddAttributes(
   365  		trace.StringAttribute("tid", req.ID),
   366  		trace.StringAttribute("eid", req.ExecID),
   367  		trace.Int64Attribute("width", int64(req.Width)),
   368  		trace.Int64Attribute("height", int64(req.Height)))
   369  
   370  	if s.isSandbox {
   371  		span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
   372  	}
   373  
   374  	r, e := s.resizePtyInternal(ctx, req)
   375  	return r, errdefs.ToGRPC(e)
   376  }
   377  
   378  func (s *service) CloseIO(ctx context.Context, req *task.CloseIORequest) (_ *google_protobuf1.Empty, err error) {
   379  	ctx, span := oc.StartSpan(ctx, "CloseIO")
   380  	defer span.End()
   381  	defer func() { oc.SetSpanStatus(span, err) }()
   382  
   383  	span.AddAttributes(
   384  		trace.StringAttribute("tid", req.ID),
   385  		trace.StringAttribute("eid", req.ExecID),
   386  		trace.BoolAttribute("stdin", req.Stdin))
   387  
   388  	if s.isSandbox {
   389  		span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
   390  	}
   391  
   392  	r, e := s.closeIOInternal(ctx, req)
   393  	return r, errdefs.ToGRPC(e)
   394  }
   395  
   396  func (s *service) Update(ctx context.Context, req *task.UpdateTaskRequest) (_ *google_protobuf1.Empty, err error) {
   397  	ctx, span := oc.StartSpan(ctx, "Update")
   398  	defer span.End()
   399  	defer func() { oc.SetSpanStatus(span, err) }()
   400  
   401  	span.AddAttributes(trace.StringAttribute("tid", req.ID))
   402  
   403  	if s.isSandbox {
   404  		span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
   405  	}
   406  
   407  	r, e := s.updateInternal(ctx, req)
   408  	return r, errdefs.ToGRPC(e)
   409  }
   410  
   411  func (s *service) Wait(ctx context.Context, req *task.WaitRequest) (resp *task.WaitResponse, err error) {
   412  	ctx, span := oc.StartSpan(ctx, "Wait")
   413  	defer span.End()
   414  	defer func() {
   415  		if resp != nil {
   416  			span.AddAttributes(
   417  				trace.Int64Attribute("exitStatus", int64(resp.ExitStatus)),
   418  				trace.StringAttribute("exitedAt", resp.ExitedAt.String()))
   419  		}
   420  		oc.SetSpanStatus(span, err)
   421  	}()
   422  
   423  	span.AddAttributes(
   424  		trace.StringAttribute("tid", req.ID),
   425  		trace.StringAttribute("eid", req.ExecID))
   426  
   427  	if s.isSandbox {
   428  		span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
   429  	}
   430  
   431  	r, e := s.waitInternal(ctx, req)
   432  	return r, errdefs.ToGRPC(e)
   433  }
   434  
   435  func (s *service) Stats(ctx context.Context, req *task.StatsRequest) (_ *task.StatsResponse, err error) {
   436  	ctx, span := oc.StartSpan(ctx, "Stats")
   437  	defer span.End()
   438  	defer func() { oc.SetSpanStatus(span, err) }()
   439  
   440  	span.AddAttributes(trace.StringAttribute("tid", req.ID))
   441  
   442  	if s.isSandbox {
   443  		span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
   444  	}
   445  
   446  	r, e := s.statsInternal(ctx, req)
   447  	return r, errdefs.ToGRPC(e)
   448  }
   449  
   450  func (s *service) Connect(ctx context.Context, req *task.ConnectRequest) (resp *task.ConnectResponse, err error) {
   451  	ctx, span := oc.StartSpan(ctx, "Connect")
   452  	defer span.End()
   453  	defer func() {
   454  		if resp != nil {
   455  			span.AddAttributes(
   456  				trace.Int64Attribute("shimPid", int64(resp.ShimPid)),
   457  				trace.Int64Attribute("taskPid", int64(resp.TaskPid)),
   458  				trace.StringAttribute("version", resp.Version))
   459  		}
   460  		oc.SetSpanStatus(span, err)
   461  	}()
   462  
   463  	span.AddAttributes(trace.StringAttribute("tid", req.ID))
   464  
   465  	if s.isSandbox {
   466  		span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
   467  	}
   468  
   469  	r, e := s.connectInternal(ctx, req)
   470  	return r, errdefs.ToGRPC(e)
   471  }
   472  
   473  func (s *service) Shutdown(ctx context.Context, req *task.ShutdownRequest) (_ *google_protobuf1.Empty, err error) {
   474  	ctx, span := oc.StartSpan(ctx, "Shutdown")
   475  	defer span.End()
   476  	defer func() { oc.SetSpanStatus(span, err) }()
   477  
   478  	span.AddAttributes(trace.StringAttribute("tid", req.ID))
   479  
   480  	if s.isSandbox {
   481  		span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
   482  	}
   483  
   484  	r, e := s.shutdownInternal(ctx, req)
   485  	return r, errdefs.ToGRPC(e)
   486  }
   487  
   488  func (s *service) DiagStacks(ctx context.Context, req *shimdiag.StacksRequest) (*shimdiag.StacksResponse, error) {
   489  	if s == nil {
   490  		return nil, nil
   491  	}
   492  	ctx, span := oc.StartSpan(ctx, "DiagStacks")
   493  	defer span.End()
   494  
   495  	span.AddAttributes(trace.StringAttribute("tid", s.tid))
   496  
   497  	if s.isSandbox {
   498  		span.AddAttributes(trace.StringAttribute("pod-id", s.tid))
   499  	}
   500  
   501  	buf := make([]byte, 4096)
   502  	for {
   503  		buf = buf[:runtime.Stack(buf, true)]
   504  		if len(buf) < cap(buf) {
   505  			break
   506  		}
   507  		buf = make([]byte, 2*len(buf))
   508  	}
   509  	resp := &shimdiag.StacksResponse{Stacks: string(buf)}
   510  
   511  	t, _ := s.getTask(s.tid)
   512  	if t != nil {
   513  		ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
   514  		defer cancel()
   515  		resp.GuestStacks = t.DumpGuestStacks(ctx)
   516  	}
   517  	return resp, nil
   518  }
   519  
   520  func (s *service) DiagPid(ctx context.Context, req *shimdiag.PidRequest) (*shimdiag.PidResponse, error) {
   521  	if s == nil {
   522  		return nil, nil
   523  	}
   524  	ctx, span := oc.StartSpan(ctx, "DiagPid") //nolint:ineffassign,staticcheck
   525  	defer span.End()
   526  
   527  	span.AddAttributes(trace.StringAttribute("tid", s.tid))
   528  
   529  	return &shimdiag.PidResponse{
   530  		Pid: int32(os.Getpid()),
   531  	}, nil
   532  }
   533  
   534  func (s *service) ComputeProcessorInfo(ctx context.Context, req *extendedtask.ComputeProcessorInfoRequest) (*extendedtask.ComputeProcessorInfoResponse, error) {
   535  	ctx, span := oc.StartSpan(ctx, "ComputeProcessorInfo")
   536  	defer span.End()
   537  
   538  	span.AddAttributes(trace.StringAttribute("tid", s.tid))
   539  
   540  	r, e := s.computeProcessorInfoInternal(ctx, req)
   541  	return r, errdefs.ToGRPC(e)
   542  }
   543  
   544  func (s *service) Done() <-chan struct{} {
   545  	return s.shutdown
   546  }
   547  
   548  func (s *service) IsShutdown() bool {
   549  	select {
   550  	case <-s.shutdown:
   551  		return true
   552  	default:
   553  		return false
   554  	}
   555  }
   556  

View as plain text