...

Source file src/github.com/docker/distribution/registry/client/blob_writer.go

Documentation: github.com/docker/distribution/registry/client

     1  package client
     2  
     3  import (
     4  	"bytes"
     5  	"context"
     6  	"fmt"
     7  	"io"
     8  	"io/ioutil"
     9  	"net/http"
    10  	"time"
    11  
    12  	"github.com/docker/distribution"
    13  )
    14  
    15  type httpBlobUpload struct {
    16  	statter distribution.BlobStatter
    17  	client  *http.Client
    18  
    19  	uuid      string
    20  	startedAt time.Time
    21  
    22  	location string // always the last value of the location header.
    23  	offset   int64
    24  	closed   bool
    25  }
    26  
    27  func (hbu *httpBlobUpload) Reader() (io.ReadCloser, error) {
    28  	panic("Not implemented")
    29  }
    30  
    31  func (hbu *httpBlobUpload) handleErrorResponse(resp *http.Response) error {
    32  	if resp.StatusCode == http.StatusNotFound {
    33  		return distribution.ErrBlobUploadUnknown
    34  	}
    35  	return HandleErrorResponse(resp)
    36  }
    37  
    38  func (hbu *httpBlobUpload) ReadFrom(r io.Reader) (n int64, err error) {
    39  	req, err := http.NewRequest("PATCH", hbu.location, ioutil.NopCloser(r))
    40  	if err != nil {
    41  		return 0, err
    42  	}
    43  	defer req.Body.Close()
    44  
    45  	req.Header.Set("Content-Type", "application/octet-stream")
    46  
    47  	resp, err := hbu.client.Do(req)
    48  	if err != nil {
    49  		return 0, err
    50  	}
    51  
    52  	if !SuccessStatus(resp.StatusCode) {
    53  		return 0, hbu.handleErrorResponse(resp)
    54  	}
    55  
    56  	hbu.uuid = resp.Header.Get("Docker-Upload-UUID")
    57  	hbu.location, err = sanitizeLocation(resp.Header.Get("Location"), hbu.location)
    58  	if err != nil {
    59  		return 0, err
    60  	}
    61  	rng := resp.Header.Get("Range")
    62  	var start, end int64
    63  	if n, err := fmt.Sscanf(rng, "%d-%d", &start, &end); err != nil {
    64  		return 0, err
    65  	} else if n != 2 || end < start {
    66  		return 0, fmt.Errorf("bad range format: %s", rng)
    67  	}
    68  
    69  	return (end - start + 1), nil
    70  
    71  }
    72  
    73  func (hbu *httpBlobUpload) Write(p []byte) (n int, err error) {
    74  	req, err := http.NewRequest("PATCH", hbu.location, bytes.NewReader(p))
    75  	if err != nil {
    76  		return 0, err
    77  	}
    78  	req.Header.Set("Content-Range", fmt.Sprintf("%d-%d", hbu.offset, hbu.offset+int64(len(p)-1)))
    79  	req.Header.Set("Content-Length", fmt.Sprintf("%d", len(p)))
    80  	req.Header.Set("Content-Type", "application/octet-stream")
    81  
    82  	resp, err := hbu.client.Do(req)
    83  	if err != nil {
    84  		return 0, err
    85  	}
    86  
    87  	if !SuccessStatus(resp.StatusCode) {
    88  		return 0, hbu.handleErrorResponse(resp)
    89  	}
    90  
    91  	hbu.uuid = resp.Header.Get("Docker-Upload-UUID")
    92  	hbu.location, err = sanitizeLocation(resp.Header.Get("Location"), hbu.location)
    93  	if err != nil {
    94  		return 0, err
    95  	}
    96  	rng := resp.Header.Get("Range")
    97  	var start, end int
    98  	if n, err := fmt.Sscanf(rng, "%d-%d", &start, &end); err != nil {
    99  		return 0, err
   100  	} else if n != 2 || end < start {
   101  		return 0, fmt.Errorf("bad range format: %s", rng)
   102  	}
   103  
   104  	return (end - start + 1), nil
   105  
   106  }
   107  
   108  func (hbu *httpBlobUpload) Size() int64 {
   109  	return hbu.offset
   110  }
   111  
   112  func (hbu *httpBlobUpload) ID() string {
   113  	return hbu.uuid
   114  }
   115  
   116  func (hbu *httpBlobUpload) StartedAt() time.Time {
   117  	return hbu.startedAt
   118  }
   119  
   120  func (hbu *httpBlobUpload) Commit(ctx context.Context, desc distribution.Descriptor) (distribution.Descriptor, error) {
   121  	// TODO(dmcgowan): Check if already finished, if so just fetch
   122  	req, err := http.NewRequest("PUT", hbu.location, nil)
   123  	if err != nil {
   124  		return distribution.Descriptor{}, err
   125  	}
   126  
   127  	values := req.URL.Query()
   128  	values.Set("digest", desc.Digest.String())
   129  	req.URL.RawQuery = values.Encode()
   130  
   131  	resp, err := hbu.client.Do(req)
   132  	if err != nil {
   133  		return distribution.Descriptor{}, err
   134  	}
   135  	defer resp.Body.Close()
   136  
   137  	if !SuccessStatus(resp.StatusCode) {
   138  		return distribution.Descriptor{}, hbu.handleErrorResponse(resp)
   139  	}
   140  
   141  	return hbu.statter.Stat(ctx, desc.Digest)
   142  }
   143  
   144  func (hbu *httpBlobUpload) Cancel(ctx context.Context) error {
   145  	req, err := http.NewRequest("DELETE", hbu.location, nil)
   146  	if err != nil {
   147  		return err
   148  	}
   149  	resp, err := hbu.client.Do(req)
   150  	if err != nil {
   151  		return err
   152  	}
   153  	defer resp.Body.Close()
   154  
   155  	if resp.StatusCode == http.StatusNotFound || SuccessStatus(resp.StatusCode) {
   156  		return nil
   157  	}
   158  	return hbu.handleErrorResponse(resp)
   159  }
   160  
   161  func (hbu *httpBlobUpload) Close() error {
   162  	hbu.closed = true
   163  	return nil
   164  }
   165  

View as plain text