1
2
3
18
19 package fifo
20
21 import (
22 "context"
23 "io"
24 "os"
25 "path/filepath"
26 "sync"
27 "syscall"
28 "testing"
29 "time"
30
31 "github.com/stretchr/testify/assert"
32 )
33
34 func TestFifoCancel(t *testing.T) {
35 tmpdir, err := os.MkdirTemp("", "fifos")
36 assert.NoError(t, err)
37 defer os.RemoveAll(tmpdir)
38
39 leakCheckWg = &sync.WaitGroup{}
40 defer func() {
41 leakCheckWg = nil
42 }()
43
44 f, err := OpenFifo(context.Background(), filepath.Join(tmpdir, "f0"), syscall.O_RDONLY|syscall.O_NONBLOCK, 0600)
45 assert.Exactly(t, nil, f)
46 assert.NotNil(t, err)
47
48 assert.NoError(t, checkWgDone(leakCheckWg))
49
50 ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
51 defer cancel()
52
53 f, err = OpenFifo(ctx, filepath.Join(tmpdir, "f0"), syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0600)
54 assert.NoError(t, err)
55
56 b := make([]byte, 32)
57 n, err := f.Read(b)
58 assert.Equal(t, n, 0)
59 assert.Equal(t, err, ErrReadClosed)
60
61 select {
62 case <-ctx.Done():
63 default:
64 t.Fatal("context should have been done")
65 }
66 assert.NoError(t, checkWgDone(leakCheckWg))
67 }
68
69 func TestFifoReadWrite(t *testing.T) {
70 tmpdir, err := os.MkdirTemp("", "fifos")
71 assert.NoError(t, err)
72 defer os.RemoveAll(tmpdir)
73
74 leakCheckWg = &sync.WaitGroup{}
75 defer func() {
76 leakCheckWg = nil
77 }()
78
79 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
80 defer cancel()
81
82 r, err := OpenFifo(ctx, filepath.Join(tmpdir, "f0"), syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0600)
83 assert.NoError(t, err)
84
85 w, err := OpenFifo(ctx, filepath.Join(tmpdir, "f0"), syscall.O_WRONLY|syscall.O_NONBLOCK, 0)
86 assert.NoError(t, err)
87
88 _, err = w.Write([]byte("foo"))
89 assert.NoError(t, err)
90
91 b := make([]byte, 32)
92 n, err := r.Read(b)
93 assert.NoError(t, err)
94 assert.Equal(t, string(b[:n]), "foo")
95
96 err = r.Close()
97 assert.NoError(t, err)
98
99 _, err = w.Write([]byte("bar"))
100 assert.NotNil(t, err)
101
102 assert.NoError(t, checkWgDone(leakCheckWg))
103
104 cancel()
105 ctx, cancel = context.WithTimeout(context.Background(), 2*time.Second)
106 defer cancel()
107
108 w, err = OpenFifo(ctx, filepath.Join(tmpdir, "f1"), syscall.O_CREAT|syscall.O_WRONLY|syscall.O_NONBLOCK, 0600)
109 assert.NoError(t, err)
110
111 written := make(chan struct{})
112 go func() {
113 w.Write([]byte("baz"))
114 close(written)
115 }()
116
117 time.Sleep(200 * time.Millisecond)
118
119 r, err = OpenFifo(ctx, filepath.Join(tmpdir, "f1"), syscall.O_RDONLY|syscall.O_NONBLOCK, 0)
120 assert.NoError(t, err)
121 n, err = r.Read(b)
122 assert.NoError(t, err)
123 assert.Equal(t, string(b[:n]), "baz")
124 select {
125 case <-written:
126 case <-time.After(500 * time.Millisecond):
127 t.Fatal("content should have been written")
128 }
129
130 _, err = w.Write([]byte("barbar"))
131 assert.NoError(t, err)
132 err = w.Close()
133 assert.NoError(t, err)
134 n, err = r.Read(b)
135 assert.NoError(t, err)
136 assert.Equal(t, string(b[:n]), "barbar")
137 n, err = r.Read(b)
138 assert.Equal(t, n, 0)
139 assert.Equal(t, err, io.EOF)
140 n, err = r.Read(b)
141 assert.Equal(t, n, 0)
142 assert.Equal(t, err, io.EOF)
143
144 assert.NoError(t, checkWgDone(leakCheckWg))
145 }
146
147 func TestFifoCancelOneSide(t *testing.T) {
148 tmpdir, err := os.MkdirTemp("", "fifos")
149 assert.NoError(t, err)
150 defer os.RemoveAll(tmpdir)
151
152 leakCheckWg = &sync.WaitGroup{}
153 defer func() {
154 leakCheckWg = nil
155 }()
156
157 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
158 defer cancel()
159
160 f, err := OpenFifo(ctx, filepath.Join(tmpdir, "f0"), syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0600)
161 assert.NoError(t, err)
162
163 read := make(chan struct{})
164 b := make([]byte, 32)
165 go func() {
166 _, err = f.Read(b)
167 close(read)
168 }()
169
170 select {
171 case <-read:
172 t.Fatal("read should have blocked")
173 case <-time.After(time.Second):
174 }
175
176 cerr := f.Close()
177 assert.NoError(t, cerr)
178
179 select {
180 case <-read:
181 case <-time.After(time.Second):
182 t.Fatal("read should have unblocked")
183 }
184
185 assert.Equal(t, err, ErrReadClosed)
186
187 assert.NoError(t, checkWgDone(leakCheckWg))
188 }
189
190 func TestFifoBlocking(t *testing.T) {
191 tmpdir, err := os.MkdirTemp("", "fifos")
192 assert.NoError(t, err)
193 defer os.RemoveAll(tmpdir)
194
195 leakCheckWg = &sync.WaitGroup{}
196 defer func() {
197 leakCheckWg = nil
198 }()
199
200 ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
201 defer cancel()
202
203 f, err := OpenFifo(ctx, filepath.Join(tmpdir, "f0"), syscall.O_RDONLY|syscall.O_CREAT, 0600)
204 assert.Exactly(t, nil, f)
205 assert.EqualError(t, err, "context deadline exceeded")
206
207 select {
208 case <-ctx.Done():
209 default:
210 t.Fatal("context should have been completed")
211 }
212
213 assert.NoError(t, checkWgDone(leakCheckWg))
214
215 cancel()
216 ctx, cancel = context.WithTimeout(context.Background(), 2*time.Second)
217 defer cancel()
218
219 var rerr error
220 var r io.ReadCloser
221 readerOpen := make(chan struct{})
222 go func() {
223 r, rerr = OpenFifo(ctx, filepath.Join(tmpdir, "f1"), syscall.O_RDONLY|syscall.O_CREAT, 0600)
224 close(readerOpen)
225 }()
226
227 time.Sleep(500 * time.Millisecond)
228 w, err := OpenFifo(ctx, filepath.Join(tmpdir, "f1"), syscall.O_WRONLY, 0)
229 assert.NoError(t, err)
230
231 select {
232 case <-readerOpen:
233 case <-time.After(time.Second):
234 t.Fatal("writer should have unblocke reader")
235 }
236
237 assert.NoError(t, rerr)
238
239 _, err = w.Write([]byte("foobar"))
240 assert.NoError(t, err)
241
242 b := make([]byte, 32)
243 n, err := r.Read(b)
244 assert.NoError(t, err)
245 assert.Equal(t, string(b[:n]), "foobar")
246
247 assert.NoError(t, checkWgDone(leakCheckWg))
248
249 err = w.Close()
250 assert.NoError(t, err)
251 n, err = r.Read(b)
252 assert.Equal(t, n, 0)
253 assert.Equal(t, err, io.EOF)
254
255 assert.NoError(t, checkWgDone(leakCheckWg))
256 }
257
258 func TestFifoORDWR(t *testing.T) {
259 tmpdir, err := os.MkdirTemp("", "fifos")
260 assert.NoError(t, err)
261 defer os.RemoveAll(tmpdir)
262
263 leakCheckWg = &sync.WaitGroup{}
264 defer func() {
265 leakCheckWg = nil
266 }()
267
268 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
269 defer cancel()
270
271 f, err := OpenFifo(ctx, filepath.Join(tmpdir, "f0"), syscall.O_RDWR|syscall.O_CREAT, 0600)
272 assert.NoError(t, err)
273
274 _, err = f.Write([]byte("foobar"))
275 assert.NoError(t, err)
276
277 b := make([]byte, 32)
278 n, err := f.Read(b)
279 assert.NoError(t, err)
280 assert.Equal(t, string(b[:n]), "foobar")
281
282 r1, err := OpenFifo(ctx, filepath.Join(tmpdir, "f0"), syscall.O_RDONLY|syscall.O_NONBLOCK, 0)
283 assert.NoError(t, err)
284
285 _, err = f.Write([]byte("barbar"))
286 assert.NoError(t, err)
287
288 n, err = r1.Read(b)
289 assert.NoError(t, err)
290 assert.Equal(t, string(b[:n]), "barbar")
291
292 r2, err := OpenFifo(ctx, filepath.Join(tmpdir, "f0"), syscall.O_RDONLY, 0)
293 assert.NoError(t, err)
294
295 _, err = f.Write([]byte("barbaz"))
296 assert.NoError(t, err)
297
298 n, err = r2.Read(b)
299 assert.NoError(t, err)
300 assert.Equal(t, string(b[:n]), "barbaz")
301
302 err = r2.Close()
303 assert.NoError(t, err)
304
305 _, err = f.Write([]byte("bar123"))
306 assert.NoError(t, err)
307
308 n, err = r1.Read(b)
309 assert.NoError(t, err)
310 assert.Equal(t, string(b[:n]), "bar123")
311
312 err = r1.Close()
313 assert.NoError(t, err)
314
315 _, err = f.Write([]byte("bar456"))
316 assert.NoError(t, err)
317
318 r2, err = OpenFifo(ctx, filepath.Join(tmpdir, "f0"), syscall.O_RDONLY, 0)
319 assert.NoError(t, err)
320
321 n, err = r2.Read(b)
322 assert.NoError(t, err)
323 assert.Equal(t, string(b[:n]), "bar456")
324
325 err = f.Close()
326 assert.NoError(t, err)
327
328 _, err = r2.Read(b)
329 assert.EqualError(t, err, io.EOF.Error())
330
331 assert.NoError(t, checkWgDone(leakCheckWg))
332 }
333
334 func TestFifoCloseError(t *testing.T) {
335 tmpdir, err := os.MkdirTemp("", "fifos")
336 assert.NoError(t, err)
337 defer os.RemoveAll(tmpdir)
338
339 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
340 defer cancel()
341
342 w, err := OpenFifo(ctx, filepath.Join(tmpdir, t.Name()), syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0600)
343 assert.NoError(t, err)
344 w.Close()
345
346 data := []byte("hello world!")
347 _, err = w.Write(data)
348 assert.Equal(t, ErrWriteClosed, err)
349
350 r, err := OpenFifo(ctx, filepath.Join(tmpdir, t.Name()), syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0600)
351 assert.NoError(t, err)
352 r.Close()
353
354 buf := make([]byte, len(data))
355 _, err = r.Read(buf)
356 assert.Equal(t, ErrReadClosed, err)
357 }
358
359 func TestFifoCloseWhileReading(t *testing.T) {
360 tmpdir, err := os.MkdirTemp("", "fifos")
361 assert.NoError(t, err)
362 defer os.RemoveAll(tmpdir)
363
364 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
365 defer cancel()
366
367 r, err := OpenFifo(ctx, filepath.Join(tmpdir, t.Name()), syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0600)
368 assert.NoError(t, err)
369
370 read := make(chan struct{})
371 readErr := make(chan error)
372
373 go func() {
374 buf := make([]byte, 32)
375 _, err := r.Read(buf)
376
377 if err != nil {
378 readErr <- err
379 return
380 }
381
382 close(read)
383
384 }()
385
386 time.Sleep(500 * time.Millisecond)
387 r.Close()
388
389 select {
390 case <-read:
391 t.Fatal("Read should not succeed")
392 case err := <-readErr:
393 assert.Equal(t, ErrReadClosed, err)
394 case <-time.After(500 * time.Millisecond):
395 t.Fatal("Read should not be blocked")
396 }
397 }
398
399 func TestFifoCloseWhileReadingAndWriting(t *testing.T) {
400 tmpdir, err := os.MkdirTemp("", "fifos")
401 assert.NoError(t, err)
402 defer os.RemoveAll(tmpdir)
403
404 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
405 defer cancel()
406
407 r, err := OpenFifo(ctx, filepath.Join(tmpdir, t.Name()), syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0600)
408 assert.NoError(t, err)
409
410 w, err := OpenFifo(ctx, filepath.Join(tmpdir, t.Name()), syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0)
411 assert.NoError(t, err)
412
413 read := make(chan struct{})
414 readErr := make(chan error)
415 wBuffer := []byte("foo")
416
417 go func() {
418 buf := make([]byte, 32)
419 _, err := r.Read(buf)
420
421 if err != nil {
422 readErr <- err
423 return
424 }
425
426 close(read)
427 }()
428
429 time.Sleep(500 * time.Millisecond)
430
431
432
433 r.Close()
434
435
436 _, err = w.Write(wBuffer)
437 assert.Error(t, err)
438
439 select {
440 case <-read:
441 t.Fatal("Read should not succeed")
442 case err := <-readErr:
443 assert.Error(t, err)
444 case <-time.After(500 * time.Millisecond):
445 t.Fatal("Read should not be blocked")
446 }
447 }
448
449 func TestFifoWrongRdWrError(t *testing.T) {
450 tmpdir, err := os.MkdirTemp("", "fifos")
451 assert.NoError(t, err)
452 defer os.RemoveAll(tmpdir)
453
454 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
455 defer cancel()
456
457 r, err := OpenFifo(ctx, filepath.Join(tmpdir, t.Name()), syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0600)
458 assert.NoError(t, err)
459
460 data := []byte("hello world!")
461 _, err = r.Write(data)
462 assert.Equal(t, ErrWrToRDONLY, err)
463
464 w, err := OpenFifo(ctx, filepath.Join(tmpdir, t.Name()), syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0600)
465 assert.NoError(t, err)
466
467 buf := make([]byte, len(data))
468 _, err = w.Read(buf)
469 assert.Equal(t, ErrRdFrmWRONLY, err)
470 }
471
472 func checkWgDone(wg *sync.WaitGroup) error {
473 ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
474 defer cancel()
475 done := make(chan struct{})
476 go func() {
477 wg.Wait()
478 close(done)
479 }()
480 select {
481 case <-done:
482 return nil
483 case <-ctx.Done():
484 return ctx.Err()
485 }
486 }
487
View as plain text