...

Source file src/github.com/docker/distribution/registry/client/transport/http_reader.go

Documentation: github.com/docker/distribution/registry/client/transport

     1  package transport
     2  
     3  import (
     4  	"errors"
     5  	"fmt"
     6  	"io"
     7  	"net/http"
     8  	"regexp"
     9  	"strconv"
    10  )
    11  
    12  var (
    13  	contentRangeRegexp = regexp.MustCompile(`bytes ([0-9]+)-([0-9]+)/([0-9]+|\\*)`)
    14  
    15  	// ErrWrongCodeForByteRange is returned if the client sends a request
    16  	// with a Range header but the server returns a 2xx or 3xx code other
    17  	// than 206 Partial Content.
    18  	ErrWrongCodeForByteRange = errors.New("expected HTTP 206 from byte range request")
    19  )
    20  
    21  // ReadSeekCloser combines io.ReadSeeker with io.Closer.
    22  type ReadSeekCloser interface {
    23  	io.ReadSeeker
    24  	io.Closer
    25  }
    26  
    27  // NewHTTPReadSeeker handles reading from an HTTP endpoint using a GET
    28  // request. When seeking and starting a read from a non-zero offset
    29  // the a "Range" header will be added which sets the offset.
    30  // TODO(dmcgowan): Move this into a separate utility package
    31  func NewHTTPReadSeeker(client *http.Client, url string, errorHandler func(*http.Response) error) ReadSeekCloser {
    32  	return &httpReadSeeker{
    33  		client:       client,
    34  		url:          url,
    35  		errorHandler: errorHandler,
    36  	}
    37  }
    38  
    39  type httpReadSeeker struct {
    40  	client *http.Client
    41  	url    string
    42  
    43  	// errorHandler creates an error from an unsuccessful HTTP response.
    44  	// This allows the error to be created with the HTTP response body
    45  	// without leaking the body through a returned error.
    46  	errorHandler func(*http.Response) error
    47  
    48  	size int64
    49  
    50  	// rc is the remote read closer.
    51  	rc io.ReadCloser
    52  	// readerOffset tracks the offset as of the last read.
    53  	readerOffset int64
    54  	// seekOffset allows Seek to override the offset. Seek changes
    55  	// seekOffset instead of changing readOffset directly so that
    56  	// connection resets can be delayed and possibly avoided if the
    57  	// seek is undone (i.e. seeking to the end and then back to the
    58  	// beginning).
    59  	seekOffset int64
    60  	err        error
    61  }
    62  
    63  func (hrs *httpReadSeeker) Read(p []byte) (n int, err error) {
    64  	if hrs.err != nil {
    65  		return 0, hrs.err
    66  	}
    67  
    68  	// If we sought to a different position, we need to reset the
    69  	// connection. This logic is here instead of Seek so that if
    70  	// a seek is undone before the next read, the connection doesn't
    71  	// need to be closed and reopened. A common example of this is
    72  	// seeking to the end to determine the length, and then seeking
    73  	// back to the original position.
    74  	if hrs.readerOffset != hrs.seekOffset {
    75  		hrs.reset()
    76  	}
    77  
    78  	hrs.readerOffset = hrs.seekOffset
    79  
    80  	rd, err := hrs.reader()
    81  	if err != nil {
    82  		return 0, err
    83  	}
    84  
    85  	n, err = rd.Read(p)
    86  	hrs.seekOffset += int64(n)
    87  	hrs.readerOffset += int64(n)
    88  
    89  	return n, err
    90  }
    91  
    92  func (hrs *httpReadSeeker) Seek(offset int64, whence int) (int64, error) {
    93  	if hrs.err != nil {
    94  		return 0, hrs.err
    95  	}
    96  
    97  	lastReaderOffset := hrs.readerOffset
    98  
    99  	if whence == io.SeekStart && hrs.rc == nil {
   100  		// If no request has been made yet, and we are seeking to an
   101  		// absolute position, set the read offset as well to avoid an
   102  		// unnecessary request.
   103  		hrs.readerOffset = offset
   104  	}
   105  
   106  	_, err := hrs.reader()
   107  	if err != nil {
   108  		hrs.readerOffset = lastReaderOffset
   109  		return 0, err
   110  	}
   111  
   112  	newOffset := hrs.seekOffset
   113  
   114  	switch whence {
   115  	case io.SeekCurrent:
   116  		newOffset += offset
   117  	case io.SeekEnd:
   118  		if hrs.size < 0 {
   119  			return 0, errors.New("content length not known")
   120  		}
   121  		newOffset = hrs.size + offset
   122  	case io.SeekStart:
   123  		newOffset = offset
   124  	}
   125  
   126  	if newOffset < 0 {
   127  		err = errors.New("cannot seek to negative position")
   128  	} else {
   129  		hrs.seekOffset = newOffset
   130  	}
   131  
   132  	return hrs.seekOffset, err
   133  }
   134  
   135  func (hrs *httpReadSeeker) Close() error {
   136  	if hrs.err != nil {
   137  		return hrs.err
   138  	}
   139  
   140  	// close and release reader chain
   141  	if hrs.rc != nil {
   142  		hrs.rc.Close()
   143  	}
   144  
   145  	hrs.rc = nil
   146  
   147  	hrs.err = errors.New("httpLayer: closed")
   148  
   149  	return nil
   150  }
   151  
   152  func (hrs *httpReadSeeker) reset() {
   153  	if hrs.err != nil {
   154  		return
   155  	}
   156  	if hrs.rc != nil {
   157  		hrs.rc.Close()
   158  		hrs.rc = nil
   159  	}
   160  }
   161  
   162  func (hrs *httpReadSeeker) reader() (io.Reader, error) {
   163  	if hrs.err != nil {
   164  		return nil, hrs.err
   165  	}
   166  
   167  	if hrs.rc != nil {
   168  		return hrs.rc, nil
   169  	}
   170  
   171  	req, err := http.NewRequest("GET", hrs.url, nil)
   172  	if err != nil {
   173  		return nil, err
   174  	}
   175  
   176  	if hrs.readerOffset > 0 {
   177  		// If we are at different offset, issue a range request from there.
   178  		req.Header.Add("Range", fmt.Sprintf("bytes=%d-", hrs.readerOffset))
   179  		// TODO: get context in here
   180  		// context.GetLogger(hrs.context).Infof("Range: %s", req.Header.Get("Range"))
   181  	}
   182  
   183  	resp, err := hrs.client.Do(req)
   184  	if err != nil {
   185  		return nil, err
   186  	}
   187  
   188  	// Normally would use client.SuccessStatus, but that would be a cyclic
   189  	// import
   190  	if resp.StatusCode >= 200 && resp.StatusCode <= 399 {
   191  		if hrs.readerOffset > 0 {
   192  			if resp.StatusCode != http.StatusPartialContent {
   193  				return nil, ErrWrongCodeForByteRange
   194  			}
   195  
   196  			contentRange := resp.Header.Get("Content-Range")
   197  			if contentRange == "" {
   198  				return nil, errors.New("no Content-Range header found in HTTP 206 response")
   199  			}
   200  
   201  			submatches := contentRangeRegexp.FindStringSubmatch(contentRange)
   202  			if len(submatches) < 4 {
   203  				return nil, fmt.Errorf("could not parse Content-Range header: %s", contentRange)
   204  			}
   205  
   206  			startByte, err := strconv.ParseUint(submatches[1], 10, 64)
   207  			if err != nil {
   208  				return nil, fmt.Errorf("could not parse start of range in Content-Range header: %s", contentRange)
   209  			}
   210  
   211  			if startByte != uint64(hrs.readerOffset) {
   212  				return nil, fmt.Errorf("received Content-Range starting at offset %d instead of requested %d", startByte, hrs.readerOffset)
   213  			}
   214  
   215  			endByte, err := strconv.ParseUint(submatches[2], 10, 64)
   216  			if err != nil {
   217  				return nil, fmt.Errorf("could not parse end of range in Content-Range header: %s", contentRange)
   218  			}
   219  
   220  			if submatches[3] == "*" {
   221  				hrs.size = -1
   222  			} else {
   223  				size, err := strconv.ParseUint(submatches[3], 10, 64)
   224  				if err != nil {
   225  					return nil, fmt.Errorf("could not parse total size in Content-Range header: %s", contentRange)
   226  				}
   227  
   228  				if endByte+1 != size {
   229  					return nil, fmt.Errorf("range in Content-Range stops before the end of the content: %s", contentRange)
   230  				}
   231  
   232  				hrs.size = int64(size)
   233  			}
   234  		} else if resp.StatusCode == http.StatusOK {
   235  			hrs.size = resp.ContentLength
   236  		} else {
   237  			hrs.size = -1
   238  		}
   239  		hrs.rc = resp.Body
   240  	} else {
   241  		defer resp.Body.Close()
   242  		if hrs.errorHandler != nil {
   243  			return nil, hrs.errorHandler(resp)
   244  		}
   245  		return nil, fmt.Errorf("unexpected status resolving reader: %v", resp.Status)
   246  	}
   247  
   248  	return hrs.rc, nil
   249  }
   250  

View as plain text