1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package ociclient
16
17 import (
18 "bytes"
19 "context"
20 "fmt"
21 "io"
22 "net/http"
23 "net/url"
24 "strconv"
25 "sync"
26
27 "github.com/opencontainers/go-digest"
28
29 "cuelabs.dev/go/oci/ociregistry"
30 "cuelabs.dev/go/oci/ociregistry/internal/ocirequest"
31 "cuelabs.dev/go/oci/ociregistry/ociauth"
32 )
33
34
35
36 func (c *client) PushManifest(ctx context.Context, repo string, tag string, contents []byte, mediaType string) (ociregistry.Descriptor, error) {
37 if mediaType == "" {
38 return ociregistry.Descriptor{}, fmt.Errorf("PushManifest called with empty mediaType")
39 }
40 desc := ociregistry.Descriptor{
41 Digest: digest.FromBytes(contents),
42 Size: int64(len(contents)),
43 MediaType: mediaType,
44 }
45
46 rreq := &ocirequest.Request{
47 Kind: ocirequest.ReqManifestPut,
48 Repo: repo,
49 Tag: tag,
50 Digest: string(desc.Digest),
51 }
52 req, err := newRequest(ctx, rreq, bytes.NewReader(contents))
53 req.Header.Set("Content-Type", mediaType)
54 req.ContentLength = desc.Size
55 resp, err := c.do(req, http.StatusCreated)
56 if err != nil {
57 return ociregistry.Descriptor{}, err
58 }
59 resp.Body.Close()
60 return desc, nil
61 }
62
63 func (c *client) MountBlob(ctx context.Context, fromRepo, toRepo string, dig ociregistry.Digest) (ociregistry.Descriptor, error) {
64 rreq := &ocirequest.Request{
65 Kind: ocirequest.ReqBlobMount,
66 Repo: toRepo,
67 FromRepo: fromRepo,
68 Digest: string(dig),
69 }
70 resp, err := c.doRequest(ctx, rreq, http.StatusCreated, http.StatusAccepted)
71 if err != nil {
72 return ociregistry.Descriptor{}, err
73 }
74 resp.Body.Close()
75 if resp.StatusCode == http.StatusAccepted {
76
77
78
79 return ociregistry.Descriptor{}, fmt.Errorf("registry does not support mounts: %w", ociregistry.ErrUnsupported)
80 }
81 return descriptorFromResponse(resp, dig, false)
82 }
83
84 func (c *client) PushBlob(ctx context.Context, repo string, desc ociregistry.Descriptor, r io.Reader) (_ ociregistry.Descriptor, _err error) {
85
86
87
88
89 rreq := &ocirequest.Request{
90 Kind: ocirequest.ReqBlobStartUpload,
91 Repo: repo,
92 }
93 req, err := newRequest(ctx, rreq, nil)
94 if err != nil {
95 return ociregistry.Descriptor{}, err
96 }
97 resp, err := c.do(req, http.StatusAccepted)
98 if err != nil {
99 return ociregistry.Descriptor{}, err
100 }
101 resp.Body.Close()
102 location, err := locationFromResponse(resp)
103 if err != nil {
104 return ociregistry.Descriptor{}, err
105 }
106
107
108
109 ctx = ociauth.ContextWithRequestInfo(ctx, ociauth.RequestInfo{
110 RequiredScope: scopeForRequest(rreq),
111 })
112
113
114 req, err = http.NewRequestWithContext(ctx, "PUT", "", r)
115 if err != nil {
116 return ociregistry.Descriptor{}, err
117 }
118 req.URL = urlWithDigest(location, string(desc.Digest))
119 req.ContentLength = desc.Size
120 req.Header.Set("Content-Type", "application/octet-stream")
121
122 req.Header.Set("Content-Range", ocirequest.RangeString(0, desc.Size))
123 resp, err = c.do(req, http.StatusCreated)
124 if err != nil {
125 return ociregistry.Descriptor{}, err
126 }
127 defer closeOnError(&_err, resp.Body)
128 resp.Body.Close()
129 return desc, nil
130 }
131
132
133
134
135 const defaultChunkSize = 64 * 1024
136
137 func (c *client) PushBlobChunked(ctx context.Context, repo string, chunkSize int) (ociregistry.BlobWriter, error) {
138 if chunkSize <= 0 {
139 chunkSize = defaultChunkSize
140 }
141 resp, err := c.doRequest(ctx, &ocirequest.Request{
142 Kind: ocirequest.ReqBlobStartUpload,
143 Repo: repo,
144 }, http.StatusAccepted)
145 if err != nil {
146 return nil, err
147 }
148 resp.Body.Close()
149 location, err := locationFromResponse(resp)
150 if err != nil {
151 return nil, err
152 }
153 ctx = ociauth.ContextWithRequestInfo(ctx, ociauth.RequestInfo{
154 RequiredScope: ociauth.NewScope(ociauth.ResourceScope{
155 ResourceType: "repository",
156 Resource: repo,
157 Action: "push",
158 }),
159 })
160 return &blobWriter{
161 ctx: ctx,
162 client: c,
163 chunkSize: chunkSizeFromResponse(resp, chunkSize),
164 chunk: make([]byte, 0, chunkSize),
165 location: location,
166 }, nil
167 }
168
169 func (c *client) PushBlobChunkedResume(ctx context.Context, repo string, id string, offset int64, chunkSize int) (ociregistry.BlobWriter, error) {
170 if id == "" {
171 return nil, fmt.Errorf("id must be non-empty to resume a chunked upload")
172 }
173 if chunkSize <= 0 {
174 chunkSize = defaultChunkSize
175 }
176 var location *url.URL
177 switch {
178 case offset == -1:
179
180
181
182 ctx := ociauth.ContextWithRequestInfo(ctx, ociauth.RequestInfo{
183 RequiredScope: ociauth.NewScope(ociauth.ResourceScope{
184 ResourceType: "repository",
185 Resource: repo,
186 Action: "push",
187 }, ociauth.ResourceScope{
188 ResourceType: "repository",
189 Resource: repo,
190 Action: "pull",
191 }),
192 })
193 req, err := http.NewRequestWithContext(ctx, "GET", id, nil)
194 if err != nil {
195 return nil, err
196 }
197 resp, err := c.do(req, http.StatusNoContent)
198 if err != nil {
199 return nil, fmt.Errorf("cannot recover chunk offset: %v", err)
200 }
201 location, err = locationFromResponse(resp)
202 if err != nil {
203 return nil, fmt.Errorf("cannot get location from response: %v", err)
204 }
205 rangeStr := resp.Header.Get("Range")
206 p0, p1, ok := ocirequest.ParseRange(rangeStr)
207 if !ok {
208 return nil, fmt.Errorf("invalid range %q in response", rangeStr)
209 }
210 if p0 != 0 {
211 return nil, fmt.Errorf("range %q does not start with 0", rangeStr)
212 }
213 chunkSize = chunkSizeFromResponse(resp, chunkSize)
214 offset = p1
215 case offset < 0:
216 return nil, fmt.Errorf("invalid offset; must be -1 or non-negative")
217 default:
218 var err error
219 location, err = url.Parse(id)
220 if err != nil {
221 return nil, fmt.Errorf("provided ID is not a valid location URL")
222 }
223 }
224 ctx = ociauth.ContextWithRequestInfo(ctx, ociauth.RequestInfo{
225 RequiredScope: ociauth.NewScope(ociauth.ResourceScope{
226 ResourceType: "repository",
227 Resource: repo,
228 Action: "push",
229 }),
230 })
231 return &blobWriter{
232 ctx: ctx,
233 client: c,
234 chunkSize: chunkSize,
235 size: offset,
236 flushed: offset,
237 location: location,
238 }, nil
239 }
240
241 type blobWriter struct {
242 client *client
243 chunkSize int
244 ctx context.Context
245
246
247 mu sync.Mutex
248 closed bool
249 chunk []byte
250 closeErr error
251
252
253
254 size int64
255
256
257
258 flushed int64
259 location *url.URL
260 }
261
262 type doResult struct {
263 resp *http.Response
264 err error
265 }
266
267 func (w *blobWriter) Write(buf []byte) (int, error) {
268 w.mu.Lock()
269 defer w.mu.Unlock()
270
271
272
273
274
275 if len(w.chunk)+len(buf) > w.chunkSize {
276 if err := w.flush(buf, ""); err != nil {
277 return 0, err
278 }
279 } else {
280 if w.chunk == nil {
281 w.chunk = make([]byte, 0, w.chunkSize)
282 }
283 w.chunk = append(w.chunk, buf...)
284 }
285 w.size += int64(len(buf))
286 return len(buf), nil
287 }
288
289
290
291
292 func (w *blobWriter) flush(buf []byte, commitDigest ociregistry.Digest) error {
293 if commitDigest == "" && len(buf)+len(w.chunk) == 0 {
294 return nil
295 }
296
297 method := "PATCH"
298 expect := http.StatusAccepted
299 reqURL := w.location
300 if commitDigest != "" {
301
302
303 method = "PUT"
304 expect = http.StatusCreated
305 reqURL = urlWithDigest(reqURL, string(commitDigest))
306 }
307 req, err := http.NewRequestWithContext(w.ctx, method, "", concatBody(w.chunk, buf))
308 if err != nil {
309 return fmt.Errorf("cannot make PATCH request: %v", err)
310 }
311 req.URL = reqURL
312 req.ContentLength = int64(len(w.chunk) + len(buf))
313
314
315 req.Header.Set("Content-Range", ocirequest.RangeString(w.flushed, w.flushed+req.ContentLength))
316 resp, err := w.client.do(req, expect)
317 if err != nil {
318 return err
319 }
320 resp.Body.Close()
321 location, err := locationFromResponse(resp)
322 if err != nil {
323 return fmt.Errorf("bad Location in response: %v", err)
324 }
325
326 w.location = location
327 w.flushed += req.ContentLength
328 w.chunk = w.chunk[:0]
329 return nil
330 }
331
332 func concatBody(b1, b2 []byte) io.Reader {
333 if len(b1)+len(b2) == 0 {
334 return nil
335 }
336 if len(b1) == 0 {
337 return bytes.NewReader(b2)
338 }
339 if len(b2) == 0 {
340 return bytes.NewReader(b1)
341 }
342 return io.MultiReader(
343 bytes.NewReader(b1),
344 bytes.NewReader(b2),
345 )
346 }
347
348 func (w *blobWriter) Close() error {
349 w.mu.Lock()
350 defer w.mu.Unlock()
351 if w.closed {
352 return w.closeErr
353 }
354 err := w.flush(nil, "")
355 w.closed = true
356 w.closeErr = err
357 return err
358 }
359
360 func (w *blobWriter) Size() int64 {
361 w.mu.Lock()
362 defer w.mu.Unlock()
363 return w.size
364 }
365
366 func (w *blobWriter) ChunkSize() int {
367 return w.chunkSize
368 }
369
370 func (w *blobWriter) ID() string {
371 w.mu.Lock()
372 defer w.mu.Unlock()
373 return w.location.String()
374 }
375
376 func (w *blobWriter) Commit(digest ociregistry.Digest) (ociregistry.Descriptor, error) {
377 if digest == "" {
378 return ociregistry.Descriptor{}, fmt.Errorf("cannot commit with an empty digest")
379 }
380 w.mu.Lock()
381 defer w.mu.Unlock()
382 if err := w.flush(nil, digest); err != nil {
383 return ociregistry.Descriptor{}, fmt.Errorf("cannot flush data before commit: %v", err)
384 }
385 return ociregistry.Descriptor{
386 MediaType: "application/octet-stream",
387 Size: w.size,
388 Digest: digest,
389 }, nil
390 }
391
392 func (w *blobWriter) Cancel() error {
393 return nil
394 }
395
396
397
398
399
400
401 func urlWithDigest(u0 *url.URL, digest string) *url.URL {
402 u := *u0
403 digest = url.QueryEscape(digest)
404 switch {
405 case u.ForceQuery:
406
407 u.RawQuery = "digest=" + digest
408 u.ForceQuery = false
409 case u.RawQuery != "":
410
411 u.RawQuery += "&digest=" + digest
412 default:
413 u.RawQuery = "digest=" + digest
414 }
415 return &u
416 }
417
418
419 func chunkSizeFromResponse(resp *http.Response, chunkSize int) int {
420 minChunkSize, err := strconv.Atoi(resp.Header.Get("OCI-Chunk-Min-Length"))
421 if err == nil && minChunkSize > chunkSize {
422 return minChunkSize
423 }
424 return chunkSize
425 }
426
View as plain text