...

Source file src/github.com/containerd/fifo/fifo_test.go

Documentation: github.com/containerd/fifo

     1  //go:build !windows
     2  
     3  /*
     4     Copyright The containerd Authors.
     5  
     6     Licensed under the Apache License, Version 2.0 (the "License");
     7     you may not use this file except in compliance with the License.
     8     You may obtain a copy of the License at
     9  
    10         http://www.apache.org/licenses/LICENSE-2.0
    11  
    12     Unless required by applicable law or agreed to in writing, software
    13     distributed under the License is distributed on an "AS IS" BASIS,
    14     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    15     See the License for the specific language governing permissions and
    16     limitations under the License.
    17  */
    18  
    19  package fifo
    20  
    21  import (
    22  	"context"
    23  	"io"
    24  	"os"
    25  	"path/filepath"
    26  	"sync"
    27  	"syscall"
    28  	"testing"
    29  	"time"
    30  
    31  	"github.com/stretchr/testify/assert"
    32  )
    33  
    34  func TestFifoCancel(t *testing.T) {
    35  	tmpdir, err := os.MkdirTemp("", "fifos")
    36  	assert.NoError(t, err)
    37  	defer os.RemoveAll(tmpdir)
    38  
    39  	leakCheckWg = &sync.WaitGroup{}
    40  	defer func() {
    41  		leakCheckWg = nil
    42  	}()
    43  
    44  	f, err := OpenFifo(context.Background(), filepath.Join(tmpdir, "f0"), syscall.O_RDONLY|syscall.O_NONBLOCK, 0600)
    45  	assert.Exactly(t, nil, f)
    46  	assert.NotNil(t, err)
    47  
    48  	assert.NoError(t, checkWgDone(leakCheckWg))
    49  
    50  	ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
    51  	defer cancel()
    52  
    53  	f, err = OpenFifo(ctx, filepath.Join(tmpdir, "f0"), syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0600)
    54  	assert.NoError(t, err)
    55  
    56  	b := make([]byte, 32)
    57  	n, err := f.Read(b)
    58  	assert.Equal(t, n, 0)
    59  	assert.Equal(t, err, ErrReadClosed)
    60  
    61  	select {
    62  	case <-ctx.Done():
    63  	default:
    64  		t.Fatal("context should have been done")
    65  	}
    66  	assert.NoError(t, checkWgDone(leakCheckWg))
    67  }
    68  
    69  func TestFifoReadWrite(t *testing.T) {
    70  	tmpdir, err := os.MkdirTemp("", "fifos")
    71  	assert.NoError(t, err)
    72  	defer os.RemoveAll(tmpdir)
    73  
    74  	leakCheckWg = &sync.WaitGroup{}
    75  	defer func() {
    76  		leakCheckWg = nil
    77  	}()
    78  
    79  	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    80  	defer cancel()
    81  
    82  	r, err := OpenFifo(ctx, filepath.Join(tmpdir, "f0"), syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0600)
    83  	assert.NoError(t, err)
    84  
    85  	w, err := OpenFifo(ctx, filepath.Join(tmpdir, "f0"), syscall.O_WRONLY|syscall.O_NONBLOCK, 0)
    86  	assert.NoError(t, err)
    87  
    88  	_, err = w.Write([]byte("foo"))
    89  	assert.NoError(t, err)
    90  
    91  	b := make([]byte, 32)
    92  	n, err := r.Read(b)
    93  	assert.NoError(t, err)
    94  	assert.Equal(t, string(b[:n]), "foo")
    95  
    96  	err = r.Close()
    97  	assert.NoError(t, err)
    98  
    99  	_, err = w.Write([]byte("bar"))
   100  	assert.NotNil(t, err)
   101  
   102  	assert.NoError(t, checkWgDone(leakCheckWg))
   103  
   104  	cancel()
   105  	ctx, cancel = context.WithTimeout(context.Background(), 2*time.Second)
   106  	defer cancel()
   107  
   108  	w, err = OpenFifo(ctx, filepath.Join(tmpdir, "f1"), syscall.O_CREAT|syscall.O_WRONLY|syscall.O_NONBLOCK, 0600)
   109  	assert.NoError(t, err)
   110  
   111  	written := make(chan struct{})
   112  	go func() {
   113  		w.Write([]byte("baz"))
   114  		close(written)
   115  	}()
   116  
   117  	time.Sleep(200 * time.Millisecond)
   118  
   119  	r, err = OpenFifo(ctx, filepath.Join(tmpdir, "f1"), syscall.O_RDONLY|syscall.O_NONBLOCK, 0)
   120  	assert.NoError(t, err)
   121  	n, err = r.Read(b)
   122  	assert.NoError(t, err)
   123  	assert.Equal(t, string(b[:n]), "baz")
   124  	select {
   125  	case <-written:
   126  	case <-time.After(500 * time.Millisecond):
   127  		t.Fatal("content should have been written")
   128  	}
   129  
   130  	_, err = w.Write([]byte("barbar")) // kernel-buffer
   131  	assert.NoError(t, err)
   132  	err = w.Close()
   133  	assert.NoError(t, err)
   134  	n, err = r.Read(b)
   135  	assert.NoError(t, err)
   136  	assert.Equal(t, string(b[:n]), "barbar")
   137  	n, err = r.Read(b)
   138  	assert.Equal(t, n, 0)
   139  	assert.Equal(t, err, io.EOF)
   140  	n, err = r.Read(b)
   141  	assert.Equal(t, n, 0)
   142  	assert.Equal(t, err, io.EOF)
   143  
   144  	assert.NoError(t, checkWgDone(leakCheckWg))
   145  }
   146  
   147  func TestFifoCancelOneSide(t *testing.T) {
   148  	tmpdir, err := os.MkdirTemp("", "fifos")
   149  	assert.NoError(t, err)
   150  	defer os.RemoveAll(tmpdir)
   151  
   152  	leakCheckWg = &sync.WaitGroup{}
   153  	defer func() {
   154  		leakCheckWg = nil
   155  	}()
   156  
   157  	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
   158  	defer cancel()
   159  
   160  	f, err := OpenFifo(ctx, filepath.Join(tmpdir, "f0"), syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0600)
   161  	assert.NoError(t, err)
   162  
   163  	read := make(chan struct{})
   164  	b := make([]byte, 32)
   165  	go func() {
   166  		_, err = f.Read(b)
   167  		close(read)
   168  	}()
   169  
   170  	select {
   171  	case <-read:
   172  		t.Fatal("read should have blocked")
   173  	case <-time.After(time.Second):
   174  	}
   175  
   176  	cerr := f.Close()
   177  	assert.NoError(t, cerr)
   178  
   179  	select {
   180  	case <-read:
   181  	case <-time.After(time.Second):
   182  		t.Fatal("read should have unblocked")
   183  	}
   184  
   185  	assert.Equal(t, err, ErrReadClosed)
   186  
   187  	assert.NoError(t, checkWgDone(leakCheckWg))
   188  }
   189  
   190  func TestFifoBlocking(t *testing.T) {
   191  	tmpdir, err := os.MkdirTemp("", "fifos")
   192  	assert.NoError(t, err)
   193  	defer os.RemoveAll(tmpdir)
   194  
   195  	leakCheckWg = &sync.WaitGroup{}
   196  	defer func() {
   197  		leakCheckWg = nil
   198  	}()
   199  
   200  	ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
   201  	defer cancel()
   202  
   203  	f, err := OpenFifo(ctx, filepath.Join(tmpdir, "f0"), syscall.O_RDONLY|syscall.O_CREAT, 0600)
   204  	assert.Exactly(t, nil, f)
   205  	assert.EqualError(t, err, "context deadline exceeded")
   206  
   207  	select {
   208  	case <-ctx.Done():
   209  	default:
   210  		t.Fatal("context should have been completed")
   211  	}
   212  
   213  	assert.NoError(t, checkWgDone(leakCheckWg))
   214  
   215  	cancel()
   216  	ctx, cancel = context.WithTimeout(context.Background(), 2*time.Second)
   217  	defer cancel()
   218  
   219  	var rerr error
   220  	var r io.ReadCloser
   221  	readerOpen := make(chan struct{})
   222  	go func() {
   223  		r, rerr = OpenFifo(ctx, filepath.Join(tmpdir, "f1"), syscall.O_RDONLY|syscall.O_CREAT, 0600)
   224  		close(readerOpen)
   225  	}()
   226  
   227  	time.Sleep(500 * time.Millisecond)
   228  	w, err := OpenFifo(ctx, filepath.Join(tmpdir, "f1"), syscall.O_WRONLY, 0)
   229  	assert.NoError(t, err)
   230  
   231  	select {
   232  	case <-readerOpen:
   233  	case <-time.After(time.Second):
   234  		t.Fatal("writer should have unblocke reader")
   235  	}
   236  
   237  	assert.NoError(t, rerr)
   238  
   239  	_, err = w.Write([]byte("foobar"))
   240  	assert.NoError(t, err)
   241  
   242  	b := make([]byte, 32)
   243  	n, err := r.Read(b)
   244  	assert.NoError(t, err)
   245  	assert.Equal(t, string(b[:n]), "foobar")
   246  
   247  	assert.NoError(t, checkWgDone(leakCheckWg))
   248  
   249  	err = w.Close()
   250  	assert.NoError(t, err)
   251  	n, err = r.Read(b)
   252  	assert.Equal(t, n, 0)
   253  	assert.Equal(t, err, io.EOF)
   254  
   255  	assert.NoError(t, checkWgDone(leakCheckWg))
   256  }
   257  
   258  func TestFifoORDWR(t *testing.T) {
   259  	tmpdir, err := os.MkdirTemp("", "fifos")
   260  	assert.NoError(t, err)
   261  	defer os.RemoveAll(tmpdir)
   262  
   263  	leakCheckWg = &sync.WaitGroup{}
   264  	defer func() {
   265  		leakCheckWg = nil
   266  	}()
   267  
   268  	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
   269  	defer cancel()
   270  
   271  	f, err := OpenFifo(ctx, filepath.Join(tmpdir, "f0"), syscall.O_RDWR|syscall.O_CREAT, 0600)
   272  	assert.NoError(t, err)
   273  
   274  	_, err = f.Write([]byte("foobar"))
   275  	assert.NoError(t, err)
   276  
   277  	b := make([]byte, 32)
   278  	n, err := f.Read(b)
   279  	assert.NoError(t, err)
   280  	assert.Equal(t, string(b[:n]), "foobar")
   281  
   282  	r1, err := OpenFifo(ctx, filepath.Join(tmpdir, "f0"), syscall.O_RDONLY|syscall.O_NONBLOCK, 0)
   283  	assert.NoError(t, err)
   284  
   285  	_, err = f.Write([]byte("barbar"))
   286  	assert.NoError(t, err)
   287  
   288  	n, err = r1.Read(b)
   289  	assert.NoError(t, err)
   290  	assert.Equal(t, string(b[:n]), "barbar")
   291  
   292  	r2, err := OpenFifo(ctx, filepath.Join(tmpdir, "f0"), syscall.O_RDONLY, 0)
   293  	assert.NoError(t, err)
   294  
   295  	_, err = f.Write([]byte("barbaz"))
   296  	assert.NoError(t, err)
   297  
   298  	n, err = r2.Read(b)
   299  	assert.NoError(t, err)
   300  	assert.Equal(t, string(b[:n]), "barbaz")
   301  
   302  	err = r2.Close()
   303  	assert.NoError(t, err)
   304  
   305  	_, err = f.Write([]byte("bar123"))
   306  	assert.NoError(t, err)
   307  
   308  	n, err = r1.Read(b)
   309  	assert.NoError(t, err)
   310  	assert.Equal(t, string(b[:n]), "bar123")
   311  
   312  	err = r1.Close()
   313  	assert.NoError(t, err)
   314  
   315  	_, err = f.Write([]byte("bar456"))
   316  	assert.NoError(t, err)
   317  
   318  	r2, err = OpenFifo(ctx, filepath.Join(tmpdir, "f0"), syscall.O_RDONLY, 0)
   319  	assert.NoError(t, err)
   320  
   321  	n, err = r2.Read(b)
   322  	assert.NoError(t, err)
   323  	assert.Equal(t, string(b[:n]), "bar456")
   324  
   325  	err = f.Close()
   326  	assert.NoError(t, err)
   327  
   328  	_, err = r2.Read(b)
   329  	assert.EqualError(t, err, io.EOF.Error())
   330  
   331  	assert.NoError(t, checkWgDone(leakCheckWg))
   332  }
   333  
   334  func TestFifoCloseError(t *testing.T) {
   335  	tmpdir, err := os.MkdirTemp("", "fifos")
   336  	assert.NoError(t, err)
   337  	defer os.RemoveAll(tmpdir)
   338  
   339  	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
   340  	defer cancel()
   341  
   342  	w, err := OpenFifo(ctx, filepath.Join(tmpdir, t.Name()), syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0600)
   343  	assert.NoError(t, err)
   344  	w.Close()
   345  
   346  	data := []byte("hello world!")
   347  	_, err = w.Write(data)
   348  	assert.Equal(t, ErrWriteClosed, err)
   349  
   350  	r, err := OpenFifo(ctx, filepath.Join(tmpdir, t.Name()), syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0600)
   351  	assert.NoError(t, err)
   352  	r.Close()
   353  
   354  	buf := make([]byte, len(data))
   355  	_, err = r.Read(buf)
   356  	assert.Equal(t, ErrReadClosed, err)
   357  }
   358  
   359  func TestFifoCloseWhileReading(t *testing.T) {
   360  	tmpdir, err := os.MkdirTemp("", "fifos")
   361  	assert.NoError(t, err)
   362  	defer os.RemoveAll(tmpdir)
   363  
   364  	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
   365  	defer cancel()
   366  
   367  	r, err := OpenFifo(ctx, filepath.Join(tmpdir, t.Name()), syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0600)
   368  	assert.NoError(t, err)
   369  
   370  	read := make(chan struct{})
   371  	readErr := make(chan error)
   372  
   373  	go func() {
   374  		buf := make([]byte, 32)
   375  		_, err := r.Read(buf)
   376  
   377  		if err != nil {
   378  			readErr <- err
   379  			return
   380  		}
   381  
   382  		close(read)
   383  
   384  	}()
   385  
   386  	time.Sleep(500 * time.Millisecond)
   387  	r.Close()
   388  
   389  	select {
   390  	case <-read:
   391  		t.Fatal("Read should not succeed")
   392  	case err := <-readErr:
   393  		assert.Equal(t, ErrReadClosed, err)
   394  	case <-time.After(500 * time.Millisecond):
   395  		t.Fatal("Read should not be blocked")
   396  	}
   397  }
   398  
   399  func TestFifoCloseWhileReadingAndWriting(t *testing.T) {
   400  	tmpdir, err := os.MkdirTemp("", "fifos")
   401  	assert.NoError(t, err)
   402  	defer os.RemoveAll(tmpdir)
   403  
   404  	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
   405  	defer cancel()
   406  
   407  	r, err := OpenFifo(ctx, filepath.Join(tmpdir, t.Name()), syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0600)
   408  	assert.NoError(t, err)
   409  
   410  	w, err := OpenFifo(ctx, filepath.Join(tmpdir, t.Name()), syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0)
   411  	assert.NoError(t, err)
   412  
   413  	read := make(chan struct{})
   414  	readErr := make(chan error)
   415  	wBuffer := []byte("foo")
   416  
   417  	go func() {
   418  		buf := make([]byte, 32)
   419  		_, err := r.Read(buf)
   420  
   421  		if err != nil {
   422  			readErr <- err
   423  			return
   424  		}
   425  
   426  		close(read)
   427  	}()
   428  
   429  	time.Sleep(500 * time.Millisecond)
   430  
   431  	// Close the reader and then write in the writer.
   432  	// The reader thread should return an error.
   433  	r.Close()
   434  
   435  	// The write should fail, the reader end of the pipe is closed.
   436  	_, err = w.Write(wBuffer)
   437  	assert.Error(t, err)
   438  
   439  	select {
   440  	case <-read:
   441  		t.Fatal("Read should not succeed")
   442  	case err := <-readErr:
   443  		assert.Error(t, err)
   444  	case <-time.After(500 * time.Millisecond):
   445  		t.Fatal("Read should not be blocked")
   446  	}
   447  }
   448  
   449  func TestFifoWrongRdWrError(t *testing.T) {
   450  	tmpdir, err := os.MkdirTemp("", "fifos")
   451  	assert.NoError(t, err)
   452  	defer os.RemoveAll(tmpdir)
   453  
   454  	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
   455  	defer cancel()
   456  
   457  	r, err := OpenFifo(ctx, filepath.Join(tmpdir, t.Name()), syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0600)
   458  	assert.NoError(t, err)
   459  
   460  	data := []byte("hello world!")
   461  	_, err = r.Write(data)
   462  	assert.Equal(t, ErrWrToRDONLY, err)
   463  
   464  	w, err := OpenFifo(ctx, filepath.Join(tmpdir, t.Name()), syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0600)
   465  	assert.NoError(t, err)
   466  
   467  	buf := make([]byte, len(data))
   468  	_, err = w.Read(buf)
   469  	assert.Equal(t, ErrRdFrmWRONLY, err)
   470  }
   471  
   472  func checkWgDone(wg *sync.WaitGroup) error {
   473  	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
   474  	defer cancel()
   475  	done := make(chan struct{})
   476  	go func() {
   477  		wg.Wait() // No way to cancel
   478  		close(done)
   479  	}()
   480  	select {
   481  	case <-done:
   482  		return nil
   483  	case <-ctx.Done():
   484  		return ctx.Err()
   485  	}
   486  }
   487  

View as plain text