...

Source file src/github.com/docker/distribution/registry/proxy/proxyblobstore.go

Documentation: github.com/docker/distribution/registry/proxy

     1  package proxy
     2  
     3  import (
     4  	"context"
     5  	"io"
     6  	"net/http"
     7  	"strconv"
     8  	"sync"
     9  
    10  	"github.com/distribution/reference"
    11  	"github.com/docker/distribution"
    12  	dcontext "github.com/docker/distribution/context"
    13  	"github.com/docker/distribution/registry/proxy/scheduler"
    14  	"github.com/opencontainers/go-digest"
    15  )
    16  
    17  type proxyBlobStore struct {
    18  	localStore     distribution.BlobStore
    19  	remoteStore    distribution.BlobService
    20  	scheduler      *scheduler.TTLExpirationScheduler
    21  	repositoryName reference.Named
    22  	authChallenger authChallenger
    23  }
    24  
    25  var _ distribution.BlobStore = &proxyBlobStore{}
    26  
    27  // inflight tracks currently downloading blobs
    28  var inflight = make(map[digest.Digest]struct{})
    29  
    30  // mu protects inflight
    31  var mu sync.Mutex
    32  
    33  func setResponseHeaders(w http.ResponseWriter, length int64, mediaType string, digest digest.Digest) {
    34  	w.Header().Set("Content-Length", strconv.FormatInt(length, 10))
    35  	w.Header().Set("Content-Type", mediaType)
    36  	w.Header().Set("Docker-Content-Digest", digest.String())
    37  	w.Header().Set("Etag", digest.String())
    38  }
    39  
    40  func (pbs *proxyBlobStore) copyContent(ctx context.Context, dgst digest.Digest, writer io.Writer) (distribution.Descriptor, error) {
    41  	desc, err := pbs.remoteStore.Stat(ctx, dgst)
    42  	if err != nil {
    43  		return distribution.Descriptor{}, err
    44  	}
    45  
    46  	if w, ok := writer.(http.ResponseWriter); ok {
    47  		setResponseHeaders(w, desc.Size, desc.MediaType, dgst)
    48  	}
    49  
    50  	remoteReader, err := pbs.remoteStore.Open(ctx, dgst)
    51  	if err != nil {
    52  		return distribution.Descriptor{}, err
    53  	}
    54  
    55  	defer remoteReader.Close()
    56  
    57  	_, err = io.CopyN(writer, remoteReader, desc.Size)
    58  	if err != nil {
    59  		return distribution.Descriptor{}, err
    60  	}
    61  
    62  	proxyMetrics.BlobPush(uint64(desc.Size))
    63  
    64  	return desc, nil
    65  }
    66  
    67  func (pbs *proxyBlobStore) serveLocal(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) (bool, error) {
    68  	localDesc, err := pbs.localStore.Stat(ctx, dgst)
    69  	if err != nil {
    70  		// Stat can report a zero sized file here if it's checked between creation
    71  		// and population.  Return nil error, and continue
    72  		return false, nil
    73  	}
    74  
    75  	if err == nil {
    76  		proxyMetrics.BlobPush(uint64(localDesc.Size))
    77  		return true, pbs.localStore.ServeBlob(ctx, w, r, dgst)
    78  	}
    79  
    80  	return false, nil
    81  
    82  }
    83  
    84  func (pbs *proxyBlobStore) storeLocal(ctx context.Context, dgst digest.Digest) error {
    85  	defer func() {
    86  		mu.Lock()
    87  		delete(inflight, dgst)
    88  		mu.Unlock()
    89  	}()
    90  
    91  	var desc distribution.Descriptor
    92  	var err error
    93  	var bw distribution.BlobWriter
    94  
    95  	bw, err = pbs.localStore.Create(ctx)
    96  	if err != nil {
    97  		return err
    98  	}
    99  
   100  	desc, err = pbs.copyContent(ctx, dgst, bw)
   101  	if err != nil {
   102  		return err
   103  	}
   104  
   105  	_, err = bw.Commit(ctx, desc)
   106  	if err != nil {
   107  		return err
   108  	}
   109  
   110  	return nil
   111  }
   112  
   113  func (pbs *proxyBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error {
   114  	served, err := pbs.serveLocal(ctx, w, r, dgst)
   115  	if err != nil {
   116  		dcontext.GetLogger(ctx).Errorf("Error serving blob from local storage: %s", err.Error())
   117  		return err
   118  	}
   119  
   120  	if served {
   121  		return nil
   122  	}
   123  
   124  	if err := pbs.authChallenger.tryEstablishChallenges(ctx); err != nil {
   125  		return err
   126  	}
   127  
   128  	mu.Lock()
   129  	_, ok := inflight[dgst]
   130  	if ok {
   131  		mu.Unlock()
   132  		_, err := pbs.copyContent(ctx, dgst, w)
   133  		return err
   134  	}
   135  	inflight[dgst] = struct{}{}
   136  	mu.Unlock()
   137  
   138  	go func(dgst digest.Digest) {
   139  		if err := pbs.storeLocal(ctx, dgst); err != nil {
   140  			dcontext.GetLogger(ctx).Errorf("Error committing to storage: %s", err.Error())
   141  		}
   142  
   143  		blobRef, err := reference.WithDigest(pbs.repositoryName, dgst)
   144  		if err != nil {
   145  			dcontext.GetLogger(ctx).Errorf("Error creating reference: %s", err)
   146  			return
   147  		}
   148  
   149  		pbs.scheduler.AddBlob(blobRef, repositoryTTL)
   150  	}(dgst)
   151  
   152  	_, err = pbs.copyContent(ctx, dgst, w)
   153  	if err != nil {
   154  		return err
   155  	}
   156  	return nil
   157  }
   158  
   159  func (pbs *proxyBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
   160  	desc, err := pbs.localStore.Stat(ctx, dgst)
   161  	if err == nil {
   162  		return desc, err
   163  	}
   164  
   165  	if err != distribution.ErrBlobUnknown {
   166  		return distribution.Descriptor{}, err
   167  	}
   168  
   169  	if err := pbs.authChallenger.tryEstablishChallenges(ctx); err != nil {
   170  		return distribution.Descriptor{}, err
   171  	}
   172  
   173  	return pbs.remoteStore.Stat(ctx, dgst)
   174  }
   175  
   176  func (pbs *proxyBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) {
   177  	blob, err := pbs.localStore.Get(ctx, dgst)
   178  	if err == nil {
   179  		return blob, nil
   180  	}
   181  
   182  	if err := pbs.authChallenger.tryEstablishChallenges(ctx); err != nil {
   183  		return []byte{}, err
   184  	}
   185  
   186  	blob, err = pbs.remoteStore.Get(ctx, dgst)
   187  	if err != nil {
   188  		return []byte{}, err
   189  	}
   190  
   191  	_, err = pbs.localStore.Put(ctx, "", blob)
   192  	if err != nil {
   193  		return []byte{}, err
   194  	}
   195  	return blob, nil
   196  }
   197  
   198  // Unsupported functions
   199  func (pbs *proxyBlobStore) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) {
   200  	return distribution.Descriptor{}, distribution.ErrUnsupported
   201  }
   202  
   203  func (pbs *proxyBlobStore) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) {
   204  	return nil, distribution.ErrUnsupported
   205  }
   206  
   207  func (pbs *proxyBlobStore) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) {
   208  	return nil, distribution.ErrUnsupported
   209  }
   210  
   211  func (pbs *proxyBlobStore) Mount(ctx context.Context, sourceRepo reference.Named, dgst digest.Digest) (distribution.Descriptor, error) {
   212  	return distribution.Descriptor{}, distribution.ErrUnsupported
   213  }
   214  
   215  func (pbs *proxyBlobStore) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) {
   216  	return nil, distribution.ErrUnsupported
   217  }
   218  
   219  func (pbs *proxyBlobStore) Delete(ctx context.Context, dgst digest.Digest) error {
   220  	return distribution.ErrUnsupported
   221  }
   222  

View as plain text