...

Source file src/github.com/google/go-containerregistry/pkg/v1/remote/pusher.go

Documentation: github.com/google/go-containerregistry/pkg/v1/remote

     1  // Copyright 2023 Google LLC All Rights Reserved.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //    http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package remote
    16  
    17  import (
    18  	"bytes"
    19  	"context"
    20  	"errors"
    21  	"fmt"
    22  	"net/http"
    23  	"net/url"
    24  	"sync"
    25  
    26  	"github.com/google/go-containerregistry/pkg/logs"
    27  	"github.com/google/go-containerregistry/pkg/name"
    28  	v1 "github.com/google/go-containerregistry/pkg/v1"
    29  	"github.com/google/go-containerregistry/pkg/v1/partial"
    30  	"github.com/google/go-containerregistry/pkg/v1/remote/transport"
    31  	"github.com/google/go-containerregistry/pkg/v1/stream"
    32  	"github.com/google/go-containerregistry/pkg/v1/types"
    33  	"golang.org/x/sync/errgroup"
    34  )
    35  
    36  type manifest interface {
    37  	Taggable
    38  	partial.Describable
    39  }
    40  
    41  // key is either v1.Hash or v1.Layer (for stream.Layer)
    42  type workers struct {
    43  	// map[v1.Hash|v1.Layer]*sync.Once
    44  	onces sync.Map
    45  
    46  	// map[v1.Hash|v1.Layer]error
    47  	errors sync.Map
    48  }
    49  
    50  func nop() error {
    51  	return nil
    52  }
    53  
    54  func (w *workers) err(digest v1.Hash) error {
    55  	v, ok := w.errors.Load(digest)
    56  	if !ok || v == nil {
    57  		return nil
    58  	}
    59  	return v.(error)
    60  }
    61  
    62  func (w *workers) Do(digest v1.Hash, f func() error) error {
    63  	// We don't care if it was loaded or not because the sync.Once will do it for us.
    64  	once, _ := w.onces.LoadOrStore(digest, &sync.Once{})
    65  
    66  	once.(*sync.Once).Do(func() {
    67  		w.errors.Store(digest, f())
    68  	})
    69  
    70  	err := w.err(digest)
    71  	if err != nil {
    72  		// Allow this to be retried by another caller.
    73  		w.onces.Delete(digest)
    74  	}
    75  	return err
    76  }
    77  
    78  func (w *workers) Stream(layer v1.Layer, f func() error) error {
    79  	// We don't care if it was loaded or not because the sync.Once will do it for us.
    80  	once, _ := w.onces.LoadOrStore(layer, &sync.Once{})
    81  
    82  	once.(*sync.Once).Do(func() {
    83  		w.errors.Store(layer, f())
    84  	})
    85  
    86  	v, ok := w.errors.Load(layer)
    87  	if !ok || v == nil {
    88  		return nil
    89  	}
    90  
    91  	return v.(error)
    92  }
    93  
    94  type Pusher struct {
    95  	o *options
    96  
    97  	// map[name.Repository]*repoWriter
    98  	writers sync.Map
    99  }
   100  
   101  func NewPusher(options ...Option) (*Pusher, error) {
   102  	o, err := makeOptions(options...)
   103  	if err != nil {
   104  		return nil, err
   105  	}
   106  
   107  	return newPusher(o), nil
   108  }
   109  
   110  func newPusher(o *options) *Pusher {
   111  	if o.pusher != nil {
   112  		return o.pusher
   113  	}
   114  	return &Pusher{
   115  		o: o,
   116  	}
   117  }
   118  
   119  func (p *Pusher) writer(ctx context.Context, repo name.Repository, o *options) (*repoWriter, error) {
   120  	v, _ := p.writers.LoadOrStore(repo, &repoWriter{
   121  		repo: repo,
   122  		o:    o,
   123  	})
   124  	rw := v.(*repoWriter)
   125  	return rw, rw.init(ctx)
   126  }
   127  
   128  func (p *Pusher) Push(ctx context.Context, ref name.Reference, t Taggable) error {
   129  	w, err := p.writer(ctx, ref.Context(), p.o)
   130  	if err != nil {
   131  		return err
   132  	}
   133  	return w.writeManifest(ctx, ref, t)
   134  }
   135  
   136  func (p *Pusher) Upload(ctx context.Context, repo name.Repository, l v1.Layer) error {
   137  	w, err := p.writer(ctx, repo, p.o)
   138  	if err != nil {
   139  		return err
   140  	}
   141  	return w.writeLayer(ctx, l)
   142  }
   143  
   144  func (p *Pusher) Delete(ctx context.Context, ref name.Reference) error {
   145  	w, err := p.writer(ctx, ref.Context(), p.o)
   146  	if err != nil {
   147  		return err
   148  	}
   149  
   150  	u := url.URL{
   151  		Scheme: ref.Context().Registry.Scheme(),
   152  		Host:   ref.Context().RegistryStr(),
   153  		Path:   fmt.Sprintf("/v2/%s/manifests/%s", ref.Context().RepositoryStr(), ref.Identifier()),
   154  	}
   155  
   156  	req, err := http.NewRequest(http.MethodDelete, u.String(), nil)
   157  	if err != nil {
   158  		return err
   159  	}
   160  
   161  	resp, err := w.w.client.Do(req.WithContext(ctx))
   162  	if err != nil {
   163  		return err
   164  	}
   165  	defer resp.Body.Close()
   166  
   167  	return transport.CheckError(resp, http.StatusOK, http.StatusAccepted)
   168  
   169  	// TODO(jason): If the manifest had a `subject`, and if the registry
   170  	// doesn't support Referrers, update the index pointed to by the
   171  	// subject's fallback tag to remove the descriptor for this manifest.
   172  }
   173  
   174  type repoWriter struct {
   175  	repo name.Repository
   176  	o    *options
   177  	once sync.Once
   178  
   179  	w   *writer
   180  	err error
   181  
   182  	work *workers
   183  }
   184  
   185  // this will run once per repoWriter instance
   186  func (rw *repoWriter) init(ctx context.Context) error {
   187  	rw.once.Do(func() {
   188  		rw.work = &workers{}
   189  		rw.w, rw.err = makeWriter(ctx, rw.repo, nil, rw.o)
   190  	})
   191  	return rw.err
   192  }
   193  
   194  func (rw *repoWriter) writeDeps(ctx context.Context, m manifest) error {
   195  	if img, ok := m.(v1.Image); ok {
   196  		return rw.writeLayers(ctx, img)
   197  	}
   198  
   199  	if idx, ok := m.(v1.ImageIndex); ok {
   200  		return rw.writeChildren(ctx, idx)
   201  	}
   202  
   203  	// This has no deps, not an error (e.g. something you want to just PUT).
   204  	return nil
   205  }
   206  
   207  type describable struct {
   208  	desc v1.Descriptor
   209  }
   210  
   211  func (d describable) Digest() (v1.Hash, error) {
   212  	return d.desc.Digest, nil
   213  }
   214  
   215  func (d describable) Size() (int64, error) {
   216  	return d.desc.Size, nil
   217  }
   218  
   219  func (d describable) MediaType() (types.MediaType, error) {
   220  	return d.desc.MediaType, nil
   221  }
   222  
   223  type tagManifest struct {
   224  	Taggable
   225  	partial.Describable
   226  }
   227  
   228  func taggableToManifest(t Taggable) (manifest, error) {
   229  	if m, ok := t.(manifest); ok {
   230  		return m, nil
   231  	}
   232  
   233  	if d, ok := t.(*Descriptor); ok {
   234  		if d.MediaType.IsIndex() {
   235  			return d.ImageIndex()
   236  		}
   237  
   238  		if d.MediaType.IsImage() {
   239  			return d.Image()
   240  		}
   241  
   242  		if d.MediaType.IsSchema1() {
   243  			return d.Schema1()
   244  		}
   245  
   246  		return tagManifest{t, describable{d.toDesc()}}, nil
   247  	}
   248  
   249  	desc := v1.Descriptor{
   250  		// A reasonable default if Taggable doesn't implement MediaType.
   251  		MediaType: types.DockerManifestSchema2,
   252  	}
   253  
   254  	b, err := t.RawManifest()
   255  	if err != nil {
   256  		return nil, err
   257  	}
   258  
   259  	if wmt, ok := t.(withMediaType); ok {
   260  		desc.MediaType, err = wmt.MediaType()
   261  		if err != nil {
   262  			return nil, err
   263  		}
   264  	}
   265  
   266  	desc.Digest, desc.Size, err = v1.SHA256(bytes.NewReader(b))
   267  	if err != nil {
   268  		return nil, err
   269  	}
   270  
   271  	return tagManifest{t, describable{desc}}, nil
   272  }
   273  
   274  func (rw *repoWriter) writeManifest(ctx context.Context, ref name.Reference, t Taggable) error {
   275  	m, err := taggableToManifest(t)
   276  	if err != nil {
   277  		return err
   278  	}
   279  
   280  	needDeps := true
   281  
   282  	digest, err := m.Digest()
   283  	if errors.Is(err, stream.ErrNotComputed) {
   284  		if err := rw.writeDeps(ctx, m); err != nil {
   285  			return err
   286  		}
   287  
   288  		needDeps = false
   289  
   290  		digest, err = m.Digest()
   291  		if err != nil {
   292  			return err
   293  		}
   294  	} else if err != nil {
   295  		return err
   296  	}
   297  
   298  	// This may be a lazy child where we have no ref until digest is computed.
   299  	if ref == nil {
   300  		ref = rw.repo.Digest(digest.String())
   301  	}
   302  
   303  	// For tags, we want to do this check outside of our Work.Do closure because
   304  	// we don't want to dedupe based on the manifest digest.
   305  	_, byTag := ref.(name.Tag)
   306  	if byTag {
   307  		if exists, err := rw.manifestExists(ctx, ref, t); err != nil {
   308  			return err
   309  		} else if exists {
   310  			return nil
   311  		}
   312  	}
   313  
   314  	// The following work.Do will get deduped by digest, so it won't happen unless
   315  	// this tag happens to be the first commitManifest to run for that digest.
   316  	needPut := byTag
   317  
   318  	if err := rw.work.Do(digest, func() error {
   319  		if !byTag {
   320  			if exists, err := rw.manifestExists(ctx, ref, t); err != nil {
   321  				return err
   322  			} else if exists {
   323  				return nil
   324  			}
   325  		}
   326  
   327  		if needDeps {
   328  			if err := rw.writeDeps(ctx, m); err != nil {
   329  				return err
   330  			}
   331  		}
   332  
   333  		needPut = false
   334  		return rw.commitManifest(ctx, ref, m)
   335  	}); err != nil {
   336  		return err
   337  	}
   338  
   339  	if !needPut {
   340  		return nil
   341  	}
   342  
   343  	// Only runs for tags that got deduped by digest.
   344  	return rw.commitManifest(ctx, ref, m)
   345  }
   346  
   347  func (rw *repoWriter) writeChildren(ctx context.Context, idx v1.ImageIndex) error {
   348  	children, err := partial.Manifests(idx)
   349  	if err != nil {
   350  		return err
   351  	}
   352  
   353  	g, ctx := errgroup.WithContext(ctx)
   354  	g.SetLimit(rw.o.jobs)
   355  
   356  	for _, child := range children {
   357  		child := child
   358  		if err := rw.writeChild(ctx, child, g); err != nil {
   359  			return err
   360  		}
   361  	}
   362  
   363  	return g.Wait()
   364  }
   365  
   366  func (rw *repoWriter) writeChild(ctx context.Context, child partial.Describable, g *errgroup.Group) error {
   367  	switch child := child.(type) {
   368  	case v1.ImageIndex:
   369  		// For recursive index, we want to do a depth-first launching of goroutines
   370  		// to avoid deadlocking.
   371  		//
   372  		// Note that this is rare, so the impact of this should be really small.
   373  		return rw.writeManifest(ctx, nil, child)
   374  	case v1.Image:
   375  		g.Go(func() error {
   376  			return rw.writeManifest(ctx, nil, child)
   377  		})
   378  	case v1.Layer:
   379  		g.Go(func() error {
   380  			return rw.writeLayer(ctx, child)
   381  		})
   382  	default:
   383  		// This can't happen.
   384  		return fmt.Errorf("encountered unknown child: %T", child)
   385  	}
   386  	return nil
   387  }
   388  
   389  // TODO: Consider caching some representation of the tags/digests in the destination
   390  // repository as a hint to avoid this optimistic check in cases where we will most
   391  // likely have to do a PUT anyway, e.g. if we are overwriting a tag we just wrote.
   392  func (rw *repoWriter) manifestExists(ctx context.Context, ref name.Reference, t Taggable) (bool, error) {
   393  	f := &fetcher{
   394  		target: ref.Context(),
   395  		client: rw.w.client,
   396  	}
   397  
   398  	m, err := taggableToManifest(t)
   399  	if err != nil {
   400  		return false, err
   401  	}
   402  
   403  	digest, err := m.Digest()
   404  	if err != nil {
   405  		// Possibly due to streaming layers.
   406  		return false, nil
   407  	}
   408  	got, err := f.headManifest(ctx, ref, allManifestMediaTypes)
   409  	if err != nil {
   410  		var terr *transport.Error
   411  		if errors.As(err, &terr) {
   412  			if terr.StatusCode == http.StatusNotFound {
   413  				return false, nil
   414  			}
   415  
   416  			// We treat a 403 here as non-fatal because this existence check is an optimization and
   417  			// some registries will return a 403 instead of a 404 in certain situations.
   418  			// E.g. https://jfrog.atlassian.net/browse/RTFACT-13797
   419  			if terr.StatusCode == http.StatusForbidden {
   420  				logs.Debug.Printf("manifestExists unexpected 403: %v", err)
   421  				return false, nil
   422  			}
   423  		}
   424  
   425  		return false, err
   426  	}
   427  
   428  	if digest != got.Digest {
   429  		// Mark that we saw this digest in the registry so we don't have to check it again.
   430  		rw.work.Do(got.Digest, nop)
   431  
   432  		return false, nil
   433  	}
   434  
   435  	if tag, ok := ref.(name.Tag); ok {
   436  		logs.Progress.Printf("existing manifest: %s@%s", tag.Identifier(), got.Digest)
   437  	} else {
   438  		logs.Progress.Print("existing manifest: ", got.Digest)
   439  	}
   440  
   441  	return true, nil
   442  }
   443  
   444  func (rw *repoWriter) commitManifest(ctx context.Context, ref name.Reference, m manifest) error {
   445  	if rw.o.progress != nil {
   446  		size, err := m.Size()
   447  		if err != nil {
   448  			return err
   449  		}
   450  		rw.o.progress.total(size)
   451  	}
   452  
   453  	return rw.w.commitManifest(ctx, m, ref)
   454  }
   455  
   456  func (rw *repoWriter) writeLayers(pctx context.Context, img v1.Image) error {
   457  	ls, err := img.Layers()
   458  	if err != nil {
   459  		return err
   460  	}
   461  
   462  	g, ctx := errgroup.WithContext(pctx)
   463  	g.SetLimit(rw.o.jobs)
   464  
   465  	for _, l := range ls {
   466  		l := l
   467  
   468  		g.Go(func() error {
   469  			return rw.writeLayer(ctx, l)
   470  		})
   471  	}
   472  
   473  	mt, err := img.MediaType()
   474  	if err != nil {
   475  		return err
   476  	}
   477  
   478  	if mt.IsSchema1() {
   479  		return g.Wait()
   480  	}
   481  
   482  	cl, err := partial.ConfigLayer(img)
   483  	if errors.Is(err, stream.ErrNotComputed) {
   484  		if err := g.Wait(); err != nil {
   485  			return err
   486  		}
   487  
   488  		cl, err := partial.ConfigLayer(img)
   489  		if err != nil {
   490  			return err
   491  		}
   492  
   493  		return rw.writeLayer(pctx, cl)
   494  	} else if err != nil {
   495  		return err
   496  	}
   497  
   498  	g.Go(func() error {
   499  		return rw.writeLayer(ctx, cl)
   500  	})
   501  
   502  	return g.Wait()
   503  }
   504  
   505  func (rw *repoWriter) writeLayer(ctx context.Context, l v1.Layer) error {
   506  	// Skip any non-distributable things.
   507  	mt, err := l.MediaType()
   508  	if err != nil {
   509  		return err
   510  	}
   511  	if !mt.IsDistributable() && !rw.o.allowNondistributableArtifacts {
   512  		return nil
   513  	}
   514  
   515  	digest, err := l.Digest()
   516  	if err != nil {
   517  		if errors.Is(err, stream.ErrNotComputed) {
   518  			return rw.lazyWriteLayer(ctx, l)
   519  		}
   520  		return err
   521  	}
   522  
   523  	return rw.work.Do(digest, func() error {
   524  		if rw.o.progress != nil {
   525  			size, err := l.Size()
   526  			if err != nil {
   527  				return err
   528  			}
   529  			rw.o.progress.total(size)
   530  		}
   531  		return rw.w.uploadOne(ctx, l)
   532  	})
   533  }
   534  
   535  func (rw *repoWriter) lazyWriteLayer(ctx context.Context, l v1.Layer) error {
   536  	return rw.work.Stream(l, func() error {
   537  		if err := rw.w.uploadOne(ctx, l); err != nil {
   538  			return err
   539  		}
   540  
   541  		// Mark this upload completed.
   542  		digest, err := l.Digest()
   543  		if err != nil {
   544  			return err
   545  		}
   546  
   547  		rw.work.Do(digest, nop)
   548  
   549  		if rw.o.progress != nil {
   550  			size, err := l.Size()
   551  			if err != nil {
   552  				return err
   553  			}
   554  			rw.o.progress.total(size)
   555  		}
   556  
   557  		return nil
   558  	})
   559  }
   560  

View as plain text