...

Source file src/cuelabs.dev/go/oci/ociregistry/ocimem/blob.go

Documentation: cuelabs.dev/go/oci/ociregistry/ocimem

     1  // Copyright 2023 CUE Labs AG
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package ocimem
    16  
    17  import (
    18  	"bytes"
    19  	"crypto/rand"
    20  	"fmt"
    21  	"sync"
    22  
    23  	"github.com/opencontainers/go-digest"
    24  
    25  	"cuelabs.dev/go/oci/ociregistry"
    26  )
    27  
    28  // NewBytesReader returns an implementation of ociregistry.BlobReader
    29  // that returns the given bytes. The returned reader will return desc from its
    30  // Descriptor method.
    31  func NewBytesReader(data []byte, desc ociregistry.Descriptor) ociregistry.BlobReader {
    32  	r := &bytesReader{
    33  		desc: desc,
    34  	}
    35  	r.r.Reset(data)
    36  	return r
    37  }
    38  
    39  type bytesReader struct {
    40  	r    bytes.Reader
    41  	desc ociregistry.Descriptor
    42  }
    43  
    44  func (r *bytesReader) Close() error {
    45  	return nil
    46  }
    47  
    48  // Descriptor implements [ociregistry.BlobReader.Descriptor].
    49  func (r *bytesReader) Descriptor() ociregistry.Descriptor {
    50  	return r.desc
    51  }
    52  
    53  func (r *bytesReader) Read(data []byte) (int, error) {
    54  	return r.r.Read(data)
    55  }
    56  
    57  // Buffer holds an in-memory implementation of ociregistry.BlobWriter.
    58  type Buffer struct {
    59  	commit           func(b *Buffer) error
    60  	mu               sync.Mutex
    61  	buf              []byte
    62  	checkStartOffset int64
    63  	uuid             string
    64  	committed        bool
    65  	desc             ociregistry.Descriptor
    66  	commitErr        error
    67  }
    68  
    69  // NewBuffer returns a buffer that calls commit with the
    70  // when [Buffer.Commit] is invoked successfully.
    71  // /
    72  // It's OK to call methods concurrently on a buffer.
    73  func NewBuffer(commit func(b *Buffer) error, uuid string) *Buffer {
    74  	if uuid == "" {
    75  		uuid = newUUID()
    76  	}
    77  	return &Buffer{
    78  		commit: commit,
    79  		uuid:   uuid,
    80  	}
    81  }
    82  
    83  func (b *Buffer) Cancel() error {
    84  	b.mu.Lock()
    85  	defer b.mu.Unlock()
    86  	b.commitErr = fmt.Errorf("upload canceled")
    87  	return nil
    88  }
    89  
    90  func (b *Buffer) Close() error {
    91  	return nil
    92  }
    93  
    94  func (b *Buffer) Size() int64 {
    95  	b.mu.Lock()
    96  	defer b.mu.Unlock()
    97  	return int64(len(b.buf))
    98  }
    99  
   100  func (b *Buffer) ChunkSize() int {
   101  	return 8 * 1024 // 8KiB; not really important
   102  }
   103  
   104  // GetBlob returns any committed data and is descriptor. It returns an error
   105  // if the data hasn't been committed or there was an error doing so.
   106  func (b *Buffer) GetBlob() (ociregistry.Descriptor, []byte, error) {
   107  	b.mu.Lock()
   108  	defer b.mu.Unlock()
   109  	if !b.committed {
   110  		return ociregistry.Descriptor{}, nil, fmt.Errorf("blob not committed")
   111  	}
   112  	if b.commitErr != nil {
   113  		return ociregistry.Descriptor{}, nil, b.commitErr
   114  	}
   115  	return b.desc, b.buf, nil
   116  }
   117  
   118  // Write implements io.Writer by writing some data to the blob.
   119  func (b *Buffer) Write(data []byte) (int, error) {
   120  	b.mu.Lock()
   121  	defer b.mu.Unlock()
   122  	if offset := b.checkStartOffset; offset != -1 {
   123  		// Can't call Buffer.Size, since we are already holding the mutex.
   124  		if int64(len(b.buf)) != offset {
   125  			return 0, fmt.Errorf("invalid offset %d in resumed upload (actual offset %d): %w", offset, len(b.buf), ociregistry.ErrRangeInvalid)
   126  		}
   127  		// Only check on the first write, since it's the start offset.
   128  		b.checkStartOffset = -1
   129  	}
   130  	b.buf = append(b.buf, data...)
   131  	return len(data), nil
   132  }
   133  
   134  func newUUID() string {
   135  	buf := make([]byte, 32)
   136  	if _, err := rand.Read(buf); err != nil {
   137  		panic(err)
   138  	}
   139  	return fmt.Sprintf("%x", buf)
   140  }
   141  
   142  // ID implements [ociregistry.BlobWriter.ID] by returning a randomly
   143  // allocated hex UUID.
   144  func (b *Buffer) ID() string {
   145  	return b.uuid
   146  }
   147  
   148  // Commit implements [ociregistry.BlobWriter.Commit] by checking
   149  // that everything looks OK and calling the commit function if so.
   150  func (b *Buffer) Commit(dig ociregistry.Digest) (_ ociregistry.Descriptor, err error) {
   151  	if err := b.checkCommit(dig); err != nil {
   152  		return ociregistry.Descriptor{}, err
   153  	}
   154  	// Note: we're careful to call this function outside of the mutex so
   155  	// that it can call locked Buffer methods OK.
   156  	if err := b.commit(b); err != nil {
   157  		b.mu.Lock()
   158  		defer b.mu.Unlock()
   159  
   160  		b.commitErr = err
   161  		return ociregistry.Descriptor{}, err
   162  	}
   163  	return ociregistry.Descriptor{
   164  		MediaType: "application/octet-stream",
   165  		Size:      int64(len(b.buf)),
   166  		Digest:    dig,
   167  	}, nil
   168  }
   169  
   170  func (b *Buffer) checkCommit(dig ociregistry.Digest) (err error) {
   171  	b.mu.Lock()
   172  	defer b.mu.Unlock()
   173  	if b.commitErr != nil {
   174  		return b.commitErr
   175  	}
   176  	defer func() {
   177  		if err != nil {
   178  			b.commitErr = err
   179  		}
   180  	}()
   181  	if digest.FromBytes(b.buf) != dig {
   182  		return fmt.Errorf("digest mismatch (sha256(%q) != %s): %w", b.buf, dig, ociregistry.ErrDigestInvalid)
   183  	}
   184  	b.desc = ociregistry.Descriptor{
   185  		MediaType: "application/octet-stream",
   186  		Digest:    dig,
   187  		Size:      int64(len(b.buf)),
   188  	}
   189  	b.committed = true
   190  	return nil
   191  }
   192  

View as plain text