1
2
3
4 package winio
5
6 import (
7 "errors"
8 "io"
9 "runtime"
10 "sync"
11 "sync/atomic"
12 "syscall"
13 "time"
14
15 "golang.org/x/sys/windows"
16 )
17
18
19
20
21
22
23
24 type atomicBool int32
25
26 func (b *atomicBool) isSet() bool { return atomic.LoadInt32((*int32)(b)) != 0 }
27 func (b *atomicBool) setFalse() { atomic.StoreInt32((*int32)(b), 0) }
28 func (b *atomicBool) setTrue() { atomic.StoreInt32((*int32)(b), 1) }
29
30
31 func (b *atomicBool) swap(new bool) bool {
32 var newInt int32
33 if new {
34 newInt = 1
35 }
36 return atomic.SwapInt32((*int32)(b), newInt) == 1
37 }
38
39 var (
40 ErrFileClosed = errors.New("file has already been closed")
41 ErrTimeout = &timeoutError{}
42 )
43
44 type timeoutError struct{}
45
46 func (*timeoutError) Error() string { return "i/o timeout" }
47 func (*timeoutError) Timeout() bool { return true }
48 func (*timeoutError) Temporary() bool { return true }
49
50 type timeoutChan chan struct{}
51
52 var ioInitOnce sync.Once
53 var ioCompletionPort syscall.Handle
54
55
56 type ioResult struct {
57 bytes uint32
58 err error
59 }
60
61
62 type ioOperation struct {
63 o syscall.Overlapped
64 ch chan ioResult
65 }
66
67 func initIO() {
68 h, err := createIoCompletionPort(syscall.InvalidHandle, 0, 0, 0xffffffff)
69 if err != nil {
70 panic(err)
71 }
72 ioCompletionPort = h
73 go ioCompletionProcessor(h)
74 }
75
76
77
78 type win32File struct {
79 handle syscall.Handle
80 wg sync.WaitGroup
81 wgLock sync.RWMutex
82 closing atomicBool
83 socket bool
84 readDeadline deadlineHandler
85 writeDeadline deadlineHandler
86 }
87
88 type deadlineHandler struct {
89 setLock sync.Mutex
90 channel timeoutChan
91 channelLock sync.RWMutex
92 timer *time.Timer
93 timedout atomicBool
94 }
95
96
97 func makeWin32File(h syscall.Handle) (*win32File, error) {
98 f := &win32File{handle: h}
99 ioInitOnce.Do(initIO)
100 _, err := createIoCompletionPort(h, ioCompletionPort, 0, 0xffffffff)
101 if err != nil {
102 return nil, err
103 }
104 err = setFileCompletionNotificationModes(h, windows.FILE_SKIP_COMPLETION_PORT_ON_SUCCESS|windows.FILE_SKIP_SET_EVENT_ON_HANDLE)
105 if err != nil {
106 return nil, err
107 }
108 f.readDeadline.channel = make(timeoutChan)
109 f.writeDeadline.channel = make(timeoutChan)
110 return f, nil
111 }
112
113 func MakeOpenFile(h syscall.Handle) (io.ReadWriteCloser, error) {
114
115
116 f, err := makeWin32File(h)
117 if err != nil {
118 return nil, err
119 }
120 return f, nil
121 }
122
123
124 func (f *win32File) closeHandle() {
125 f.wgLock.Lock()
126
127 if !f.closing.swap(true) {
128 f.wgLock.Unlock()
129
130 _ = cancelIoEx(f.handle, nil)
131 f.wg.Wait()
132
133 syscall.Close(f.handle)
134 f.handle = 0
135 } else {
136 f.wgLock.Unlock()
137 }
138 }
139
140
141 func (f *win32File) Close() error {
142 f.closeHandle()
143 return nil
144 }
145
146
147 func (f *win32File) IsClosed() bool {
148 return f.closing.isSet()
149 }
150
151
152
153 func (f *win32File) prepareIO() (*ioOperation, error) {
154 f.wgLock.RLock()
155 if f.closing.isSet() {
156 f.wgLock.RUnlock()
157 return nil, ErrFileClosed
158 }
159 f.wg.Add(1)
160 f.wgLock.RUnlock()
161 c := &ioOperation{}
162 c.ch = make(chan ioResult)
163 return c, nil
164 }
165
166
167 func ioCompletionProcessor(h syscall.Handle) {
168 for {
169 var bytes uint32
170 var key uintptr
171 var op *ioOperation
172 err := getQueuedCompletionStatus(h, &bytes, &key, &op, syscall.INFINITE)
173 if op == nil {
174 panic(err)
175 }
176 op.ch <- ioResult{bytes, err}
177 }
178 }
179
180
181
182
183
184 func (f *win32File) asyncIO(c *ioOperation, d *deadlineHandler, bytes uint32, err error) (int, error) {
185 if err != syscall.ERROR_IO_PENDING {
186 return int(bytes), err
187 }
188
189 if f.closing.isSet() {
190 _ = cancelIoEx(f.handle, &c.o)
191 }
192
193 var timeout timeoutChan
194 if d != nil {
195 d.channelLock.Lock()
196 timeout = d.channel
197 d.channelLock.Unlock()
198 }
199
200 var r ioResult
201 select {
202 case r = <-c.ch:
203 err = r.err
204 if err == syscall.ERROR_OPERATION_ABORTED {
205 if f.closing.isSet() {
206 err = ErrFileClosed
207 }
208 } else if err != nil && f.socket {
209
210 var bytes, flags uint32
211 err = wsaGetOverlappedResult(f.handle, &c.o, &bytes, false, &flags)
212 }
213 case <-timeout:
214 _ = cancelIoEx(f.handle, &c.o)
215 r = <-c.ch
216 err = r.err
217 if err == syscall.ERROR_OPERATION_ABORTED {
218 err = ErrTimeout
219 }
220 }
221
222
223
224
225
226 runtime.KeepAlive(c)
227 return int(r.bytes), err
228 }
229
230
231 func (f *win32File) Read(b []byte) (int, error) {
232 c, err := f.prepareIO()
233 if err != nil {
234 return 0, err
235 }
236 defer f.wg.Done()
237
238 if f.readDeadline.timedout.isSet() {
239 return 0, ErrTimeout
240 }
241
242 var bytes uint32
243 err = syscall.ReadFile(f.handle, b, &bytes, &c.o)
244 n, err := f.asyncIO(c, &f.readDeadline, bytes, err)
245 runtime.KeepAlive(b)
246
247
248 if err == nil && n == 0 && len(b) != 0 {
249 return 0, io.EOF
250 } else if err == syscall.ERROR_BROKEN_PIPE {
251 return 0, io.EOF
252 } else {
253 return n, err
254 }
255 }
256
257
258 func (f *win32File) Write(b []byte) (int, error) {
259 c, err := f.prepareIO()
260 if err != nil {
261 return 0, err
262 }
263 defer f.wg.Done()
264
265 if f.writeDeadline.timedout.isSet() {
266 return 0, ErrTimeout
267 }
268
269 var bytes uint32
270 err = syscall.WriteFile(f.handle, b, &bytes, &c.o)
271 n, err := f.asyncIO(c, &f.writeDeadline, bytes, err)
272 runtime.KeepAlive(b)
273 return n, err
274 }
275
276 func (f *win32File) SetReadDeadline(deadline time.Time) error {
277 return f.readDeadline.set(deadline)
278 }
279
280 func (f *win32File) SetWriteDeadline(deadline time.Time) error {
281 return f.writeDeadline.set(deadline)
282 }
283
284 func (f *win32File) Flush() error {
285 return syscall.FlushFileBuffers(f.handle)
286 }
287
288 func (f *win32File) Fd() uintptr {
289 return uintptr(f.handle)
290 }
291
292 func (d *deadlineHandler) set(deadline time.Time) error {
293 d.setLock.Lock()
294 defer d.setLock.Unlock()
295
296 if d.timer != nil {
297 if !d.timer.Stop() {
298 <-d.channel
299 }
300 d.timer = nil
301 }
302 d.timedout.setFalse()
303
304 select {
305 case <-d.channel:
306 d.channelLock.Lock()
307 d.channel = make(chan struct{})
308 d.channelLock.Unlock()
309 default:
310 }
311
312 if deadline.IsZero() {
313 return nil
314 }
315
316 timeoutIO := func() {
317 d.timedout.setTrue()
318 close(d.channel)
319 }
320
321 now := time.Now()
322 duration := deadline.Sub(now)
323 if deadline.After(now) {
324
325 d.timer = time.AfterFunc(duration, timeoutIO)
326 } else {
327
328 timeoutIO()
329 }
330 return nil
331 }
332
View as plain text