...

Source file src/github.com/docker/distribution/registry/storage/blobwriter.go

Documentation: github.com/docker/distribution/registry/storage

     1  package storage
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  	"io"
     8  	"path"
     9  	"time"
    10  
    11  	"github.com/docker/distribution"
    12  	dcontext "github.com/docker/distribution/context"
    13  	storagedriver "github.com/docker/distribution/registry/storage/driver"
    14  	"github.com/opencontainers/go-digest"
    15  	"github.com/sirupsen/logrus"
    16  )
    17  
    18  var (
    19  	errResumableDigestNotAvailable = errors.New("resumable digest not available")
    20  )
    21  
    22  const (
    23  	// digestSha256Empty is the canonical sha256 digest of empty data
    24  	digestSha256Empty = "sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
    25  )
    26  
    27  // blobWriter is used to control the various aspects of resumable
    28  // blob upload.
    29  type blobWriter struct {
    30  	ctx       context.Context
    31  	blobStore *linkedBlobStore
    32  
    33  	id        string
    34  	startedAt time.Time
    35  	digester  digest.Digester
    36  	written   int64 // track the write to digester
    37  
    38  	fileWriter storagedriver.FileWriter
    39  	driver     storagedriver.StorageDriver
    40  	path       string
    41  
    42  	resumableDigestEnabled bool
    43  	committed              bool
    44  }
    45  
    46  var _ distribution.BlobWriter = &blobWriter{}
    47  
    48  // ID returns the identifier for this upload.
    49  func (bw *blobWriter) ID() string {
    50  	return bw.id
    51  }
    52  
    53  func (bw *blobWriter) StartedAt() time.Time {
    54  	return bw.startedAt
    55  }
    56  
    57  // Commit marks the upload as completed, returning a valid descriptor. The
    58  // final size and digest are checked against the first descriptor provided.
    59  func (bw *blobWriter) Commit(ctx context.Context, desc distribution.Descriptor) (distribution.Descriptor, error) {
    60  	dcontext.GetLogger(ctx).Debug("(*blobWriter).Commit")
    61  
    62  	if err := bw.fileWriter.Commit(); err != nil {
    63  		return distribution.Descriptor{}, err
    64  	}
    65  
    66  	bw.Close()
    67  	desc.Size = bw.Size()
    68  
    69  	canonical, err := bw.validateBlob(ctx, desc)
    70  	if err != nil {
    71  		return distribution.Descriptor{}, err
    72  	}
    73  
    74  	if err := bw.moveBlob(ctx, canonical); err != nil {
    75  		return distribution.Descriptor{}, err
    76  	}
    77  
    78  	if err := bw.blobStore.linkBlob(ctx, canonical, desc.Digest); err != nil {
    79  		return distribution.Descriptor{}, err
    80  	}
    81  
    82  	if err := bw.removeResources(ctx); err != nil {
    83  		return distribution.Descriptor{}, err
    84  	}
    85  
    86  	err = bw.blobStore.blobAccessController.SetDescriptor(ctx, canonical.Digest, canonical)
    87  	if err != nil {
    88  		return distribution.Descriptor{}, err
    89  	}
    90  
    91  	bw.committed = true
    92  	return canonical, nil
    93  }
    94  
    95  // Cancel the blob upload process, releasing any resources associated with
    96  // the writer and canceling the operation.
    97  func (bw *blobWriter) Cancel(ctx context.Context) error {
    98  	dcontext.GetLogger(ctx).Debug("(*blobWriter).Cancel")
    99  	if err := bw.fileWriter.Cancel(); err != nil {
   100  		return err
   101  	}
   102  
   103  	if err := bw.Close(); err != nil {
   104  		dcontext.GetLogger(ctx).Errorf("error closing blobwriter: %s", err)
   105  	}
   106  
   107  	return bw.removeResources(ctx)
   108  }
   109  
   110  func (bw *blobWriter) Size() int64 {
   111  	return bw.fileWriter.Size()
   112  }
   113  
   114  func (bw *blobWriter) Write(p []byte) (int, error) {
   115  	// Ensure that the current write offset matches how many bytes have been
   116  	// written to the digester. If not, we need to update the digest state to
   117  	// match the current write position.
   118  	if err := bw.resumeDigest(bw.blobStore.ctx); err != nil && err != errResumableDigestNotAvailable {
   119  		return 0, err
   120  	}
   121  
   122  	_, err := bw.fileWriter.Write(p)
   123  	if err != nil {
   124  		return 0, err
   125  	}
   126  
   127  	n, err := bw.digester.Hash().Write(p)
   128  	bw.written += int64(n)
   129  
   130  	return n, err
   131  }
   132  
   133  func (bw *blobWriter) ReadFrom(r io.Reader) (n int64, err error) {
   134  	// Ensure that the current write offset matches how many bytes have been
   135  	// written to the digester. If not, we need to update the digest state to
   136  	// match the current write position.
   137  	if err := bw.resumeDigest(bw.blobStore.ctx); err != nil && err != errResumableDigestNotAvailable {
   138  		return 0, err
   139  	}
   140  
   141  	// Using a TeeReader instead of MultiWriter ensures Copy returns
   142  	// the amount written to the digester as well as ensuring that we
   143  	// write to the fileWriter first
   144  	tee := io.TeeReader(r, bw.fileWriter)
   145  	nn, err := io.Copy(bw.digester.Hash(), tee)
   146  	bw.written += nn
   147  
   148  	return nn, err
   149  }
   150  
   151  func (bw *blobWriter) Close() error {
   152  	if bw.committed {
   153  		return errors.New("blobwriter close after commit")
   154  	}
   155  
   156  	if err := bw.storeHashState(bw.blobStore.ctx); err != nil && err != errResumableDigestNotAvailable {
   157  		return err
   158  	}
   159  
   160  	return bw.fileWriter.Close()
   161  }
   162  
   163  // validateBlob checks the data against the digest, returning an error if it
   164  // does not match. The canonical descriptor is returned.
   165  func (bw *blobWriter) validateBlob(ctx context.Context, desc distribution.Descriptor) (distribution.Descriptor, error) {
   166  	var (
   167  		verified, fullHash bool
   168  		canonical          digest.Digest
   169  	)
   170  
   171  	if desc.Digest == "" {
   172  		// if no descriptors are provided, we have nothing to validate
   173  		// against. We don't really want to support this for the registry.
   174  		return distribution.Descriptor{}, distribution.ErrBlobInvalidDigest{
   175  			Reason: fmt.Errorf("cannot validate against empty digest"),
   176  		}
   177  	}
   178  
   179  	var size int64
   180  
   181  	// Stat the on disk file
   182  	if fi, err := bw.driver.Stat(ctx, bw.path); err != nil {
   183  		switch err := err.(type) {
   184  		case storagedriver.PathNotFoundError:
   185  			// NOTE(stevvooe): We really don't care if the file is
   186  			// not actually present for the reader. We now assume
   187  			// that the desc length is zero.
   188  			desc.Size = 0
   189  		default:
   190  			// Any other error we want propagated up the stack.
   191  			return distribution.Descriptor{}, err
   192  		}
   193  	} else {
   194  		if fi.IsDir() {
   195  			return distribution.Descriptor{}, fmt.Errorf("unexpected directory at upload location %q", bw.path)
   196  		}
   197  
   198  		size = fi.Size()
   199  	}
   200  
   201  	if desc.Size > 0 {
   202  		if desc.Size != size {
   203  			return distribution.Descriptor{}, distribution.ErrBlobInvalidLength
   204  		}
   205  	} else {
   206  		// if provided 0 or negative length, we can assume caller doesn't know or
   207  		// care about length.
   208  		desc.Size = size
   209  	}
   210  
   211  	// TODO(stevvooe): This section is very meandering. Need to be broken down
   212  	// to be a lot more clear.
   213  
   214  	if err := bw.resumeDigest(ctx); err == nil {
   215  		canonical = bw.digester.Digest()
   216  
   217  		if canonical.Algorithm() == desc.Digest.Algorithm() {
   218  			// Common case: client and server prefer the same canonical digest
   219  			// algorithm - currently SHA256.
   220  			verified = desc.Digest == canonical
   221  		} else {
   222  			// The client wants to use a different digest algorithm. They'll just
   223  			// have to be patient and wait for us to download and re-hash the
   224  			// uploaded content using that digest algorithm.
   225  			fullHash = true
   226  		}
   227  	} else if err == errResumableDigestNotAvailable {
   228  		// Not using resumable digests, so we need to hash the entire layer.
   229  		fullHash = true
   230  	} else {
   231  		return distribution.Descriptor{}, err
   232  	}
   233  
   234  	if fullHash {
   235  		// a fantastic optimization: if the the written data and the size are
   236  		// the same, we don't need to read the data from the backend. This is
   237  		// because we've written the entire file in the lifecycle of the
   238  		// current instance.
   239  		if bw.written == size && digest.Canonical == desc.Digest.Algorithm() {
   240  			canonical = bw.digester.Digest()
   241  			verified = desc.Digest == canonical
   242  		}
   243  
   244  		// If the check based on size fails, we fall back to the slowest of
   245  		// paths. We may be able to make the size-based check a stronger
   246  		// guarantee, so this may be defensive.
   247  		if !verified {
   248  			digester := digest.Canonical.Digester()
   249  			verifier := desc.Digest.Verifier()
   250  
   251  			// Read the file from the backend driver and validate it.
   252  			fr, err := newFileReader(ctx, bw.driver, bw.path, desc.Size)
   253  			if err != nil {
   254  				return distribution.Descriptor{}, err
   255  			}
   256  			defer fr.Close()
   257  
   258  			tr := io.TeeReader(fr, digester.Hash())
   259  
   260  			if _, err := io.Copy(verifier, tr); err != nil {
   261  				return distribution.Descriptor{}, err
   262  			}
   263  
   264  			canonical = digester.Digest()
   265  			verified = verifier.Verified()
   266  		}
   267  	}
   268  
   269  	if !verified {
   270  		dcontext.GetLoggerWithFields(ctx,
   271  			map[interface{}]interface{}{
   272  				"canonical": canonical,
   273  				"provided":  desc.Digest,
   274  			}, "canonical", "provided").
   275  			Errorf("canonical digest does match provided digest")
   276  		return distribution.Descriptor{}, distribution.ErrBlobInvalidDigest{
   277  			Digest: desc.Digest,
   278  			Reason: fmt.Errorf("content does not match digest"),
   279  		}
   280  	}
   281  
   282  	// update desc with canonical hash
   283  	desc.Digest = canonical
   284  
   285  	if desc.MediaType == "" {
   286  		desc.MediaType = "application/octet-stream"
   287  	}
   288  
   289  	return desc, nil
   290  }
   291  
   292  // moveBlob moves the data into its final, hash-qualified destination,
   293  // identified by dgst. The layer should be validated before commencing the
   294  // move.
   295  func (bw *blobWriter) moveBlob(ctx context.Context, desc distribution.Descriptor) error {
   296  	blobPath, err := pathFor(blobDataPathSpec{
   297  		digest: desc.Digest,
   298  	})
   299  
   300  	if err != nil {
   301  		return err
   302  	}
   303  
   304  	// Check for existence
   305  	if _, err := bw.blobStore.driver.Stat(ctx, blobPath); err != nil {
   306  		switch err := err.(type) {
   307  		case storagedriver.PathNotFoundError:
   308  			break // ensure that it doesn't exist.
   309  		default:
   310  			return err
   311  		}
   312  	} else {
   313  		// If the path exists, we can assume that the content has already
   314  		// been uploaded, since the blob storage is content-addressable.
   315  		// While it may be corrupted, detection of such corruption belongs
   316  		// elsewhere.
   317  		return nil
   318  	}
   319  
   320  	// If no data was received, we may not actually have a file on disk. Check
   321  	// the size here and write a zero-length file to blobPath if this is the
   322  	// case. For the most part, this should only ever happen with zero-length
   323  	// blobs.
   324  	if _, err := bw.blobStore.driver.Stat(ctx, bw.path); err != nil {
   325  		switch err := err.(type) {
   326  		case storagedriver.PathNotFoundError:
   327  			// HACK(stevvooe): This is slightly dangerous: if we verify above,
   328  			// get a hash, then the underlying file is deleted, we risk moving
   329  			// a zero-length blob into a nonzero-length blob location. To
   330  			// prevent this horrid thing, we employ the hack of only allowing
   331  			// to this happen for the digest of an empty blob.
   332  			if desc.Digest == digestSha256Empty {
   333  				return bw.blobStore.driver.PutContent(ctx, blobPath, []byte{})
   334  			}
   335  
   336  			// We let this fail during the move below.
   337  			logrus.
   338  				WithField("upload.id", bw.ID()).
   339  				WithField("digest", desc.Digest).Warnf("attempted to move zero-length content with non-zero digest")
   340  		default:
   341  			return err // unrelated error
   342  		}
   343  	}
   344  
   345  	// TODO(stevvooe): We should also write the mediatype when executing this move.
   346  
   347  	return bw.blobStore.driver.Move(ctx, bw.path, blobPath)
   348  }
   349  
   350  // removeResources should clean up all resources associated with the upload
   351  // instance. An error will be returned if the clean up cannot proceed. If the
   352  // resources are already not present, no error will be returned.
   353  func (bw *blobWriter) removeResources(ctx context.Context) error {
   354  	dataPath, err := pathFor(uploadDataPathSpec{
   355  		name: bw.blobStore.repository.Named().Name(),
   356  		id:   bw.id,
   357  	})
   358  
   359  	if err != nil {
   360  		return err
   361  	}
   362  
   363  	// Resolve and delete the containing directory, which should include any
   364  	// upload related files.
   365  	dirPath := path.Dir(dataPath)
   366  	if err := bw.blobStore.driver.Delete(ctx, dirPath); err != nil {
   367  		switch err := err.(type) {
   368  		case storagedriver.PathNotFoundError:
   369  			break // already gone!
   370  		default:
   371  			// This should be uncommon enough such that returning an error
   372  			// should be okay. At this point, the upload should be mostly
   373  			// complete, but perhaps the backend became unaccessible.
   374  			dcontext.GetLogger(ctx).Errorf("unable to delete layer upload resources %q: %v", dirPath, err)
   375  			return err
   376  		}
   377  	}
   378  
   379  	return nil
   380  }
   381  
   382  func (bw *blobWriter) Reader() (io.ReadCloser, error) {
   383  	// todo(richardscothern): Change to exponential backoff, i=0.5, e=2, n=4
   384  	try := 1
   385  	for try <= 5 {
   386  		_, err := bw.driver.Stat(bw.ctx, bw.path)
   387  		if err == nil {
   388  			break
   389  		}
   390  		switch err.(type) {
   391  		case storagedriver.PathNotFoundError:
   392  			dcontext.GetLogger(bw.ctx).Debugf("Nothing found on try %d, sleeping...", try)
   393  			time.Sleep(1 * time.Second)
   394  			try++
   395  		default:
   396  			return nil, err
   397  		}
   398  	}
   399  
   400  	readCloser, err := bw.driver.Reader(bw.ctx, bw.path, 0)
   401  	if err != nil {
   402  		return nil, err
   403  	}
   404  
   405  	return readCloser, nil
   406  }
   407  

View as plain text