...

Source file src/github.com/containerd/fifo/fifo.go

Documentation: github.com/containerd/fifo

     1  //go:build !windows
     2  
     3  /*
     4     Copyright The containerd Authors.
     5  
     6     Licensed under the Apache License, Version 2.0 (the "License");
     7     you may not use this file except in compliance with the License.
     8     You may obtain a copy of the License at
     9  
    10         http://www.apache.org/licenses/LICENSE-2.0
    11  
    12     Unless required by applicable law or agreed to in writing, software
    13     distributed under the License is distributed on an "AS IS" BASIS,
    14     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    15     See the License for the specific language governing permissions and
    16     limitations under the License.
    17  */
    18  
    19  package fifo
    20  
    21  import (
    22  	"context"
    23  	"fmt"
    24  	"io"
    25  	"os"
    26  	"runtime"
    27  	"sync"
    28  	"syscall"
    29  
    30  	"golang.org/x/sys/unix"
    31  )
    32  
    33  type fifo struct {
    34  	flag        int
    35  	opened      chan struct{}
    36  	closed      chan struct{}
    37  	closing     chan struct{}
    38  	err         error
    39  	file        *os.File
    40  	closingOnce sync.Once // close has been called
    41  	closedOnce  sync.Once // fifo is closed
    42  	handle      *handle
    43  }
    44  
    45  var leakCheckWg *sync.WaitGroup
    46  
    47  // OpenFifoDup2 is same as OpenFifo, but additionally creates a copy of the FIFO file descriptor with dup2 syscall.
    48  func OpenFifoDup2(ctx context.Context, fn string, flag int, perm os.FileMode, fd int) (io.ReadWriteCloser, error) {
    49  	f, err := openFifo(ctx, fn, flag, perm)
    50  	if err != nil {
    51  		return nil, fmt.Errorf("fifo error: %w", err)
    52  	}
    53  
    54  	if err := unix.Dup2(int(f.file.Fd()), fd); err != nil {
    55  		_ = f.Close()
    56  		return nil, fmt.Errorf("dup2 error: %w", err)
    57  	}
    58  
    59  	return f, nil
    60  }
    61  
    62  // OpenFifo opens a fifo. Returns io.ReadWriteCloser.
    63  // Context can be used to cancel this function until open(2) has not returned.
    64  // Accepted flags:
    65  //   - syscall.O_CREAT - create new fifo if one doesn't exist
    66  //   - syscall.O_RDONLY - open fifo only from reader side
    67  //   - syscall.O_WRONLY - open fifo only from writer side
    68  //   - syscall.O_RDWR - open fifo from both sides, never block on syscall level
    69  //   - syscall.O_NONBLOCK - return io.ReadWriteCloser even if other side of the
    70  //     fifo isn't open. read/write will be connected after the actual fifo is
    71  //     open or after fifo is closed.
    72  func OpenFifo(ctx context.Context, fn string, flag int, perm os.FileMode) (io.ReadWriteCloser, error) {
    73  	fifo, err := openFifo(ctx, fn, flag, perm)
    74  	if fifo == nil {
    75  		// Do not return a non-nil ReadWriteCloser((*fifo)(nil)) value
    76  		// as that can confuse callers.
    77  		return nil, err
    78  	}
    79  	return fifo, err
    80  }
    81  
    82  func openFifo(ctx context.Context, fn string, flag int, perm os.FileMode) (*fifo, error) {
    83  	if _, err := os.Stat(fn); err != nil {
    84  		if os.IsNotExist(err) && flag&syscall.O_CREAT != 0 {
    85  			if err := syscall.Mkfifo(fn, uint32(perm&os.ModePerm)); err != nil && !os.IsExist(err) {
    86  				return nil, fmt.Errorf("error creating fifo %v: %w", fn, err)
    87  			}
    88  		} else {
    89  			return nil, err
    90  		}
    91  	}
    92  
    93  	block := flag&syscall.O_NONBLOCK == 0 || flag&syscall.O_RDWR != 0
    94  
    95  	flag &= ^syscall.O_CREAT
    96  	flag &= ^syscall.O_NONBLOCK
    97  
    98  	h, err := getHandle(fn)
    99  	if err != nil {
   100  		return nil, err
   101  	}
   102  
   103  	f := &fifo{
   104  		handle:  h,
   105  		flag:    flag,
   106  		opened:  make(chan struct{}),
   107  		closed:  make(chan struct{}),
   108  		closing: make(chan struct{}),
   109  	}
   110  
   111  	wg := leakCheckWg
   112  	if wg != nil {
   113  		wg.Add(2)
   114  	}
   115  
   116  	go func() {
   117  		if wg != nil {
   118  			defer wg.Done()
   119  		}
   120  		select {
   121  		case <-ctx.Done():
   122  			select {
   123  			case <-f.opened:
   124  			default:
   125  				f.Close()
   126  			}
   127  		case <-f.opened:
   128  		case <-f.closed:
   129  		}
   130  	}()
   131  	go func() {
   132  		if wg != nil {
   133  			defer wg.Done()
   134  		}
   135  		var file *os.File
   136  		fn, err := h.Path()
   137  		if err == nil {
   138  			file, err = os.OpenFile(fn, flag, 0)
   139  		}
   140  		select {
   141  		case <-f.closing:
   142  			if err == nil {
   143  				select {
   144  				case <-ctx.Done():
   145  					err = ctx.Err()
   146  				default:
   147  					err = fmt.Errorf("fifo %v was closed before opening", h.Name())
   148  				}
   149  				if file != nil {
   150  					file.Close()
   151  				}
   152  			}
   153  		default:
   154  		}
   155  		if err != nil {
   156  			f.closedOnce.Do(func() {
   157  				f.err = err
   158  				close(f.closed)
   159  			})
   160  			return
   161  		}
   162  		f.file = file
   163  		close(f.opened)
   164  	}()
   165  	if block {
   166  		select {
   167  		case <-f.opened:
   168  		case <-f.closed:
   169  			return nil, f.err
   170  		}
   171  	}
   172  	return f, nil
   173  }
   174  
   175  // Read from a fifo to a byte array.
   176  func (f *fifo) Read(b []byte) (int, error) {
   177  	if f.flag&syscall.O_WRONLY > 0 {
   178  		return 0, ErrRdFrmWRONLY
   179  	}
   180  	select {
   181  	case <-f.opened:
   182  		return f.file.Read(b)
   183  	default:
   184  	}
   185  	select {
   186  	case <-f.opened:
   187  		return f.file.Read(b)
   188  	case <-f.closed:
   189  		return 0, ErrReadClosed
   190  	}
   191  }
   192  
   193  // Write from byte array to a fifo.
   194  func (f *fifo) Write(b []byte) (int, error) {
   195  	if f.flag&(syscall.O_WRONLY|syscall.O_RDWR) == 0 {
   196  		return 0, ErrWrToRDONLY
   197  	}
   198  	select {
   199  	case <-f.opened:
   200  		return f.file.Write(b)
   201  	default:
   202  	}
   203  	select {
   204  	case <-f.opened:
   205  		return f.file.Write(b)
   206  	case <-f.closed:
   207  		return 0, ErrWriteClosed
   208  	}
   209  }
   210  
   211  // Close the fifo. Next reads/writes will error. This method can also be used
   212  // before open(2) has returned and fifo was never opened.
   213  func (f *fifo) Close() (retErr error) {
   214  	for {
   215  		if f == nil {
   216  			return
   217  		}
   218  
   219  		select {
   220  		case <-f.closed:
   221  			f.handle.Close()
   222  			return
   223  		default:
   224  			select {
   225  			case <-f.opened:
   226  				f.closedOnce.Do(func() {
   227  					retErr = f.file.Close()
   228  					f.err = retErr
   229  					close(f.closed)
   230  				})
   231  			default:
   232  				if f.flag&syscall.O_RDWR != 0 {
   233  					runtime.Gosched()
   234  					break
   235  				}
   236  				f.closingOnce.Do(func() {
   237  					close(f.closing)
   238  				})
   239  				reverseMode := syscall.O_WRONLY
   240  				if f.flag&syscall.O_WRONLY > 0 {
   241  					reverseMode = syscall.O_RDONLY
   242  				}
   243  				fn, err := f.handle.Path()
   244  				// if Close() is called concurrently(shouldn't) it may cause error
   245  				// because handle is closed
   246  				select {
   247  				case <-f.closed:
   248  				default:
   249  					if err != nil {
   250  						// Path has become invalid. We will leak a goroutine.
   251  						// This case should not happen in linux.
   252  						f.closedOnce.Do(func() {
   253  							f.err = err
   254  							close(f.closed)
   255  						})
   256  						<-f.closed
   257  						break
   258  					}
   259  					f, err := os.OpenFile(fn, reverseMode|syscall.O_NONBLOCK, 0)
   260  					if err == nil {
   261  						f.Close()
   262  					}
   263  					runtime.Gosched()
   264  				}
   265  			}
   266  		}
   267  	}
   268  }
   269  

View as plain text