...

Source file src/github.com/docker/distribution/registry/storage/linkedblobstore.go

Documentation: github.com/docker/distribution/registry/storage

     1  package storage
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"net/http"
     7  	"path"
     8  	"time"
     9  
    10  	"github.com/distribution/reference"
    11  	"github.com/docker/distribution"
    12  	dcontext "github.com/docker/distribution/context"
    13  	"github.com/docker/distribution/registry/storage/driver"
    14  	"github.com/docker/distribution/uuid"
    15  	"github.com/opencontainers/go-digest"
    16  )
    17  
    18  // linkPathFunc describes a function that can resolve a link based on the
    19  // repository name and digest.
    20  type linkPathFunc func(name string, dgst digest.Digest) (string, error)
    21  
    22  // linkedBlobStore provides a full BlobService that namespaces the blobs to a
    23  // given repository. Effectively, it manages the links in a given repository
    24  // that grant access to the global blob store.
    25  type linkedBlobStore struct {
    26  	*blobStore
    27  	registry               *registry
    28  	blobServer             distribution.BlobServer
    29  	blobAccessController   distribution.BlobDescriptorService
    30  	repository             distribution.Repository
    31  	ctx                    context.Context // only to be used where context can't come through method args
    32  	deleteEnabled          bool
    33  	resumableDigestEnabled bool
    34  
    35  	// linkPathFns specifies one or more path functions allowing one to
    36  	// control the repository blob link set to which the blob store
    37  	// dispatches. This is required because manifest and layer blobs have not
    38  	// yet been fully merged. At some point, this functionality should be
    39  	// removed the blob links folder should be merged. The first entry is
    40  	// treated as the "canonical" link location and will be used for writes.
    41  	linkPathFns []linkPathFunc
    42  
    43  	// linkDirectoryPathSpec locates the root directories in which one might find links
    44  	linkDirectoryPathSpec pathSpec
    45  }
    46  
    47  var _ distribution.BlobStore = &linkedBlobStore{}
    48  
    49  func (lbs *linkedBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
    50  	return lbs.blobAccessController.Stat(ctx, dgst)
    51  }
    52  
    53  func (lbs *linkedBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) {
    54  	canonical, err := lbs.Stat(ctx, dgst) // access check
    55  	if err != nil {
    56  		return nil, err
    57  	}
    58  
    59  	return lbs.blobStore.Get(ctx, canonical.Digest)
    60  }
    61  
    62  func (lbs *linkedBlobStore) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) {
    63  	canonical, err := lbs.Stat(ctx, dgst) // access check
    64  	if err != nil {
    65  		return nil, err
    66  	}
    67  
    68  	return lbs.blobStore.Open(ctx, canonical.Digest)
    69  }
    70  
    71  func (lbs *linkedBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error {
    72  	canonical, err := lbs.Stat(ctx, dgst) // access check
    73  	if err != nil {
    74  		return err
    75  	}
    76  
    77  	if canonical.MediaType != "" {
    78  		// Set the repository local content type.
    79  		w.Header().Set("Content-Type", canonical.MediaType)
    80  	}
    81  
    82  	return lbs.blobServer.ServeBlob(ctx, w, r, canonical.Digest)
    83  }
    84  
    85  func (lbs *linkedBlobStore) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) {
    86  	dgst := digest.FromBytes(p)
    87  	// Place the data in the blob store first.
    88  	desc, err := lbs.blobStore.Put(ctx, mediaType, p)
    89  	if err != nil {
    90  		dcontext.GetLogger(ctx).Errorf("error putting into main store: %v", err)
    91  		return distribution.Descriptor{}, err
    92  	}
    93  
    94  	if err := lbs.blobAccessController.SetDescriptor(ctx, dgst, desc); err != nil {
    95  		return distribution.Descriptor{}, err
    96  	}
    97  
    98  	// TODO(stevvooe): Write out mediatype if incoming differs from what is
    99  	// returned by Put above. Note that we should allow updates for a given
   100  	// repository.
   101  
   102  	return desc, lbs.linkBlob(ctx, desc)
   103  }
   104  
   105  type optionFunc func(interface{}) error
   106  
   107  func (f optionFunc) Apply(v interface{}) error {
   108  	return f(v)
   109  }
   110  
   111  // WithMountFrom returns a BlobCreateOption which designates that the blob should be
   112  // mounted from the given canonical reference.
   113  func WithMountFrom(ref reference.Canonical) distribution.BlobCreateOption {
   114  	return optionFunc(func(v interface{}) error {
   115  		opts, ok := v.(*distribution.CreateOptions)
   116  		if !ok {
   117  			return fmt.Errorf("unexpected options type: %T", v)
   118  		}
   119  
   120  		opts.Mount.ShouldMount = true
   121  		opts.Mount.From = ref
   122  
   123  		return nil
   124  	})
   125  }
   126  
   127  // Writer begins a blob write session, returning a handle.
   128  func (lbs *linkedBlobStore) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) {
   129  	dcontext.GetLogger(ctx).Debug("(*linkedBlobStore).Writer")
   130  
   131  	var opts distribution.CreateOptions
   132  
   133  	for _, option := range options {
   134  		err := option.Apply(&opts)
   135  		if err != nil {
   136  			return nil, err
   137  		}
   138  	}
   139  
   140  	if opts.Mount.ShouldMount {
   141  		desc, err := lbs.mount(ctx, opts.Mount.From, opts.Mount.From.Digest(), opts.Mount.Stat)
   142  		if err == nil {
   143  			// Mount successful, no need to initiate an upload session
   144  			return nil, distribution.ErrBlobMounted{From: opts.Mount.From, Descriptor: desc}
   145  		}
   146  	}
   147  
   148  	uuid := uuid.Generate().String()
   149  	startedAt := time.Now().UTC()
   150  
   151  	path, err := pathFor(uploadDataPathSpec{
   152  		name: lbs.repository.Named().Name(),
   153  		id:   uuid,
   154  	})
   155  
   156  	if err != nil {
   157  		return nil, err
   158  	}
   159  
   160  	startedAtPath, err := pathFor(uploadStartedAtPathSpec{
   161  		name: lbs.repository.Named().Name(),
   162  		id:   uuid,
   163  	})
   164  
   165  	if err != nil {
   166  		return nil, err
   167  	}
   168  
   169  	// Write a startedat file for this upload
   170  	if err := lbs.blobStore.driver.PutContent(ctx, startedAtPath, []byte(startedAt.Format(time.RFC3339))); err != nil {
   171  		return nil, err
   172  	}
   173  
   174  	return lbs.newBlobUpload(ctx, uuid, path, startedAt, false)
   175  }
   176  
   177  func (lbs *linkedBlobStore) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) {
   178  	dcontext.GetLogger(ctx).Debug("(*linkedBlobStore).Resume")
   179  
   180  	startedAtPath, err := pathFor(uploadStartedAtPathSpec{
   181  		name: lbs.repository.Named().Name(),
   182  		id:   id,
   183  	})
   184  
   185  	if err != nil {
   186  		return nil, err
   187  	}
   188  
   189  	startedAtBytes, err := lbs.blobStore.driver.GetContent(ctx, startedAtPath)
   190  	if err != nil {
   191  		switch err := err.(type) {
   192  		case driver.PathNotFoundError:
   193  			return nil, distribution.ErrBlobUploadUnknown
   194  		default:
   195  			return nil, err
   196  		}
   197  	}
   198  
   199  	startedAt, err := time.Parse(time.RFC3339, string(startedAtBytes))
   200  	if err != nil {
   201  		return nil, err
   202  	}
   203  
   204  	path, err := pathFor(uploadDataPathSpec{
   205  		name: lbs.repository.Named().Name(),
   206  		id:   id,
   207  	})
   208  
   209  	if err != nil {
   210  		return nil, err
   211  	}
   212  
   213  	return lbs.newBlobUpload(ctx, id, path, startedAt, true)
   214  }
   215  
   216  func (lbs *linkedBlobStore) Delete(ctx context.Context, dgst digest.Digest) error {
   217  	if !lbs.deleteEnabled {
   218  		return distribution.ErrUnsupported
   219  	}
   220  
   221  	// Ensure the blob is available for deletion
   222  	_, err := lbs.blobAccessController.Stat(ctx, dgst)
   223  	if err != nil {
   224  		return err
   225  	}
   226  
   227  	err = lbs.blobAccessController.Clear(ctx, dgst)
   228  	if err != nil {
   229  		return err
   230  	}
   231  
   232  	return nil
   233  }
   234  
   235  func (lbs *linkedBlobStore) Enumerate(ctx context.Context, ingestor func(digest.Digest) error) error {
   236  	rootPath, err := pathFor(lbs.linkDirectoryPathSpec)
   237  	if err != nil {
   238  		return err
   239  	}
   240  	return lbs.driver.Walk(ctx, rootPath, func(fileInfo driver.FileInfo) error {
   241  		// exit early if directory...
   242  		if fileInfo.IsDir() {
   243  			return nil
   244  		}
   245  		filePath := fileInfo.Path()
   246  
   247  		// check if it's a link
   248  		_, fileName := path.Split(filePath)
   249  		if fileName != "link" {
   250  			return nil
   251  		}
   252  
   253  		// read the digest found in link
   254  		digest, err := lbs.blobStore.readlink(ctx, filePath)
   255  		if err != nil {
   256  			return err
   257  		}
   258  
   259  		// ensure this conforms to the linkPathFns
   260  		_, err = lbs.Stat(ctx, digest)
   261  		if err != nil {
   262  			// we expect this error to occur so we move on
   263  			if err == distribution.ErrBlobUnknown {
   264  				return nil
   265  			}
   266  			return err
   267  		}
   268  
   269  		err = ingestor(digest)
   270  		if err != nil {
   271  			return err
   272  		}
   273  
   274  		return nil
   275  	})
   276  }
   277  
   278  func (lbs *linkedBlobStore) mount(ctx context.Context, sourceRepo reference.Named, dgst digest.Digest, sourceStat *distribution.Descriptor) (distribution.Descriptor, error) {
   279  	var stat distribution.Descriptor
   280  	if sourceStat == nil {
   281  		// look up the blob info from the sourceRepo if not already provided
   282  		repo, err := lbs.registry.Repository(ctx, sourceRepo)
   283  		if err != nil {
   284  			return distribution.Descriptor{}, err
   285  		}
   286  		stat, err = repo.Blobs(ctx).Stat(ctx, dgst)
   287  		if err != nil {
   288  			return distribution.Descriptor{}, err
   289  		}
   290  	} else {
   291  		// use the provided blob info
   292  		stat = *sourceStat
   293  	}
   294  
   295  	desc := distribution.Descriptor{
   296  		Size: stat.Size,
   297  
   298  		// NOTE(stevvooe): The central blob store firewalls media types from
   299  		// other users. The caller should look this up and override the value
   300  		// for the specific repository.
   301  		MediaType: "application/octet-stream",
   302  		Digest:    dgst,
   303  	}
   304  	return desc, lbs.linkBlob(ctx, desc)
   305  }
   306  
   307  // newBlobUpload allocates a new upload controller with the given state.
   308  func (lbs *linkedBlobStore) newBlobUpload(ctx context.Context, uuid, path string, startedAt time.Time, append bool) (distribution.BlobWriter, error) {
   309  	fw, err := lbs.driver.Writer(ctx, path, append)
   310  	if err != nil {
   311  		return nil, err
   312  	}
   313  
   314  	bw := &blobWriter{
   315  		ctx:                    ctx,
   316  		blobStore:              lbs,
   317  		id:                     uuid,
   318  		startedAt:              startedAt,
   319  		digester:               digest.Canonical.Digester(),
   320  		fileWriter:             fw,
   321  		driver:                 lbs.driver,
   322  		path:                   path,
   323  		resumableDigestEnabled: lbs.resumableDigestEnabled,
   324  	}
   325  
   326  	return bw, nil
   327  }
   328  
   329  // linkBlob links a valid, written blob into the registry under the named
   330  // repository for the upload controller.
   331  func (lbs *linkedBlobStore) linkBlob(ctx context.Context, canonical distribution.Descriptor, aliases ...digest.Digest) error {
   332  	dgsts := append([]digest.Digest{canonical.Digest}, aliases...)
   333  
   334  	// TODO(stevvooe): Need to write out mediatype for only canonical hash
   335  	// since we don't care about the aliases. They are generally unused except
   336  	// for tarsum but those versions don't care about mediatype.
   337  
   338  	// Don't make duplicate links.
   339  	seenDigests := make(map[digest.Digest]struct{}, len(dgsts))
   340  
   341  	// only use the first link
   342  	linkPathFn := lbs.linkPathFns[0]
   343  
   344  	for _, dgst := range dgsts {
   345  		if _, seen := seenDigests[dgst]; seen {
   346  			continue
   347  		}
   348  		seenDigests[dgst] = struct{}{}
   349  
   350  		blobLinkPath, err := linkPathFn(lbs.repository.Named().Name(), dgst)
   351  		if err != nil {
   352  			return err
   353  		}
   354  
   355  		if err := lbs.blobStore.link(ctx, blobLinkPath, canonical.Digest); err != nil {
   356  			return err
   357  		}
   358  	}
   359  
   360  	return nil
   361  }
   362  
   363  type linkedBlobStatter struct {
   364  	*blobStore
   365  	repository distribution.Repository
   366  
   367  	// linkPathFns specifies one or more path functions allowing one to
   368  	// control the repository blob link set to which the blob store
   369  	// dispatches. This is required because manifest and layer blobs have not
   370  	// yet been fully merged. At some point, this functionality should be
   371  	// removed an the blob links folder should be merged. The first entry is
   372  	// treated as the "canonical" link location and will be used for writes.
   373  	linkPathFns []linkPathFunc
   374  }
   375  
   376  var _ distribution.BlobDescriptorService = &linkedBlobStatter{}
   377  
   378  func (lbs *linkedBlobStatter) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
   379  	var (
   380  		found  bool
   381  		target digest.Digest
   382  	)
   383  
   384  	// try the many link path functions until we get success or an error that
   385  	// is not PathNotFoundError.
   386  	for _, linkPathFn := range lbs.linkPathFns {
   387  		var err error
   388  		target, err = lbs.resolveWithLinkFunc(ctx, dgst, linkPathFn)
   389  
   390  		if err == nil {
   391  			found = true
   392  			break // success!
   393  		}
   394  
   395  		switch err := err.(type) {
   396  		case driver.PathNotFoundError:
   397  			// do nothing, just move to the next linkPathFn
   398  		default:
   399  			return distribution.Descriptor{}, err
   400  		}
   401  	}
   402  
   403  	if !found {
   404  		return distribution.Descriptor{}, distribution.ErrBlobUnknown
   405  	}
   406  
   407  	if target != dgst {
   408  		// Track when we are doing cross-digest domain lookups. ie, sha512 to sha256.
   409  		dcontext.GetLogger(ctx).Warnf("looking up blob with canonical target: %v -> %v", dgst, target)
   410  	}
   411  
   412  	// TODO(stevvooe): Look up repository local mediatype and replace that on
   413  	// the returned descriptor.
   414  
   415  	return lbs.blobStore.statter.Stat(ctx, target)
   416  }
   417  
   418  func (lbs *linkedBlobStatter) Clear(ctx context.Context, dgst digest.Digest) (err error) {
   419  	// clear any possible existence of a link described in linkPathFns
   420  	for _, linkPathFn := range lbs.linkPathFns {
   421  		blobLinkPath, err := linkPathFn(lbs.repository.Named().Name(), dgst)
   422  		if err != nil {
   423  			return err
   424  		}
   425  
   426  		err = lbs.blobStore.driver.Delete(ctx, blobLinkPath)
   427  		if err != nil {
   428  			switch err := err.(type) {
   429  			case driver.PathNotFoundError:
   430  				continue // just ignore this error and continue
   431  			default:
   432  				return err
   433  			}
   434  		}
   435  	}
   436  
   437  	return nil
   438  }
   439  
   440  // resolveTargetWithFunc allows us to read a link to a resource with different
   441  // linkPathFuncs to let us try a few different paths before returning not
   442  // found.
   443  func (lbs *linkedBlobStatter) resolveWithLinkFunc(ctx context.Context, dgst digest.Digest, linkPathFn linkPathFunc) (digest.Digest, error) {
   444  	blobLinkPath, err := linkPathFn(lbs.repository.Named().Name(), dgst)
   445  	if err != nil {
   446  		return "", err
   447  	}
   448  
   449  	return lbs.blobStore.readlink(ctx, blobLinkPath)
   450  }
   451  
   452  func (lbs *linkedBlobStatter) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error {
   453  	// The canonical descriptor for a blob is set at the commit phase of upload
   454  	return nil
   455  }
   456  
   457  // blobLinkPath provides the path to the blob link, also known as layers.
   458  func blobLinkPath(name string, dgst digest.Digest) (string, error) {
   459  	return pathFor(layerLinkPathSpec{name: name, digest: dgst})
   460  }
   461  
   462  // manifestRevisionLinkPath provides the path to the manifest revision link.
   463  func manifestRevisionLinkPath(name string, dgst digest.Digest) (string, error) {
   464  	return pathFor(manifestRevisionLinkPathSpec{name: name, revision: dgst})
   465  }
   466  

View as plain text