...
1
2
3
4 package storage
5
6 import (
7 "context"
8 "encoding"
9 "fmt"
10 "hash"
11 "path"
12 "strconv"
13
14 storagedriver "github.com/docker/distribution/registry/storage/driver"
15 "github.com/sirupsen/logrus"
16 )
17
18
19
20 func (bw *blobWriter) resumeDigest(ctx context.Context) error {
21 if !bw.resumableDigestEnabled {
22 return errResumableDigestNotAvailable
23 }
24
25 h, ok := bw.digester.Hash().(encoding.BinaryUnmarshaler)
26 if !ok {
27 return errResumableDigestNotAvailable
28 }
29
30 offset := bw.fileWriter.Size()
31 if offset == bw.written {
32
33 return nil
34 }
35
36
37 var hashStateMatch hashStateEntry
38 hashStates, err := bw.getStoredHashStates(ctx)
39 if err != nil {
40 return fmt.Errorf("unable to get stored hash states with offset %d: %s", offset, err)
41 }
42
43
44
45 for _, hashState := range hashStates {
46 if hashState.offset == offset {
47 hashStateMatch = hashState
48 break
49 }
50 }
51
52 if hashStateMatch.offset == 0 {
53
54 h.(hash.Hash).Reset()
55 } else {
56 storedState, err := bw.driver.GetContent(ctx, hashStateMatch.path)
57 if err != nil {
58 return err
59 }
60
61 if err = h.UnmarshalBinary(storedState); err != nil {
62 return err
63 }
64 bw.written = hashStateMatch.offset
65 }
66
67
68 if gapLen := offset - bw.written; gapLen > 0 {
69 return errResumableDigestNotAvailable
70 }
71
72 return nil
73 }
74
75 type hashStateEntry struct {
76 offset int64
77 path string
78 }
79
80
81 func (bw *blobWriter) getStoredHashStates(ctx context.Context) ([]hashStateEntry, error) {
82 uploadHashStatePathPrefix, err := pathFor(uploadHashStatePathSpec{
83 name: bw.blobStore.repository.Named().String(),
84 id: bw.id,
85 alg: bw.digester.Digest().Algorithm(),
86 list: true,
87 })
88
89 if err != nil {
90 return nil, err
91 }
92
93 paths, err := bw.blobStore.driver.List(ctx, uploadHashStatePathPrefix)
94 if err != nil {
95 if _, ok := err.(storagedriver.PathNotFoundError); !ok {
96 return nil, err
97 }
98
99 paths = nil
100 }
101
102 hashStateEntries := make([]hashStateEntry, 0, len(paths))
103
104 for _, p := range paths {
105 pathSuffix := path.Base(p)
106
107 offset, err := strconv.ParseInt(pathSuffix, 0, 64)
108 if err != nil {
109 logrus.Errorf("unable to parse offset from upload state path %q: %s", p, err)
110 }
111
112 hashStateEntries = append(hashStateEntries, hashStateEntry{offset: offset, path: p})
113 }
114
115 return hashStateEntries, nil
116 }
117
118 func (bw *blobWriter) storeHashState(ctx context.Context) error {
119 if !bw.resumableDigestEnabled {
120 return errResumableDigestNotAvailable
121 }
122
123 h, ok := bw.digester.Hash().(encoding.BinaryMarshaler)
124 if !ok {
125 return errResumableDigestNotAvailable
126 }
127
128 state, err := h.MarshalBinary()
129 if err != nil {
130 return err
131 }
132
133 uploadHashStatePath, err := pathFor(uploadHashStatePathSpec{
134 name: bw.blobStore.repository.Named().String(),
135 id: bw.id,
136 alg: bw.digester.Digest().Algorithm(),
137 offset: bw.written,
138 })
139
140 if err != nil {
141 return err
142 }
143
144 return bw.driver.PutContent(ctx, uploadHashStatePath, state)
145 }
146
View as plain text