1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package ociunify
16
17 import (
18 "context"
19 "encoding/base64"
20 "encoding/json"
21 "fmt"
22 "io"
23
24 "cuelabs.dev/go/oci/ociregistry"
25 )
26
27 func (u unifier) PushBlob(ctx context.Context, repo string, desc ociregistry.Descriptor, r io.Reader) (ociregistry.Descriptor, error) {
28 resultc := make(chan t2[ociregistry.Descriptor])
29 onePush := func(ri ociregistry.Interface, r *io.PipeReader) {
30 desc, err := ri.PushBlob(ctx, repo, desc, r)
31 r.CloseWithError(err)
32 resultc <- t2[ociregistry.Descriptor]{desc, err}
33 }
34 pr0, pw0 := io.Pipe()
35 pr1, pw1 := io.Pipe()
36 go onePush(u.r0, pr0)
37 go onePush(u.r1, pr1)
38 go func() {
39 _, err := io.Copy(io.MultiWriter(pw0, pw1), r)
40 pw0.CloseWithError(err)
41 pw1.CloseWithError(err)
42 }()
43 r0 := <-resultc
44 r1 := <-resultc
45 if (r0.err == nil) == (r1.err == nil) {
46 return r0.get()
47 }
48 return ociregistry.Descriptor{}, fmt.Errorf("one push succeeded where the other failed (TODO better error)")
49 }
50
51 func (u unifier) PushManifest(ctx context.Context, repo string, tag string, contents []byte, mediaType string) (ociregistry.Descriptor, error) {
52 r0, r1 := both(u, func(r ociregistry.Interface, _ int) t2[ociregistry.Descriptor] {
53 return mk2(r.PushManifest(ctx, repo, tag, contents, mediaType))
54 })
55 if (r0.err == nil) == (r1.err == nil) {
56 return r0.get()
57 }
58 return ociregistry.Descriptor{}, fmt.Errorf("one push succeeded where the other failed (TODO better error)")
59 }
60
61 func (u unifier) PushBlobChunked(ctx context.Context, repo string, chunkSize int) (ociregistry.BlobWriter, error) {
62 r0, r1 := both(u, func(r ociregistry.Interface, i int) t2[ociregistry.BlobWriter] {
63 return mk2(r.PushBlobChunked(ctx, repo, chunkSize))
64 })
65 if r0.err != nil || r1.err != nil {
66 r0.close()
67 r1.close()
68 return nil, bothResults(r0, r1).err
69 }
70 w0, w1 := r0.x, r1.x
71 size := w0.Size()
72 return &unifiedBlobWriter{
73 w: [2]ociregistry.BlobWriter{w0, w1},
74 size: size,
75 }, nil
76 }
77
78 func (u unifier) PushBlobChunkedResume(ctx context.Context, repo, id string, offset int64, chunkSize int) (ociregistry.BlobWriter, error) {
79 data, err := base64.RawURLEncoding.DecodeString(id)
80 if err != nil {
81 return nil, fmt.Errorf("malformed ID: %v", err)
82 }
83 var ids []string
84 if err := json.Unmarshal(data, &ids); err != nil {
85 return nil, fmt.Errorf("malformed ID %q: %v", id, err)
86 }
87 if len(ids) != 2 {
88 return nil, fmt.Errorf("malformed ID %q (expected two elements)", id)
89 }
90 r0, r1 := both(u, func(r ociregistry.Interface, i int) t2[ociregistry.BlobWriter] {
91 return mk2(r.PushBlobChunkedResume(ctx, repo, ids[i], offset, chunkSize))
92 })
93 if r0.err != nil || r1.err != nil {
94 r0.close()
95 r1.close()
96 return nil, bothResults(r0, r1).err
97 }
98 w0, w1 := r0.x, r1.x
99 size := w0.Size()
100 if w1.Size() != size {
101 r0.close()
102 r1.close()
103 return nil, fmt.Errorf("registries do not agree on upload size; please start upload again")
104 }
105 return &unifiedBlobWriter{
106 w: [2]ociregistry.BlobWriter{w0, w1},
107 size: size,
108 }, nil
109 }
110
111 func (u unifier) MountBlob(ctx context.Context, fromRepo, toRepo string, digest ociregistry.Digest) (ociregistry.Descriptor, error) {
112 return bothResults(both(u,
113 func(r ociregistry.Interface, _ int) t2[ociregistry.Descriptor] {
114 return mk2(r.MountBlob(ctx, fromRepo, toRepo, digest))
115 },
116 )).get()
117 }
118
119 type unifiedBlobWriter struct {
120 u unifier
121 w [2]ociregistry.BlobWriter
122 size int64
123 }
124
125 func (w *unifiedBlobWriter) Write(buf []byte) (int, error) {
126 r := bothResults(both(w.u, func(_ ociregistry.Interface, i int) t2[int] {
127 return mk2(w.w[i].Write(buf))
128 }))
129 if r.err != nil {
130 return 0, r.err
131 }
132 w.size += int64(len(buf))
133 return len(buf), nil
134 }
135
136 func (w *unifiedBlobWriter) Close() error {
137 return bothResults(both(w.u, func(_ ociregistry.Interface, i int) t1 {
138 return mk1(w.w[i].Close())
139 })).err
140 }
141
142 func (w *unifiedBlobWriter) Cancel() error {
143 return bothResults(both(w.u, func(_ ociregistry.Interface, i int) t1 {
144 return mk1(w.w[i].Cancel())
145 })).err
146 }
147
148 func (w *unifiedBlobWriter) Size() int64 {
149 return w.size
150 }
151
152 func (w *unifiedBlobWriter) ChunkSize() int {
153
154
155
156 s1, s2 := w.w[0].ChunkSize(), w.w[1].ChunkSize()
157 if s2 > s1 {
158 return s2
159 }
160 return s1
161 }
162
163 func (w *unifiedBlobWriter) ID() string {
164 data, _ := json.Marshal([]string{w.w[0].ID(), w.w[1].ID()})
165 return base64.RawURLEncoding.EncodeToString(data)
166 }
167
168 func (w *unifiedBlobWriter) Commit(digest ociregistry.Digest) (ociregistry.Descriptor, error) {
169 return bothResults(both(w.u, func(_ ociregistry.Interface, i int) t2[ociregistry.Descriptor] {
170 return mk2(w.w[i].Commit(digest))
171 })).get()
172 }
173
View as plain text