1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package ociunify
17
18 import (
19 "context"
20 "fmt"
21 "io"
22
23 "cuelabs.dev/go/oci/ociregistry"
24 )
25
26 type Options struct {
27 ReadPolicy ReadPolicy
28 }
29
30 type ReadPolicy int
31
32 const (
33 ReadSequential ReadPolicy = iota
34 ReadConcurrent
35 )
36
37
38
39
40
41
42
43
44
45 func New(r0, r1 ociregistry.Interface, opts *Options) ociregistry.Interface {
46 if opts == nil {
47 opts = new(Options)
48 }
49 return unifier{
50 r0: r0,
51 r1: r1,
52 opts: *opts,
53 }
54 }
55
56 type unifier struct {
57 r0, r1 ociregistry.Interface
58 opts Options
59 *ociregistry.Funcs
60 }
61
62 func bothResults[T result[T]](r0, r1 T) T {
63 if r0.error() == nil && r1.error() == nil {
64 return r0
65 }
66 var zero T
67 if r0.error() != nil && r1.error() != nil {
68 return zero.mkErr(fmt.Errorf("r0 and r1 failed: %w; %w", r0.error(), r1.error()))
69 }
70 if r0.error() != nil {
71 return zero.mkErr(fmt.Errorf("r0 failed: %w", r0.error()))
72 }
73 return zero.mkErr(fmt.Errorf("r1 failed: %w", r1.error()))
74 }
75
76 type result[T any] interface {
77 error() error
78 close()
79 mkErr(err error) T
80 }
81
82
83 func both[T any](u unifier, f func(r ociregistry.Interface, i int) T) (T, T) {
84 c0, c1 := make(chan T), make(chan T)
85 go func() {
86 c0 <- f(u.r0, 0)
87 }()
88 go func() {
89 c1 <- f(u.r1, 1)
90 }()
91 return <-c0, <-c1
92 }
93
94
95
96
97 func runRead[T result[T]](ctx context.Context, u unifier, f func(ctx context.Context, r ociregistry.Interface, i int) T) T {
98 r, cancel := runReadWithCancel(ctx, u, f)
99 cancel()
100 return r
101 }
102
103
104
105
106 func runReadWithCancel[T result[T]](ctx context.Context, u unifier, f func(ctx context.Context, r ociregistry.Interface, i int) T) (T, func()) {
107 switch u.opts.ReadPolicy {
108 case ReadConcurrent:
109 return runReadConcurrent(ctx, u, f)
110 case ReadSequential:
111 return runReadSequential(ctx, u, f), func() {}
112 default:
113 panic("unreachable")
114 }
115 }
116
117 func runReadSequential[T result[T]](ctx context.Context, u unifier, f func(ctx context.Context, r ociregistry.Interface, i int) T) T {
118 r := f(ctx, u.r0, 0)
119 if err := r.error(); err == nil {
120 return r
121 }
122 return f(ctx, u.r1, 1)
123 }
124
125 func runReadConcurrent[T result[T]](ctx context.Context, u unifier, f func(ctx context.Context, r ociregistry.Interface, i int) T) (T, func()) {
126 done := make(chan struct{})
127 defer close(done)
128 type result struct {
129 r T
130 cancel func()
131 }
132 c := make(chan result)
133 sender := func(f func(context.Context, ociregistry.Interface, int) T, reg ociregistry.Interface, i int) {
134 ctx, cancel := context.WithCancel(ctx)
135 r := f(ctx, reg, i)
136 select {
137 case c <- result{r, cancel}:
138 case <-done:
139 r.close()
140 cancel()
141 }
142 }
143 go sender(f, u.r0, 0)
144 go sender(f, u.r1, 1)
145 select {
146 case r := <-c:
147 if r.r.error() == nil {
148 return r.r, r.cancel
149 }
150 r.cancel()
151 case <-ctx.Done():
152 return (*new(T)).mkErr(ctx.Err()), func() {}
153 }
154
155 select {
156 case r := <-c:
157 return r.r, r.cancel
158 case <-ctx.Done():
159 return (*new(T)).mkErr(ctx.Err()), func() {}
160 }
161 }
162
163 func mk1(err error) t1 {
164 return t1{err}
165 }
166
167 type t1 struct {
168 err error
169 }
170
171 func (t1) close() {}
172
173 func (t t1) error() error {
174 return t.err
175 }
176
177 func (t1) mkErr(err error) t1 {
178 return t1{err}
179 }
180
181 func mk2[T any](x T, err error) t2[T] {
182 return t2[T]{x, err}
183 }
184
185 type t2[T any] struct {
186 x T
187 err error
188 }
189
190 func (t t2[T]) close() {
191 if closer, ok := any(t.x).(io.Closer); ok {
192 closer.Close()
193 }
194 }
195
196 func (t t2[T]) get() (T, error) {
197 return t.x, t.err
198 }
199
200 func (t t2[T]) error() error {
201 return t.err
202 }
203
204 func (t t2[T]) mkErr(err error) t2[T] {
205 return t2[T]{*new(T), err}
206 }
207
View as plain text