...

Source file src/github.com/go-kit/kit/sd/consul/instancer_test.go

Documentation: github.com/go-kit/kit/sd/consul

     1  package consul
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"io"
     7  	"testing"
     8  	"time"
     9  
    10  	consul "github.com/hashicorp/consul/api"
    11  
    12  	"github.com/go-kit/kit/sd"
    13  	"github.com/go-kit/log"
    14  )
    15  
    16  var _ sd.Instancer = (*Instancer)(nil) // API check
    17  
    18  var consulState = []*consul.ServiceEntry{
    19  	{
    20  		Node: &consul.Node{
    21  			Address: "10.0.0.0",
    22  			Node:    "app00.local",
    23  		},
    24  		Service: &consul.AgentService{
    25  			ID:      "search-api-0",
    26  			Port:    8000,
    27  			Service: "search",
    28  			Tags: []string{
    29  				"api",
    30  				"v1",
    31  			},
    32  		},
    33  	},
    34  	{
    35  		Node: &consul.Node{
    36  			Address: "10.0.0.1",
    37  			Node:    "app01.local",
    38  		},
    39  		Service: &consul.AgentService{
    40  			ID:      "search-api-1",
    41  			Port:    8001,
    42  			Service: "search",
    43  			Tags: []string{
    44  				"api",
    45  				"v2",
    46  			},
    47  		},
    48  	},
    49  	{
    50  		Node: &consul.Node{
    51  			Address: "10.0.0.1",
    52  			Node:    "app01.local",
    53  		},
    54  		Service: &consul.AgentService{
    55  			Address: "10.0.0.10",
    56  			ID:      "search-db-0",
    57  			Port:    9000,
    58  			Service: "search",
    59  			Tags: []string{
    60  				"db",
    61  			},
    62  		},
    63  	},
    64  }
    65  
    66  func TestInstancer(t *testing.T) {
    67  	var (
    68  		logger = log.NewNopLogger()
    69  		client = newTestClient(consulState)
    70  	)
    71  
    72  	s := NewInstancer(client, logger, "search", []string{"api"}, true)
    73  	defer s.Stop()
    74  
    75  	state := s.cache.State()
    76  	if want, have := 2, len(state.Instances); want != have {
    77  		t.Errorf("want %d, have %d", want, have)
    78  	}
    79  }
    80  
    81  func TestInstancerNoService(t *testing.T) {
    82  	var (
    83  		logger = log.NewNopLogger()
    84  		client = newTestClient(consulState)
    85  	)
    86  
    87  	s := NewInstancer(client, logger, "feed", []string{}, true)
    88  	defer s.Stop()
    89  
    90  	state := s.cache.State()
    91  	if want, have := 0, len(state.Instances); want != have {
    92  		t.Fatalf("want %d, have %d", want, have)
    93  	}
    94  }
    95  
    96  func TestInstancerWithTags(t *testing.T) {
    97  	var (
    98  		logger = log.NewNopLogger()
    99  		client = newTestClient(consulState)
   100  	)
   101  
   102  	s := NewInstancer(client, logger, "search", []string{"api", "v2"}, true)
   103  	defer s.Stop()
   104  
   105  	state := s.cache.State()
   106  	if want, have := 1, len(state.Instances); want != have {
   107  		t.Fatalf("want %d, have %d", want, have)
   108  	}
   109  }
   110  
   111  func TestInstancerAddressOverride(t *testing.T) {
   112  	s := NewInstancer(newTestClient(consulState), log.NewNopLogger(), "search", []string{"db"}, true)
   113  	defer s.Stop()
   114  
   115  	state := s.cache.State()
   116  	if want, have := 1, len(state.Instances); want != have {
   117  		t.Fatalf("want %d, have %d", want, have)
   118  	}
   119  
   120  	endpoint, closer, err := testFactory(state.Instances[0])
   121  	if err != nil {
   122  		t.Fatal(err)
   123  	}
   124  	if closer != nil {
   125  		defer closer.Close()
   126  	}
   127  
   128  	response, err := endpoint(context.Background(), struct{}{})
   129  	if err != nil {
   130  		t.Fatal(err)
   131  	}
   132  
   133  	if want, have := "10.0.0.10:9000", response.(string); want != have {
   134  		t.Errorf("want %q, have %q", want, have)
   135  	}
   136  }
   137  
   138  type eofTestClient struct {
   139  	client *testClient
   140  	eofSig chan bool
   141  	called chan struct{}
   142  }
   143  
   144  func neweofTestClient(client *testClient, sig chan bool, called chan struct{}) Client {
   145  	return &eofTestClient{client: client, eofSig: sig, called: called}
   146  }
   147  
   148  func (c *eofTestClient) Register(r *consul.AgentServiceRegistration) error {
   149  	return c.client.Register(r)
   150  }
   151  
   152  func (c *eofTestClient) Deregister(r *consul.AgentServiceRegistration) error {
   153  	return c.client.Deregister(r)
   154  }
   155  
   156  func (c *eofTestClient) Service(service, tag string, passingOnly bool, queryOpts *consul.QueryOptions) ([]*consul.ServiceEntry, *consul.QueryMeta, error) {
   157  	c.called <- struct{}{}
   158  	shouldEOF := <-c.eofSig
   159  	if shouldEOF {
   160  		return nil, &consul.QueryMeta{}, io.EOF
   161  	}
   162  	return c.client.Service(service, tag, passingOnly, queryOpts)
   163  }
   164  
   165  func TestInstancerWithEOF(t *testing.T) {
   166  	var (
   167  		sig    = make(chan bool, 1)
   168  		called = make(chan struct{}, 1)
   169  		logger = log.NewNopLogger()
   170  		client = neweofTestClient(newTestClient(consulState), sig, called)
   171  	)
   172  
   173  	sig <- false
   174  	s := NewInstancer(client, logger, "search", []string{"api"}, true)
   175  	defer s.Stop()
   176  
   177  	select {
   178  	case <-called:
   179  	case <-time.Tick(time.Millisecond * 500):
   180  		t.Error("failed, to receive call")
   181  	}
   182  
   183  	state := s.cache.State()
   184  	if want, have := 2, len(state.Instances); want != have {
   185  		t.Errorf("want %d, have %d", want, have)
   186  	}
   187  
   188  	// some error occurred resulting in io.EOF
   189  	sig <- true
   190  
   191  	// Service Called Once
   192  	select {
   193  	case <-called:
   194  	case <-time.Tick(time.Millisecond * 500):
   195  		t.Error("failed, to receive call in time")
   196  	}
   197  
   198  	sig <- false
   199  
   200  	// loop should continue
   201  	select {
   202  	case <-called:
   203  	case <-time.Tick(time.Millisecond * 500):
   204  		t.Error("failed, to receive call in time")
   205  	}
   206  }
   207  
   208  type badIndexTestClient struct {
   209  	client *testClient
   210  	called chan struct{}
   211  }
   212  
   213  func newBadIndexTestClient(client *testClient, called chan struct{}) Client {
   214  	return &badIndexTestClient{client: client, called: called}
   215  }
   216  
   217  func (c *badIndexTestClient) Register(r *consul.AgentServiceRegistration) error {
   218  	return c.client.Register(r)
   219  }
   220  
   221  func (c *badIndexTestClient) Deregister(r *consul.AgentServiceRegistration) error {
   222  	return c.client.Deregister(r)
   223  }
   224  
   225  func (c *badIndexTestClient) Service(service, tag string, passingOnly bool, queryOpts *consul.QueryOptions) ([]*consul.ServiceEntry, *consul.QueryMeta, error) {
   226  	switch {
   227  	case queryOpts.WaitIndex == 0:
   228  		queryOpts.WaitIndex = 100
   229  	case queryOpts.WaitIndex == 100:
   230  		queryOpts.WaitIndex = 99
   231  	default:
   232  	}
   233  	c.called <- struct{}{}
   234  	return c.client.Service(service, tag, passingOnly, queryOpts)
   235  }
   236  
   237  func TestInstancerWithInvalidIndex(t *testing.T) {
   238  	var (
   239  		called = make(chan struct{}, 1)
   240  		logger = log.NewNopLogger()
   241  		client = newBadIndexTestClient(newTestClient(consulState), called)
   242  	)
   243  
   244  	s := NewInstancer(client, logger, "search", []string{"api"}, true)
   245  	defer s.Stop()
   246  
   247  	select {
   248  	case <-called:
   249  	case <-time.Tick(time.Millisecond * 500):
   250  		t.Error("failed, to receive call")
   251  	}
   252  
   253  	state := s.cache.State()
   254  	if want, have := 2, len(state.Instances); want != have {
   255  		t.Errorf("want %d, have %d", want, have)
   256  	}
   257  
   258  	// loop should continue
   259  	select {
   260  	case <-called:
   261  	case <-time.Tick(time.Millisecond * 500):
   262  		t.Error("failed, to receive call in time")
   263  	}
   264  }
   265  
   266  type indexTestClient struct {
   267  	client *testClient
   268  	index  uint64
   269  	errs   chan error
   270  }
   271  
   272  func newIndexTestClient(c *testClient, errs chan error) *indexTestClient {
   273  	return &indexTestClient{
   274  		client: c,
   275  		index:  0,
   276  		errs:   errs,
   277  	}
   278  }
   279  
   280  func (i *indexTestClient) Register(r *consul.AgentServiceRegistration) error {
   281  	return i.client.Register(r)
   282  }
   283  
   284  func (i *indexTestClient) Deregister(r *consul.AgentServiceRegistration) error {
   285  	return i.client.Deregister(r)
   286  }
   287  
   288  func (i *indexTestClient) Service(service, tag string, passingOnly bool, queryOpts *consul.QueryOptions) ([]*consul.ServiceEntry, *consul.QueryMeta, error) {
   289  
   290  	// Assumes this is the first call Service, loop hasn't begun running yet
   291  	if i.index == 0 && queryOpts.WaitIndex == 0 {
   292  		i.index = 100
   293  		entries, meta, err := i.client.Service(service, tag, passingOnly, queryOpts)
   294  		meta.LastIndex = i.index
   295  		return entries, meta, err
   296  	}
   297  
   298  	if queryOpts.WaitIndex < i.index {
   299  		i.errs <- fmt.Errorf("wait index %d is less than or equal to previous value", queryOpts.WaitIndex)
   300  	}
   301  
   302  	entries, meta, err := i.client.Service(service, tag, passingOnly, queryOpts)
   303  	i.index++
   304  	meta.LastIndex = i.index
   305  	return entries, meta, err
   306  }
   307  
   308  func TestInstancerLoopIndex(t *testing.T) {
   309  
   310  	var (
   311  		errs   = make(chan error, 1)
   312  		logger = log.NewNopLogger()
   313  		client = newIndexTestClient(newTestClient(consulState), errs)
   314  	)
   315  
   316  	go func() {
   317  		for err := range errs {
   318  			t.Error(err)
   319  			t.FailNow()
   320  		}
   321  	}()
   322  
   323  	instancer := NewInstancer(client, logger, "search", []string{"api"}, true)
   324  	defer instancer.Stop()
   325  
   326  	time.Sleep(2 * time.Second)
   327  }
   328  

View as plain text