...

Source file src/github.com/docker/distribution/registry/storage/blobwriter_resumable.go

Documentation: github.com/docker/distribution/registry/storage

     1  //go:build !noresumabledigest
     2  // +build !noresumabledigest
     3  
     4  package storage
     5  
     6  import (
     7  	"context"
     8  	"encoding"
     9  	"fmt"
    10  	"hash"
    11  	"path"
    12  	"strconv"
    13  
    14  	storagedriver "github.com/docker/distribution/registry/storage/driver"
    15  	"github.com/sirupsen/logrus"
    16  )
    17  
    18  // resumeDigest attempts to restore the state of the internal hash function
    19  // by loading the most recent saved hash state equal to the current size of the blob.
    20  func (bw *blobWriter) resumeDigest(ctx context.Context) error {
    21  	if !bw.resumableDigestEnabled {
    22  		return errResumableDigestNotAvailable
    23  	}
    24  
    25  	h, ok := bw.digester.Hash().(encoding.BinaryUnmarshaler)
    26  	if !ok {
    27  		return errResumableDigestNotAvailable
    28  	}
    29  
    30  	offset := bw.fileWriter.Size()
    31  	if offset == bw.written {
    32  		// State of digester is already at the requested offset.
    33  		return nil
    34  	}
    35  
    36  	// List hash states from storage backend.
    37  	var hashStateMatch hashStateEntry
    38  	hashStates, err := bw.getStoredHashStates(ctx)
    39  	if err != nil {
    40  		return fmt.Errorf("unable to get stored hash states with offset %d: %s", offset, err)
    41  	}
    42  
    43  	// Find the highest stored hashState with offset equal to
    44  	// the requested offset.
    45  	for _, hashState := range hashStates {
    46  		if hashState.offset == offset {
    47  			hashStateMatch = hashState
    48  			break // Found an exact offset match.
    49  		}
    50  	}
    51  
    52  	if hashStateMatch.offset == 0 {
    53  		// No need to load any state, just reset the hasher.
    54  		h.(hash.Hash).Reset()
    55  	} else {
    56  		storedState, err := bw.driver.GetContent(ctx, hashStateMatch.path)
    57  		if err != nil {
    58  			return err
    59  		}
    60  
    61  		if err = h.UnmarshalBinary(storedState); err != nil {
    62  			return err
    63  		}
    64  		bw.written = hashStateMatch.offset
    65  	}
    66  
    67  	// Mind the gap.
    68  	if gapLen := offset - bw.written; gapLen > 0 {
    69  		return errResumableDigestNotAvailable
    70  	}
    71  
    72  	return nil
    73  }
    74  
    75  type hashStateEntry struct {
    76  	offset int64
    77  	path   string
    78  }
    79  
    80  // getStoredHashStates returns a slice of hashStateEntries for this upload.
    81  func (bw *blobWriter) getStoredHashStates(ctx context.Context) ([]hashStateEntry, error) {
    82  	uploadHashStatePathPrefix, err := pathFor(uploadHashStatePathSpec{
    83  		name: bw.blobStore.repository.Named().String(),
    84  		id:   bw.id,
    85  		alg:  bw.digester.Digest().Algorithm(),
    86  		list: true,
    87  	})
    88  
    89  	if err != nil {
    90  		return nil, err
    91  	}
    92  
    93  	paths, err := bw.blobStore.driver.List(ctx, uploadHashStatePathPrefix)
    94  	if err != nil {
    95  		if _, ok := err.(storagedriver.PathNotFoundError); !ok {
    96  			return nil, err
    97  		}
    98  		// Treat PathNotFoundError as no entries.
    99  		paths = nil
   100  	}
   101  
   102  	hashStateEntries := make([]hashStateEntry, 0, len(paths))
   103  
   104  	for _, p := range paths {
   105  		pathSuffix := path.Base(p)
   106  		// The suffix should be the offset.
   107  		offset, err := strconv.ParseInt(pathSuffix, 0, 64)
   108  		if err != nil {
   109  			logrus.Errorf("unable to parse offset from upload state path %q: %s", p, err)
   110  		}
   111  
   112  		hashStateEntries = append(hashStateEntries, hashStateEntry{offset: offset, path: p})
   113  	}
   114  
   115  	return hashStateEntries, nil
   116  }
   117  
   118  func (bw *blobWriter) storeHashState(ctx context.Context) error {
   119  	if !bw.resumableDigestEnabled {
   120  		return errResumableDigestNotAvailable
   121  	}
   122  
   123  	h, ok := bw.digester.Hash().(encoding.BinaryMarshaler)
   124  	if !ok {
   125  		return errResumableDigestNotAvailable
   126  	}
   127  
   128  	state, err := h.MarshalBinary()
   129  	if err != nil {
   130  		return err
   131  	}
   132  
   133  	uploadHashStatePath, err := pathFor(uploadHashStatePathSpec{
   134  		name:   bw.blobStore.repository.Named().String(),
   135  		id:     bw.id,
   136  		alg:    bw.digester.Digest().Algorithm(),
   137  		offset: bw.written,
   138  	})
   139  
   140  	if err != nil {
   141  		return err
   142  	}
   143  
   144  	return bw.driver.PutContent(ctx, uploadHashStatePath, state)
   145  }
   146  

View as plain text