...

Source file src/github.com/docker/distribution/registry/proxy/proxyblobstore_test.go

Documentation: github.com/docker/distribution/registry/proxy

     1  package proxy
     2  
     3  import (
     4  	"context"
     5  	"io/ioutil"
     6  	"math/rand"
     7  	"net/http"
     8  	"net/http/httptest"
     9  	"sync"
    10  	"testing"
    11  	"time"
    12  
    13  	"github.com/distribution/reference"
    14  	"github.com/docker/distribution"
    15  	"github.com/docker/distribution/registry/proxy/scheduler"
    16  	"github.com/docker/distribution/registry/storage"
    17  	"github.com/docker/distribution/registry/storage/cache/memory"
    18  	"github.com/docker/distribution/registry/storage/driver/filesystem"
    19  	"github.com/docker/distribution/registry/storage/driver/inmemory"
    20  	"github.com/opencontainers/go-digest"
    21  )
    22  
    23  var sbsMu sync.Mutex
    24  var randSource rand.Rand
    25  
    26  type statsBlobStore struct {
    27  	stats map[string]int
    28  	blobs distribution.BlobStore
    29  }
    30  
    31  func (sbs statsBlobStore) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) {
    32  	sbsMu.Lock()
    33  	sbs.stats["put"]++
    34  	sbsMu.Unlock()
    35  
    36  	return sbs.blobs.Put(ctx, mediaType, p)
    37  }
    38  
    39  func (sbs statsBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) {
    40  	sbsMu.Lock()
    41  	sbs.stats["get"]++
    42  	sbsMu.Unlock()
    43  
    44  	return sbs.blobs.Get(ctx, dgst)
    45  }
    46  
    47  func (sbs statsBlobStore) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) {
    48  	sbsMu.Lock()
    49  	sbs.stats["create"]++
    50  	sbsMu.Unlock()
    51  
    52  	return sbs.blobs.Create(ctx, options...)
    53  }
    54  
    55  func (sbs statsBlobStore) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) {
    56  	sbsMu.Lock()
    57  	sbs.stats["resume"]++
    58  	sbsMu.Unlock()
    59  
    60  	return sbs.blobs.Resume(ctx, id)
    61  }
    62  
    63  func (sbs statsBlobStore) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) {
    64  	sbsMu.Lock()
    65  	sbs.stats["open"]++
    66  	sbsMu.Unlock()
    67  
    68  	return sbs.blobs.Open(ctx, dgst)
    69  }
    70  
    71  func (sbs statsBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error {
    72  	sbsMu.Lock()
    73  	sbs.stats["serveblob"]++
    74  	sbsMu.Unlock()
    75  
    76  	return sbs.blobs.ServeBlob(ctx, w, r, dgst)
    77  }
    78  
    79  func (sbs statsBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
    80  
    81  	sbsMu.Lock()
    82  	sbs.stats["stat"]++
    83  	sbsMu.Unlock()
    84  
    85  	return sbs.blobs.Stat(ctx, dgst)
    86  }
    87  
    88  func (sbs statsBlobStore) Delete(ctx context.Context, dgst digest.Digest) error {
    89  	sbsMu.Lock()
    90  	sbs.stats["delete"]++
    91  	sbsMu.Unlock()
    92  
    93  	return sbs.blobs.Delete(ctx, dgst)
    94  }
    95  
    96  type testEnv struct {
    97  	numUnique int
    98  	inRemote  []distribution.Descriptor
    99  	store     proxyBlobStore
   100  	ctx       context.Context
   101  }
   102  
   103  func (te *testEnv) LocalStats() *map[string]int {
   104  	sbsMu.Lock()
   105  	ls := te.store.localStore.(statsBlobStore).stats
   106  	sbsMu.Unlock()
   107  	return &ls
   108  }
   109  
   110  func (te *testEnv) RemoteStats() *map[string]int {
   111  	sbsMu.Lock()
   112  	rs := te.store.remoteStore.(statsBlobStore).stats
   113  	sbsMu.Unlock()
   114  	return &rs
   115  }
   116  
   117  // Populate remote store and record the digests
   118  func makeTestEnv(t *testing.T, name string) *testEnv {
   119  	nameRef, err := reference.WithName(name)
   120  	if err != nil {
   121  		t.Fatalf("unable to parse reference: %s", err)
   122  	}
   123  
   124  	ctx := context.Background()
   125  
   126  	truthDir, err := ioutil.TempDir("", "truth")
   127  	if err != nil {
   128  		t.Fatalf("unable to create tempdir: %s", err)
   129  	}
   130  
   131  	cacheDir, err := ioutil.TempDir("", "cache")
   132  	if err != nil {
   133  		t.Fatalf("unable to create tempdir: %s", err)
   134  	}
   135  
   136  	localDriver, err := filesystem.FromParameters(map[string]interface{}{
   137  		"rootdirectory": truthDir,
   138  	})
   139  	if err != nil {
   140  		t.Fatalf("unable to create filesystem driver: %s", err)
   141  	}
   142  
   143  	// todo: create a tempfile area here
   144  	localRegistry, err := storage.NewRegistry(ctx, localDriver, storage.BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()), storage.EnableRedirect, storage.DisableDigestResumption)
   145  	if err != nil {
   146  		t.Fatalf("error creating registry: %v", err)
   147  	}
   148  	localRepo, err := localRegistry.Repository(ctx, nameRef)
   149  	if err != nil {
   150  		t.Fatalf("unexpected error getting repo: %v", err)
   151  	}
   152  
   153  	cacheDriver, err := filesystem.FromParameters(map[string]interface{}{
   154  		"rootdirectory": cacheDir,
   155  	})
   156  	if err != nil {
   157  		t.Fatalf("unable to create filesystem driver: %s", err)
   158  	}
   159  
   160  	truthRegistry, err := storage.NewRegistry(ctx, cacheDriver, storage.BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()))
   161  	if err != nil {
   162  		t.Fatalf("error creating registry: %v", err)
   163  	}
   164  	truthRepo, err := truthRegistry.Repository(ctx, nameRef)
   165  	if err != nil {
   166  		t.Fatalf("unexpected error getting repo: %v", err)
   167  	}
   168  
   169  	truthBlobs := statsBlobStore{
   170  		stats: make(map[string]int),
   171  		blobs: truthRepo.Blobs(ctx),
   172  	}
   173  
   174  	localBlobs := statsBlobStore{
   175  		stats: make(map[string]int),
   176  		blobs: localRepo.Blobs(ctx),
   177  	}
   178  
   179  	s := scheduler.New(ctx, inmemory.New(), "/scheduler-state.json")
   180  
   181  	proxyBlobStore := proxyBlobStore{
   182  		repositoryName: nameRef,
   183  		remoteStore:    truthBlobs,
   184  		localStore:     localBlobs,
   185  		scheduler:      s,
   186  		authChallenger: &mockChallenger{},
   187  	}
   188  
   189  	te := &testEnv{
   190  		store: proxyBlobStore,
   191  		ctx:   ctx,
   192  	}
   193  	return te
   194  }
   195  
   196  func makeBlob(size int) []byte {
   197  	blob := make([]byte, size)
   198  	for i := 0; i < size; i++ {
   199  		blob[i] = byte('A' + randSource.Int()%48)
   200  	}
   201  	return blob
   202  }
   203  
   204  func init() {
   205  	randSource = *rand.New(rand.NewSource(42))
   206  }
   207  
   208  func populate(t *testing.T, te *testEnv, blobCount, size, numUnique int) {
   209  	var inRemote []distribution.Descriptor
   210  
   211  	for i := 0; i < numUnique; i++ {
   212  		bytes := makeBlob(size)
   213  		for j := 0; j < blobCount/numUnique; j++ {
   214  			desc, err := te.store.remoteStore.Put(te.ctx, "", bytes)
   215  			if err != nil {
   216  				t.Fatalf("Put in store")
   217  			}
   218  
   219  			inRemote = append(inRemote, desc)
   220  		}
   221  	}
   222  
   223  	te.inRemote = inRemote
   224  	te.numUnique = numUnique
   225  }
   226  func TestProxyStoreGet(t *testing.T) {
   227  	te := makeTestEnv(t, "foo/bar")
   228  
   229  	localStats := te.LocalStats()
   230  	remoteStats := te.RemoteStats()
   231  
   232  	populate(t, te, 1, 10, 1)
   233  	_, err := te.store.Get(te.ctx, te.inRemote[0].Digest)
   234  	if err != nil {
   235  		t.Fatal(err)
   236  	}
   237  
   238  	if (*localStats)["get"] != 1 && (*localStats)["put"] != 1 {
   239  		t.Errorf("Unexpected local counts")
   240  	}
   241  
   242  	if (*remoteStats)["get"] != 1 {
   243  		t.Errorf("Unexpected remote get count")
   244  	}
   245  
   246  	_, err = te.store.Get(te.ctx, te.inRemote[0].Digest)
   247  	if err != nil {
   248  		t.Fatal(err)
   249  	}
   250  
   251  	if (*localStats)["get"] != 2 && (*localStats)["put"] != 1 {
   252  		t.Errorf("Unexpected local counts")
   253  	}
   254  
   255  	if (*remoteStats)["get"] != 1 {
   256  		t.Errorf("Unexpected remote get count")
   257  	}
   258  
   259  }
   260  
   261  func TestProxyStoreStat(t *testing.T) {
   262  	te := makeTestEnv(t, "foo/bar")
   263  
   264  	remoteBlobCount := 1
   265  	populate(t, te, remoteBlobCount, 10, 1)
   266  
   267  	localStats := te.LocalStats()
   268  	remoteStats := te.RemoteStats()
   269  
   270  	// Stat - touches both stores
   271  	for _, d := range te.inRemote {
   272  		_, err := te.store.Stat(te.ctx, d.Digest)
   273  		if err != nil {
   274  			t.Fatalf("Error stating proxy store")
   275  		}
   276  	}
   277  
   278  	if (*localStats)["stat"] != remoteBlobCount {
   279  		t.Errorf("Unexpected local stat count")
   280  	}
   281  
   282  	if (*remoteStats)["stat"] != remoteBlobCount {
   283  		t.Errorf("Unexpected remote stat count")
   284  	}
   285  
   286  	if te.store.authChallenger.(*mockChallenger).count != len(te.inRemote) {
   287  		t.Fatalf("Unexpected auth challenge count, got %#v", te.store.authChallenger)
   288  	}
   289  
   290  }
   291  
   292  func TestProxyStoreServeHighConcurrency(t *testing.T) {
   293  	te := makeTestEnv(t, "foo/bar")
   294  	blobSize := 200
   295  	blobCount := 10
   296  	numUnique := 1
   297  	populate(t, te, blobCount, blobSize, numUnique)
   298  
   299  	numClients := 16
   300  	testProxyStoreServe(t, te, numClients)
   301  }
   302  
   303  func TestProxyStoreServeMany(t *testing.T) {
   304  	te := makeTestEnv(t, "foo/bar")
   305  	blobSize := 200
   306  	blobCount := 10
   307  	numUnique := 4
   308  	populate(t, te, blobCount, blobSize, numUnique)
   309  
   310  	numClients := 4
   311  	testProxyStoreServe(t, te, numClients)
   312  }
   313  
   314  // todo(richardscothern): blobCount must be smaller than num clients
   315  func TestProxyStoreServeBig(t *testing.T) {
   316  	te := makeTestEnv(t, "foo/bar")
   317  
   318  	blobSize := 2 << 20
   319  	blobCount := 4
   320  	numUnique := 2
   321  	populate(t, te, blobCount, blobSize, numUnique)
   322  
   323  	numClients := 4
   324  	testProxyStoreServe(t, te, numClients)
   325  }
   326  
   327  // testProxyStoreServe will create clients to consume all blobs
   328  // populated in the truth store
   329  func testProxyStoreServe(t *testing.T, te *testEnv, numClients int) {
   330  	localStats := te.LocalStats()
   331  	remoteStats := te.RemoteStats()
   332  
   333  	var wg sync.WaitGroup
   334  
   335  	for i := 0; i < numClients; i++ {
   336  		// Serveblob - pulls through blobs
   337  		wg.Add(1)
   338  		go func() {
   339  			defer wg.Done()
   340  			for _, remoteBlob := range te.inRemote {
   341  				w := httptest.NewRecorder()
   342  				r, err := http.NewRequest("GET", "", nil)
   343  				if err != nil {
   344  					t.Error(err)
   345  					return
   346  				}
   347  
   348  				err = te.store.ServeBlob(te.ctx, w, r, remoteBlob.Digest)
   349  				if err != nil {
   350  					t.Errorf(err.Error())
   351  					return
   352  				}
   353  
   354  				bodyBytes := w.Body.Bytes()
   355  				localDigest := digest.FromBytes(bodyBytes)
   356  				if localDigest != remoteBlob.Digest {
   357  					t.Errorf("Mismatching blob fetch from proxy")
   358  					return
   359  				}
   360  			}
   361  		}()
   362  	}
   363  
   364  	wg.Wait()
   365  	if t.Failed() {
   366  		t.FailNow()
   367  	}
   368  
   369  	remoteBlobCount := len(te.inRemote)
   370  	sbsMu.Lock()
   371  	if (*localStats)["stat"] != remoteBlobCount*numClients && (*localStats)["create"] != te.numUnique {
   372  		sbsMu.Unlock()
   373  		t.Fatal("Expected: stat:", remoteBlobCount*numClients, "create:", remoteBlobCount)
   374  	}
   375  	sbsMu.Unlock()
   376  
   377  	// Wait for any async storage goroutines to finish
   378  	time.Sleep(3 * time.Second)
   379  
   380  	sbsMu.Lock()
   381  	remoteStatCount := (*remoteStats)["stat"]
   382  	remoteOpenCount := (*remoteStats)["open"]
   383  	sbsMu.Unlock()
   384  
   385  	// Serveblob - blobs come from local
   386  	for _, dr := range te.inRemote {
   387  		w := httptest.NewRecorder()
   388  		r, err := http.NewRequest("GET", "", nil)
   389  		if err != nil {
   390  			t.Fatal(err)
   391  		}
   392  
   393  		err = te.store.ServeBlob(te.ctx, w, r, dr.Digest)
   394  		if err != nil {
   395  			t.Fatalf(err.Error())
   396  		}
   397  
   398  		dl := digest.FromBytes(w.Body.Bytes())
   399  		if dl != dr.Digest {
   400  			t.Errorf("Mismatching blob fetch from proxy")
   401  		}
   402  	}
   403  
   404  	remoteStats = te.RemoteStats()
   405  
   406  	// Ensure remote unchanged
   407  	sbsMu.Lock()
   408  	defer sbsMu.Unlock()
   409  	if (*remoteStats)["stat"] != remoteStatCount && (*remoteStats)["open"] != remoteOpenCount {
   410  		t.Fatalf("unexpected remote stats: %#v", remoteStats)
   411  	}
   412  }
   413  

View as plain text