...

Source file src/cuelabs.dev/go/oci/ociregistry/ociunify/writer.go

Documentation: cuelabs.dev/go/oci/ociregistry/ociunify

     1  // Copyright 2023 CUE Labs AG
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package ociunify
    16  
    17  import (
    18  	"context"
    19  	"encoding/base64"
    20  	"encoding/json"
    21  	"fmt"
    22  	"io"
    23  
    24  	"cuelabs.dev/go/oci/ociregistry"
    25  )
    26  
    27  func (u unifier) PushBlob(ctx context.Context, repo string, desc ociregistry.Descriptor, r io.Reader) (ociregistry.Descriptor, error) {
    28  	resultc := make(chan t2[ociregistry.Descriptor])
    29  	onePush := func(ri ociregistry.Interface, r *io.PipeReader) {
    30  		desc, err := ri.PushBlob(ctx, repo, desc, r)
    31  		r.CloseWithError(err)
    32  		resultc <- t2[ociregistry.Descriptor]{desc, err}
    33  	}
    34  	pr0, pw0 := io.Pipe()
    35  	pr1, pw1 := io.Pipe()
    36  	go onePush(u.r0, pr0)
    37  	go onePush(u.r1, pr1)
    38  	go func() {
    39  		_, err := io.Copy(io.MultiWriter(pw0, pw1), r)
    40  		pw0.CloseWithError(err)
    41  		pw1.CloseWithError(err)
    42  	}()
    43  	r0 := <-resultc
    44  	r1 := <-resultc
    45  	if (r0.err == nil) == (r1.err == nil) {
    46  		return r0.get()
    47  	}
    48  	return ociregistry.Descriptor{}, fmt.Errorf("one push succeeded where the other failed (TODO better error)")
    49  }
    50  
    51  func (u unifier) PushManifest(ctx context.Context, repo string, tag string, contents []byte, mediaType string) (ociregistry.Descriptor, error) {
    52  	r0, r1 := both(u, func(r ociregistry.Interface, _ int) t2[ociregistry.Descriptor] {
    53  		return mk2(r.PushManifest(ctx, repo, tag, contents, mediaType))
    54  	})
    55  	if (r0.err == nil) == (r1.err == nil) {
    56  		return r0.get()
    57  	}
    58  	return ociregistry.Descriptor{}, fmt.Errorf("one push succeeded where the other failed (TODO better error)")
    59  }
    60  
    61  func (u unifier) PushBlobChunked(ctx context.Context, repo string, chunkSize int) (ociregistry.BlobWriter, error) {
    62  	r0, r1 := both(u, func(r ociregistry.Interface, i int) t2[ociregistry.BlobWriter] {
    63  		return mk2(r.PushBlobChunked(ctx, repo, chunkSize))
    64  	})
    65  	if r0.err != nil || r1.err != nil {
    66  		r0.close()
    67  		r1.close()
    68  		return nil, bothResults(r0, r1).err
    69  	}
    70  	w0, w1 := r0.x, r1.x
    71  	size := w0.Size() // assumed to agree with w1.Size
    72  	return &unifiedBlobWriter{
    73  		w:    [2]ociregistry.BlobWriter{w0, w1},
    74  		size: size,
    75  	}, nil
    76  }
    77  
    78  func (u unifier) PushBlobChunkedResume(ctx context.Context, repo, id string, offset int64, chunkSize int) (ociregistry.BlobWriter, error) {
    79  	data, err := base64.RawURLEncoding.DecodeString(id)
    80  	if err != nil {
    81  		return nil, fmt.Errorf("malformed ID: %v", err)
    82  	}
    83  	var ids []string
    84  	if err := json.Unmarshal(data, &ids); err != nil {
    85  		return nil, fmt.Errorf("malformed ID %q: %v", id, err)
    86  	}
    87  	if len(ids) != 2 {
    88  		return nil, fmt.Errorf("malformed ID %q (expected two elements)", id)
    89  	}
    90  	r0, r1 := both(u, func(r ociregistry.Interface, i int) t2[ociregistry.BlobWriter] {
    91  		return mk2(r.PushBlobChunkedResume(ctx, repo, ids[i], offset, chunkSize))
    92  	})
    93  	if r0.err != nil || r1.err != nil {
    94  		r0.close()
    95  		r1.close()
    96  		return nil, bothResults(r0, r1).err
    97  	}
    98  	w0, w1 := r0.x, r1.x
    99  	size := w0.Size()
   100  	if w1.Size() != size {
   101  		r0.close()
   102  		r1.close()
   103  		return nil, fmt.Errorf("registries do not agree on upload size; please start upload again")
   104  	}
   105  	return &unifiedBlobWriter{
   106  		w:    [2]ociregistry.BlobWriter{w0, w1},
   107  		size: size,
   108  	}, nil
   109  }
   110  
   111  func (u unifier) MountBlob(ctx context.Context, fromRepo, toRepo string, digest ociregistry.Digest) (ociregistry.Descriptor, error) {
   112  	return bothResults(both(u,
   113  		func(r ociregistry.Interface, _ int) t2[ociregistry.Descriptor] {
   114  			return mk2(r.MountBlob(ctx, fromRepo, toRepo, digest))
   115  		},
   116  	)).get()
   117  }
   118  
   119  type unifiedBlobWriter struct {
   120  	u    unifier
   121  	w    [2]ociregistry.BlobWriter
   122  	size int64
   123  }
   124  
   125  func (w *unifiedBlobWriter) Write(buf []byte) (int, error) {
   126  	r := bothResults(both(w.u, func(_ ociregistry.Interface, i int) t2[int] {
   127  		return mk2(w.w[i].Write(buf))
   128  	}))
   129  	if r.err != nil {
   130  		return 0, r.err
   131  	}
   132  	w.size += int64(len(buf))
   133  	return len(buf), nil
   134  }
   135  
   136  func (w *unifiedBlobWriter) Close() error {
   137  	return bothResults(both(w.u, func(_ ociregistry.Interface, i int) t1 {
   138  		return mk1(w.w[i].Close())
   139  	})).err
   140  }
   141  
   142  func (w *unifiedBlobWriter) Cancel() error {
   143  	return bothResults(both(w.u, func(_ ociregistry.Interface, i int) t1 {
   144  		return mk1(w.w[i].Cancel())
   145  	})).err
   146  }
   147  
   148  func (w *unifiedBlobWriter) Size() int64 {
   149  	return w.size
   150  }
   151  
   152  func (w *unifiedBlobWriter) ChunkSize() int {
   153  	// ChunkSize can be derived from the server's required minimum, so take the maximum between both.
   154  	// ChunkSize is usually a cheap method, so there's no need to call both concurrently.
   155  	// TODO(mvdan): replace with max when we can assume Go 1.21
   156  	s1, s2 := w.w[0].ChunkSize(), w.w[1].ChunkSize()
   157  	if s2 > s1 {
   158  		return s2
   159  	}
   160  	return s1
   161  }
   162  
   163  func (w *unifiedBlobWriter) ID() string {
   164  	data, _ := json.Marshal([]string{w.w[0].ID(), w.w[1].ID()})
   165  	return base64.RawURLEncoding.EncodeToString(data)
   166  }
   167  
   168  func (w *unifiedBlobWriter) Commit(digest ociregistry.Digest) (ociregistry.Descriptor, error) {
   169  	return bothResults(both(w.u, func(_ ociregistry.Interface, i int) t2[ociregistry.Descriptor] {
   170  		return mk2(w.w[i].Commit(digest))
   171  	})).get()
   172  }
   173  

View as plain text