...

Source file src/github.com/go-kit/kit/sd/endpoint_cache_test.go

Documentation: github.com/go-kit/kit/sd

     1  package sd
     2  
     3  import (
     4  	"errors"
     5  	"io"
     6  	"testing"
     7  	"time"
     8  
     9  	"github.com/go-kit/kit/endpoint"
    10  	"github.com/go-kit/log"
    11  )
    12  
    13  func TestEndpointCache(t *testing.T) {
    14  	var (
    15  		ca    = make(closer)
    16  		cb    = make(closer)
    17  		c     = map[string]io.Closer{"a": ca, "b": cb}
    18  		f     = func(instance string) (endpoint.Endpoint, io.Closer, error) { return endpoint.Nop, c[instance], nil }
    19  		cache = newEndpointCache(f, log.NewNopLogger(), endpointerOptions{})
    20  	)
    21  
    22  	// Populate
    23  	cache.Update(Event{Instances: []string{"a", "b"}})
    24  	select {
    25  	case <-ca:
    26  		t.Errorf("endpoint a closed, not good")
    27  	case <-cb:
    28  		t.Errorf("endpoint b closed, not good")
    29  	case <-time.After(time.Millisecond):
    30  		t.Logf("no closures yet, good")
    31  	}
    32  	assertEndpointsLen(t, cache, 2)
    33  
    34  	// Duplicate, should be no-op
    35  	cache.Update(Event{Instances: []string{"a", "b"}})
    36  	select {
    37  	case <-ca:
    38  		t.Errorf("endpoint a closed, not good")
    39  	case <-cb:
    40  		t.Errorf("endpoint b closed, not good")
    41  	case <-time.After(time.Millisecond):
    42  		t.Logf("no closures yet, good")
    43  	}
    44  	assertEndpointsLen(t, cache, 2)
    45  
    46  	// Error, should continue returning old endpoints
    47  	cache.Update(Event{Err: errors.New("sd error")})
    48  	select {
    49  	case <-ca:
    50  		t.Errorf("endpoint a closed, not good")
    51  	case <-cb:
    52  		t.Errorf("endpoint b closed, not good")
    53  	case <-time.After(time.Millisecond):
    54  		t.Logf("no closures yet, good")
    55  	}
    56  	assertEndpointsLen(t, cache, 2)
    57  
    58  	// Delete b
    59  	go cache.Update(Event{Instances: []string{"a"}})
    60  	select {
    61  	case <-ca:
    62  		t.Errorf("endpoint a closed, not good")
    63  	case <-cb:
    64  		t.Logf("endpoint b closed, good")
    65  	case <-time.After(time.Second):
    66  		t.Errorf("didn't close the deleted instance in time")
    67  	}
    68  	assertEndpointsLen(t, cache, 1)
    69  
    70  	// Delete a
    71  	go cache.Update(Event{Instances: []string{}})
    72  	select {
    73  	// case <-cb: will succeed, as it's closed
    74  	case <-ca:
    75  		t.Logf("endpoint a closed, good")
    76  	case <-time.After(time.Second):
    77  		t.Errorf("didn't close the deleted instance in time")
    78  	}
    79  	assertEndpointsLen(t, cache, 0)
    80  }
    81  
    82  func TestEndpointCacheErrorAndTimeout(t *testing.T) {
    83  	var (
    84  		ca      = make(closer)
    85  		cb      = make(closer)
    86  		c       = map[string]io.Closer{"a": ca, "b": cb}
    87  		f       = func(instance string) (endpoint.Endpoint, io.Closer, error) { return endpoint.Nop, c[instance], nil }
    88  		timeOut = 100 * time.Millisecond
    89  		cache   = newEndpointCache(f, log.NewNopLogger(), endpointerOptions{
    90  			invalidateOnError: true,
    91  			invalidateTimeout: timeOut,
    92  		})
    93  	)
    94  
    95  	timeNow := time.Now()
    96  	cache.timeNow = func() time.Time { return timeNow }
    97  
    98  	// Populate
    99  	cache.Update(Event{Instances: []string{"a"}})
   100  	select {
   101  	case <-ca:
   102  		t.Errorf("endpoint a closed, not good")
   103  	case <-time.After(time.Millisecond):
   104  		t.Logf("no closures yet, good")
   105  	}
   106  	assertEndpointsLen(t, cache, 1)
   107  
   108  	// Send error, keep time still.
   109  	cache.Update(Event{Err: errors.New("sd error")})
   110  	select {
   111  	case <-ca:
   112  		t.Errorf("endpoint a closed, not good")
   113  	case <-time.After(time.Millisecond):
   114  		t.Logf("no closures yet, good")
   115  	}
   116  	assertEndpointsLen(t, cache, 1)
   117  
   118  	// Move the time, but less than the timeout
   119  	timeNow = timeNow.Add(timeOut / 2)
   120  	assertEndpointsLen(t, cache, 1)
   121  	select {
   122  	case <-ca:
   123  		t.Errorf("endpoint a closed, not good")
   124  	case <-time.After(time.Millisecond):
   125  		t.Logf("no closures yet, good")
   126  	}
   127  
   128  	// Move the time past the timeout
   129  	timeNow = timeNow.Add(timeOut)
   130  	assertEndpointsError(t, cache, "sd error")
   131  	select {
   132  	case <-ca:
   133  		t.Logf("endpoint a closed, good")
   134  	case <-time.After(time.Millisecond):
   135  		t.Errorf("didn't close the deleted instance in time")
   136  	}
   137  
   138  	// Send another error
   139  	cache.Update(Event{Err: errors.New("another sd error")})
   140  	assertEndpointsError(t, cache, "sd error") // expect original error
   141  }
   142  
   143  func TestBadFactory(t *testing.T) {
   144  	cache := newEndpointCache(func(string) (endpoint.Endpoint, io.Closer, error) {
   145  		return nil, nil, errors.New("bad factory")
   146  	}, log.NewNopLogger(), endpointerOptions{})
   147  
   148  	cache.Update(Event{Instances: []string{"foo:1234", "bar:5678"}})
   149  	assertEndpointsLen(t, cache, 0)
   150  }
   151  
   152  func assertEndpointsLen(t *testing.T, cache *endpointCache, l int) {
   153  	endpoints, err := cache.Endpoints()
   154  	if err != nil {
   155  		t.Errorf("unexpected error %v", err)
   156  		return
   157  	}
   158  	if want, have := l, len(endpoints); want != have {
   159  		t.Errorf("want %d, have %d", want, have)
   160  	}
   161  }
   162  
   163  func assertEndpointsError(t *testing.T, cache *endpointCache, wantErr string) {
   164  	endpoints, err := cache.Endpoints()
   165  	if err == nil {
   166  		t.Errorf("expecting error, not good")
   167  		return
   168  	}
   169  	if want, have := wantErr, err.Error(); want != have {
   170  		t.Errorf("want %s, have %s", want, have)
   171  		return
   172  	}
   173  	if want, have := 0, len(endpoints); want != have {
   174  		t.Errorf("want %d, have %d", want, have)
   175  	}
   176  }
   177  
   178  type closer chan struct{}
   179  
   180  func (c closer) Close() error { close(c); return nil }
   181  

View as plain text