1 package consul
2
3 import (
4 "context"
5 "fmt"
6 "io"
7 "testing"
8 "time"
9
10 consul "github.com/hashicorp/consul/api"
11
12 "github.com/go-kit/kit/sd"
13 "github.com/go-kit/log"
14 )
15
16 var _ sd.Instancer = (*Instancer)(nil)
17
18 var consulState = []*consul.ServiceEntry{
19 {
20 Node: &consul.Node{
21 Address: "10.0.0.0",
22 Node: "app00.local",
23 },
24 Service: &consul.AgentService{
25 ID: "search-api-0",
26 Port: 8000,
27 Service: "search",
28 Tags: []string{
29 "api",
30 "v1",
31 },
32 },
33 },
34 {
35 Node: &consul.Node{
36 Address: "10.0.0.1",
37 Node: "app01.local",
38 },
39 Service: &consul.AgentService{
40 ID: "search-api-1",
41 Port: 8001,
42 Service: "search",
43 Tags: []string{
44 "api",
45 "v2",
46 },
47 },
48 },
49 {
50 Node: &consul.Node{
51 Address: "10.0.0.1",
52 Node: "app01.local",
53 },
54 Service: &consul.AgentService{
55 Address: "10.0.0.10",
56 ID: "search-db-0",
57 Port: 9000,
58 Service: "search",
59 Tags: []string{
60 "db",
61 },
62 },
63 },
64 }
65
66 func TestInstancer(t *testing.T) {
67 var (
68 logger = log.NewNopLogger()
69 client = newTestClient(consulState)
70 )
71
72 s := NewInstancer(client, logger, "search", []string{"api"}, true)
73 defer s.Stop()
74
75 state := s.cache.State()
76 if want, have := 2, len(state.Instances); want != have {
77 t.Errorf("want %d, have %d", want, have)
78 }
79 }
80
81 func TestInstancerNoService(t *testing.T) {
82 var (
83 logger = log.NewNopLogger()
84 client = newTestClient(consulState)
85 )
86
87 s := NewInstancer(client, logger, "feed", []string{}, true)
88 defer s.Stop()
89
90 state := s.cache.State()
91 if want, have := 0, len(state.Instances); want != have {
92 t.Fatalf("want %d, have %d", want, have)
93 }
94 }
95
96 func TestInstancerWithTags(t *testing.T) {
97 var (
98 logger = log.NewNopLogger()
99 client = newTestClient(consulState)
100 )
101
102 s := NewInstancer(client, logger, "search", []string{"api", "v2"}, true)
103 defer s.Stop()
104
105 state := s.cache.State()
106 if want, have := 1, len(state.Instances); want != have {
107 t.Fatalf("want %d, have %d", want, have)
108 }
109 }
110
111 func TestInstancerAddressOverride(t *testing.T) {
112 s := NewInstancer(newTestClient(consulState), log.NewNopLogger(), "search", []string{"db"}, true)
113 defer s.Stop()
114
115 state := s.cache.State()
116 if want, have := 1, len(state.Instances); want != have {
117 t.Fatalf("want %d, have %d", want, have)
118 }
119
120 endpoint, closer, err := testFactory(state.Instances[0])
121 if err != nil {
122 t.Fatal(err)
123 }
124 if closer != nil {
125 defer closer.Close()
126 }
127
128 response, err := endpoint(context.Background(), struct{}{})
129 if err != nil {
130 t.Fatal(err)
131 }
132
133 if want, have := "10.0.0.10:9000", response.(string); want != have {
134 t.Errorf("want %q, have %q", want, have)
135 }
136 }
137
138 type eofTestClient struct {
139 client *testClient
140 eofSig chan bool
141 called chan struct{}
142 }
143
144 func neweofTestClient(client *testClient, sig chan bool, called chan struct{}) Client {
145 return &eofTestClient{client: client, eofSig: sig, called: called}
146 }
147
148 func (c *eofTestClient) Register(r *consul.AgentServiceRegistration) error {
149 return c.client.Register(r)
150 }
151
152 func (c *eofTestClient) Deregister(r *consul.AgentServiceRegistration) error {
153 return c.client.Deregister(r)
154 }
155
156 func (c *eofTestClient) Service(service, tag string, passingOnly bool, queryOpts *consul.QueryOptions) ([]*consul.ServiceEntry, *consul.QueryMeta, error) {
157 c.called <- struct{}{}
158 shouldEOF := <-c.eofSig
159 if shouldEOF {
160 return nil, &consul.QueryMeta{}, io.EOF
161 }
162 return c.client.Service(service, tag, passingOnly, queryOpts)
163 }
164
165 func TestInstancerWithEOF(t *testing.T) {
166 var (
167 sig = make(chan bool, 1)
168 called = make(chan struct{}, 1)
169 logger = log.NewNopLogger()
170 client = neweofTestClient(newTestClient(consulState), sig, called)
171 )
172
173 sig <- false
174 s := NewInstancer(client, logger, "search", []string{"api"}, true)
175 defer s.Stop()
176
177 select {
178 case <-called:
179 case <-time.Tick(time.Millisecond * 500):
180 t.Error("failed, to receive call")
181 }
182
183 state := s.cache.State()
184 if want, have := 2, len(state.Instances); want != have {
185 t.Errorf("want %d, have %d", want, have)
186 }
187
188
189 sig <- true
190
191
192 select {
193 case <-called:
194 case <-time.Tick(time.Millisecond * 500):
195 t.Error("failed, to receive call in time")
196 }
197
198 sig <- false
199
200
201 select {
202 case <-called:
203 case <-time.Tick(time.Millisecond * 500):
204 t.Error("failed, to receive call in time")
205 }
206 }
207
208 type badIndexTestClient struct {
209 client *testClient
210 called chan struct{}
211 }
212
213 func newBadIndexTestClient(client *testClient, called chan struct{}) Client {
214 return &badIndexTestClient{client: client, called: called}
215 }
216
217 func (c *badIndexTestClient) Register(r *consul.AgentServiceRegistration) error {
218 return c.client.Register(r)
219 }
220
221 func (c *badIndexTestClient) Deregister(r *consul.AgentServiceRegistration) error {
222 return c.client.Deregister(r)
223 }
224
225 func (c *badIndexTestClient) Service(service, tag string, passingOnly bool, queryOpts *consul.QueryOptions) ([]*consul.ServiceEntry, *consul.QueryMeta, error) {
226 switch {
227 case queryOpts.WaitIndex == 0:
228 queryOpts.WaitIndex = 100
229 case queryOpts.WaitIndex == 100:
230 queryOpts.WaitIndex = 99
231 default:
232 }
233 c.called <- struct{}{}
234 return c.client.Service(service, tag, passingOnly, queryOpts)
235 }
236
237 func TestInstancerWithInvalidIndex(t *testing.T) {
238 var (
239 called = make(chan struct{}, 1)
240 logger = log.NewNopLogger()
241 client = newBadIndexTestClient(newTestClient(consulState), called)
242 )
243
244 s := NewInstancer(client, logger, "search", []string{"api"}, true)
245 defer s.Stop()
246
247 select {
248 case <-called:
249 case <-time.Tick(time.Millisecond * 500):
250 t.Error("failed, to receive call")
251 }
252
253 state := s.cache.State()
254 if want, have := 2, len(state.Instances); want != have {
255 t.Errorf("want %d, have %d", want, have)
256 }
257
258
259 select {
260 case <-called:
261 case <-time.Tick(time.Millisecond * 500):
262 t.Error("failed, to receive call in time")
263 }
264 }
265
266 type indexTestClient struct {
267 client *testClient
268 index uint64
269 errs chan error
270 }
271
272 func newIndexTestClient(c *testClient, errs chan error) *indexTestClient {
273 return &indexTestClient{
274 client: c,
275 index: 0,
276 errs: errs,
277 }
278 }
279
280 func (i *indexTestClient) Register(r *consul.AgentServiceRegistration) error {
281 return i.client.Register(r)
282 }
283
284 func (i *indexTestClient) Deregister(r *consul.AgentServiceRegistration) error {
285 return i.client.Deregister(r)
286 }
287
288 func (i *indexTestClient) Service(service, tag string, passingOnly bool, queryOpts *consul.QueryOptions) ([]*consul.ServiceEntry, *consul.QueryMeta, error) {
289
290
291 if i.index == 0 && queryOpts.WaitIndex == 0 {
292 i.index = 100
293 entries, meta, err := i.client.Service(service, tag, passingOnly, queryOpts)
294 meta.LastIndex = i.index
295 return entries, meta, err
296 }
297
298 if queryOpts.WaitIndex < i.index {
299 i.errs <- fmt.Errorf("wait index %d is less than or equal to previous value", queryOpts.WaitIndex)
300 }
301
302 entries, meta, err := i.client.Service(service, tag, passingOnly, queryOpts)
303 i.index++
304 meta.LastIndex = i.index
305 return entries, meta, err
306 }
307
308 func TestInstancerLoopIndex(t *testing.T) {
309
310 var (
311 errs = make(chan error, 1)
312 logger = log.NewNopLogger()
313 client = newIndexTestClient(newTestClient(consulState), errs)
314 )
315
316 go func() {
317 for err := range errs {
318 t.Error(err)
319 t.FailNow()
320 }
321 }()
322
323 instancer := NewInstancer(client, logger, "search", []string{"api"}, true)
324 defer instancer.Stop()
325
326 time.Sleep(2 * time.Second)
327 }
328
View as plain text