...

Source file src/github.com/docker/distribution/registry/proxy/proxyregistry.go

Documentation: github.com/docker/distribution/registry/proxy

     1  package proxy
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"net/http"
     7  	"net/url"
     8  	"sync"
     9  
    10  	"github.com/distribution/reference"
    11  	"github.com/docker/distribution"
    12  	"github.com/docker/distribution/configuration"
    13  	dcontext "github.com/docker/distribution/context"
    14  	"github.com/docker/distribution/registry/client"
    15  	"github.com/docker/distribution/registry/client/auth"
    16  	"github.com/docker/distribution/registry/client/auth/challenge"
    17  	"github.com/docker/distribution/registry/client/transport"
    18  	"github.com/docker/distribution/registry/proxy/scheduler"
    19  	"github.com/docker/distribution/registry/storage"
    20  	"github.com/docker/distribution/registry/storage/driver"
    21  )
    22  
    23  // proxyingRegistry fetches content from a remote registry and caches it locally
    24  type proxyingRegistry struct {
    25  	embedded       distribution.Namespace // provides local registry functionality
    26  	scheduler      *scheduler.TTLExpirationScheduler
    27  	remoteURL      url.URL
    28  	authChallenger authChallenger
    29  }
    30  
    31  // NewRegistryPullThroughCache creates a registry acting as a pull through cache
    32  func NewRegistryPullThroughCache(ctx context.Context, registry distribution.Namespace, driver driver.StorageDriver, config configuration.Proxy) (distribution.Namespace, error) {
    33  	remoteURL, err := url.Parse(config.RemoteURL)
    34  	if err != nil {
    35  		return nil, err
    36  	}
    37  
    38  	v := storage.NewVacuum(ctx, driver)
    39  	s := scheduler.New(ctx, driver, "/scheduler-state.json")
    40  	s.OnBlobExpire(func(ref reference.Reference) error {
    41  		var r reference.Canonical
    42  		var ok bool
    43  		if r, ok = ref.(reference.Canonical); !ok {
    44  			return fmt.Errorf("unexpected reference type : %T", ref)
    45  		}
    46  
    47  		repo, err := registry.Repository(ctx, r)
    48  		if err != nil {
    49  			return err
    50  		}
    51  
    52  		blobs := repo.Blobs(ctx)
    53  
    54  		// Clear the repository reference and descriptor caches
    55  		err = blobs.Delete(ctx, r.Digest())
    56  		if err != nil {
    57  			return err
    58  		}
    59  
    60  		err = v.RemoveBlob(r.Digest().String())
    61  		if err != nil {
    62  			return err
    63  		}
    64  
    65  		return nil
    66  	})
    67  
    68  	s.OnManifestExpire(func(ref reference.Reference) error {
    69  		var r reference.Canonical
    70  		var ok bool
    71  		if r, ok = ref.(reference.Canonical); !ok {
    72  			return fmt.Errorf("unexpected reference type : %T", ref)
    73  		}
    74  
    75  		repo, err := registry.Repository(ctx, r)
    76  		if err != nil {
    77  			return err
    78  		}
    79  
    80  		manifests, err := repo.Manifests(ctx)
    81  		if err != nil {
    82  			return err
    83  		}
    84  		err = manifests.Delete(ctx, r.Digest())
    85  		if err != nil {
    86  			return err
    87  		}
    88  		return nil
    89  	})
    90  
    91  	err = s.Start()
    92  	if err != nil {
    93  		return nil, err
    94  	}
    95  
    96  	cs, err := configureAuth(config.Username, config.Password, config.RemoteURL)
    97  	if err != nil {
    98  		return nil, err
    99  	}
   100  
   101  	return &proxyingRegistry{
   102  		embedded:  registry,
   103  		scheduler: s,
   104  		remoteURL: *remoteURL,
   105  		authChallenger: &remoteAuthChallenger{
   106  			remoteURL: *remoteURL,
   107  			cm:        challenge.NewSimpleManager(),
   108  			cs:        cs,
   109  		},
   110  	}, nil
   111  }
   112  
   113  func (pr *proxyingRegistry) Scope() distribution.Scope {
   114  	return distribution.GlobalScope
   115  }
   116  
   117  func (pr *proxyingRegistry) Repositories(ctx context.Context, repos []string, last string) (n int, err error) {
   118  	return pr.embedded.Repositories(ctx, repos, last)
   119  }
   120  
   121  func (pr *proxyingRegistry) Repository(ctx context.Context, name reference.Named) (distribution.Repository, error) {
   122  	c := pr.authChallenger
   123  
   124  	tkopts := auth.TokenHandlerOptions{
   125  		Transport:   http.DefaultTransport,
   126  		Credentials: c.credentialStore(),
   127  		Scopes: []auth.Scope{
   128  			auth.RepositoryScope{
   129  				Repository: name.Name(),
   130  				Actions:    []string{"pull"},
   131  			},
   132  		},
   133  		Logger: dcontext.GetLogger(ctx),
   134  	}
   135  
   136  	tr := transport.NewTransport(http.DefaultTransport,
   137  		auth.NewAuthorizer(c.challengeManager(),
   138  			auth.NewTokenHandlerWithOptions(tkopts)))
   139  
   140  	localRepo, err := pr.embedded.Repository(ctx, name)
   141  	if err != nil {
   142  		return nil, err
   143  	}
   144  	localManifests, err := localRepo.Manifests(ctx, storage.SkipLayerVerification())
   145  	if err != nil {
   146  		return nil, err
   147  	}
   148  
   149  	remoteRepo, err := client.NewRepository(name, pr.remoteURL.String(), tr)
   150  	if err != nil {
   151  		return nil, err
   152  	}
   153  
   154  	remoteManifests, err := remoteRepo.Manifests(ctx)
   155  	if err != nil {
   156  		return nil, err
   157  	}
   158  
   159  	return &proxiedRepository{
   160  		blobStore: &proxyBlobStore{
   161  			localStore:     localRepo.Blobs(ctx),
   162  			remoteStore:    remoteRepo.Blobs(ctx),
   163  			scheduler:      pr.scheduler,
   164  			repositoryName: name,
   165  			authChallenger: pr.authChallenger,
   166  		},
   167  		manifests: &proxyManifestStore{
   168  			repositoryName:  name,
   169  			localManifests:  localManifests, // Options?
   170  			remoteManifests: remoteManifests,
   171  			ctx:             ctx,
   172  			scheduler:       pr.scheduler,
   173  			authChallenger:  pr.authChallenger,
   174  		},
   175  		name: name,
   176  		tags: &proxyTagService{
   177  			localTags:      localRepo.Tags(ctx),
   178  			remoteTags:     remoteRepo.Tags(ctx),
   179  			authChallenger: pr.authChallenger,
   180  		},
   181  	}, nil
   182  }
   183  
   184  func (pr *proxyingRegistry) Blobs() distribution.BlobEnumerator {
   185  	return pr.embedded.Blobs()
   186  }
   187  
   188  func (pr *proxyingRegistry) BlobStatter() distribution.BlobStatter {
   189  	return pr.embedded.BlobStatter()
   190  }
   191  
   192  // authChallenger encapsulates a request to the upstream to establish credential challenges
   193  type authChallenger interface {
   194  	tryEstablishChallenges(context.Context) error
   195  	challengeManager() challenge.Manager
   196  	credentialStore() auth.CredentialStore
   197  }
   198  
   199  type remoteAuthChallenger struct {
   200  	remoteURL url.URL
   201  	sync.Mutex
   202  	cm challenge.Manager
   203  	cs auth.CredentialStore
   204  }
   205  
   206  func (r *remoteAuthChallenger) credentialStore() auth.CredentialStore {
   207  	return r.cs
   208  }
   209  
   210  func (r *remoteAuthChallenger) challengeManager() challenge.Manager {
   211  	return r.cm
   212  }
   213  
   214  // tryEstablishChallenges will attempt to get a challenge type for the upstream if none currently exist
   215  func (r *remoteAuthChallenger) tryEstablishChallenges(ctx context.Context) error {
   216  	r.Lock()
   217  	defer r.Unlock()
   218  
   219  	remoteURL := r.remoteURL
   220  	remoteURL.Path = "/v2/"
   221  	challenges, err := r.cm.GetChallenges(remoteURL)
   222  	if err != nil {
   223  		return err
   224  	}
   225  
   226  	if len(challenges) > 0 {
   227  		return nil
   228  	}
   229  
   230  	// establish challenge type with upstream
   231  	if err := ping(r.cm, remoteURL.String(), challengeHeader); err != nil {
   232  		return err
   233  	}
   234  
   235  	dcontext.GetLogger(ctx).Infof("Challenge established with upstream : %s %s", remoteURL, r.cm)
   236  	return nil
   237  }
   238  
   239  // proxiedRepository uses proxying blob and manifest services to serve content
   240  // locally, or pulling it through from a remote and caching it locally if it doesn't
   241  // already exist
   242  type proxiedRepository struct {
   243  	blobStore distribution.BlobStore
   244  	manifests distribution.ManifestService
   245  	name      reference.Named
   246  	tags      distribution.TagService
   247  }
   248  
   249  func (pr *proxiedRepository) Manifests(ctx context.Context, options ...distribution.ManifestServiceOption) (distribution.ManifestService, error) {
   250  	return pr.manifests, nil
   251  }
   252  
   253  func (pr *proxiedRepository) Blobs(ctx context.Context) distribution.BlobStore {
   254  	return pr.blobStore
   255  }
   256  
   257  func (pr *proxiedRepository) Named() reference.Named {
   258  	return pr.name
   259  }
   260  
   261  func (pr *proxiedRepository) Tags(ctx context.Context) distribution.TagService {
   262  	return pr.tags
   263  }
   264  

View as plain text