1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package remote
16
17 import (
18 "bytes"
19 "context"
20 "encoding/json"
21 "errors"
22 "fmt"
23 "io"
24 "net/http"
25 "net/url"
26 "sort"
27 "strings"
28 "sync"
29
30 "github.com/google/go-containerregistry/internal/redact"
31 "github.com/google/go-containerregistry/internal/retry"
32 "github.com/google/go-containerregistry/pkg/authn"
33 "github.com/google/go-containerregistry/pkg/logs"
34 "github.com/google/go-containerregistry/pkg/name"
35 v1 "github.com/google/go-containerregistry/pkg/v1"
36 "github.com/google/go-containerregistry/pkg/v1/remote/transport"
37 "github.com/google/go-containerregistry/pkg/v1/stream"
38 "github.com/google/go-containerregistry/pkg/v1/types"
39 )
40
41
42 type Taggable interface {
43 RawManifest() ([]byte, error)
44 }
45
46
47 func Write(ref name.Reference, img v1.Image, options ...Option) (rerr error) {
48 o, err := makeOptions(options...)
49 if err != nil {
50 return err
51 }
52 if o.progress != nil {
53 defer func() { o.progress.Close(rerr) }()
54 }
55 return newPusher(o).Push(o.context, ref, img)
56 }
57
58
59 type writer struct {
60 repo name.Repository
61 auth authn.Authenticator
62 transport http.RoundTripper
63
64 client *http.Client
65
66 progress *progress
67 backoff Backoff
68 predicate retry.Predicate
69
70 scopeLock sync.Mutex
71
72 scopeSet map[string]struct{}
73 scopes []string
74 }
75
76 func makeWriter(ctx context.Context, repo name.Repository, ls []v1.Layer, o *options) (*writer, error) {
77 auth := o.auth
78 if o.keychain != nil {
79 kauth, err := o.keychain.Resolve(repo)
80 if err != nil {
81 return nil, err
82 }
83 auth = kauth
84 }
85 scopes := scopesForUploadingImage(repo, ls)
86 tr, err := transport.NewWithContext(ctx, repo.Registry, auth, o.transport, scopes)
87 if err != nil {
88 return nil, err
89 }
90
91 scopeSet := map[string]struct{}{}
92 for _, scope := range scopes {
93 scopeSet[scope] = struct{}{}
94 }
95 return &writer{
96 repo: repo,
97 client: &http.Client{Transport: tr},
98 auth: auth,
99 transport: o.transport,
100 progress: o.progress,
101 backoff: o.retryBackoff,
102 predicate: o.retryPredicate,
103 scopes: scopes,
104 scopeSet: scopeSet,
105 }, nil
106 }
107
108
109 func (w *writer) url(path string) url.URL {
110 return url.URL{
111 Scheme: w.repo.Registry.Scheme(),
112 Host: w.repo.RegistryStr(),
113 Path: path,
114 }
115 }
116
117 func (w *writer) maybeUpdateScopes(ctx context.Context, ml *MountableLayer) error {
118 if ml.Reference.Context().String() == w.repo.String() {
119 return nil
120 }
121 if ml.Reference.Context().Registry.String() != w.repo.Registry.String() {
122 return nil
123 }
124
125 scope := ml.Reference.Scope(transport.PullScope)
126
127 w.scopeLock.Lock()
128 defer w.scopeLock.Unlock()
129
130 if _, ok := w.scopeSet[scope]; !ok {
131 w.scopeSet[scope] = struct{}{}
132 w.scopes = append(w.scopes, scope)
133
134 logs.Debug.Printf("Refreshing token to add scope %q", scope)
135 wt, err := transport.NewWithContext(ctx, w.repo.Registry, w.auth, w.transport, w.scopes)
136 if err != nil {
137 return err
138 }
139 w.client = &http.Client{Transport: wt}
140 }
141
142 return nil
143 }
144
145
146 func (w *writer) nextLocation(resp *http.Response) (string, error) {
147 loc := resp.Header.Get("Location")
148 if len(loc) == 0 {
149 return "", errors.New("missing Location header")
150 }
151 u, err := url.Parse(loc)
152 if err != nil {
153 return "", err
154 }
155
156
157
158 return resp.Request.URL.ResolveReference(u).String(), nil
159 }
160
161
162
163
164
165 func (w *writer) checkExistingBlob(ctx context.Context, h v1.Hash) (bool, error) {
166 u := w.url(fmt.Sprintf("/v2/%s/blobs/%s", w.repo.RepositoryStr(), h.String()))
167
168 req, err := http.NewRequest(http.MethodHead, u.String(), nil)
169 if err != nil {
170 return false, err
171 }
172
173 resp, err := w.client.Do(req.WithContext(ctx))
174 if err != nil {
175 return false, err
176 }
177 defer resp.Body.Close()
178
179 if err := transport.CheckError(resp, http.StatusOK, http.StatusNotFound); err != nil {
180 return false, err
181 }
182
183 return resp.StatusCode == http.StatusOK, nil
184 }
185
186
187
188
189
190
191
192 func (w *writer) initiateUpload(ctx context.Context, from, mount, origin string) (location string, mounted bool, err error) {
193 u := w.url(fmt.Sprintf("/v2/%s/blobs/uploads/", w.repo.RepositoryStr()))
194 uv := url.Values{}
195 if mount != "" && from != "" {
196
197 uv.Set("mount", mount)
198 uv.Set("from", from)
199 if origin != "" {
200 uv.Set("origin", origin)
201 }
202 }
203 u.RawQuery = uv.Encode()
204
205
206 req, err := http.NewRequest(http.MethodPost, u.String(), nil)
207 if err != nil {
208 return "", false, err
209 }
210 req.Header.Set("Content-Type", "application/json")
211 resp, err := w.client.Do(req.WithContext(ctx))
212 if err != nil {
213 if from != "" {
214
215 logs.Warn.Printf("retrying without mount: %v", err)
216 return w.initiateUpload(ctx, "", "", "")
217 }
218 return "", false, err
219 }
220 defer resp.Body.Close()
221
222 if err := transport.CheckError(resp, http.StatusCreated, http.StatusAccepted); err != nil {
223 if from != "" {
224
225 logs.Warn.Printf("retrying without mount: %v", err)
226 return w.initiateUpload(ctx, "", "", "")
227 }
228 return "", false, err
229 }
230
231
232 switch resp.StatusCode {
233 case http.StatusCreated:
234
235 return "", true, nil
236 case http.StatusAccepted:
237
238 loc, err := w.nextLocation(resp)
239 return loc, false, err
240 default:
241 panic("Unreachable: initiateUpload")
242 }
243 }
244
245
246
247
248 func (w *writer) streamBlob(ctx context.Context, layer v1.Layer, streamLocation string) (commitLocation string, rerr error) {
249 reset := func() {}
250 defer func() {
251 if rerr != nil {
252 reset()
253 }
254 }()
255 blob, err := layer.Compressed()
256 if err != nil {
257 return "", err
258 }
259
260 getBody := layer.Compressed
261 if w.progress != nil {
262 var count int64
263 blob = &progressReader{rc: blob, progress: w.progress, count: &count}
264 getBody = func() (io.ReadCloser, error) {
265 blob, err := layer.Compressed()
266 if err != nil {
267 return nil, err
268 }
269 return &progressReader{rc: blob, progress: w.progress, count: &count}, nil
270 }
271 reset = func() {
272 w.progress.complete(-count)
273 }
274 }
275
276 req, err := http.NewRequest(http.MethodPatch, streamLocation, blob)
277 if err != nil {
278 return "", err
279 }
280 if _, ok := layer.(*stream.Layer); !ok {
281
282 req.GetBody = getBody
283
284
285 if size, err := layer.Size(); err == nil {
286 req.ContentLength = size
287 }
288 }
289 req.Header.Set("Content-Type", "application/octet-stream")
290
291 resp, err := w.client.Do(req.WithContext(ctx))
292 if err != nil {
293 return "", err
294 }
295 defer resp.Body.Close()
296
297 if err := transport.CheckError(resp, http.StatusNoContent, http.StatusAccepted, http.StatusCreated); err != nil {
298 return "", err
299 }
300
301
302
303 return w.nextLocation(resp)
304 }
305
306
307
308 func (w *writer) commitBlob(ctx context.Context, location, digest string) error {
309 u, err := url.Parse(location)
310 if err != nil {
311 return err
312 }
313 v := u.Query()
314 v.Set("digest", digest)
315 u.RawQuery = v.Encode()
316
317 req, err := http.NewRequest(http.MethodPut, u.String(), nil)
318 if err != nil {
319 return err
320 }
321 req.Header.Set("Content-Type", "application/octet-stream")
322
323 resp, err := w.client.Do(req.WithContext(ctx))
324 if err != nil {
325 return err
326 }
327 defer resp.Body.Close()
328
329 return transport.CheckError(resp, http.StatusCreated)
330 }
331
332
333 func (w *writer) incrProgress(written int64) {
334 if w.progress == nil {
335 return
336 }
337 w.progress.complete(written)
338 }
339
340
341 func (w *writer) uploadOne(ctx context.Context, l v1.Layer) error {
342 tryUpload := func() error {
343 ctx := retry.Never(ctx)
344 var from, mount, origin string
345 if h, err := l.Digest(); err == nil {
346
347
348 existing, err := w.checkExistingBlob(ctx, h)
349 if err != nil {
350 return err
351 }
352 if existing {
353 size, err := l.Size()
354 if err != nil {
355 return err
356 }
357 w.incrProgress(size)
358 logs.Progress.Printf("existing blob: %v", h)
359 return nil
360 }
361
362 mount = h.String()
363 }
364 if ml, ok := l.(*MountableLayer); ok {
365 if err := w.maybeUpdateScopes(ctx, ml); err != nil {
366 return err
367 }
368
369 from = ml.Reference.Context().RepositoryStr()
370 origin = ml.Reference.Context().RegistryStr()
371
372
373
374 if w.repo.RegistryStr() == name.DefaultRegistry && origin != w.repo.RegistryStr() {
375 from = ""
376 origin = ""
377 }
378 }
379
380 location, mounted, err := w.initiateUpload(ctx, from, mount, origin)
381 if err != nil {
382 return err
383 } else if mounted {
384 size, err := l.Size()
385 if err != nil {
386 return err
387 }
388 w.incrProgress(size)
389 h, err := l.Digest()
390 if err != nil {
391 return err
392 }
393 logs.Progress.Printf("mounted blob: %s", h.String())
394 return nil
395 }
396
397
398
399 mt, err := l.MediaType()
400 if err != nil {
401 return err
402 }
403 smt := string(mt)
404 if !(strings.HasSuffix(smt, "+json") || strings.HasSuffix(smt, "+yaml")) {
405 ctx = redact.NewContext(ctx, "omitting binary blobs from logs")
406 }
407
408 location, err = w.streamBlob(ctx, l, location)
409 if err != nil {
410 return err
411 }
412
413 h, err := l.Digest()
414 if err != nil {
415 return err
416 }
417 digest := h.String()
418
419 if err := w.commitBlob(ctx, location, digest); err != nil {
420 return err
421 }
422 logs.Progress.Printf("pushed blob: %s", digest)
423 return nil
424 }
425
426 return retry.Retry(tryUpload, w.predicate, w.backoff)
427 }
428
429 type withMediaType interface {
430 MediaType() (types.MediaType, error)
431 }
432
433
434
435
436
437
438
439 func unpackTaggable(t Taggable) ([]byte, *v1.Descriptor, error) {
440 if d, ok := t.(*Descriptor); ok {
441 return d.Manifest, &d.Descriptor, nil
442 }
443 b, err := t.RawManifest()
444 if err != nil {
445 return nil, nil, err
446 }
447
448
449 mt := types.DockerManifestSchema2
450
451 if wmt, ok := t.(withMediaType); ok {
452 m, err := wmt.MediaType()
453 if err != nil {
454 return nil, nil, err
455 }
456 mt = m
457 }
458
459 h, sz, err := v1.SHA256(bytes.NewReader(b))
460 if err != nil {
461 return nil, nil, err
462 }
463
464 return b, &v1.Descriptor{
465 MediaType: mt,
466 Size: sz,
467 Digest: h,
468 }, nil
469 }
470
471
472
473 func (w *writer) commitSubjectReferrers(ctx context.Context, sub name.Digest, add v1.Descriptor) error {
474
475
476 u := w.url(fmt.Sprintf("/v2/%s/referrers/%s", w.repo.RepositoryStr(), sub.DigestStr()))
477 req, err := http.NewRequest(http.MethodGet, u.String(), nil)
478 if err != nil {
479 return err
480 }
481 req.Header.Set("Accept", string(types.OCIImageIndex))
482 resp, err := w.client.Do(req.WithContext(ctx))
483 if err != nil {
484 return err
485 }
486 defer resp.Body.Close()
487
488 if err := transport.CheckError(resp, http.StatusOK, http.StatusNotFound, http.StatusBadRequest); err != nil {
489 return err
490 }
491 if resp.StatusCode == http.StatusOK {
492
493 return nil
494 }
495
496
497
498 t := fallbackTag(sub)
499 u = w.url(fmt.Sprintf("/v2/%s/manifests/%s", w.repo.RepositoryStr(), t.Identifier()))
500 req, err = http.NewRequest(http.MethodGet, u.String(), nil)
501 if err != nil {
502 return err
503 }
504 req.Header.Set("Accept", string(types.OCIImageIndex))
505 resp, err = w.client.Do(req.WithContext(ctx))
506 if err != nil {
507 return err
508 }
509 defer resp.Body.Close()
510
511 var im v1.IndexManifest
512 if err := transport.CheckError(resp, http.StatusOK, http.StatusNotFound); err != nil {
513 return err
514 } else if resp.StatusCode == http.StatusNotFound {
515
516 im = v1.IndexManifest{
517 SchemaVersion: 2,
518 MediaType: types.OCIImageIndex,
519 Manifests: []v1.Descriptor{add},
520 }
521 } else {
522 if err := json.NewDecoder(resp.Body).Decode(&im); err != nil {
523 return err
524 }
525 if im.SchemaVersion != 2 {
526 return fmt.Errorf("fallback tag manifest is not a schema version 2: %d", im.SchemaVersion)
527 }
528 if im.MediaType != types.OCIImageIndex {
529 return fmt.Errorf("fallback tag manifest is not an OCI image index: %s", im.MediaType)
530 }
531 for _, desc := range im.Manifests {
532 if desc.Digest == add.Digest {
533
534 logs.Progress.Printf("fallback tag %s already had referrer", t.Identifier())
535 return nil
536 }
537 }
538
539 im.Manifests = append(im.Manifests, add)
540 }
541
542
543 sort.Slice(im.Manifests, func(i, j int) bool {
544 return im.Manifests[i].Digest.String() < im.Manifests[j].Digest.String()
545 })
546 logs.Progress.Printf("updating fallback tag %s with new referrer", t.Identifier())
547 return w.commitManifest(ctx, fallbackTaggable{im}, t)
548 }
549
550 type fallbackTaggable struct {
551 im v1.IndexManifest
552 }
553
554 func (f fallbackTaggable) RawManifest() ([]byte, error) { return json.Marshal(f.im) }
555 func (f fallbackTaggable) MediaType() (types.MediaType, error) { return types.OCIImageIndex, nil }
556
557
558 func (w *writer) commitManifest(ctx context.Context, t Taggable, ref name.Reference) error {
559
560 raw, err := t.RawManifest()
561 if err != nil {
562 return err
563 }
564 var mf struct {
565 MediaType types.MediaType `json:"mediaType"`
566 Subject *v1.Descriptor `json:"subject,omitempty"`
567 Config struct {
568 MediaType types.MediaType `json:"mediaType"`
569 } `json:"config"`
570 }
571 if err := json.Unmarshal(raw, &mf); err != nil {
572 return err
573 }
574
575 tryUpload := func() error {
576 ctx := retry.Never(ctx)
577 raw, desc, err := unpackTaggable(t)
578 if err != nil {
579 return err
580 }
581
582 u := w.url(fmt.Sprintf("/v2/%s/manifests/%s", w.repo.RepositoryStr(), ref.Identifier()))
583
584
585 req, err := http.NewRequest(http.MethodPut, u.String(), bytes.NewBuffer(raw))
586 if err != nil {
587 return err
588 }
589 req.Header.Set("Content-Type", string(desc.MediaType))
590
591 resp, err := w.client.Do(req.WithContext(ctx))
592 if err != nil {
593 return err
594 }
595 defer resp.Body.Close()
596
597 if err := transport.CheckError(resp, http.StatusOK, http.StatusCreated, http.StatusAccepted); err != nil {
598 return err
599 }
600
601
602
603 if mf.Subject != nil {
604 h, size, err := v1.SHA256(bytes.NewReader(raw))
605 if err != nil {
606 return err
607 }
608 desc := v1.Descriptor{
609 ArtifactType: string(mf.Config.MediaType),
610 MediaType: mf.MediaType,
611 Digest: h,
612 Size: size,
613 }
614 if err := w.commitSubjectReferrers(ctx,
615 ref.Context().Digest(mf.Subject.Digest.String()),
616 desc); err != nil {
617 return err
618 }
619 }
620
621
622 logs.Progress.Printf("%v: digest: %v size: %d", ref, desc.Digest, desc.Size)
623 w.incrProgress(int64(len(raw)))
624 return nil
625 }
626
627 return retry.Retry(tryUpload, w.predicate, w.backoff)
628 }
629
630 func scopesForUploadingImage(repo name.Repository, layers []v1.Layer) []string {
631
632 scopeSet := map[string]struct{}{}
633
634 for _, l := range layers {
635 if ml, ok := l.(*MountableLayer); ok {
636
637
638 if ml.Reference.Context().String() != repo.String() && ml.Reference.Context().Registry.String() == repo.Registry.String() {
639 scopeSet[ml.Reference.Scope(transport.PullScope)] = struct{}{}
640 }
641 }
642 }
643
644 scopes := make([]string, 0)
645
646 scopes = append(scopes, repo.Scope(transport.PushScope))
647
648 for scope := range scopeSet {
649 scopes = append(scopes, scope)
650 }
651
652 return scopes
653 }
654
655
656
657
658 func WriteIndex(ref name.Reference, ii v1.ImageIndex, options ...Option) (rerr error) {
659 o, err := makeOptions(options...)
660 if err != nil {
661 return err
662 }
663 if o.progress != nil {
664 defer func() { o.progress.Close(rerr) }()
665 }
666 return newPusher(o).Push(o.context, ref, ii)
667 }
668
669
670 func WriteLayer(repo name.Repository, layer v1.Layer, options ...Option) (rerr error) {
671 o, err := makeOptions(options...)
672 if err != nil {
673 return err
674 }
675 if o.progress != nil {
676 defer func() { o.progress.Close(rerr) }()
677 }
678 return newPusher(o).Upload(o.context, repo, layer)
679 }
680
681
682
683
684
685
686
687
688
689
690
691
692 func Tag(tag name.Tag, t Taggable, options ...Option) error {
693 return Put(tag, t, options...)
694 }
695
696
697
698
699
700
701
702
703
704
705
706
707 func Put(ref name.Reference, t Taggable, options ...Option) error {
708 o, err := makeOptions(options...)
709 if err != nil {
710 return err
711 }
712 return newPusher(o).Push(o.context, ref, t)
713 }
714
View as plain text