...

Source file src/github.com/docker/distribution/registry/storage/filereader.go

Documentation: github.com/docker/distribution/registry/storage

     1  package storage
     2  
     3  import (
     4  	"bufio"
     5  	"bytes"
     6  	"context"
     7  	"fmt"
     8  	"io"
     9  	"io/ioutil"
    10  
    11  	storagedriver "github.com/docker/distribution/registry/storage/driver"
    12  )
    13  
    14  // TODO(stevvooe): Set an optimal buffer size here. We'll have to
    15  // understand the latency characteristics of the underlying network to
    16  // set this correctly, so we may want to leave it to the driver. For
    17  // out of process drivers, we'll have to optimize this buffer size for
    18  // local communication.
    19  const fileReaderBufferSize = 4 << 20
    20  
    21  // remoteFileReader provides a read seeker interface to files stored in
    22  // storagedriver. Used to implement part of layer interface and will be used
    23  // to implement read side of LayerUpload.
    24  type fileReader struct {
    25  	driver storagedriver.StorageDriver
    26  
    27  	ctx context.Context
    28  
    29  	// identifying fields
    30  	path string
    31  	size int64 // size is the total size, must be set.
    32  
    33  	// mutable fields
    34  	rc     io.ReadCloser // remote read closer
    35  	brd    *bufio.Reader // internal buffered io
    36  	offset int64         // offset is the current read offset
    37  	err    error         // terminal error, if set, reader is closed
    38  }
    39  
    40  // newFileReader initializes a file reader for the remote file. The reader
    41  // takes on the size and path that must be determined externally with a stat
    42  // call. The reader operates optimistically, assuming that the file is already
    43  // there.
    44  func newFileReader(ctx context.Context, driver storagedriver.StorageDriver, path string, size int64) (*fileReader, error) {
    45  	return &fileReader{
    46  		ctx:    ctx,
    47  		driver: driver,
    48  		path:   path,
    49  		size:   size,
    50  	}, nil
    51  }
    52  
    53  func (fr *fileReader) Read(p []byte) (n int, err error) {
    54  	if fr.err != nil {
    55  		return 0, fr.err
    56  	}
    57  
    58  	rd, err := fr.reader()
    59  	if err != nil {
    60  		return 0, err
    61  	}
    62  
    63  	n, err = rd.Read(p)
    64  	fr.offset += int64(n)
    65  
    66  	// Simulate io.EOR error if we reach filesize.
    67  	if err == nil && fr.offset >= fr.size {
    68  		err = io.EOF
    69  	}
    70  
    71  	return n, err
    72  }
    73  
    74  func (fr *fileReader) Seek(offset int64, whence int) (int64, error) {
    75  	if fr.err != nil {
    76  		return 0, fr.err
    77  	}
    78  
    79  	var err error
    80  	newOffset := fr.offset
    81  
    82  	switch whence {
    83  	case io.SeekCurrent:
    84  		newOffset += offset
    85  	case io.SeekEnd:
    86  		newOffset = fr.size + offset
    87  	case io.SeekStart:
    88  		newOffset = offset
    89  	}
    90  
    91  	if newOffset < 0 {
    92  		err = fmt.Errorf("cannot seek to negative position")
    93  	} else {
    94  		if fr.offset != newOffset {
    95  			fr.reset()
    96  		}
    97  
    98  		// No problems, set the offset.
    99  		fr.offset = newOffset
   100  	}
   101  
   102  	return fr.offset, err
   103  }
   104  
   105  func (fr *fileReader) Close() error {
   106  	return fr.closeWithErr(fmt.Errorf("fileReader: closed"))
   107  }
   108  
   109  // reader prepares the current reader at the lrs offset, ensuring its buffered
   110  // and ready to go.
   111  func (fr *fileReader) reader() (io.Reader, error) {
   112  	if fr.err != nil {
   113  		return nil, fr.err
   114  	}
   115  
   116  	if fr.rc != nil {
   117  		return fr.brd, nil
   118  	}
   119  
   120  	// If we don't have a reader, open one up.
   121  	rc, err := fr.driver.Reader(fr.ctx, fr.path, fr.offset)
   122  	if err != nil {
   123  		switch err := err.(type) {
   124  		case storagedriver.PathNotFoundError:
   125  			// NOTE(stevvooe): If the path is not found, we simply return a
   126  			// reader that returns io.EOF. However, we do not set fr.rc,
   127  			// allowing future attempts at getting a reader to possibly
   128  			// succeed if the file turns up later.
   129  			return ioutil.NopCloser(bytes.NewReader([]byte{})), nil
   130  		default:
   131  			return nil, err
   132  		}
   133  	}
   134  
   135  	fr.rc = rc
   136  
   137  	if fr.brd == nil {
   138  		fr.brd = bufio.NewReaderSize(fr.rc, fileReaderBufferSize)
   139  	} else {
   140  		fr.brd.Reset(fr.rc)
   141  	}
   142  
   143  	return fr.brd, nil
   144  }
   145  
   146  // resetReader resets the reader, forcing the read method to open up a new
   147  // connection and rebuild the buffered reader. This should be called when the
   148  // offset and the reader will become out of sync, such as during a seek
   149  // operation.
   150  func (fr *fileReader) reset() {
   151  	if fr.err != nil {
   152  		return
   153  	}
   154  	if fr.rc != nil {
   155  		fr.rc.Close()
   156  		fr.rc = nil
   157  	}
   158  }
   159  
   160  func (fr *fileReader) closeWithErr(err error) error {
   161  	if fr.err != nil {
   162  		return fr.err
   163  	}
   164  
   165  	fr.err = err
   166  
   167  	// close and release reader chain
   168  	if fr.rc != nil {
   169  		fr.rc.Close()
   170  	}
   171  
   172  	fr.rc = nil
   173  	fr.brd = nil
   174  
   175  	return fr.err
   176  }
   177  

View as plain text