...

Source file src/cuelabs.dev/go/oci/ociregistry/ociunify/unify.go

Documentation: cuelabs.dev/go/oci/ociregistry/ociunify

     1  // Copyright 2023 CUE Labs AG
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  // Package ociunify unifies two OCI registries into one.
    16  package ociunify
    17  
    18  import (
    19  	"context"
    20  	"fmt"
    21  	"io"
    22  
    23  	"cuelabs.dev/go/oci/ociregistry"
    24  )
    25  
    26  type Options struct {
    27  	ReadPolicy ReadPolicy
    28  }
    29  
    30  type ReadPolicy int
    31  
    32  const (
    33  	ReadSequential ReadPolicy = iota
    34  	ReadConcurrent
    35  )
    36  
    37  // New returns a registry that unifies the contents from both
    38  // the given registries. If there's a conflict, (for example a tag resolves
    39  // to a different thing on both repositories), it returns an error
    40  // for requests that specifically read the value, or omits the conflicting
    41  // item for list requests.
    42  //
    43  // Writes write to both repositories. Reads of immutable data
    44  // come from either.
    45  func New(r0, r1 ociregistry.Interface, opts *Options) ociregistry.Interface {
    46  	if opts == nil {
    47  		opts = new(Options)
    48  	}
    49  	return unifier{
    50  		r0:   r0,
    51  		r1:   r1,
    52  		opts: *opts,
    53  	}
    54  }
    55  
    56  type unifier struct {
    57  	r0, r1 ociregistry.Interface
    58  	opts   Options
    59  	*ociregistry.Funcs
    60  }
    61  
    62  func bothResults[T result[T]](r0, r1 T) T {
    63  	if r0.error() == nil && r1.error() == nil {
    64  		return r0
    65  	}
    66  	var zero T
    67  	if r0.error() != nil && r1.error() != nil {
    68  		return zero.mkErr(fmt.Errorf("r0 and r1 failed: %w; %w", r0.error(), r1.error()))
    69  	}
    70  	if r0.error() != nil {
    71  		return zero.mkErr(fmt.Errorf("r0 failed: %w", r0.error()))
    72  	}
    73  	return zero.mkErr(fmt.Errorf("r1 failed: %w", r1.error()))
    74  }
    75  
    76  type result[T any] interface {
    77  	error() error
    78  	close()
    79  	mkErr(err error) T
    80  }
    81  
    82  // both returns the results from calling f on both registries concurrently.
    83  func both[T any](u unifier, f func(r ociregistry.Interface, i int) T) (T, T) {
    84  	c0, c1 := make(chan T), make(chan T)
    85  	go func() {
    86  		c0 <- f(u.r0, 0)
    87  	}()
    88  	go func() {
    89  		c1 <- f(u.r1, 1)
    90  	}()
    91  	return <-c0, <-c1
    92  }
    93  
    94  // runRead calls f concurrently on each registry.
    95  // It returns the result from the first one that returns without error.
    96  // This should not be used if the return value is affected by cancelling the context.
    97  func runRead[T result[T]](ctx context.Context, u unifier, f func(ctx context.Context, r ociregistry.Interface, i int) T) T {
    98  	r, cancel := runReadWithCancel(ctx, u, f)
    99  	cancel()
   100  	return r
   101  }
   102  
   103  // runReadWithCancel calls f concurrently on each registry.
   104  // It returns the result from the first one that returns without error
   105  // and a cancel function that should be called when the returned value is done with.
   106  func runReadWithCancel[T result[T]](ctx context.Context, u unifier, f func(ctx context.Context, r ociregistry.Interface, i int) T) (T, func()) {
   107  	switch u.opts.ReadPolicy {
   108  	case ReadConcurrent:
   109  		return runReadConcurrent(ctx, u, f)
   110  	case ReadSequential:
   111  		return runReadSequential(ctx, u, f), func() {}
   112  	default:
   113  		panic("unreachable")
   114  	}
   115  }
   116  
   117  func runReadSequential[T result[T]](ctx context.Context, u unifier, f func(ctx context.Context, r ociregistry.Interface, i int) T) T {
   118  	r := f(ctx, u.r0, 0)
   119  	if err := r.error(); err == nil {
   120  		return r
   121  	}
   122  	return f(ctx, u.r1, 1)
   123  }
   124  
   125  func runReadConcurrent[T result[T]](ctx context.Context, u unifier, f func(ctx context.Context, r ociregistry.Interface, i int) T) (T, func()) {
   126  	done := make(chan struct{})
   127  	defer close(done)
   128  	type result struct {
   129  		r      T
   130  		cancel func()
   131  	}
   132  	c := make(chan result)
   133  	sender := func(f func(context.Context, ociregistry.Interface, int) T, reg ociregistry.Interface, i int) {
   134  		ctx, cancel := context.WithCancel(ctx)
   135  		r := f(ctx, reg, i)
   136  		select {
   137  		case c <- result{r, cancel}:
   138  		case <-done:
   139  			r.close()
   140  			cancel()
   141  		}
   142  	}
   143  	go sender(f, u.r0, 0)
   144  	go sender(f, u.r1, 1)
   145  	select {
   146  	case r := <-c:
   147  		if r.r.error() == nil {
   148  			return r.r, r.cancel
   149  		}
   150  		r.cancel()
   151  	case <-ctx.Done():
   152  		return (*new(T)).mkErr(ctx.Err()), func() {}
   153  	}
   154  	// The first result was a failure. Try for the second, which might work.
   155  	select {
   156  	case r := <-c:
   157  		return r.r, r.cancel
   158  	case <-ctx.Done():
   159  		return (*new(T)).mkErr(ctx.Err()), func() {}
   160  	}
   161  }
   162  
   163  func mk1(err error) t1 {
   164  	return t1{err}
   165  }
   166  
   167  type t1 struct {
   168  	err error
   169  }
   170  
   171  func (t1) close() {}
   172  
   173  func (t t1) error() error {
   174  	return t.err
   175  }
   176  
   177  func (t1) mkErr(err error) t1 {
   178  	return t1{err}
   179  }
   180  
   181  func mk2[T any](x T, err error) t2[T] {
   182  	return t2[T]{x, err}
   183  }
   184  
   185  type t2[T any] struct {
   186  	x   T
   187  	err error
   188  }
   189  
   190  func (t t2[T]) close() {
   191  	if closer, ok := any(t.x).(io.Closer); ok {
   192  		closer.Close()
   193  	}
   194  }
   195  
   196  func (t t2[T]) get() (T, error) {
   197  	return t.x, t.err
   198  }
   199  
   200  func (t t2[T]) error() error {
   201  	return t.err
   202  }
   203  
   204  func (t t2[T]) mkErr(err error) t2[T] {
   205  	return t2[T]{*new(T), err}
   206  }
   207  

View as plain text