1 package proxy
2
3 import (
4 "context"
5 "io"
6 "net/http"
7 "strconv"
8 "sync"
9
10 "github.com/distribution/reference"
11 "github.com/docker/distribution"
12 dcontext "github.com/docker/distribution/context"
13 "github.com/docker/distribution/registry/proxy/scheduler"
14 "github.com/opencontainers/go-digest"
15 )
16
17 type proxyBlobStore struct {
18 localStore distribution.BlobStore
19 remoteStore distribution.BlobService
20 scheduler *scheduler.TTLExpirationScheduler
21 repositoryName reference.Named
22 authChallenger authChallenger
23 }
24
25 var _ distribution.BlobStore = &proxyBlobStore{}
26
27
28 var inflight = make(map[digest.Digest]struct{})
29
30
31 var mu sync.Mutex
32
33 func setResponseHeaders(w http.ResponseWriter, length int64, mediaType string, digest digest.Digest) {
34 w.Header().Set("Content-Length", strconv.FormatInt(length, 10))
35 w.Header().Set("Content-Type", mediaType)
36 w.Header().Set("Docker-Content-Digest", digest.String())
37 w.Header().Set("Etag", digest.String())
38 }
39
40 func (pbs *proxyBlobStore) copyContent(ctx context.Context, dgst digest.Digest, writer io.Writer) (distribution.Descriptor, error) {
41 desc, err := pbs.remoteStore.Stat(ctx, dgst)
42 if err != nil {
43 return distribution.Descriptor{}, err
44 }
45
46 if w, ok := writer.(http.ResponseWriter); ok {
47 setResponseHeaders(w, desc.Size, desc.MediaType, dgst)
48 }
49
50 remoteReader, err := pbs.remoteStore.Open(ctx, dgst)
51 if err != nil {
52 return distribution.Descriptor{}, err
53 }
54
55 defer remoteReader.Close()
56
57 _, err = io.CopyN(writer, remoteReader, desc.Size)
58 if err != nil {
59 return distribution.Descriptor{}, err
60 }
61
62 proxyMetrics.BlobPush(uint64(desc.Size))
63
64 return desc, nil
65 }
66
67 func (pbs *proxyBlobStore) serveLocal(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) (bool, error) {
68 localDesc, err := pbs.localStore.Stat(ctx, dgst)
69 if err != nil {
70
71
72 return false, nil
73 }
74
75 if err == nil {
76 proxyMetrics.BlobPush(uint64(localDesc.Size))
77 return true, pbs.localStore.ServeBlob(ctx, w, r, dgst)
78 }
79
80 return false, nil
81
82 }
83
84 func (pbs *proxyBlobStore) storeLocal(ctx context.Context, dgst digest.Digest) error {
85 defer func() {
86 mu.Lock()
87 delete(inflight, dgst)
88 mu.Unlock()
89 }()
90
91 var desc distribution.Descriptor
92 var err error
93 var bw distribution.BlobWriter
94
95 bw, err = pbs.localStore.Create(ctx)
96 if err != nil {
97 return err
98 }
99
100 desc, err = pbs.copyContent(ctx, dgst, bw)
101 if err != nil {
102 return err
103 }
104
105 _, err = bw.Commit(ctx, desc)
106 if err != nil {
107 return err
108 }
109
110 return nil
111 }
112
113 func (pbs *proxyBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error {
114 served, err := pbs.serveLocal(ctx, w, r, dgst)
115 if err != nil {
116 dcontext.GetLogger(ctx).Errorf("Error serving blob from local storage: %s", err.Error())
117 return err
118 }
119
120 if served {
121 return nil
122 }
123
124 if err := pbs.authChallenger.tryEstablishChallenges(ctx); err != nil {
125 return err
126 }
127
128 mu.Lock()
129 _, ok := inflight[dgst]
130 if ok {
131 mu.Unlock()
132 _, err := pbs.copyContent(ctx, dgst, w)
133 return err
134 }
135 inflight[dgst] = struct{}{}
136 mu.Unlock()
137
138 go func(dgst digest.Digest) {
139 if err := pbs.storeLocal(ctx, dgst); err != nil {
140 dcontext.GetLogger(ctx).Errorf("Error committing to storage: %s", err.Error())
141 }
142
143 blobRef, err := reference.WithDigest(pbs.repositoryName, dgst)
144 if err != nil {
145 dcontext.GetLogger(ctx).Errorf("Error creating reference: %s", err)
146 return
147 }
148
149 pbs.scheduler.AddBlob(blobRef, repositoryTTL)
150 }(dgst)
151
152 _, err = pbs.copyContent(ctx, dgst, w)
153 if err != nil {
154 return err
155 }
156 return nil
157 }
158
159 func (pbs *proxyBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
160 desc, err := pbs.localStore.Stat(ctx, dgst)
161 if err == nil {
162 return desc, err
163 }
164
165 if err != distribution.ErrBlobUnknown {
166 return distribution.Descriptor{}, err
167 }
168
169 if err := pbs.authChallenger.tryEstablishChallenges(ctx); err != nil {
170 return distribution.Descriptor{}, err
171 }
172
173 return pbs.remoteStore.Stat(ctx, dgst)
174 }
175
176 func (pbs *proxyBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) {
177 blob, err := pbs.localStore.Get(ctx, dgst)
178 if err == nil {
179 return blob, nil
180 }
181
182 if err := pbs.authChallenger.tryEstablishChallenges(ctx); err != nil {
183 return []byte{}, err
184 }
185
186 blob, err = pbs.remoteStore.Get(ctx, dgst)
187 if err != nil {
188 return []byte{}, err
189 }
190
191 _, err = pbs.localStore.Put(ctx, "", blob)
192 if err != nil {
193 return []byte{}, err
194 }
195 return blob, nil
196 }
197
198
199 func (pbs *proxyBlobStore) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) {
200 return distribution.Descriptor{}, distribution.ErrUnsupported
201 }
202
203 func (pbs *proxyBlobStore) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) {
204 return nil, distribution.ErrUnsupported
205 }
206
207 func (pbs *proxyBlobStore) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) {
208 return nil, distribution.ErrUnsupported
209 }
210
211 func (pbs *proxyBlobStore) Mount(ctx context.Context, sourceRepo reference.Named, dgst digest.Digest) (distribution.Descriptor, error) {
212 return distribution.Descriptor{}, distribution.ErrUnsupported
213 }
214
215 func (pbs *proxyBlobStore) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) {
216 return nil, distribution.ErrUnsupported
217 }
218
219 func (pbs *proxyBlobStore) Delete(ctx context.Context, dgst digest.Digest) error {
220 return distribution.ErrUnsupported
221 }
222
View as plain text