1 package proxy
2
3 import (
4 "context"
5 "io/ioutil"
6 "math/rand"
7 "net/http"
8 "net/http/httptest"
9 "sync"
10 "testing"
11 "time"
12
13 "github.com/distribution/reference"
14 "github.com/docker/distribution"
15 "github.com/docker/distribution/registry/proxy/scheduler"
16 "github.com/docker/distribution/registry/storage"
17 "github.com/docker/distribution/registry/storage/cache/memory"
18 "github.com/docker/distribution/registry/storage/driver/filesystem"
19 "github.com/docker/distribution/registry/storage/driver/inmemory"
20 "github.com/opencontainers/go-digest"
21 )
22
23 var sbsMu sync.Mutex
24 var randSource rand.Rand
25
26 type statsBlobStore struct {
27 stats map[string]int
28 blobs distribution.BlobStore
29 }
30
31 func (sbs statsBlobStore) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) {
32 sbsMu.Lock()
33 sbs.stats["put"]++
34 sbsMu.Unlock()
35
36 return sbs.blobs.Put(ctx, mediaType, p)
37 }
38
39 func (sbs statsBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) {
40 sbsMu.Lock()
41 sbs.stats["get"]++
42 sbsMu.Unlock()
43
44 return sbs.blobs.Get(ctx, dgst)
45 }
46
47 func (sbs statsBlobStore) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) {
48 sbsMu.Lock()
49 sbs.stats["create"]++
50 sbsMu.Unlock()
51
52 return sbs.blobs.Create(ctx, options...)
53 }
54
55 func (sbs statsBlobStore) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) {
56 sbsMu.Lock()
57 sbs.stats["resume"]++
58 sbsMu.Unlock()
59
60 return sbs.blobs.Resume(ctx, id)
61 }
62
63 func (sbs statsBlobStore) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) {
64 sbsMu.Lock()
65 sbs.stats["open"]++
66 sbsMu.Unlock()
67
68 return sbs.blobs.Open(ctx, dgst)
69 }
70
71 func (sbs statsBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error {
72 sbsMu.Lock()
73 sbs.stats["serveblob"]++
74 sbsMu.Unlock()
75
76 return sbs.blobs.ServeBlob(ctx, w, r, dgst)
77 }
78
79 func (sbs statsBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
80
81 sbsMu.Lock()
82 sbs.stats["stat"]++
83 sbsMu.Unlock()
84
85 return sbs.blobs.Stat(ctx, dgst)
86 }
87
88 func (sbs statsBlobStore) Delete(ctx context.Context, dgst digest.Digest) error {
89 sbsMu.Lock()
90 sbs.stats["delete"]++
91 sbsMu.Unlock()
92
93 return sbs.blobs.Delete(ctx, dgst)
94 }
95
96 type testEnv struct {
97 numUnique int
98 inRemote []distribution.Descriptor
99 store proxyBlobStore
100 ctx context.Context
101 }
102
103 func (te *testEnv) LocalStats() *map[string]int {
104 sbsMu.Lock()
105 ls := te.store.localStore.(statsBlobStore).stats
106 sbsMu.Unlock()
107 return &ls
108 }
109
110 func (te *testEnv) RemoteStats() *map[string]int {
111 sbsMu.Lock()
112 rs := te.store.remoteStore.(statsBlobStore).stats
113 sbsMu.Unlock()
114 return &rs
115 }
116
117
118 func makeTestEnv(t *testing.T, name string) *testEnv {
119 nameRef, err := reference.WithName(name)
120 if err != nil {
121 t.Fatalf("unable to parse reference: %s", err)
122 }
123
124 ctx := context.Background()
125
126 truthDir, err := ioutil.TempDir("", "truth")
127 if err != nil {
128 t.Fatalf("unable to create tempdir: %s", err)
129 }
130
131 cacheDir, err := ioutil.TempDir("", "cache")
132 if err != nil {
133 t.Fatalf("unable to create tempdir: %s", err)
134 }
135
136 localDriver, err := filesystem.FromParameters(map[string]interface{}{
137 "rootdirectory": truthDir,
138 })
139 if err != nil {
140 t.Fatalf("unable to create filesystem driver: %s", err)
141 }
142
143
144 localRegistry, err := storage.NewRegistry(ctx, localDriver, storage.BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()), storage.EnableRedirect, storage.DisableDigestResumption)
145 if err != nil {
146 t.Fatalf("error creating registry: %v", err)
147 }
148 localRepo, err := localRegistry.Repository(ctx, nameRef)
149 if err != nil {
150 t.Fatalf("unexpected error getting repo: %v", err)
151 }
152
153 cacheDriver, err := filesystem.FromParameters(map[string]interface{}{
154 "rootdirectory": cacheDir,
155 })
156 if err != nil {
157 t.Fatalf("unable to create filesystem driver: %s", err)
158 }
159
160 truthRegistry, err := storage.NewRegistry(ctx, cacheDriver, storage.BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()))
161 if err != nil {
162 t.Fatalf("error creating registry: %v", err)
163 }
164 truthRepo, err := truthRegistry.Repository(ctx, nameRef)
165 if err != nil {
166 t.Fatalf("unexpected error getting repo: %v", err)
167 }
168
169 truthBlobs := statsBlobStore{
170 stats: make(map[string]int),
171 blobs: truthRepo.Blobs(ctx),
172 }
173
174 localBlobs := statsBlobStore{
175 stats: make(map[string]int),
176 blobs: localRepo.Blobs(ctx),
177 }
178
179 s := scheduler.New(ctx, inmemory.New(), "/scheduler-state.json")
180
181 proxyBlobStore := proxyBlobStore{
182 repositoryName: nameRef,
183 remoteStore: truthBlobs,
184 localStore: localBlobs,
185 scheduler: s,
186 authChallenger: &mockChallenger{},
187 }
188
189 te := &testEnv{
190 store: proxyBlobStore,
191 ctx: ctx,
192 }
193 return te
194 }
195
196 func makeBlob(size int) []byte {
197 blob := make([]byte, size)
198 for i := 0; i < size; i++ {
199 blob[i] = byte('A' + randSource.Int()%48)
200 }
201 return blob
202 }
203
204 func init() {
205 randSource = *rand.New(rand.NewSource(42))
206 }
207
208 func populate(t *testing.T, te *testEnv, blobCount, size, numUnique int) {
209 var inRemote []distribution.Descriptor
210
211 for i := 0; i < numUnique; i++ {
212 bytes := makeBlob(size)
213 for j := 0; j < blobCount/numUnique; j++ {
214 desc, err := te.store.remoteStore.Put(te.ctx, "", bytes)
215 if err != nil {
216 t.Fatalf("Put in store")
217 }
218
219 inRemote = append(inRemote, desc)
220 }
221 }
222
223 te.inRemote = inRemote
224 te.numUnique = numUnique
225 }
226 func TestProxyStoreGet(t *testing.T) {
227 te := makeTestEnv(t, "foo/bar")
228
229 localStats := te.LocalStats()
230 remoteStats := te.RemoteStats()
231
232 populate(t, te, 1, 10, 1)
233 _, err := te.store.Get(te.ctx, te.inRemote[0].Digest)
234 if err != nil {
235 t.Fatal(err)
236 }
237
238 if (*localStats)["get"] != 1 && (*localStats)["put"] != 1 {
239 t.Errorf("Unexpected local counts")
240 }
241
242 if (*remoteStats)["get"] != 1 {
243 t.Errorf("Unexpected remote get count")
244 }
245
246 _, err = te.store.Get(te.ctx, te.inRemote[0].Digest)
247 if err != nil {
248 t.Fatal(err)
249 }
250
251 if (*localStats)["get"] != 2 && (*localStats)["put"] != 1 {
252 t.Errorf("Unexpected local counts")
253 }
254
255 if (*remoteStats)["get"] != 1 {
256 t.Errorf("Unexpected remote get count")
257 }
258
259 }
260
261 func TestProxyStoreStat(t *testing.T) {
262 te := makeTestEnv(t, "foo/bar")
263
264 remoteBlobCount := 1
265 populate(t, te, remoteBlobCount, 10, 1)
266
267 localStats := te.LocalStats()
268 remoteStats := te.RemoteStats()
269
270
271 for _, d := range te.inRemote {
272 _, err := te.store.Stat(te.ctx, d.Digest)
273 if err != nil {
274 t.Fatalf("Error stating proxy store")
275 }
276 }
277
278 if (*localStats)["stat"] != remoteBlobCount {
279 t.Errorf("Unexpected local stat count")
280 }
281
282 if (*remoteStats)["stat"] != remoteBlobCount {
283 t.Errorf("Unexpected remote stat count")
284 }
285
286 if te.store.authChallenger.(*mockChallenger).count != len(te.inRemote) {
287 t.Fatalf("Unexpected auth challenge count, got %#v", te.store.authChallenger)
288 }
289
290 }
291
292 func TestProxyStoreServeHighConcurrency(t *testing.T) {
293 te := makeTestEnv(t, "foo/bar")
294 blobSize := 200
295 blobCount := 10
296 numUnique := 1
297 populate(t, te, blobCount, blobSize, numUnique)
298
299 numClients := 16
300 testProxyStoreServe(t, te, numClients)
301 }
302
303 func TestProxyStoreServeMany(t *testing.T) {
304 te := makeTestEnv(t, "foo/bar")
305 blobSize := 200
306 blobCount := 10
307 numUnique := 4
308 populate(t, te, blobCount, blobSize, numUnique)
309
310 numClients := 4
311 testProxyStoreServe(t, te, numClients)
312 }
313
314
315 func TestProxyStoreServeBig(t *testing.T) {
316 te := makeTestEnv(t, "foo/bar")
317
318 blobSize := 2 << 20
319 blobCount := 4
320 numUnique := 2
321 populate(t, te, blobCount, blobSize, numUnique)
322
323 numClients := 4
324 testProxyStoreServe(t, te, numClients)
325 }
326
327
328
329 func testProxyStoreServe(t *testing.T, te *testEnv, numClients int) {
330 localStats := te.LocalStats()
331 remoteStats := te.RemoteStats()
332
333 var wg sync.WaitGroup
334
335 for i := 0; i < numClients; i++ {
336
337 wg.Add(1)
338 go func() {
339 defer wg.Done()
340 for _, remoteBlob := range te.inRemote {
341 w := httptest.NewRecorder()
342 r, err := http.NewRequest("GET", "", nil)
343 if err != nil {
344 t.Error(err)
345 return
346 }
347
348 err = te.store.ServeBlob(te.ctx, w, r, remoteBlob.Digest)
349 if err != nil {
350 t.Errorf(err.Error())
351 return
352 }
353
354 bodyBytes := w.Body.Bytes()
355 localDigest := digest.FromBytes(bodyBytes)
356 if localDigest != remoteBlob.Digest {
357 t.Errorf("Mismatching blob fetch from proxy")
358 return
359 }
360 }
361 }()
362 }
363
364 wg.Wait()
365 if t.Failed() {
366 t.FailNow()
367 }
368
369 remoteBlobCount := len(te.inRemote)
370 sbsMu.Lock()
371 if (*localStats)["stat"] != remoteBlobCount*numClients && (*localStats)["create"] != te.numUnique {
372 sbsMu.Unlock()
373 t.Fatal("Expected: stat:", remoteBlobCount*numClients, "create:", remoteBlobCount)
374 }
375 sbsMu.Unlock()
376
377
378 time.Sleep(3 * time.Second)
379
380 sbsMu.Lock()
381 remoteStatCount := (*remoteStats)["stat"]
382 remoteOpenCount := (*remoteStats)["open"]
383 sbsMu.Unlock()
384
385
386 for _, dr := range te.inRemote {
387 w := httptest.NewRecorder()
388 r, err := http.NewRequest("GET", "", nil)
389 if err != nil {
390 t.Fatal(err)
391 }
392
393 err = te.store.ServeBlob(te.ctx, w, r, dr.Digest)
394 if err != nil {
395 t.Fatalf(err.Error())
396 }
397
398 dl := digest.FromBytes(w.Body.Bytes())
399 if dl != dr.Digest {
400 t.Errorf("Mismatching blob fetch from proxy")
401 }
402 }
403
404 remoteStats = te.RemoteStats()
405
406
407 sbsMu.Lock()
408 defer sbsMu.Unlock()
409 if (*remoteStats)["stat"] != remoteStatCount && (*remoteStats)["open"] != remoteOpenCount {
410 t.Fatalf("unexpected remote stats: %#v", remoteStats)
411 }
412 }
413
View as plain text