...

Source file src/github.com/Microsoft/go-winio/file.go

Documentation: github.com/Microsoft/go-winio

     1  //go:build windows
     2  // +build windows
     3  
     4  package winio
     5  
     6  import (
     7  	"errors"
     8  	"io"
     9  	"runtime"
    10  	"sync"
    11  	"sync/atomic"
    12  	"syscall"
    13  	"time"
    14  
    15  	"golang.org/x/sys/windows"
    16  )
    17  
    18  //sys cancelIoEx(file syscall.Handle, o *syscall.Overlapped) (err error) = CancelIoEx
    19  //sys createIoCompletionPort(file syscall.Handle, port syscall.Handle, key uintptr, threadCount uint32) (newport syscall.Handle, err error) = CreateIoCompletionPort
    20  //sys getQueuedCompletionStatus(port syscall.Handle, bytes *uint32, key *uintptr, o **ioOperation, timeout uint32) (err error) = GetQueuedCompletionStatus
    21  //sys setFileCompletionNotificationModes(h syscall.Handle, flags uint8) (err error) = SetFileCompletionNotificationModes
    22  //sys wsaGetOverlappedResult(h syscall.Handle, o *syscall.Overlapped, bytes *uint32, wait bool, flags *uint32) (err error) = ws2_32.WSAGetOverlappedResult
    23  
    24  type atomicBool int32
    25  
    26  func (b *atomicBool) isSet() bool { return atomic.LoadInt32((*int32)(b)) != 0 }
    27  func (b *atomicBool) setFalse()   { atomic.StoreInt32((*int32)(b), 0) }
    28  func (b *atomicBool) setTrue()    { atomic.StoreInt32((*int32)(b), 1) }
    29  
    30  //revive:disable-next-line:predeclared Keep "new" to maintain consistency with "atomic" pkg
    31  func (b *atomicBool) swap(new bool) bool {
    32  	var newInt int32
    33  	if new {
    34  		newInt = 1
    35  	}
    36  	return atomic.SwapInt32((*int32)(b), newInt) == 1
    37  }
    38  
    39  var (
    40  	ErrFileClosed = errors.New("file has already been closed")
    41  	ErrTimeout    = &timeoutError{}
    42  )
    43  
    44  type timeoutError struct{}
    45  
    46  func (*timeoutError) Error() string   { return "i/o timeout" }
    47  func (*timeoutError) Timeout() bool   { return true }
    48  func (*timeoutError) Temporary() bool { return true }
    49  
    50  type timeoutChan chan struct{}
    51  
    52  var ioInitOnce sync.Once
    53  var ioCompletionPort syscall.Handle
    54  
    55  // ioResult contains the result of an asynchronous IO operation.
    56  type ioResult struct {
    57  	bytes uint32
    58  	err   error
    59  }
    60  
    61  // ioOperation represents an outstanding asynchronous Win32 IO.
    62  type ioOperation struct {
    63  	o  syscall.Overlapped
    64  	ch chan ioResult
    65  }
    66  
    67  func initIO() {
    68  	h, err := createIoCompletionPort(syscall.InvalidHandle, 0, 0, 0xffffffff)
    69  	if err != nil {
    70  		panic(err)
    71  	}
    72  	ioCompletionPort = h
    73  	go ioCompletionProcessor(h)
    74  }
    75  
    76  // win32File implements Reader, Writer, and Closer on a Win32 handle without blocking in a syscall.
    77  // It takes ownership of this handle and will close it if it is garbage collected.
    78  type win32File struct {
    79  	handle        syscall.Handle
    80  	wg            sync.WaitGroup
    81  	wgLock        sync.RWMutex
    82  	closing       atomicBool
    83  	socket        bool
    84  	readDeadline  deadlineHandler
    85  	writeDeadline deadlineHandler
    86  }
    87  
    88  type deadlineHandler struct {
    89  	setLock     sync.Mutex
    90  	channel     timeoutChan
    91  	channelLock sync.RWMutex
    92  	timer       *time.Timer
    93  	timedout    atomicBool
    94  }
    95  
    96  // makeWin32File makes a new win32File from an existing file handle.
    97  func makeWin32File(h syscall.Handle) (*win32File, error) {
    98  	f := &win32File{handle: h}
    99  	ioInitOnce.Do(initIO)
   100  	_, err := createIoCompletionPort(h, ioCompletionPort, 0, 0xffffffff)
   101  	if err != nil {
   102  		return nil, err
   103  	}
   104  	err = setFileCompletionNotificationModes(h, windows.FILE_SKIP_COMPLETION_PORT_ON_SUCCESS|windows.FILE_SKIP_SET_EVENT_ON_HANDLE)
   105  	if err != nil {
   106  		return nil, err
   107  	}
   108  	f.readDeadline.channel = make(timeoutChan)
   109  	f.writeDeadline.channel = make(timeoutChan)
   110  	return f, nil
   111  }
   112  
   113  func MakeOpenFile(h syscall.Handle) (io.ReadWriteCloser, error) {
   114  	// If we return the result of makeWin32File directly, it can result in an
   115  	// interface-wrapped nil, rather than a nil interface value.
   116  	f, err := makeWin32File(h)
   117  	if err != nil {
   118  		return nil, err
   119  	}
   120  	return f, nil
   121  }
   122  
   123  // closeHandle closes the resources associated with a Win32 handle.
   124  func (f *win32File) closeHandle() {
   125  	f.wgLock.Lock()
   126  	// Atomically set that we are closing, releasing the resources only once.
   127  	if !f.closing.swap(true) {
   128  		f.wgLock.Unlock()
   129  		// cancel all IO and wait for it to complete
   130  		_ = cancelIoEx(f.handle, nil)
   131  		f.wg.Wait()
   132  		// at this point, no new IO can start
   133  		syscall.Close(f.handle)
   134  		f.handle = 0
   135  	} else {
   136  		f.wgLock.Unlock()
   137  	}
   138  }
   139  
   140  // Close closes a win32File.
   141  func (f *win32File) Close() error {
   142  	f.closeHandle()
   143  	return nil
   144  }
   145  
   146  // IsClosed checks if the file has been closed.
   147  func (f *win32File) IsClosed() bool {
   148  	return f.closing.isSet()
   149  }
   150  
   151  // prepareIO prepares for a new IO operation.
   152  // The caller must call f.wg.Done() when the IO is finished, prior to Close() returning.
   153  func (f *win32File) prepareIO() (*ioOperation, error) {
   154  	f.wgLock.RLock()
   155  	if f.closing.isSet() {
   156  		f.wgLock.RUnlock()
   157  		return nil, ErrFileClosed
   158  	}
   159  	f.wg.Add(1)
   160  	f.wgLock.RUnlock()
   161  	c := &ioOperation{}
   162  	c.ch = make(chan ioResult)
   163  	return c, nil
   164  }
   165  
   166  // ioCompletionProcessor processes completed async IOs forever.
   167  func ioCompletionProcessor(h syscall.Handle) {
   168  	for {
   169  		var bytes uint32
   170  		var key uintptr
   171  		var op *ioOperation
   172  		err := getQueuedCompletionStatus(h, &bytes, &key, &op, syscall.INFINITE)
   173  		if op == nil {
   174  			panic(err)
   175  		}
   176  		op.ch <- ioResult{bytes, err}
   177  	}
   178  }
   179  
   180  // todo: helsaawy - create an asyncIO version that takes a context
   181  
   182  // asyncIO processes the return value from ReadFile or WriteFile, blocking until
   183  // the operation has actually completed.
   184  func (f *win32File) asyncIO(c *ioOperation, d *deadlineHandler, bytes uint32, err error) (int, error) {
   185  	if err != syscall.ERROR_IO_PENDING { //nolint:errorlint // err is Errno
   186  		return int(bytes), err
   187  	}
   188  
   189  	if f.closing.isSet() {
   190  		_ = cancelIoEx(f.handle, &c.o)
   191  	}
   192  
   193  	var timeout timeoutChan
   194  	if d != nil {
   195  		d.channelLock.Lock()
   196  		timeout = d.channel
   197  		d.channelLock.Unlock()
   198  	}
   199  
   200  	var r ioResult
   201  	select {
   202  	case r = <-c.ch:
   203  		err = r.err
   204  		if err == syscall.ERROR_OPERATION_ABORTED { //nolint:errorlint // err is Errno
   205  			if f.closing.isSet() {
   206  				err = ErrFileClosed
   207  			}
   208  		} else if err != nil && f.socket {
   209  			// err is from Win32. Query the overlapped structure to get the winsock error.
   210  			var bytes, flags uint32
   211  			err = wsaGetOverlappedResult(f.handle, &c.o, &bytes, false, &flags)
   212  		}
   213  	case <-timeout:
   214  		_ = cancelIoEx(f.handle, &c.o)
   215  		r = <-c.ch
   216  		err = r.err
   217  		if err == syscall.ERROR_OPERATION_ABORTED { //nolint:errorlint // err is Errno
   218  			err = ErrTimeout
   219  		}
   220  	}
   221  
   222  	// runtime.KeepAlive is needed, as c is passed via native
   223  	// code to ioCompletionProcessor, c must remain alive
   224  	// until the channel read is complete.
   225  	// todo: (de)allocate *ioOperation via win32 heap functions, instead of needing to KeepAlive?
   226  	runtime.KeepAlive(c)
   227  	return int(r.bytes), err
   228  }
   229  
   230  // Read reads from a file handle.
   231  func (f *win32File) Read(b []byte) (int, error) {
   232  	c, err := f.prepareIO()
   233  	if err != nil {
   234  		return 0, err
   235  	}
   236  	defer f.wg.Done()
   237  
   238  	if f.readDeadline.timedout.isSet() {
   239  		return 0, ErrTimeout
   240  	}
   241  
   242  	var bytes uint32
   243  	err = syscall.ReadFile(f.handle, b, &bytes, &c.o)
   244  	n, err := f.asyncIO(c, &f.readDeadline, bytes, err)
   245  	runtime.KeepAlive(b)
   246  
   247  	// Handle EOF conditions.
   248  	if err == nil && n == 0 && len(b) != 0 {
   249  		return 0, io.EOF
   250  	} else if err == syscall.ERROR_BROKEN_PIPE { //nolint:errorlint // err is Errno
   251  		return 0, io.EOF
   252  	} else {
   253  		return n, err
   254  	}
   255  }
   256  
   257  // Write writes to a file handle.
   258  func (f *win32File) Write(b []byte) (int, error) {
   259  	c, err := f.prepareIO()
   260  	if err != nil {
   261  		return 0, err
   262  	}
   263  	defer f.wg.Done()
   264  
   265  	if f.writeDeadline.timedout.isSet() {
   266  		return 0, ErrTimeout
   267  	}
   268  
   269  	var bytes uint32
   270  	err = syscall.WriteFile(f.handle, b, &bytes, &c.o)
   271  	n, err := f.asyncIO(c, &f.writeDeadline, bytes, err)
   272  	runtime.KeepAlive(b)
   273  	return n, err
   274  }
   275  
   276  func (f *win32File) SetReadDeadline(deadline time.Time) error {
   277  	return f.readDeadline.set(deadline)
   278  }
   279  
   280  func (f *win32File) SetWriteDeadline(deadline time.Time) error {
   281  	return f.writeDeadline.set(deadline)
   282  }
   283  
   284  func (f *win32File) Flush() error {
   285  	return syscall.FlushFileBuffers(f.handle)
   286  }
   287  
   288  func (f *win32File) Fd() uintptr {
   289  	return uintptr(f.handle)
   290  }
   291  
   292  func (d *deadlineHandler) set(deadline time.Time) error {
   293  	d.setLock.Lock()
   294  	defer d.setLock.Unlock()
   295  
   296  	if d.timer != nil {
   297  		if !d.timer.Stop() {
   298  			<-d.channel
   299  		}
   300  		d.timer = nil
   301  	}
   302  	d.timedout.setFalse()
   303  
   304  	select {
   305  	case <-d.channel:
   306  		d.channelLock.Lock()
   307  		d.channel = make(chan struct{})
   308  		d.channelLock.Unlock()
   309  	default:
   310  	}
   311  
   312  	if deadline.IsZero() {
   313  		return nil
   314  	}
   315  
   316  	timeoutIO := func() {
   317  		d.timedout.setTrue()
   318  		close(d.channel)
   319  	}
   320  
   321  	now := time.Now()
   322  	duration := deadline.Sub(now)
   323  	if deadline.After(now) {
   324  		// Deadline is in the future, set a timer to wait
   325  		d.timer = time.AfterFunc(duration, timeoutIO)
   326  	} else {
   327  		// Deadline is in the past. Cancel all pending IO now.
   328  		timeoutIO()
   329  	}
   330  	return nil
   331  }
   332  

View as plain text