1 package storage
2
3 import (
4 "context"
5 "errors"
6 "fmt"
7 "io"
8 "path"
9 "time"
10
11 "github.com/docker/distribution"
12 dcontext "github.com/docker/distribution/context"
13 storagedriver "github.com/docker/distribution/registry/storage/driver"
14 "github.com/opencontainers/go-digest"
15 "github.com/sirupsen/logrus"
16 )
17
18 var (
19 errResumableDigestNotAvailable = errors.New("resumable digest not available")
20 )
21
22 const (
23
24 digestSha256Empty = "sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
25 )
26
27
28
29 type blobWriter struct {
30 ctx context.Context
31 blobStore *linkedBlobStore
32
33 id string
34 startedAt time.Time
35 digester digest.Digester
36 written int64
37
38 fileWriter storagedriver.FileWriter
39 driver storagedriver.StorageDriver
40 path string
41
42 resumableDigestEnabled bool
43 committed bool
44 }
45
46 var _ distribution.BlobWriter = &blobWriter{}
47
48
49 func (bw *blobWriter) ID() string {
50 return bw.id
51 }
52
53 func (bw *blobWriter) StartedAt() time.Time {
54 return bw.startedAt
55 }
56
57
58
59 func (bw *blobWriter) Commit(ctx context.Context, desc distribution.Descriptor) (distribution.Descriptor, error) {
60 dcontext.GetLogger(ctx).Debug("(*blobWriter).Commit")
61
62 if err := bw.fileWriter.Commit(); err != nil {
63 return distribution.Descriptor{}, err
64 }
65
66 bw.Close()
67 desc.Size = bw.Size()
68
69 canonical, err := bw.validateBlob(ctx, desc)
70 if err != nil {
71 return distribution.Descriptor{}, err
72 }
73
74 if err := bw.moveBlob(ctx, canonical); err != nil {
75 return distribution.Descriptor{}, err
76 }
77
78 if err := bw.blobStore.linkBlob(ctx, canonical, desc.Digest); err != nil {
79 return distribution.Descriptor{}, err
80 }
81
82 if err := bw.removeResources(ctx); err != nil {
83 return distribution.Descriptor{}, err
84 }
85
86 err = bw.blobStore.blobAccessController.SetDescriptor(ctx, canonical.Digest, canonical)
87 if err != nil {
88 return distribution.Descriptor{}, err
89 }
90
91 bw.committed = true
92 return canonical, nil
93 }
94
95
96
97 func (bw *blobWriter) Cancel(ctx context.Context) error {
98 dcontext.GetLogger(ctx).Debug("(*blobWriter).Cancel")
99 if err := bw.fileWriter.Cancel(); err != nil {
100 return err
101 }
102
103 if err := bw.Close(); err != nil {
104 dcontext.GetLogger(ctx).Errorf("error closing blobwriter: %s", err)
105 }
106
107 return bw.removeResources(ctx)
108 }
109
110 func (bw *blobWriter) Size() int64 {
111 return bw.fileWriter.Size()
112 }
113
114 func (bw *blobWriter) Write(p []byte) (int, error) {
115
116
117
118 if err := bw.resumeDigest(bw.blobStore.ctx); err != nil && err != errResumableDigestNotAvailable {
119 return 0, err
120 }
121
122 _, err := bw.fileWriter.Write(p)
123 if err != nil {
124 return 0, err
125 }
126
127 n, err := bw.digester.Hash().Write(p)
128 bw.written += int64(n)
129
130 return n, err
131 }
132
133 func (bw *blobWriter) ReadFrom(r io.Reader) (n int64, err error) {
134
135
136
137 if err := bw.resumeDigest(bw.blobStore.ctx); err != nil && err != errResumableDigestNotAvailable {
138 return 0, err
139 }
140
141
142
143
144 tee := io.TeeReader(r, bw.fileWriter)
145 nn, err := io.Copy(bw.digester.Hash(), tee)
146 bw.written += nn
147
148 return nn, err
149 }
150
151 func (bw *blobWriter) Close() error {
152 if bw.committed {
153 return errors.New("blobwriter close after commit")
154 }
155
156 if err := bw.storeHashState(bw.blobStore.ctx); err != nil && err != errResumableDigestNotAvailable {
157 return err
158 }
159
160 return bw.fileWriter.Close()
161 }
162
163
164
165 func (bw *blobWriter) validateBlob(ctx context.Context, desc distribution.Descriptor) (distribution.Descriptor, error) {
166 var (
167 verified, fullHash bool
168 canonical digest.Digest
169 )
170
171 if desc.Digest == "" {
172
173
174 return distribution.Descriptor{}, distribution.ErrBlobInvalidDigest{
175 Reason: fmt.Errorf("cannot validate against empty digest"),
176 }
177 }
178
179 var size int64
180
181
182 if fi, err := bw.driver.Stat(ctx, bw.path); err != nil {
183 switch err := err.(type) {
184 case storagedriver.PathNotFoundError:
185
186
187
188 desc.Size = 0
189 default:
190
191 return distribution.Descriptor{}, err
192 }
193 } else {
194 if fi.IsDir() {
195 return distribution.Descriptor{}, fmt.Errorf("unexpected directory at upload location %q", bw.path)
196 }
197
198 size = fi.Size()
199 }
200
201 if desc.Size > 0 {
202 if desc.Size != size {
203 return distribution.Descriptor{}, distribution.ErrBlobInvalidLength
204 }
205 } else {
206
207
208 desc.Size = size
209 }
210
211
212
213
214 if err := bw.resumeDigest(ctx); err == nil {
215 canonical = bw.digester.Digest()
216
217 if canonical.Algorithm() == desc.Digest.Algorithm() {
218
219
220 verified = desc.Digest == canonical
221 } else {
222
223
224
225 fullHash = true
226 }
227 } else if err == errResumableDigestNotAvailable {
228
229 fullHash = true
230 } else {
231 return distribution.Descriptor{}, err
232 }
233
234 if fullHash {
235
236
237
238
239 if bw.written == size && digest.Canonical == desc.Digest.Algorithm() {
240 canonical = bw.digester.Digest()
241 verified = desc.Digest == canonical
242 }
243
244
245
246
247 if !verified {
248 digester := digest.Canonical.Digester()
249 verifier := desc.Digest.Verifier()
250
251
252 fr, err := newFileReader(ctx, bw.driver, bw.path, desc.Size)
253 if err != nil {
254 return distribution.Descriptor{}, err
255 }
256 defer fr.Close()
257
258 tr := io.TeeReader(fr, digester.Hash())
259
260 if _, err := io.Copy(verifier, tr); err != nil {
261 return distribution.Descriptor{}, err
262 }
263
264 canonical = digester.Digest()
265 verified = verifier.Verified()
266 }
267 }
268
269 if !verified {
270 dcontext.GetLoggerWithFields(ctx,
271 map[interface{}]interface{}{
272 "canonical": canonical,
273 "provided": desc.Digest,
274 }, "canonical", "provided").
275 Errorf("canonical digest does match provided digest")
276 return distribution.Descriptor{}, distribution.ErrBlobInvalidDigest{
277 Digest: desc.Digest,
278 Reason: fmt.Errorf("content does not match digest"),
279 }
280 }
281
282
283 desc.Digest = canonical
284
285 if desc.MediaType == "" {
286 desc.MediaType = "application/octet-stream"
287 }
288
289 return desc, nil
290 }
291
292
293
294
295 func (bw *blobWriter) moveBlob(ctx context.Context, desc distribution.Descriptor) error {
296 blobPath, err := pathFor(blobDataPathSpec{
297 digest: desc.Digest,
298 })
299
300 if err != nil {
301 return err
302 }
303
304
305 if _, err := bw.blobStore.driver.Stat(ctx, blobPath); err != nil {
306 switch err := err.(type) {
307 case storagedriver.PathNotFoundError:
308 break
309 default:
310 return err
311 }
312 } else {
313
314
315
316
317 return nil
318 }
319
320
321
322
323
324 if _, err := bw.blobStore.driver.Stat(ctx, bw.path); err != nil {
325 switch err := err.(type) {
326 case storagedriver.PathNotFoundError:
327
328
329
330
331
332 if desc.Digest == digestSha256Empty {
333 return bw.blobStore.driver.PutContent(ctx, blobPath, []byte{})
334 }
335
336
337 logrus.
338 WithField("upload.id", bw.ID()).
339 WithField("digest", desc.Digest).Warnf("attempted to move zero-length content with non-zero digest")
340 default:
341 return err
342 }
343 }
344
345
346
347 return bw.blobStore.driver.Move(ctx, bw.path, blobPath)
348 }
349
350
351
352
353 func (bw *blobWriter) removeResources(ctx context.Context) error {
354 dataPath, err := pathFor(uploadDataPathSpec{
355 name: bw.blobStore.repository.Named().Name(),
356 id: bw.id,
357 })
358
359 if err != nil {
360 return err
361 }
362
363
364
365 dirPath := path.Dir(dataPath)
366 if err := bw.blobStore.driver.Delete(ctx, dirPath); err != nil {
367 switch err := err.(type) {
368 case storagedriver.PathNotFoundError:
369 break
370 default:
371
372
373
374 dcontext.GetLogger(ctx).Errorf("unable to delete layer upload resources %q: %v", dirPath, err)
375 return err
376 }
377 }
378
379 return nil
380 }
381
382 func (bw *blobWriter) Reader() (io.ReadCloser, error) {
383
384 try := 1
385 for try <= 5 {
386 _, err := bw.driver.Stat(bw.ctx, bw.path)
387 if err == nil {
388 break
389 }
390 switch err.(type) {
391 case storagedriver.PathNotFoundError:
392 dcontext.GetLogger(bw.ctx).Debugf("Nothing found on try %d, sleeping...", try)
393 time.Sleep(1 * time.Second)
394 try++
395 default:
396 return nil, err
397 }
398 }
399
400 readCloser, err := bw.driver.Reader(bw.ctx, bw.path, 0)
401 if err != nil {
402 return nil, err
403 }
404
405 return readCloser, nil
406 }
407
View as plain text