1 package proxy
2
3 import (
4 "context"
5 "fmt"
6 "net/http"
7 "net/url"
8 "sync"
9
10 "github.com/distribution/reference"
11 "github.com/docker/distribution"
12 "github.com/docker/distribution/configuration"
13 dcontext "github.com/docker/distribution/context"
14 "github.com/docker/distribution/registry/client"
15 "github.com/docker/distribution/registry/client/auth"
16 "github.com/docker/distribution/registry/client/auth/challenge"
17 "github.com/docker/distribution/registry/client/transport"
18 "github.com/docker/distribution/registry/proxy/scheduler"
19 "github.com/docker/distribution/registry/storage"
20 "github.com/docker/distribution/registry/storage/driver"
21 )
22
23
24 type proxyingRegistry struct {
25 embedded distribution.Namespace
26 scheduler *scheduler.TTLExpirationScheduler
27 remoteURL url.URL
28 authChallenger authChallenger
29 }
30
31
32 func NewRegistryPullThroughCache(ctx context.Context, registry distribution.Namespace, driver driver.StorageDriver, config configuration.Proxy) (distribution.Namespace, error) {
33 remoteURL, err := url.Parse(config.RemoteURL)
34 if err != nil {
35 return nil, err
36 }
37
38 v := storage.NewVacuum(ctx, driver)
39 s := scheduler.New(ctx, driver, "/scheduler-state.json")
40 s.OnBlobExpire(func(ref reference.Reference) error {
41 var r reference.Canonical
42 var ok bool
43 if r, ok = ref.(reference.Canonical); !ok {
44 return fmt.Errorf("unexpected reference type : %T", ref)
45 }
46
47 repo, err := registry.Repository(ctx, r)
48 if err != nil {
49 return err
50 }
51
52 blobs := repo.Blobs(ctx)
53
54
55 err = blobs.Delete(ctx, r.Digest())
56 if err != nil {
57 return err
58 }
59
60 err = v.RemoveBlob(r.Digest().String())
61 if err != nil {
62 return err
63 }
64
65 return nil
66 })
67
68 s.OnManifestExpire(func(ref reference.Reference) error {
69 var r reference.Canonical
70 var ok bool
71 if r, ok = ref.(reference.Canonical); !ok {
72 return fmt.Errorf("unexpected reference type : %T", ref)
73 }
74
75 repo, err := registry.Repository(ctx, r)
76 if err != nil {
77 return err
78 }
79
80 manifests, err := repo.Manifests(ctx)
81 if err != nil {
82 return err
83 }
84 err = manifests.Delete(ctx, r.Digest())
85 if err != nil {
86 return err
87 }
88 return nil
89 })
90
91 err = s.Start()
92 if err != nil {
93 return nil, err
94 }
95
96 cs, err := configureAuth(config.Username, config.Password, config.RemoteURL)
97 if err != nil {
98 return nil, err
99 }
100
101 return &proxyingRegistry{
102 embedded: registry,
103 scheduler: s,
104 remoteURL: *remoteURL,
105 authChallenger: &remoteAuthChallenger{
106 remoteURL: *remoteURL,
107 cm: challenge.NewSimpleManager(),
108 cs: cs,
109 },
110 }, nil
111 }
112
113 func (pr *proxyingRegistry) Scope() distribution.Scope {
114 return distribution.GlobalScope
115 }
116
117 func (pr *proxyingRegistry) Repositories(ctx context.Context, repos []string, last string) (n int, err error) {
118 return pr.embedded.Repositories(ctx, repos, last)
119 }
120
121 func (pr *proxyingRegistry) Repository(ctx context.Context, name reference.Named) (distribution.Repository, error) {
122 c := pr.authChallenger
123
124 tkopts := auth.TokenHandlerOptions{
125 Transport: http.DefaultTransport,
126 Credentials: c.credentialStore(),
127 Scopes: []auth.Scope{
128 auth.RepositoryScope{
129 Repository: name.Name(),
130 Actions: []string{"pull"},
131 },
132 },
133 Logger: dcontext.GetLogger(ctx),
134 }
135
136 tr := transport.NewTransport(http.DefaultTransport,
137 auth.NewAuthorizer(c.challengeManager(),
138 auth.NewTokenHandlerWithOptions(tkopts)))
139
140 localRepo, err := pr.embedded.Repository(ctx, name)
141 if err != nil {
142 return nil, err
143 }
144 localManifests, err := localRepo.Manifests(ctx, storage.SkipLayerVerification())
145 if err != nil {
146 return nil, err
147 }
148
149 remoteRepo, err := client.NewRepository(name, pr.remoteURL.String(), tr)
150 if err != nil {
151 return nil, err
152 }
153
154 remoteManifests, err := remoteRepo.Manifests(ctx)
155 if err != nil {
156 return nil, err
157 }
158
159 return &proxiedRepository{
160 blobStore: &proxyBlobStore{
161 localStore: localRepo.Blobs(ctx),
162 remoteStore: remoteRepo.Blobs(ctx),
163 scheduler: pr.scheduler,
164 repositoryName: name,
165 authChallenger: pr.authChallenger,
166 },
167 manifests: &proxyManifestStore{
168 repositoryName: name,
169 localManifests: localManifests,
170 remoteManifests: remoteManifests,
171 ctx: ctx,
172 scheduler: pr.scheduler,
173 authChallenger: pr.authChallenger,
174 },
175 name: name,
176 tags: &proxyTagService{
177 localTags: localRepo.Tags(ctx),
178 remoteTags: remoteRepo.Tags(ctx),
179 authChallenger: pr.authChallenger,
180 },
181 }, nil
182 }
183
184 func (pr *proxyingRegistry) Blobs() distribution.BlobEnumerator {
185 return pr.embedded.Blobs()
186 }
187
188 func (pr *proxyingRegistry) BlobStatter() distribution.BlobStatter {
189 return pr.embedded.BlobStatter()
190 }
191
192
193 type authChallenger interface {
194 tryEstablishChallenges(context.Context) error
195 challengeManager() challenge.Manager
196 credentialStore() auth.CredentialStore
197 }
198
199 type remoteAuthChallenger struct {
200 remoteURL url.URL
201 sync.Mutex
202 cm challenge.Manager
203 cs auth.CredentialStore
204 }
205
206 func (r *remoteAuthChallenger) credentialStore() auth.CredentialStore {
207 return r.cs
208 }
209
210 func (r *remoteAuthChallenger) challengeManager() challenge.Manager {
211 return r.cm
212 }
213
214
215 func (r *remoteAuthChallenger) tryEstablishChallenges(ctx context.Context) error {
216 r.Lock()
217 defer r.Unlock()
218
219 remoteURL := r.remoteURL
220 remoteURL.Path = "/v2/"
221 challenges, err := r.cm.GetChallenges(remoteURL)
222 if err != nil {
223 return err
224 }
225
226 if len(challenges) > 0 {
227 return nil
228 }
229
230
231 if err := ping(r.cm, remoteURL.String(), challengeHeader); err != nil {
232 return err
233 }
234
235 dcontext.GetLogger(ctx).Infof("Challenge established with upstream : %s %s", remoteURL, r.cm)
236 return nil
237 }
238
239
240
241
242 type proxiedRepository struct {
243 blobStore distribution.BlobStore
244 manifests distribution.ManifestService
245 name reference.Named
246 tags distribution.TagService
247 }
248
249 func (pr *proxiedRepository) Manifests(ctx context.Context, options ...distribution.ManifestServiceOption) (distribution.ManifestService, error) {
250 return pr.manifests, nil
251 }
252
253 func (pr *proxiedRepository) Blobs(ctx context.Context) distribution.BlobStore {
254 return pr.blobStore
255 }
256
257 func (pr *proxiedRepository) Named() reference.Named {
258 return pr.name
259 }
260
261 func (pr *proxiedRepository) Tags(ctx context.Context) distribution.TagService {
262 return pr.tags
263 }
264
View as plain text