...

Source file src/github.com/Microsoft/hcsshim/internal/hcs/system.go

Documentation: github.com/Microsoft/hcsshim/internal/hcs

     1  //go:build windows
     2  
     3  package hcs
     4  
     5  import (
     6  	"context"
     7  	"encoding/json"
     8  	"errors"
     9  	"fmt"
    10  	"strings"
    11  	"sync"
    12  	"syscall"
    13  	"time"
    14  
    15  	"github.com/Microsoft/hcsshim/internal/cow"
    16  	"github.com/Microsoft/hcsshim/internal/hcs/schema1"
    17  	hcsschema "github.com/Microsoft/hcsshim/internal/hcs/schema2"
    18  	"github.com/Microsoft/hcsshim/internal/jobobject"
    19  	"github.com/Microsoft/hcsshim/internal/log"
    20  	"github.com/Microsoft/hcsshim/internal/logfields"
    21  	"github.com/Microsoft/hcsshim/internal/oc"
    22  	"github.com/Microsoft/hcsshim/internal/timeout"
    23  	"github.com/Microsoft/hcsshim/internal/vmcompute"
    24  	"github.com/sirupsen/logrus"
    25  	"go.opencensus.io/trace"
    26  )
    27  
    28  type System struct {
    29  	handleLock     sync.RWMutex
    30  	handle         vmcompute.HcsSystem
    31  	id             string
    32  	callbackNumber uintptr
    33  
    34  	closedWaitOnce sync.Once
    35  	waitBlock      chan struct{}
    36  	waitError      error
    37  	exitError      error
    38  	os, typ, owner string
    39  	startTime      time.Time
    40  }
    41  
    42  var _ cow.Container = &System{}
    43  var _ cow.ProcessHost = &System{}
    44  
    45  func newSystem(id string) *System {
    46  	return &System{
    47  		id:        id,
    48  		waitBlock: make(chan struct{}),
    49  	}
    50  }
    51  
    52  // Implementation detail for silo naming, this should NOT be relied upon very heavily.
    53  func siloNameFmt(containerID string) string {
    54  	return fmt.Sprintf(`\Container_%s`, containerID)
    55  }
    56  
    57  // CreateComputeSystem creates a new compute system with the given configuration but does not start it.
    58  func CreateComputeSystem(ctx context.Context, id string, hcsDocumentInterface interface{}) (_ *System, err error) {
    59  	operation := "hcs::CreateComputeSystem"
    60  
    61  	// hcsCreateComputeSystemContext is an async operation. Start the outer span
    62  	// here to measure the full create time.
    63  	ctx, span := oc.StartSpan(ctx, operation)
    64  	defer span.End()
    65  	defer func() { oc.SetSpanStatus(span, err) }()
    66  	span.AddAttributes(trace.StringAttribute("cid", id))
    67  
    68  	computeSystem := newSystem(id)
    69  
    70  	hcsDocumentB, err := json.Marshal(hcsDocumentInterface)
    71  	if err != nil {
    72  		return nil, err
    73  	}
    74  
    75  	hcsDocument := string(hcsDocumentB)
    76  
    77  	var (
    78  		identity    syscall.Handle
    79  		resultJSON  string
    80  		createError error
    81  	)
    82  	computeSystem.handle, resultJSON, createError = vmcompute.HcsCreateComputeSystem(ctx, id, hcsDocument, identity)
    83  	if createError == nil || IsPending(createError) {
    84  		defer func() {
    85  			if err != nil {
    86  				computeSystem.Close()
    87  			}
    88  		}()
    89  		if err = computeSystem.registerCallback(ctx); err != nil {
    90  			// Terminate the compute system if it still exists. We're okay to
    91  			// ignore a failure here.
    92  			_ = computeSystem.Terminate(ctx)
    93  			return nil, makeSystemError(computeSystem, operation, err, nil)
    94  		}
    95  	}
    96  
    97  	events, err := processAsyncHcsResult(ctx, createError, resultJSON, computeSystem.callbackNumber,
    98  		hcsNotificationSystemCreateCompleted, &timeout.SystemCreate)
    99  	if err != nil {
   100  		if err == ErrTimeout {
   101  			// Terminate the compute system if it still exists. We're okay to
   102  			// ignore a failure here.
   103  			_ = computeSystem.Terminate(ctx)
   104  		}
   105  		return nil, makeSystemError(computeSystem, operation, err, events)
   106  	}
   107  	go computeSystem.waitBackground()
   108  	if err = computeSystem.getCachedProperties(ctx); err != nil {
   109  		return nil, err
   110  	}
   111  	return computeSystem, nil
   112  }
   113  
   114  // OpenComputeSystem opens an existing compute system by ID.
   115  func OpenComputeSystem(ctx context.Context, id string) (*System, error) {
   116  	operation := "hcs::OpenComputeSystem"
   117  
   118  	computeSystem := newSystem(id)
   119  	handle, resultJSON, err := vmcompute.HcsOpenComputeSystem(ctx, id)
   120  	events := processHcsResult(ctx, resultJSON)
   121  	if err != nil {
   122  		return nil, makeSystemError(computeSystem, operation, err, events)
   123  	}
   124  	computeSystem.handle = handle
   125  	defer func() {
   126  		if err != nil {
   127  			computeSystem.Close()
   128  		}
   129  	}()
   130  	if err = computeSystem.registerCallback(ctx); err != nil {
   131  		return nil, makeSystemError(computeSystem, operation, err, nil)
   132  	}
   133  	go computeSystem.waitBackground()
   134  	if err = computeSystem.getCachedProperties(ctx); err != nil {
   135  		return nil, err
   136  	}
   137  	return computeSystem, nil
   138  }
   139  
   140  func (computeSystem *System) getCachedProperties(ctx context.Context) error {
   141  	props, err := computeSystem.Properties(ctx)
   142  	if err != nil {
   143  		return err
   144  	}
   145  	computeSystem.typ = strings.ToLower(props.SystemType)
   146  	computeSystem.os = strings.ToLower(props.RuntimeOSType)
   147  	computeSystem.owner = strings.ToLower(props.Owner)
   148  	if computeSystem.os == "" && computeSystem.typ == "container" {
   149  		// Pre-RS5 HCS did not return the OS, but it only supported containers
   150  		// that ran Windows.
   151  		computeSystem.os = "windows"
   152  	}
   153  	return nil
   154  }
   155  
   156  // OS returns the operating system of the compute system, "linux" or "windows".
   157  func (computeSystem *System) OS() string {
   158  	return computeSystem.os
   159  }
   160  
   161  // IsOCI returns whether processes in the compute system should be created via
   162  // OCI.
   163  func (computeSystem *System) IsOCI() bool {
   164  	return computeSystem.os == "linux" && computeSystem.typ == "container"
   165  }
   166  
   167  // GetComputeSystems gets a list of the compute systems on the system that match the query
   168  func GetComputeSystems(ctx context.Context, q schema1.ComputeSystemQuery) ([]schema1.ContainerProperties, error) {
   169  	operation := "hcs::GetComputeSystems"
   170  
   171  	queryb, err := json.Marshal(q)
   172  	if err != nil {
   173  		return nil, err
   174  	}
   175  
   176  	computeSystemsJSON, resultJSON, err := vmcompute.HcsEnumerateComputeSystems(ctx, string(queryb))
   177  	events := processHcsResult(ctx, resultJSON)
   178  	if err != nil {
   179  		return nil, &HcsError{Op: operation, Err: err, Events: events}
   180  	}
   181  
   182  	if computeSystemsJSON == "" {
   183  		return nil, ErrUnexpectedValue
   184  	}
   185  	computeSystems := []schema1.ContainerProperties{}
   186  	if err = json.Unmarshal([]byte(computeSystemsJSON), &computeSystems); err != nil {
   187  		return nil, err
   188  	}
   189  
   190  	return computeSystems, nil
   191  }
   192  
   193  // Start synchronously starts the computeSystem.
   194  func (computeSystem *System) Start(ctx context.Context) (err error) {
   195  	operation := "hcs::System::Start"
   196  
   197  	// hcsStartComputeSystemContext is an async operation. Start the outer span
   198  	// here to measure the full start time.
   199  	ctx, span := oc.StartSpan(ctx, operation)
   200  	defer span.End()
   201  	defer func() { oc.SetSpanStatus(span, err) }()
   202  	span.AddAttributes(trace.StringAttribute("cid", computeSystem.id))
   203  
   204  	computeSystem.handleLock.RLock()
   205  	defer computeSystem.handleLock.RUnlock()
   206  
   207  	// prevent starting an exited system because waitblock we do not recreate waitBlock
   208  	// or rerun waitBackground, so we have no way to be notified of it closing again
   209  	if computeSystem.handle == 0 {
   210  		return makeSystemError(computeSystem, operation, ErrAlreadyClosed, nil)
   211  	}
   212  
   213  	resultJSON, err := vmcompute.HcsStartComputeSystem(ctx, computeSystem.handle, "")
   214  	events, err := processAsyncHcsResult(ctx, err, resultJSON, computeSystem.callbackNumber,
   215  		hcsNotificationSystemStartCompleted, &timeout.SystemStart)
   216  	if err != nil {
   217  		return makeSystemError(computeSystem, operation, err, events)
   218  	}
   219  	computeSystem.startTime = time.Now()
   220  	return nil
   221  }
   222  
   223  // ID returns the compute system's identifier.
   224  func (computeSystem *System) ID() string {
   225  	return computeSystem.id
   226  }
   227  
   228  // Shutdown requests a compute system shutdown.
   229  func (computeSystem *System) Shutdown(ctx context.Context) error {
   230  	computeSystem.handleLock.RLock()
   231  	defer computeSystem.handleLock.RUnlock()
   232  
   233  	operation := "hcs::System::Shutdown"
   234  
   235  	if computeSystem.handle == 0 || computeSystem.stopped() {
   236  		return nil
   237  	}
   238  
   239  	resultJSON, err := vmcompute.HcsShutdownComputeSystem(ctx, computeSystem.handle, "")
   240  	events := processHcsResult(ctx, resultJSON)
   241  	switch err {
   242  	case nil, ErrVmcomputeAlreadyStopped, ErrComputeSystemDoesNotExist, ErrVmcomputeOperationPending:
   243  	default:
   244  		return makeSystemError(computeSystem, operation, err, events)
   245  	}
   246  	return nil
   247  }
   248  
   249  // Terminate requests a compute system terminate.
   250  func (computeSystem *System) Terminate(ctx context.Context) error {
   251  	computeSystem.handleLock.RLock()
   252  	defer computeSystem.handleLock.RUnlock()
   253  
   254  	operation := "hcs::System::Terminate"
   255  
   256  	if computeSystem.handle == 0 || computeSystem.stopped() {
   257  		return nil
   258  	}
   259  
   260  	resultJSON, err := vmcompute.HcsTerminateComputeSystem(ctx, computeSystem.handle, "")
   261  	events := processHcsResult(ctx, resultJSON)
   262  	switch err {
   263  	case nil, ErrVmcomputeAlreadyStopped, ErrComputeSystemDoesNotExist, ErrVmcomputeOperationPending:
   264  	default:
   265  		return makeSystemError(computeSystem, operation, err, events)
   266  	}
   267  	return nil
   268  }
   269  
   270  // waitBackground waits for the compute system exit notification. Once received
   271  // sets `computeSystem.waitError` (if any) and unblocks all `Wait` calls.
   272  //
   273  // This MUST be called exactly once per `computeSystem.handle` but `Wait` is
   274  // safe to call multiple times.
   275  func (computeSystem *System) waitBackground() {
   276  	operation := "hcs::System::waitBackground"
   277  	ctx, span := oc.StartSpan(context.Background(), operation)
   278  	defer span.End()
   279  	span.AddAttributes(trace.StringAttribute("cid", computeSystem.id))
   280  
   281  	err := waitForNotification(ctx, computeSystem.callbackNumber, hcsNotificationSystemExited, nil)
   282  	switch err {
   283  	case nil:
   284  		log.G(ctx).Debug("system exited")
   285  	case ErrVmcomputeUnexpectedExit:
   286  		log.G(ctx).Debug("unexpected system exit")
   287  		computeSystem.exitError = makeSystemError(computeSystem, operation, err, nil)
   288  		err = nil
   289  	default:
   290  		err = makeSystemError(computeSystem, operation, err, nil)
   291  	}
   292  	computeSystem.closedWaitOnce.Do(func() {
   293  		computeSystem.waitError = err
   294  		close(computeSystem.waitBlock)
   295  	})
   296  	oc.SetSpanStatus(span, err)
   297  }
   298  
   299  func (computeSystem *System) WaitChannel() <-chan struct{} {
   300  	return computeSystem.waitBlock
   301  }
   302  
   303  func (computeSystem *System) WaitError() error {
   304  	return computeSystem.waitError
   305  }
   306  
   307  // Wait synchronously waits for the compute system to shutdown or terminate. If
   308  // the compute system has already exited returns the previous error (if any).
   309  func (computeSystem *System) Wait() error {
   310  	<-computeSystem.WaitChannel()
   311  	return computeSystem.WaitError()
   312  }
   313  
   314  // stopped returns true if the compute system stopped.
   315  func (computeSystem *System) stopped() bool {
   316  	select {
   317  	case <-computeSystem.waitBlock:
   318  		return true
   319  	default:
   320  	}
   321  	return false
   322  }
   323  
   324  // ExitError returns an error describing the reason the compute system terminated.
   325  func (computeSystem *System) ExitError() error {
   326  	if !computeSystem.stopped() {
   327  		return errors.New("container not exited")
   328  	}
   329  	if computeSystem.waitError != nil {
   330  		return computeSystem.waitError
   331  	}
   332  	return computeSystem.exitError
   333  }
   334  
   335  // Properties returns the requested container properties targeting a V1 schema container.
   336  func (computeSystem *System) Properties(ctx context.Context, types ...schema1.PropertyType) (*schema1.ContainerProperties, error) {
   337  	computeSystem.handleLock.RLock()
   338  	defer computeSystem.handleLock.RUnlock()
   339  
   340  	operation := "hcs::System::Properties"
   341  
   342  	if computeSystem.handle == 0 {
   343  		return nil, makeSystemError(computeSystem, operation, ErrAlreadyClosed, nil)
   344  	}
   345  
   346  	queryBytes, err := json.Marshal(schema1.PropertyQuery{PropertyTypes: types})
   347  	if err != nil {
   348  		return nil, makeSystemError(computeSystem, operation, err, nil)
   349  	}
   350  
   351  	propertiesJSON, resultJSON, err := vmcompute.HcsGetComputeSystemProperties(ctx, computeSystem.handle, string(queryBytes))
   352  	events := processHcsResult(ctx, resultJSON)
   353  	if err != nil {
   354  		return nil, makeSystemError(computeSystem, operation, err, events)
   355  	}
   356  
   357  	if propertiesJSON == "" {
   358  		return nil, ErrUnexpectedValue
   359  	}
   360  	properties := &schema1.ContainerProperties{}
   361  	if err := json.Unmarshal([]byte(propertiesJSON), properties); err != nil {
   362  		return nil, makeSystemError(computeSystem, operation, err, nil)
   363  	}
   364  
   365  	return properties, nil
   366  }
   367  
   368  // queryInProc handles querying for container properties without reaching out to HCS. `props`
   369  // will be updated to contain any data returned from the queries present in `types`. If any properties
   370  // failed to be queried they will be tallied up and returned in as the first return value. Failures on
   371  // query are NOT considered errors; the only failure case for this method is if the containers job object
   372  // cannot be opened.
   373  func (computeSystem *System) queryInProc(
   374  	ctx context.Context,
   375  	props *hcsschema.Properties,
   376  	types []hcsschema.PropertyType,
   377  ) ([]hcsschema.PropertyType, error) {
   378  	// In the future we can make use of some new functionality in the HCS that allows you
   379  	// to pass a job object for HCS to use for the container. Currently, the only way we'll
   380  	// be able to open the job/silo is if we're running as SYSTEM.
   381  	jobOptions := &jobobject.Options{
   382  		UseNTVariant: true,
   383  		Name:         siloNameFmt(computeSystem.id),
   384  	}
   385  	job, err := jobobject.Open(ctx, jobOptions)
   386  	if err != nil {
   387  		return nil, err
   388  	}
   389  	defer job.Close()
   390  
   391  	var fallbackQueryTypes []hcsschema.PropertyType
   392  	for _, propType := range types {
   393  		switch propType {
   394  		case hcsschema.PTStatistics:
   395  			// Handle a bad caller asking for the same type twice. No use in re-querying if this is
   396  			// filled in already.
   397  			if props.Statistics == nil {
   398  				props.Statistics, err = computeSystem.statisticsInProc(job)
   399  				if err != nil {
   400  					log.G(ctx).WithError(err).Warn("failed to get statistics in-proc")
   401  
   402  					fallbackQueryTypes = append(fallbackQueryTypes, propType)
   403  				}
   404  			}
   405  		default:
   406  			fallbackQueryTypes = append(fallbackQueryTypes, propType)
   407  		}
   408  	}
   409  
   410  	return fallbackQueryTypes, nil
   411  }
   412  
   413  // statisticsInProc emulates what HCS does to grab statistics for a given container with a small
   414  // change to make grabbing the private working set total much more efficient.
   415  func (computeSystem *System) statisticsInProc(job *jobobject.JobObject) (*hcsschema.Statistics, error) {
   416  	// Start timestamp for these stats before we grab them to match HCS
   417  	timestamp := time.Now()
   418  
   419  	memInfo, err := job.QueryMemoryStats()
   420  	if err != nil {
   421  		return nil, err
   422  	}
   423  
   424  	processorInfo, err := job.QueryProcessorStats()
   425  	if err != nil {
   426  		return nil, err
   427  	}
   428  
   429  	storageInfo, err := job.QueryStorageStats()
   430  	if err != nil {
   431  		return nil, err
   432  	}
   433  
   434  	// This calculates the private working set more efficiently than HCS does. HCS calls NtQuerySystemInformation
   435  	// with the class SystemProcessInformation which returns an array containing system information for *every*
   436  	// process running on the machine. They then grab the pids that are running in the container and filter down
   437  	// the entries in the array to only what's running in that silo and start tallying up the total. This doesn't
   438  	// work well as performance should get worse if more processess are running on the machine in general and not
   439  	// just in the container. All of the additional information besides the WorkingSetPrivateSize field is ignored
   440  	// as well which isn't great and is wasted work to fetch.
   441  	//
   442  	// HCS only let's you grab statistics in an all or nothing fashion, so we can't just grab the private
   443  	// working set ourselves and ask for everything else separately. The optimization we can make here is
   444  	// to open the silo ourselves and do the same queries for the rest of the info, as well as calculating
   445  	// the private working set in a more efficient manner by:
   446  	//
   447  	// 1. Find the pids running in the silo
   448  	// 2. Get a process handle for every process (only need PROCESS_QUERY_LIMITED_INFORMATION access)
   449  	// 3. Call NtQueryInformationProcess on each process with the class ProcessVmCounters
   450  	// 4. Tally up the total using the field PrivateWorkingSetSize in VM_COUNTERS_EX2.
   451  	privateWorkingSet, err := job.QueryPrivateWorkingSet()
   452  	if err != nil {
   453  		return nil, err
   454  	}
   455  
   456  	return &hcsschema.Statistics{
   457  		Timestamp:          timestamp,
   458  		ContainerStartTime: computeSystem.startTime,
   459  		Uptime100ns:        uint64(time.Since(computeSystem.startTime).Nanoseconds()) / 100,
   460  		Memory: &hcsschema.MemoryStats{
   461  			MemoryUsageCommitBytes:            memInfo.JobMemory,
   462  			MemoryUsageCommitPeakBytes:        memInfo.PeakJobMemoryUsed,
   463  			MemoryUsagePrivateWorkingSetBytes: privateWorkingSet,
   464  		},
   465  		Processor: &hcsschema.ProcessorStats{
   466  			RuntimeKernel100ns: uint64(processorInfo.TotalKernelTime),
   467  			RuntimeUser100ns:   uint64(processorInfo.TotalUserTime),
   468  			TotalRuntime100ns:  uint64(processorInfo.TotalKernelTime + processorInfo.TotalUserTime),
   469  		},
   470  		Storage: &hcsschema.StorageStats{
   471  			ReadCountNormalized:  uint64(storageInfo.ReadStats.IoCount),
   472  			ReadSizeBytes:        storageInfo.ReadStats.TotalSize,
   473  			WriteCountNormalized: uint64(storageInfo.WriteStats.IoCount),
   474  			WriteSizeBytes:       storageInfo.WriteStats.TotalSize,
   475  		},
   476  	}, nil
   477  }
   478  
   479  // hcsPropertiesV2Query is a helper to make a HcsGetComputeSystemProperties call using the V2 schema property types.
   480  func (computeSystem *System) hcsPropertiesV2Query(ctx context.Context, types []hcsschema.PropertyType) (*hcsschema.Properties, error) {
   481  	operation := "hcs::System::PropertiesV2"
   482  
   483  	if computeSystem.handle == 0 {
   484  		return nil, makeSystemError(computeSystem, operation, ErrAlreadyClosed, nil)
   485  	}
   486  
   487  	queryBytes, err := json.Marshal(hcsschema.PropertyQuery{PropertyTypes: types})
   488  	if err != nil {
   489  		return nil, makeSystemError(computeSystem, operation, err, nil)
   490  	}
   491  
   492  	propertiesJSON, resultJSON, err := vmcompute.HcsGetComputeSystemProperties(ctx, computeSystem.handle, string(queryBytes))
   493  	events := processHcsResult(ctx, resultJSON)
   494  	if err != nil {
   495  		return nil, makeSystemError(computeSystem, operation, err, events)
   496  	}
   497  
   498  	if propertiesJSON == "" {
   499  		return nil, ErrUnexpectedValue
   500  	}
   501  	props := &hcsschema.Properties{}
   502  	if err := json.Unmarshal([]byte(propertiesJSON), props); err != nil {
   503  		return nil, makeSystemError(computeSystem, operation, err, nil)
   504  	}
   505  
   506  	return props, nil
   507  }
   508  
   509  // PropertiesV2 returns the requested compute systems properties targeting a V2 schema compute system.
   510  func (computeSystem *System) PropertiesV2(ctx context.Context, types ...hcsschema.PropertyType) (_ *hcsschema.Properties, err error) {
   511  	computeSystem.handleLock.RLock()
   512  	defer computeSystem.handleLock.RUnlock()
   513  
   514  	// Let HCS tally up the total for VM based queries instead of querying ourselves.
   515  	if computeSystem.typ != "container" {
   516  		return computeSystem.hcsPropertiesV2Query(ctx, types)
   517  	}
   518  
   519  	// Define a starter Properties struct with the default fields returned from every
   520  	// query. Owner is only returned from Statistics but it's harmless to include.
   521  	properties := &hcsschema.Properties{
   522  		Id:            computeSystem.id,
   523  		SystemType:    computeSystem.typ,
   524  		RuntimeOsType: computeSystem.os,
   525  		Owner:         computeSystem.owner,
   526  	}
   527  
   528  	logEntry := log.G(ctx)
   529  	// First lets try and query ourselves without reaching to HCS. If any of the queries fail
   530  	// we'll take note and fallback to querying HCS for any of the failed types.
   531  	fallbackTypes, err := computeSystem.queryInProc(ctx, properties, types)
   532  	if err == nil && len(fallbackTypes) == 0 {
   533  		return properties, nil
   534  	} else if err != nil {
   535  		logEntry = logEntry.WithError(fmt.Errorf("failed to query compute system properties in-proc: %w", err))
   536  		fallbackTypes = types
   537  	}
   538  
   539  	logEntry.WithFields(logrus.Fields{
   540  		logfields.ContainerID: computeSystem.id,
   541  		"propertyTypes":       fallbackTypes,
   542  	}).Info("falling back to HCS for property type queries")
   543  
   544  	hcsProperties, err := computeSystem.hcsPropertiesV2Query(ctx, fallbackTypes)
   545  	if err != nil {
   546  		return nil, err
   547  	}
   548  
   549  	// Now add in anything that we might have successfully queried in process.
   550  	if properties.Statistics != nil {
   551  		hcsProperties.Statistics = properties.Statistics
   552  		hcsProperties.Owner = properties.Owner
   553  	}
   554  
   555  	// For future support for querying processlist in-proc as well.
   556  	if properties.ProcessList != nil {
   557  		hcsProperties.ProcessList = properties.ProcessList
   558  	}
   559  
   560  	return hcsProperties, nil
   561  }
   562  
   563  // Pause pauses the execution of the computeSystem. This feature is not enabled in TP5.
   564  func (computeSystem *System) Pause(ctx context.Context) (err error) {
   565  	operation := "hcs::System::Pause"
   566  
   567  	// hcsPauseComputeSystemContext is an async operation. Start the outer span
   568  	// here to measure the full pause time.
   569  	ctx, span := oc.StartSpan(ctx, operation)
   570  	defer span.End()
   571  	defer func() { oc.SetSpanStatus(span, err) }()
   572  	span.AddAttributes(trace.StringAttribute("cid", computeSystem.id))
   573  
   574  	computeSystem.handleLock.RLock()
   575  	defer computeSystem.handleLock.RUnlock()
   576  
   577  	if computeSystem.handle == 0 {
   578  		return makeSystemError(computeSystem, operation, ErrAlreadyClosed, nil)
   579  	}
   580  
   581  	resultJSON, err := vmcompute.HcsPauseComputeSystem(ctx, computeSystem.handle, "")
   582  	events, err := processAsyncHcsResult(ctx, err, resultJSON, computeSystem.callbackNumber,
   583  		hcsNotificationSystemPauseCompleted, &timeout.SystemPause)
   584  	if err != nil {
   585  		return makeSystemError(computeSystem, operation, err, events)
   586  	}
   587  
   588  	return nil
   589  }
   590  
   591  // Resume resumes the execution of the computeSystem. This feature is not enabled in TP5.
   592  func (computeSystem *System) Resume(ctx context.Context) (err error) {
   593  	operation := "hcs::System::Resume"
   594  
   595  	// hcsResumeComputeSystemContext is an async operation. Start the outer span
   596  	// here to measure the full restore time.
   597  	ctx, span := oc.StartSpan(ctx, operation)
   598  	defer span.End()
   599  	defer func() { oc.SetSpanStatus(span, err) }()
   600  	span.AddAttributes(trace.StringAttribute("cid", computeSystem.id))
   601  
   602  	computeSystem.handleLock.RLock()
   603  	defer computeSystem.handleLock.RUnlock()
   604  
   605  	if computeSystem.handle == 0 {
   606  		return makeSystemError(computeSystem, operation, ErrAlreadyClosed, nil)
   607  	}
   608  
   609  	resultJSON, err := vmcompute.HcsResumeComputeSystem(ctx, computeSystem.handle, "")
   610  	events, err := processAsyncHcsResult(ctx, err, resultJSON, computeSystem.callbackNumber,
   611  		hcsNotificationSystemResumeCompleted, &timeout.SystemResume)
   612  	if err != nil {
   613  		return makeSystemError(computeSystem, operation, err, events)
   614  	}
   615  
   616  	return nil
   617  }
   618  
   619  // Save the compute system
   620  func (computeSystem *System) Save(ctx context.Context, options interface{}) (err error) {
   621  	operation := "hcs::System::Save"
   622  
   623  	// hcsSaveComputeSystemContext is an async operation. Start the outer span
   624  	// here to measure the full save time.
   625  	ctx, span := oc.StartSpan(ctx, operation)
   626  	defer span.End()
   627  	defer func() { oc.SetSpanStatus(span, err) }()
   628  	span.AddAttributes(trace.StringAttribute("cid", computeSystem.id))
   629  
   630  	saveOptions, err := json.Marshal(options)
   631  	if err != nil {
   632  		return err
   633  	}
   634  
   635  	computeSystem.handleLock.RLock()
   636  	defer computeSystem.handleLock.RUnlock()
   637  
   638  	if computeSystem.handle == 0 {
   639  		return makeSystemError(computeSystem, operation, ErrAlreadyClosed, nil)
   640  	}
   641  
   642  	result, err := vmcompute.HcsSaveComputeSystem(ctx, computeSystem.handle, string(saveOptions))
   643  	events, err := processAsyncHcsResult(ctx, err, result, computeSystem.callbackNumber,
   644  		hcsNotificationSystemSaveCompleted, &timeout.SystemSave)
   645  	if err != nil {
   646  		return makeSystemError(computeSystem, operation, err, events)
   647  	}
   648  
   649  	return nil
   650  }
   651  
   652  func (computeSystem *System) createProcess(ctx context.Context, operation string, c interface{}) (*Process, *vmcompute.HcsProcessInformation, error) {
   653  	computeSystem.handleLock.RLock()
   654  	defer computeSystem.handleLock.RUnlock()
   655  
   656  	if computeSystem.handle == 0 {
   657  		return nil, nil, makeSystemError(computeSystem, operation, ErrAlreadyClosed, nil)
   658  	}
   659  
   660  	configurationb, err := json.Marshal(c)
   661  	if err != nil {
   662  		return nil, nil, makeSystemError(computeSystem, operation, err, nil)
   663  	}
   664  
   665  	configuration := string(configurationb)
   666  	processInfo, processHandle, resultJSON, err := vmcompute.HcsCreateProcess(ctx, computeSystem.handle, configuration)
   667  	events := processHcsResult(ctx, resultJSON)
   668  	if err != nil {
   669  		if v2, ok := c.(*hcsschema.ProcessParameters); ok {
   670  			operation += ": " + v2.CommandLine
   671  		} else if v1, ok := c.(*schema1.ProcessConfig); ok {
   672  			operation += ": " + v1.CommandLine
   673  		}
   674  		return nil, nil, makeSystemError(computeSystem, operation, err, events)
   675  	}
   676  
   677  	log.G(ctx).WithField("pid", processInfo.ProcessId).Debug("created process pid")
   678  	return newProcess(processHandle, int(processInfo.ProcessId), computeSystem), &processInfo, nil
   679  }
   680  
   681  // CreateProcess launches a new process within the computeSystem.
   682  func (computeSystem *System) CreateProcess(ctx context.Context, c interface{}) (cow.Process, error) {
   683  	operation := "hcs::System::CreateProcess"
   684  	process, processInfo, err := computeSystem.createProcess(ctx, operation, c)
   685  	if err != nil {
   686  		return nil, err
   687  	}
   688  	defer func() {
   689  		if err != nil {
   690  			process.Close()
   691  		}
   692  	}()
   693  
   694  	pipes, err := makeOpenFiles([]syscall.Handle{processInfo.StdInput, processInfo.StdOutput, processInfo.StdError})
   695  	if err != nil {
   696  		return nil, makeSystemError(computeSystem, operation, err, nil)
   697  	}
   698  	process.stdin = pipes[0]
   699  	process.stdout = pipes[1]
   700  	process.stderr = pipes[2]
   701  	process.hasCachedStdio = true
   702  
   703  	if err = process.registerCallback(ctx); err != nil {
   704  		return nil, makeSystemError(computeSystem, operation, err, nil)
   705  	}
   706  	go process.waitBackground()
   707  
   708  	return process, nil
   709  }
   710  
   711  // OpenProcess gets an interface to an existing process within the computeSystem.
   712  func (computeSystem *System) OpenProcess(ctx context.Context, pid int) (*Process, error) {
   713  	computeSystem.handleLock.RLock()
   714  	defer computeSystem.handleLock.RUnlock()
   715  
   716  	operation := "hcs::System::OpenProcess"
   717  
   718  	if computeSystem.handle == 0 {
   719  		return nil, makeSystemError(computeSystem, operation, ErrAlreadyClosed, nil)
   720  	}
   721  
   722  	processHandle, resultJSON, err := vmcompute.HcsOpenProcess(ctx, computeSystem.handle, uint32(pid))
   723  	events := processHcsResult(ctx, resultJSON)
   724  	if err != nil {
   725  		return nil, makeSystemError(computeSystem, operation, err, events)
   726  	}
   727  
   728  	process := newProcess(processHandle, pid, computeSystem)
   729  	if err = process.registerCallback(ctx); err != nil {
   730  		return nil, makeSystemError(computeSystem, operation, err, nil)
   731  	}
   732  	go process.waitBackground()
   733  
   734  	return process, nil
   735  }
   736  
   737  // Close cleans up any state associated with the compute system but does not terminate or wait for it.
   738  func (computeSystem *System) Close() (err error) {
   739  	operation := "hcs::System::Close"
   740  	ctx, span := oc.StartSpan(context.Background(), operation)
   741  	defer span.End()
   742  	defer func() { oc.SetSpanStatus(span, err) }()
   743  	span.AddAttributes(trace.StringAttribute("cid", computeSystem.id))
   744  
   745  	computeSystem.handleLock.Lock()
   746  	defer computeSystem.handleLock.Unlock()
   747  
   748  	// Don't double free this
   749  	if computeSystem.handle == 0 {
   750  		return nil
   751  	}
   752  
   753  	if err = computeSystem.unregisterCallback(ctx); err != nil {
   754  		return makeSystemError(computeSystem, operation, err, nil)
   755  	}
   756  
   757  	err = vmcompute.HcsCloseComputeSystem(ctx, computeSystem.handle)
   758  	if err != nil {
   759  		return makeSystemError(computeSystem, operation, err, nil)
   760  	}
   761  
   762  	computeSystem.handle = 0
   763  	computeSystem.closedWaitOnce.Do(func() {
   764  		computeSystem.waitError = ErrAlreadyClosed
   765  		close(computeSystem.waitBlock)
   766  	})
   767  
   768  	return nil
   769  }
   770  
   771  func (computeSystem *System) registerCallback(ctx context.Context) error {
   772  	callbackContext := &notificationWatcherContext{
   773  		channels: newSystemChannels(),
   774  		systemID: computeSystem.id,
   775  	}
   776  
   777  	callbackMapLock.Lock()
   778  	callbackNumber := nextCallback
   779  	nextCallback++
   780  	callbackMap[callbackNumber] = callbackContext
   781  	callbackMapLock.Unlock()
   782  
   783  	callbackHandle, err := vmcompute.HcsRegisterComputeSystemCallback(ctx, computeSystem.handle,
   784  		notificationWatcherCallback, callbackNumber)
   785  	if err != nil {
   786  		return err
   787  	}
   788  	callbackContext.handle = callbackHandle
   789  	computeSystem.callbackNumber = callbackNumber
   790  
   791  	return nil
   792  }
   793  
   794  func (computeSystem *System) unregisterCallback(ctx context.Context) error {
   795  	callbackNumber := computeSystem.callbackNumber
   796  
   797  	callbackMapLock.RLock()
   798  	callbackContext := callbackMap[callbackNumber]
   799  	callbackMapLock.RUnlock()
   800  
   801  	if callbackContext == nil {
   802  		return nil
   803  	}
   804  
   805  	handle := callbackContext.handle
   806  
   807  	if handle == 0 {
   808  		return nil
   809  	}
   810  
   811  	// hcsUnregisterComputeSystemCallback has its own synchronization
   812  	// to wait for all callbacks to complete. We must NOT hold the callbackMapLock.
   813  	err := vmcompute.HcsUnregisterComputeSystemCallback(ctx, handle)
   814  	if err != nil {
   815  		return err
   816  	}
   817  
   818  	closeChannels(callbackContext.channels)
   819  
   820  	callbackMapLock.Lock()
   821  	delete(callbackMap, callbackNumber)
   822  	callbackMapLock.Unlock()
   823  
   824  	handle = 0 //nolint:ineffassign
   825  
   826  	return nil
   827  }
   828  
   829  // Modify the System by sending a request to HCS
   830  func (computeSystem *System) Modify(ctx context.Context, config interface{}) error {
   831  	computeSystem.handleLock.RLock()
   832  	defer computeSystem.handleLock.RUnlock()
   833  
   834  	operation := "hcs::System::Modify"
   835  
   836  	if computeSystem.handle == 0 {
   837  		return makeSystemError(computeSystem, operation, ErrAlreadyClosed, nil)
   838  	}
   839  
   840  	requestBytes, err := json.Marshal(config)
   841  	if err != nil {
   842  		return err
   843  	}
   844  
   845  	requestJSON := string(requestBytes)
   846  	resultJSON, err := vmcompute.HcsModifyComputeSystem(ctx, computeSystem.handle, requestJSON)
   847  	events := processHcsResult(ctx, resultJSON)
   848  	if err != nil {
   849  		return makeSystemError(computeSystem, operation, err, events)
   850  	}
   851  
   852  	return nil
   853  }
   854  

View as plain text