1 package storage
2
3 import (
4 "context"
5 "fmt"
6 "net/http"
7 "path"
8 "time"
9
10 "github.com/distribution/reference"
11 "github.com/docker/distribution"
12 dcontext "github.com/docker/distribution/context"
13 "github.com/docker/distribution/registry/storage/driver"
14 "github.com/docker/distribution/uuid"
15 "github.com/opencontainers/go-digest"
16 )
17
18
19
20 type linkPathFunc func(name string, dgst digest.Digest) (string, error)
21
22
23
24
25 type linkedBlobStore struct {
26 *blobStore
27 registry *registry
28 blobServer distribution.BlobServer
29 blobAccessController distribution.BlobDescriptorService
30 repository distribution.Repository
31 ctx context.Context
32 deleteEnabled bool
33 resumableDigestEnabled bool
34
35
36
37
38
39
40
41 linkPathFns []linkPathFunc
42
43
44 linkDirectoryPathSpec pathSpec
45 }
46
47 var _ distribution.BlobStore = &linkedBlobStore{}
48
49 func (lbs *linkedBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
50 return lbs.blobAccessController.Stat(ctx, dgst)
51 }
52
53 func (lbs *linkedBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) {
54 canonical, err := lbs.Stat(ctx, dgst)
55 if err != nil {
56 return nil, err
57 }
58
59 return lbs.blobStore.Get(ctx, canonical.Digest)
60 }
61
62 func (lbs *linkedBlobStore) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) {
63 canonical, err := lbs.Stat(ctx, dgst)
64 if err != nil {
65 return nil, err
66 }
67
68 return lbs.blobStore.Open(ctx, canonical.Digest)
69 }
70
71 func (lbs *linkedBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error {
72 canonical, err := lbs.Stat(ctx, dgst)
73 if err != nil {
74 return err
75 }
76
77 if canonical.MediaType != "" {
78
79 w.Header().Set("Content-Type", canonical.MediaType)
80 }
81
82 return lbs.blobServer.ServeBlob(ctx, w, r, canonical.Digest)
83 }
84
85 func (lbs *linkedBlobStore) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) {
86 dgst := digest.FromBytes(p)
87
88 desc, err := lbs.blobStore.Put(ctx, mediaType, p)
89 if err != nil {
90 dcontext.GetLogger(ctx).Errorf("error putting into main store: %v", err)
91 return distribution.Descriptor{}, err
92 }
93
94 if err := lbs.blobAccessController.SetDescriptor(ctx, dgst, desc); err != nil {
95 return distribution.Descriptor{}, err
96 }
97
98
99
100
101
102 return desc, lbs.linkBlob(ctx, desc)
103 }
104
105 type optionFunc func(interface{}) error
106
107 func (f optionFunc) Apply(v interface{}) error {
108 return f(v)
109 }
110
111
112
113 func WithMountFrom(ref reference.Canonical) distribution.BlobCreateOption {
114 return optionFunc(func(v interface{}) error {
115 opts, ok := v.(*distribution.CreateOptions)
116 if !ok {
117 return fmt.Errorf("unexpected options type: %T", v)
118 }
119
120 opts.Mount.ShouldMount = true
121 opts.Mount.From = ref
122
123 return nil
124 })
125 }
126
127
128 func (lbs *linkedBlobStore) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) {
129 dcontext.GetLogger(ctx).Debug("(*linkedBlobStore).Writer")
130
131 var opts distribution.CreateOptions
132
133 for _, option := range options {
134 err := option.Apply(&opts)
135 if err != nil {
136 return nil, err
137 }
138 }
139
140 if opts.Mount.ShouldMount {
141 desc, err := lbs.mount(ctx, opts.Mount.From, opts.Mount.From.Digest(), opts.Mount.Stat)
142 if err == nil {
143
144 return nil, distribution.ErrBlobMounted{From: opts.Mount.From, Descriptor: desc}
145 }
146 }
147
148 uuid := uuid.Generate().String()
149 startedAt := time.Now().UTC()
150
151 path, err := pathFor(uploadDataPathSpec{
152 name: lbs.repository.Named().Name(),
153 id: uuid,
154 })
155
156 if err != nil {
157 return nil, err
158 }
159
160 startedAtPath, err := pathFor(uploadStartedAtPathSpec{
161 name: lbs.repository.Named().Name(),
162 id: uuid,
163 })
164
165 if err != nil {
166 return nil, err
167 }
168
169
170 if err := lbs.blobStore.driver.PutContent(ctx, startedAtPath, []byte(startedAt.Format(time.RFC3339))); err != nil {
171 return nil, err
172 }
173
174 return lbs.newBlobUpload(ctx, uuid, path, startedAt, false)
175 }
176
177 func (lbs *linkedBlobStore) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) {
178 dcontext.GetLogger(ctx).Debug("(*linkedBlobStore).Resume")
179
180 startedAtPath, err := pathFor(uploadStartedAtPathSpec{
181 name: lbs.repository.Named().Name(),
182 id: id,
183 })
184
185 if err != nil {
186 return nil, err
187 }
188
189 startedAtBytes, err := lbs.blobStore.driver.GetContent(ctx, startedAtPath)
190 if err != nil {
191 switch err := err.(type) {
192 case driver.PathNotFoundError:
193 return nil, distribution.ErrBlobUploadUnknown
194 default:
195 return nil, err
196 }
197 }
198
199 startedAt, err := time.Parse(time.RFC3339, string(startedAtBytes))
200 if err != nil {
201 return nil, err
202 }
203
204 path, err := pathFor(uploadDataPathSpec{
205 name: lbs.repository.Named().Name(),
206 id: id,
207 })
208
209 if err != nil {
210 return nil, err
211 }
212
213 return lbs.newBlobUpload(ctx, id, path, startedAt, true)
214 }
215
216 func (lbs *linkedBlobStore) Delete(ctx context.Context, dgst digest.Digest) error {
217 if !lbs.deleteEnabled {
218 return distribution.ErrUnsupported
219 }
220
221
222 _, err := lbs.blobAccessController.Stat(ctx, dgst)
223 if err != nil {
224 return err
225 }
226
227 err = lbs.blobAccessController.Clear(ctx, dgst)
228 if err != nil {
229 return err
230 }
231
232 return nil
233 }
234
235 func (lbs *linkedBlobStore) Enumerate(ctx context.Context, ingestor func(digest.Digest) error) error {
236 rootPath, err := pathFor(lbs.linkDirectoryPathSpec)
237 if err != nil {
238 return err
239 }
240 return lbs.driver.Walk(ctx, rootPath, func(fileInfo driver.FileInfo) error {
241
242 if fileInfo.IsDir() {
243 return nil
244 }
245 filePath := fileInfo.Path()
246
247
248 _, fileName := path.Split(filePath)
249 if fileName != "link" {
250 return nil
251 }
252
253
254 digest, err := lbs.blobStore.readlink(ctx, filePath)
255 if err != nil {
256 return err
257 }
258
259
260 _, err = lbs.Stat(ctx, digest)
261 if err != nil {
262
263 if err == distribution.ErrBlobUnknown {
264 return nil
265 }
266 return err
267 }
268
269 err = ingestor(digest)
270 if err != nil {
271 return err
272 }
273
274 return nil
275 })
276 }
277
278 func (lbs *linkedBlobStore) mount(ctx context.Context, sourceRepo reference.Named, dgst digest.Digest, sourceStat *distribution.Descriptor) (distribution.Descriptor, error) {
279 var stat distribution.Descriptor
280 if sourceStat == nil {
281
282 repo, err := lbs.registry.Repository(ctx, sourceRepo)
283 if err != nil {
284 return distribution.Descriptor{}, err
285 }
286 stat, err = repo.Blobs(ctx).Stat(ctx, dgst)
287 if err != nil {
288 return distribution.Descriptor{}, err
289 }
290 } else {
291
292 stat = *sourceStat
293 }
294
295 desc := distribution.Descriptor{
296 Size: stat.Size,
297
298
299
300
301 MediaType: "application/octet-stream",
302 Digest: dgst,
303 }
304 return desc, lbs.linkBlob(ctx, desc)
305 }
306
307
308 func (lbs *linkedBlobStore) newBlobUpload(ctx context.Context, uuid, path string, startedAt time.Time, append bool) (distribution.BlobWriter, error) {
309 fw, err := lbs.driver.Writer(ctx, path, append)
310 if err != nil {
311 return nil, err
312 }
313
314 bw := &blobWriter{
315 ctx: ctx,
316 blobStore: lbs,
317 id: uuid,
318 startedAt: startedAt,
319 digester: digest.Canonical.Digester(),
320 fileWriter: fw,
321 driver: lbs.driver,
322 path: path,
323 resumableDigestEnabled: lbs.resumableDigestEnabled,
324 }
325
326 return bw, nil
327 }
328
329
330
331 func (lbs *linkedBlobStore) linkBlob(ctx context.Context, canonical distribution.Descriptor, aliases ...digest.Digest) error {
332 dgsts := append([]digest.Digest{canonical.Digest}, aliases...)
333
334
335
336
337
338
339 seenDigests := make(map[digest.Digest]struct{}, len(dgsts))
340
341
342 linkPathFn := lbs.linkPathFns[0]
343
344 for _, dgst := range dgsts {
345 if _, seen := seenDigests[dgst]; seen {
346 continue
347 }
348 seenDigests[dgst] = struct{}{}
349
350 blobLinkPath, err := linkPathFn(lbs.repository.Named().Name(), dgst)
351 if err != nil {
352 return err
353 }
354
355 if err := lbs.blobStore.link(ctx, blobLinkPath, canonical.Digest); err != nil {
356 return err
357 }
358 }
359
360 return nil
361 }
362
363 type linkedBlobStatter struct {
364 *blobStore
365 repository distribution.Repository
366
367
368
369
370
371
372
373 linkPathFns []linkPathFunc
374 }
375
376 var _ distribution.BlobDescriptorService = &linkedBlobStatter{}
377
378 func (lbs *linkedBlobStatter) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
379 var (
380 found bool
381 target digest.Digest
382 )
383
384
385
386 for _, linkPathFn := range lbs.linkPathFns {
387 var err error
388 target, err = lbs.resolveWithLinkFunc(ctx, dgst, linkPathFn)
389
390 if err == nil {
391 found = true
392 break
393 }
394
395 switch err := err.(type) {
396 case driver.PathNotFoundError:
397
398 default:
399 return distribution.Descriptor{}, err
400 }
401 }
402
403 if !found {
404 return distribution.Descriptor{}, distribution.ErrBlobUnknown
405 }
406
407 if target != dgst {
408
409 dcontext.GetLogger(ctx).Warnf("looking up blob with canonical target: %v -> %v", dgst, target)
410 }
411
412
413
414
415 return lbs.blobStore.statter.Stat(ctx, target)
416 }
417
418 func (lbs *linkedBlobStatter) Clear(ctx context.Context, dgst digest.Digest) (err error) {
419
420 for _, linkPathFn := range lbs.linkPathFns {
421 blobLinkPath, err := linkPathFn(lbs.repository.Named().Name(), dgst)
422 if err != nil {
423 return err
424 }
425
426 err = lbs.blobStore.driver.Delete(ctx, blobLinkPath)
427 if err != nil {
428 switch err := err.(type) {
429 case driver.PathNotFoundError:
430 continue
431 default:
432 return err
433 }
434 }
435 }
436
437 return nil
438 }
439
440
441
442
443 func (lbs *linkedBlobStatter) resolveWithLinkFunc(ctx context.Context, dgst digest.Digest, linkPathFn linkPathFunc) (digest.Digest, error) {
444 blobLinkPath, err := linkPathFn(lbs.repository.Named().Name(), dgst)
445 if err != nil {
446 return "", err
447 }
448
449 return lbs.blobStore.readlink(ctx, blobLinkPath)
450 }
451
452 func (lbs *linkedBlobStatter) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error {
453
454 return nil
455 }
456
457
458 func blobLinkPath(name string, dgst digest.Digest) (string, error) {
459 return pathFor(layerLinkPathSpec{name: name, digest: dgst})
460 }
461
462
463 func manifestRevisionLinkPath(name string, dgst digest.Digest) (string, error) {
464 return pathFor(manifestRevisionLinkPathSpec{name: name, revision: dgst})
465 }
466
View as plain text