...

Source file src/github.com/Microsoft/hcsshim/internal/jobobject/iocp.go

Documentation: github.com/Microsoft/hcsshim/internal/jobobject

     1  //go:build windows
     2  
     3  package jobobject
     4  
     5  import (
     6  	"context"
     7  	"fmt"
     8  	"sync"
     9  	"unsafe"
    10  
    11  	"github.com/Microsoft/hcsshim/internal/log"
    12  	"github.com/Microsoft/hcsshim/internal/queue"
    13  	"github.com/Microsoft/hcsshim/internal/winapi"
    14  	"github.com/sirupsen/logrus"
    15  	"golang.org/x/sys/windows"
    16  )
    17  
    18  var (
    19  	ioInitOnce sync.Once
    20  	initIOErr  error
    21  	// Global iocp handle that will be re-used for every job object
    22  	ioCompletionPort windows.Handle
    23  	// Mapping of job handle to queue to place notifications in.
    24  	jobMap sync.Map
    25  )
    26  
    27  // MsgAllProcessesExited is a type representing a message that every process in a job has exited.
    28  type MsgAllProcessesExited struct{}
    29  
    30  // MsgUnimplemented represents a message that we are aware of, but that isn't implemented currently.
    31  // This should not be treated as an error.
    32  type MsgUnimplemented struct{}
    33  
    34  // pollIOCP polls the io completion port forever.
    35  func pollIOCP(ctx context.Context, iocpHandle windows.Handle) {
    36  	var (
    37  		overlapped uintptr
    38  		code       uint32
    39  		key        uintptr
    40  	)
    41  
    42  	for {
    43  		err := windows.GetQueuedCompletionStatus(iocpHandle, &code, &key, (**windows.Overlapped)(unsafe.Pointer(&overlapped)), windows.INFINITE)
    44  		if err != nil {
    45  			log.G(ctx).WithError(err).Error("failed to poll for job object message")
    46  			continue
    47  		}
    48  		if val, ok := jobMap.Load(key); ok {
    49  			msq, ok := val.(*queue.MessageQueue)
    50  			if !ok {
    51  				log.G(ctx).WithField("value", msq).Warn("encountered non queue type in job map")
    52  				continue
    53  			}
    54  			notification, err := parseMessage(code, overlapped)
    55  			if err != nil {
    56  				log.G(ctx).WithFields(logrus.Fields{
    57  					"code":       code,
    58  					"overlapped": overlapped,
    59  				}).Warn("failed to parse job object message")
    60  				continue
    61  			}
    62  			if err := msq.Enqueue(notification); err == queue.ErrQueueClosed {
    63  				// Write will only return an error when the queue is closed.
    64  				// The only time a queue would ever be closed is when we call `Close` on
    65  				// the job it belongs to which also removes it from the jobMap, so something
    66  				// went wrong here. We can't return as this is reading messages for all jobs
    67  				// so just log it and move on.
    68  				log.G(ctx).WithFields(logrus.Fields{
    69  					"code":       code,
    70  					"overlapped": overlapped,
    71  				}).Warn("tried to write to a closed queue")
    72  				continue
    73  			}
    74  		} else {
    75  			log.G(ctx).Warn("received a message for a job not present in the mapping")
    76  		}
    77  	}
    78  }
    79  
    80  func parseMessage(code uint32, overlapped uintptr) (interface{}, error) {
    81  	// Check code and parse out relevant information related to that notification
    82  	// that we care about. For now all we handle is the message that all processes
    83  	// in the job have exited.
    84  	switch code {
    85  	case winapi.JOB_OBJECT_MSG_ACTIVE_PROCESS_ZERO:
    86  		return MsgAllProcessesExited{}, nil
    87  	// Other messages for completeness and a check to make sure that if we fall
    88  	// into the default case that this is a code we don't know how to handle.
    89  	case winapi.JOB_OBJECT_MSG_END_OF_JOB_TIME:
    90  	case winapi.JOB_OBJECT_MSG_END_OF_PROCESS_TIME:
    91  	case winapi.JOB_OBJECT_MSG_ACTIVE_PROCESS_LIMIT:
    92  	case winapi.JOB_OBJECT_MSG_NEW_PROCESS:
    93  	case winapi.JOB_OBJECT_MSG_EXIT_PROCESS:
    94  	case winapi.JOB_OBJECT_MSG_ABNORMAL_EXIT_PROCESS:
    95  	case winapi.JOB_OBJECT_MSG_PROCESS_MEMORY_LIMIT:
    96  	case winapi.JOB_OBJECT_MSG_JOB_MEMORY_LIMIT:
    97  	case winapi.JOB_OBJECT_MSG_NOTIFICATION_LIMIT:
    98  	default:
    99  		return nil, fmt.Errorf("unknown job notification type: %d", code)
   100  	}
   101  	return MsgUnimplemented{}, nil
   102  }
   103  
   104  // Assigns an IO completion port to get notified of events for the registered job
   105  // object.
   106  func attachIOCP(job windows.Handle, iocp windows.Handle) error {
   107  	info := winapi.JOBOBJECT_ASSOCIATE_COMPLETION_PORT{
   108  		CompletionKey:  job,
   109  		CompletionPort: iocp,
   110  	}
   111  	_, err := windows.SetInformationJobObject(job, windows.JobObjectAssociateCompletionPortInformation, uintptr(unsafe.Pointer(&info)), uint32(unsafe.Sizeof(info)))
   112  	return err
   113  }
   114  

View as plain text