...

Source file src/cuelabs.dev/go/oci/ociregistry/ociclient/writer.go

Documentation: cuelabs.dev/go/oci/ociregistry/ociclient

     1  // Copyright 2023 CUE Labs AG
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package ociclient
    16  
    17  import (
    18  	"bytes"
    19  	"context"
    20  	"fmt"
    21  	"io"
    22  	"net/http"
    23  	"net/url"
    24  	"strconv"
    25  	"sync"
    26  
    27  	"github.com/opencontainers/go-digest"
    28  
    29  	"cuelabs.dev/go/oci/ociregistry"
    30  	"cuelabs.dev/go/oci/ociregistry/internal/ocirequest"
    31  	"cuelabs.dev/go/oci/ociregistry/ociauth"
    32  )
    33  
    34  // This file implements the ociregistry.Writer methods.
    35  
    36  func (c *client) PushManifest(ctx context.Context, repo string, tag string, contents []byte, mediaType string) (ociregistry.Descriptor, error) {
    37  	if mediaType == "" {
    38  		return ociregistry.Descriptor{}, fmt.Errorf("PushManifest called with empty mediaType")
    39  	}
    40  	desc := ociregistry.Descriptor{
    41  		Digest:    digest.FromBytes(contents),
    42  		Size:      int64(len(contents)),
    43  		MediaType: mediaType,
    44  	}
    45  
    46  	rreq := &ocirequest.Request{
    47  		Kind:   ocirequest.ReqManifestPut,
    48  		Repo:   repo,
    49  		Tag:    tag,
    50  		Digest: string(desc.Digest),
    51  	}
    52  	req, err := newRequest(ctx, rreq, bytes.NewReader(contents))
    53  	req.Header.Set("Content-Type", mediaType)
    54  	req.ContentLength = desc.Size
    55  	resp, err := c.do(req, http.StatusCreated)
    56  	if err != nil {
    57  		return ociregistry.Descriptor{}, err
    58  	}
    59  	resp.Body.Close()
    60  	return desc, nil
    61  }
    62  
    63  func (c *client) MountBlob(ctx context.Context, fromRepo, toRepo string, dig ociregistry.Digest) (ociregistry.Descriptor, error) {
    64  	rreq := &ocirequest.Request{
    65  		Kind:     ocirequest.ReqBlobMount,
    66  		Repo:     toRepo,
    67  		FromRepo: fromRepo,
    68  		Digest:   string(dig),
    69  	}
    70  	resp, err := c.doRequest(ctx, rreq, http.StatusCreated, http.StatusAccepted)
    71  	if err != nil {
    72  		return ociregistry.Descriptor{}, err
    73  	}
    74  	resp.Body.Close()
    75  	if resp.StatusCode == http.StatusAccepted {
    76  		// Mount isn't supported and technically the upload session has begun,
    77  		// but we aren't in a great position to be able to continue it, so let's just
    78  		// return Unsupported.
    79  		return ociregistry.Descriptor{}, fmt.Errorf("registry does not support mounts: %w", ociregistry.ErrUnsupported)
    80  	}
    81  	return descriptorFromResponse(resp, dig, false)
    82  }
    83  
    84  func (c *client) PushBlob(ctx context.Context, repo string, desc ociregistry.Descriptor, r io.Reader) (_ ociregistry.Descriptor, _err error) {
    85  	// TODO use the single-post blob-upload method (ReqBlobUploadBlob)
    86  	// See:
    87  	//	https://github.com/distribution/distribution/issues/4065
    88  	//	https://github.com/golang/go/issues/63152
    89  	rreq := &ocirequest.Request{
    90  		Kind: ocirequest.ReqBlobStartUpload,
    91  		Repo: repo,
    92  	}
    93  	req, err := newRequest(ctx, rreq, nil)
    94  	if err != nil {
    95  		return ociregistry.Descriptor{}, err
    96  	}
    97  	resp, err := c.do(req, http.StatusAccepted)
    98  	if err != nil {
    99  		return ociregistry.Descriptor{}, err
   100  	}
   101  	resp.Body.Close()
   102  	location, err := locationFromResponse(resp)
   103  	if err != nil {
   104  		return ociregistry.Descriptor{}, err
   105  	}
   106  
   107  	// We've got the upload location. Now PUT the content.
   108  
   109  	ctx = ociauth.ContextWithRequestInfo(ctx, ociauth.RequestInfo{
   110  		RequiredScope: scopeForRequest(rreq),
   111  	})
   112  	// Note: we can't use ocirequest.Request here because that's
   113  	// specific to the ociserver implementation in this case.
   114  	req, err = http.NewRequestWithContext(ctx, "PUT", "", r)
   115  	if err != nil {
   116  		return ociregistry.Descriptor{}, err
   117  	}
   118  	req.URL = urlWithDigest(location, string(desc.Digest))
   119  	req.ContentLength = desc.Size
   120  	req.Header.Set("Content-Type", "application/octet-stream")
   121  	// TODO: per the spec, the content-range header here is unnecessary.
   122  	req.Header.Set("Content-Range", ocirequest.RangeString(0, desc.Size))
   123  	resp, err = c.do(req, http.StatusCreated)
   124  	if err != nil {
   125  		return ociregistry.Descriptor{}, err
   126  	}
   127  	defer closeOnError(&_err, resp.Body)
   128  	resp.Body.Close()
   129  	return desc, nil
   130  }
   131  
   132  // TODO is this a reasonable default? We have to
   133  // weigh up in-memory cost vs round-trip overhead.
   134  // TODO: make this default configurable.
   135  const defaultChunkSize = 64 * 1024
   136  
   137  func (c *client) PushBlobChunked(ctx context.Context, repo string, chunkSize int) (ociregistry.BlobWriter, error) {
   138  	if chunkSize <= 0 {
   139  		chunkSize = defaultChunkSize
   140  	}
   141  	resp, err := c.doRequest(ctx, &ocirequest.Request{
   142  		Kind: ocirequest.ReqBlobStartUpload,
   143  		Repo: repo,
   144  	}, http.StatusAccepted)
   145  	if err != nil {
   146  		return nil, err
   147  	}
   148  	resp.Body.Close()
   149  	location, err := locationFromResponse(resp)
   150  	if err != nil {
   151  		return nil, err
   152  	}
   153  	ctx = ociauth.ContextWithRequestInfo(ctx, ociauth.RequestInfo{
   154  		RequiredScope: ociauth.NewScope(ociauth.ResourceScope{
   155  			ResourceType: "repository",
   156  			Resource:     repo,
   157  			Action:       "push",
   158  		}),
   159  	})
   160  	return &blobWriter{
   161  		ctx:       ctx,
   162  		client:    c,
   163  		chunkSize: chunkSizeFromResponse(resp, chunkSize),
   164  		chunk:     make([]byte, 0, chunkSize),
   165  		location:  location,
   166  	}, nil
   167  }
   168  
   169  func (c *client) PushBlobChunkedResume(ctx context.Context, repo string, id string, offset int64, chunkSize int) (ociregistry.BlobWriter, error) {
   170  	if id == "" {
   171  		return nil, fmt.Errorf("id must be non-empty to resume a chunked upload")
   172  	}
   173  	if chunkSize <= 0 {
   174  		chunkSize = defaultChunkSize
   175  	}
   176  	var location *url.URL
   177  	switch {
   178  	case offset == -1:
   179  		// Try to find what offset we're meant to be writing at
   180  		// by doing a GET to the location.
   181  		// TODO does resuming an upload require push or pull scope or both?
   182  		ctx := ociauth.ContextWithRequestInfo(ctx, ociauth.RequestInfo{
   183  			RequiredScope: ociauth.NewScope(ociauth.ResourceScope{
   184  				ResourceType: "repository",
   185  				Resource:     repo,
   186  				Action:       "push",
   187  			}, ociauth.ResourceScope{
   188  				ResourceType: "repository",
   189  				Resource:     repo,
   190  				Action:       "pull",
   191  			}),
   192  		})
   193  		req, err := http.NewRequestWithContext(ctx, "GET", id, nil)
   194  		if err != nil {
   195  			return nil, err
   196  		}
   197  		resp, err := c.do(req, http.StatusNoContent)
   198  		if err != nil {
   199  			return nil, fmt.Errorf("cannot recover chunk offset: %v", err)
   200  		}
   201  		location, err = locationFromResponse(resp)
   202  		if err != nil {
   203  			return nil, fmt.Errorf("cannot get location from response: %v", err)
   204  		}
   205  		rangeStr := resp.Header.Get("Range")
   206  		p0, p1, ok := ocirequest.ParseRange(rangeStr)
   207  		if !ok {
   208  			return nil, fmt.Errorf("invalid range %q in response", rangeStr)
   209  		}
   210  		if p0 != 0 {
   211  			return nil, fmt.Errorf("range %q does not start with 0", rangeStr)
   212  		}
   213  		chunkSize = chunkSizeFromResponse(resp, chunkSize)
   214  		offset = p1
   215  	case offset < 0:
   216  		return nil, fmt.Errorf("invalid offset; must be -1 or non-negative")
   217  	default:
   218  		var err error
   219  		location, err = url.Parse(id) // Note that this mirrors [BlobWriter.ID].
   220  		if err != nil {
   221  			return nil, fmt.Errorf("provided ID is not a valid location URL")
   222  		}
   223  	}
   224  	ctx = ociauth.ContextWithRequestInfo(ctx, ociauth.RequestInfo{
   225  		RequiredScope: ociauth.NewScope(ociauth.ResourceScope{
   226  			ResourceType: "repository",
   227  			Resource:     repo,
   228  			Action:       "push",
   229  		}),
   230  	})
   231  	return &blobWriter{
   232  		ctx:       ctx,
   233  		client:    c,
   234  		chunkSize: chunkSize,
   235  		size:      offset,
   236  		flushed:   offset,
   237  		location:  location,
   238  	}, nil
   239  }
   240  
   241  type blobWriter struct {
   242  	client    *client
   243  	chunkSize int
   244  	ctx       context.Context
   245  
   246  	// mu guards the fields below it.
   247  	mu       sync.Mutex
   248  	closed   bool
   249  	chunk    []byte
   250  	closeErr error
   251  
   252  	// size holds the size of the entire upload as seen from the
   253  	// client perspective. Each call to Write increases this immediately.
   254  	size int64
   255  
   256  	// flushed holds the size of the upload as flushed to the server.
   257  	// Each successfully flushed chunk increases this.
   258  	flushed  int64
   259  	location *url.URL
   260  }
   261  
   262  type doResult struct {
   263  	resp *http.Response
   264  	err  error
   265  }
   266  
   267  func (w *blobWriter) Write(buf []byte) (int, error) {
   268  	w.mu.Lock()
   269  	defer w.mu.Unlock()
   270  
   271  	// We use > rather than >= here so that using a chunk size of 100
   272  	// and writing 100 bytes does not actually flush, which would result in a PATCH
   273  	// then followed by an empty-bodied PUT with the call to Commit.
   274  	// Instead, we want the writes to not flush at all, and Commit to PUT the entire chunk.
   275  	if len(w.chunk)+len(buf) > w.chunkSize {
   276  		if err := w.flush(buf, ""); err != nil {
   277  			return 0, err
   278  		}
   279  	} else {
   280  		if w.chunk == nil {
   281  			w.chunk = make([]byte, 0, w.chunkSize)
   282  		}
   283  		w.chunk = append(w.chunk, buf...)
   284  	}
   285  	w.size += int64(len(buf))
   286  	return len(buf), nil
   287  }
   288  
   289  // flush flushes any outstanding upload data to the server.
   290  // If commitDigest is non-empty, this is the final segment of data in the blob:
   291  // the blob is being committed and the digest should hold the digest of the entire blob content.
   292  func (w *blobWriter) flush(buf []byte, commitDigest ociregistry.Digest) error {
   293  	if commitDigest == "" && len(buf)+len(w.chunk) == 0 {
   294  		return nil
   295  	}
   296  	// Start a new PATCH request to send the currently outstanding data.
   297  	method := "PATCH"
   298  	expect := http.StatusAccepted
   299  	reqURL := w.location
   300  	if commitDigest != "" {
   301  		// This is the final piece of data, so send it as the final PUT request
   302  		// (committing the whole blob) which avoids an extra round trip.
   303  		method = "PUT"
   304  		expect = http.StatusCreated
   305  		reqURL = urlWithDigest(reqURL, string(commitDigest))
   306  	}
   307  	req, err := http.NewRequestWithContext(w.ctx, method, "", concatBody(w.chunk, buf))
   308  	if err != nil {
   309  		return fmt.Errorf("cannot make PATCH request: %v", err)
   310  	}
   311  	req.URL = reqURL
   312  	req.ContentLength = int64(len(w.chunk) + len(buf))
   313  	// TODO: per the spec, the content-range header here is unnecessary
   314  	// if we are doing a final PUT without a body.
   315  	req.Header.Set("Content-Range", ocirequest.RangeString(w.flushed, w.flushed+req.ContentLength))
   316  	resp, err := w.client.do(req, expect)
   317  	if err != nil {
   318  		return err
   319  	}
   320  	resp.Body.Close()
   321  	location, err := locationFromResponse(resp)
   322  	if err != nil {
   323  		return fmt.Errorf("bad Location in response: %v", err)
   324  	}
   325  	// TODO is there something we could be doing with the Range header in the response?
   326  	w.location = location
   327  	w.flushed += req.ContentLength
   328  	w.chunk = w.chunk[:0]
   329  	return nil
   330  }
   331  
   332  func concatBody(b1, b2 []byte) io.Reader {
   333  	if len(b1)+len(b2) == 0 {
   334  		return nil // note that net/http treats a nil request body differently
   335  	}
   336  	if len(b1) == 0 {
   337  		return bytes.NewReader(b2)
   338  	}
   339  	if len(b2) == 0 {
   340  		return bytes.NewReader(b1)
   341  	}
   342  	return io.MultiReader(
   343  		bytes.NewReader(b1),
   344  		bytes.NewReader(b2),
   345  	)
   346  }
   347  
   348  func (w *blobWriter) Close() error {
   349  	w.mu.Lock()
   350  	defer w.mu.Unlock()
   351  	if w.closed {
   352  		return w.closeErr
   353  	}
   354  	err := w.flush(nil, "")
   355  	w.closed = true
   356  	w.closeErr = err
   357  	return err
   358  }
   359  
   360  func (w *blobWriter) Size() int64 {
   361  	w.mu.Lock()
   362  	defer w.mu.Unlock()
   363  	return w.size
   364  }
   365  
   366  func (w *blobWriter) ChunkSize() int {
   367  	return w.chunkSize
   368  }
   369  
   370  func (w *blobWriter) ID() string {
   371  	w.mu.Lock()
   372  	defer w.mu.Unlock()
   373  	return w.location.String()
   374  }
   375  
   376  func (w *blobWriter) Commit(digest ociregistry.Digest) (ociregistry.Descriptor, error) {
   377  	if digest == "" {
   378  		return ociregistry.Descriptor{}, fmt.Errorf("cannot commit with an empty digest")
   379  	}
   380  	w.mu.Lock()
   381  	defer w.mu.Unlock()
   382  	if err := w.flush(nil, digest); err != nil {
   383  		return ociregistry.Descriptor{}, fmt.Errorf("cannot flush data before commit: %v", err)
   384  	}
   385  	return ociregistry.Descriptor{
   386  		MediaType: "application/octet-stream",
   387  		Size:      w.size,
   388  		Digest:    digest,
   389  	}, nil
   390  }
   391  
   392  func (w *blobWriter) Cancel() error {
   393  	return nil
   394  }
   395  
   396  // urlWithDigest returns u with the digest query parameter set, taking care not
   397  // to disrupt the initial URL (thus avoiding the charge of "manually
   398  // assembing the location; see [here].
   399  //
   400  // [here]: https://github.com/opencontainers/distribution-spec/blob/main/spec.md#post-then-put
   401  func urlWithDigest(u0 *url.URL, digest string) *url.URL {
   402  	u := *u0
   403  	digest = url.QueryEscape(digest)
   404  	switch {
   405  	case u.ForceQuery:
   406  		// The URL already ended in a "?" with no actual query parameters.
   407  		u.RawQuery = "digest=" + digest
   408  		u.ForceQuery = false
   409  	case u.RawQuery != "":
   410  		// There's already a query parameter present.
   411  		u.RawQuery += "&digest=" + digest
   412  	default:
   413  		u.RawQuery = "digest=" + digest
   414  	}
   415  	return &u
   416  }
   417  
   418  // See https://github.com/opencontainers/distribution-spec/blob/main/spec.md#pushing-a-blob-in-chunks
   419  func chunkSizeFromResponse(resp *http.Response, chunkSize int) int {
   420  	minChunkSize, err := strconv.Atoi(resp.Header.Get("OCI-Chunk-Min-Length"))
   421  	if err == nil && minChunkSize > chunkSize {
   422  		return minChunkSize
   423  	}
   424  	return chunkSize
   425  }
   426  

View as plain text