1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package remote
16
17 import (
18 "bytes"
19 "context"
20 "errors"
21 "fmt"
22 "net/http"
23 "net/url"
24 "sync"
25
26 "github.com/google/go-containerregistry/pkg/logs"
27 "github.com/google/go-containerregistry/pkg/name"
28 v1 "github.com/google/go-containerregistry/pkg/v1"
29 "github.com/google/go-containerregistry/pkg/v1/partial"
30 "github.com/google/go-containerregistry/pkg/v1/remote/transport"
31 "github.com/google/go-containerregistry/pkg/v1/stream"
32 "github.com/google/go-containerregistry/pkg/v1/types"
33 "golang.org/x/sync/errgroup"
34 )
35
36 type manifest interface {
37 Taggable
38 partial.Describable
39 }
40
41
42 type workers struct {
43
44 onces sync.Map
45
46
47 errors sync.Map
48 }
49
50 func nop() error {
51 return nil
52 }
53
54 func (w *workers) err(digest v1.Hash) error {
55 v, ok := w.errors.Load(digest)
56 if !ok || v == nil {
57 return nil
58 }
59 return v.(error)
60 }
61
62 func (w *workers) Do(digest v1.Hash, f func() error) error {
63
64 once, _ := w.onces.LoadOrStore(digest, &sync.Once{})
65
66 once.(*sync.Once).Do(func() {
67 w.errors.Store(digest, f())
68 })
69
70 err := w.err(digest)
71 if err != nil {
72
73 w.onces.Delete(digest)
74 }
75 return err
76 }
77
78 func (w *workers) Stream(layer v1.Layer, f func() error) error {
79
80 once, _ := w.onces.LoadOrStore(layer, &sync.Once{})
81
82 once.(*sync.Once).Do(func() {
83 w.errors.Store(layer, f())
84 })
85
86 v, ok := w.errors.Load(layer)
87 if !ok || v == nil {
88 return nil
89 }
90
91 return v.(error)
92 }
93
94 type Pusher struct {
95 o *options
96
97
98 writers sync.Map
99 }
100
101 func NewPusher(options ...Option) (*Pusher, error) {
102 o, err := makeOptions(options...)
103 if err != nil {
104 return nil, err
105 }
106
107 return newPusher(o), nil
108 }
109
110 func newPusher(o *options) *Pusher {
111 if o.pusher != nil {
112 return o.pusher
113 }
114 return &Pusher{
115 o: o,
116 }
117 }
118
119 func (p *Pusher) writer(ctx context.Context, repo name.Repository, o *options) (*repoWriter, error) {
120 v, _ := p.writers.LoadOrStore(repo, &repoWriter{
121 repo: repo,
122 o: o,
123 })
124 rw := v.(*repoWriter)
125 return rw, rw.init(ctx)
126 }
127
128 func (p *Pusher) Push(ctx context.Context, ref name.Reference, t Taggable) error {
129 w, err := p.writer(ctx, ref.Context(), p.o)
130 if err != nil {
131 return err
132 }
133 return w.writeManifest(ctx, ref, t)
134 }
135
136 func (p *Pusher) Upload(ctx context.Context, repo name.Repository, l v1.Layer) error {
137 w, err := p.writer(ctx, repo, p.o)
138 if err != nil {
139 return err
140 }
141 return w.writeLayer(ctx, l)
142 }
143
144 func (p *Pusher) Delete(ctx context.Context, ref name.Reference) error {
145 w, err := p.writer(ctx, ref.Context(), p.o)
146 if err != nil {
147 return err
148 }
149
150 u := url.URL{
151 Scheme: ref.Context().Registry.Scheme(),
152 Host: ref.Context().RegistryStr(),
153 Path: fmt.Sprintf("/v2/%s/manifests/%s", ref.Context().RepositoryStr(), ref.Identifier()),
154 }
155
156 req, err := http.NewRequest(http.MethodDelete, u.String(), nil)
157 if err != nil {
158 return err
159 }
160
161 resp, err := w.w.client.Do(req.WithContext(ctx))
162 if err != nil {
163 return err
164 }
165 defer resp.Body.Close()
166
167 return transport.CheckError(resp, http.StatusOK, http.StatusAccepted)
168
169
170
171
172 }
173
174 type repoWriter struct {
175 repo name.Repository
176 o *options
177 once sync.Once
178
179 w *writer
180 err error
181
182 work *workers
183 }
184
185
186 func (rw *repoWriter) init(ctx context.Context) error {
187 rw.once.Do(func() {
188 rw.work = &workers{}
189 rw.w, rw.err = makeWriter(ctx, rw.repo, nil, rw.o)
190 })
191 return rw.err
192 }
193
194 func (rw *repoWriter) writeDeps(ctx context.Context, m manifest) error {
195 if img, ok := m.(v1.Image); ok {
196 return rw.writeLayers(ctx, img)
197 }
198
199 if idx, ok := m.(v1.ImageIndex); ok {
200 return rw.writeChildren(ctx, idx)
201 }
202
203
204 return nil
205 }
206
207 type describable struct {
208 desc v1.Descriptor
209 }
210
211 func (d describable) Digest() (v1.Hash, error) {
212 return d.desc.Digest, nil
213 }
214
215 func (d describable) Size() (int64, error) {
216 return d.desc.Size, nil
217 }
218
219 func (d describable) MediaType() (types.MediaType, error) {
220 return d.desc.MediaType, nil
221 }
222
223 type tagManifest struct {
224 Taggable
225 partial.Describable
226 }
227
228 func taggableToManifest(t Taggable) (manifest, error) {
229 if m, ok := t.(manifest); ok {
230 return m, nil
231 }
232
233 if d, ok := t.(*Descriptor); ok {
234 if d.MediaType.IsIndex() {
235 return d.ImageIndex()
236 }
237
238 if d.MediaType.IsImage() {
239 return d.Image()
240 }
241
242 if d.MediaType.IsSchema1() {
243 return d.Schema1()
244 }
245
246 return tagManifest{t, describable{d.toDesc()}}, nil
247 }
248
249 desc := v1.Descriptor{
250
251 MediaType: types.DockerManifestSchema2,
252 }
253
254 b, err := t.RawManifest()
255 if err != nil {
256 return nil, err
257 }
258
259 if wmt, ok := t.(withMediaType); ok {
260 desc.MediaType, err = wmt.MediaType()
261 if err != nil {
262 return nil, err
263 }
264 }
265
266 desc.Digest, desc.Size, err = v1.SHA256(bytes.NewReader(b))
267 if err != nil {
268 return nil, err
269 }
270
271 return tagManifest{t, describable{desc}}, nil
272 }
273
274 func (rw *repoWriter) writeManifest(ctx context.Context, ref name.Reference, t Taggable) error {
275 m, err := taggableToManifest(t)
276 if err != nil {
277 return err
278 }
279
280 needDeps := true
281
282 digest, err := m.Digest()
283 if errors.Is(err, stream.ErrNotComputed) {
284 if err := rw.writeDeps(ctx, m); err != nil {
285 return err
286 }
287
288 needDeps = false
289
290 digest, err = m.Digest()
291 if err != nil {
292 return err
293 }
294 } else if err != nil {
295 return err
296 }
297
298
299 if ref == nil {
300 ref = rw.repo.Digest(digest.String())
301 }
302
303
304
305 _, byTag := ref.(name.Tag)
306 if byTag {
307 if exists, err := rw.manifestExists(ctx, ref, t); err != nil {
308 return err
309 } else if exists {
310 return nil
311 }
312 }
313
314
315
316 needPut := byTag
317
318 if err := rw.work.Do(digest, func() error {
319 if !byTag {
320 if exists, err := rw.manifestExists(ctx, ref, t); err != nil {
321 return err
322 } else if exists {
323 return nil
324 }
325 }
326
327 if needDeps {
328 if err := rw.writeDeps(ctx, m); err != nil {
329 return err
330 }
331 }
332
333 needPut = false
334 return rw.commitManifest(ctx, ref, m)
335 }); err != nil {
336 return err
337 }
338
339 if !needPut {
340 return nil
341 }
342
343
344 return rw.commitManifest(ctx, ref, m)
345 }
346
347 func (rw *repoWriter) writeChildren(ctx context.Context, idx v1.ImageIndex) error {
348 children, err := partial.Manifests(idx)
349 if err != nil {
350 return err
351 }
352
353 g, ctx := errgroup.WithContext(ctx)
354 g.SetLimit(rw.o.jobs)
355
356 for _, child := range children {
357 child := child
358 if err := rw.writeChild(ctx, child, g); err != nil {
359 return err
360 }
361 }
362
363 return g.Wait()
364 }
365
366 func (rw *repoWriter) writeChild(ctx context.Context, child partial.Describable, g *errgroup.Group) error {
367 switch child := child.(type) {
368 case v1.ImageIndex:
369
370
371
372
373 return rw.writeManifest(ctx, nil, child)
374 case v1.Image:
375 g.Go(func() error {
376 return rw.writeManifest(ctx, nil, child)
377 })
378 case v1.Layer:
379 g.Go(func() error {
380 return rw.writeLayer(ctx, child)
381 })
382 default:
383
384 return fmt.Errorf("encountered unknown child: %T", child)
385 }
386 return nil
387 }
388
389
390
391
392 func (rw *repoWriter) manifestExists(ctx context.Context, ref name.Reference, t Taggable) (bool, error) {
393 f := &fetcher{
394 target: ref.Context(),
395 client: rw.w.client,
396 }
397
398 m, err := taggableToManifest(t)
399 if err != nil {
400 return false, err
401 }
402
403 digest, err := m.Digest()
404 if err != nil {
405
406 return false, nil
407 }
408 got, err := f.headManifest(ctx, ref, allManifestMediaTypes)
409 if err != nil {
410 var terr *transport.Error
411 if errors.As(err, &terr) {
412 if terr.StatusCode == http.StatusNotFound {
413 return false, nil
414 }
415
416
417
418
419 if terr.StatusCode == http.StatusForbidden {
420 logs.Debug.Printf("manifestExists unexpected 403: %v", err)
421 return false, nil
422 }
423 }
424
425 return false, err
426 }
427
428 if digest != got.Digest {
429
430 rw.work.Do(got.Digest, nop)
431
432 return false, nil
433 }
434
435 if tag, ok := ref.(name.Tag); ok {
436 logs.Progress.Printf("existing manifest: %s@%s", tag.Identifier(), got.Digest)
437 } else {
438 logs.Progress.Print("existing manifest: ", got.Digest)
439 }
440
441 return true, nil
442 }
443
444 func (rw *repoWriter) commitManifest(ctx context.Context, ref name.Reference, m manifest) error {
445 if rw.o.progress != nil {
446 size, err := m.Size()
447 if err != nil {
448 return err
449 }
450 rw.o.progress.total(size)
451 }
452
453 return rw.w.commitManifest(ctx, m, ref)
454 }
455
456 func (rw *repoWriter) writeLayers(pctx context.Context, img v1.Image) error {
457 ls, err := img.Layers()
458 if err != nil {
459 return err
460 }
461
462 g, ctx := errgroup.WithContext(pctx)
463 g.SetLimit(rw.o.jobs)
464
465 for _, l := range ls {
466 l := l
467
468 g.Go(func() error {
469 return rw.writeLayer(ctx, l)
470 })
471 }
472
473 mt, err := img.MediaType()
474 if err != nil {
475 return err
476 }
477
478 if mt.IsSchema1() {
479 return g.Wait()
480 }
481
482 cl, err := partial.ConfigLayer(img)
483 if errors.Is(err, stream.ErrNotComputed) {
484 if err := g.Wait(); err != nil {
485 return err
486 }
487
488 cl, err := partial.ConfigLayer(img)
489 if err != nil {
490 return err
491 }
492
493 return rw.writeLayer(pctx, cl)
494 } else if err != nil {
495 return err
496 }
497
498 g.Go(func() error {
499 return rw.writeLayer(ctx, cl)
500 })
501
502 return g.Wait()
503 }
504
505 func (rw *repoWriter) writeLayer(ctx context.Context, l v1.Layer) error {
506
507 mt, err := l.MediaType()
508 if err != nil {
509 return err
510 }
511 if !mt.IsDistributable() && !rw.o.allowNondistributableArtifacts {
512 return nil
513 }
514
515 digest, err := l.Digest()
516 if err != nil {
517 if errors.Is(err, stream.ErrNotComputed) {
518 return rw.lazyWriteLayer(ctx, l)
519 }
520 return err
521 }
522
523 return rw.work.Do(digest, func() error {
524 if rw.o.progress != nil {
525 size, err := l.Size()
526 if err != nil {
527 return err
528 }
529 rw.o.progress.total(size)
530 }
531 return rw.w.uploadOne(ctx, l)
532 })
533 }
534
535 func (rw *repoWriter) lazyWriteLayer(ctx context.Context, l v1.Layer) error {
536 return rw.work.Stream(l, func() error {
537 if err := rw.w.uploadOne(ctx, l); err != nil {
538 return err
539 }
540
541
542 digest, err := l.Digest()
543 if err != nil {
544 return err
545 }
546
547 rw.work.Do(digest, nop)
548
549 if rw.o.progress != nil {
550 size, err := l.Size()
551 if err != nil {
552 return err
553 }
554 rw.o.progress.total(size)
555 }
556
557 return nil
558 })
559 }
560
View as plain text