1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package v2discovery
16
17 import (
18 "context"
19 "errors"
20 "math"
21 "math/rand"
22 "net/http"
23 "net/url"
24 "reflect"
25 "sort"
26 "strconv"
27 "testing"
28 "time"
29
30 "go.etcd.io/etcd/client/pkg/v3/types"
31 "go.etcd.io/etcd/client/v2"
32
33 "github.com/jonboulle/clockwork"
34 "go.uber.org/zap"
35 )
36
37 const (
38 maxRetryInTest = 3
39 )
40
41 func TestNewProxyFuncUnset(t *testing.T) {
42 pf, err := newProxyFunc(zap.NewExample(), "")
43 if pf != nil {
44 t.Fatal("unexpected non-nil proxyFunc")
45 }
46 if err != nil {
47 t.Fatalf("unexpected non-nil err: %v", err)
48 }
49 }
50
51 func TestNewProxyFuncBad(t *testing.T) {
52 tests := []string{
53 "%%",
54 "http://foo.com/%1",
55 }
56 for i, in := range tests {
57 pf, err := newProxyFunc(zap.NewExample(), in)
58 if pf != nil {
59 t.Errorf("#%d: unexpected non-nil proxyFunc", i)
60 }
61 if err == nil {
62 t.Errorf("#%d: unexpected nil err", i)
63 }
64 }
65 }
66
67 func TestNewProxyFunc(t *testing.T) {
68 tests := map[string]string{
69 "bar.com": "http://bar.com",
70 "http://disco.foo.bar": "http://disco.foo.bar",
71 }
72 for in, w := range tests {
73 pf, err := newProxyFunc(zap.NewExample(), in)
74 if pf == nil {
75 t.Errorf("%s: unexpected nil proxyFunc", in)
76 continue
77 }
78 if err != nil {
79 t.Errorf("%s: unexpected non-nil err: %v", in, err)
80 continue
81 }
82 g, err := pf(&http.Request{})
83 if err != nil {
84 t.Errorf("%s: unexpected non-nil err: %v", in, err)
85 }
86 if g.String() != w {
87 t.Errorf("%s: proxyURL=%q, want %q", in, g, w)
88 }
89
90 }
91 }
92
93 func TestCheckCluster(t *testing.T) {
94 cluster := "/prefix/1000"
95 self := "/1000/1"
96
97 tests := []struct {
98 nodes []*client.Node
99 index uint64
100 werr error
101 wsize int
102 }{
103 {
104
105 []*client.Node{
106 {Key: "/1000/_config/size", Value: "3", CreatedIndex: 1},
107 {Key: "/1000/_config/"},
108 {Key: self, CreatedIndex: 2},
109 {Key: "/1000/2", CreatedIndex: 3},
110 {Key: "/1000/3", CreatedIndex: 4},
111 {Key: "/1000/4", CreatedIndex: 5},
112 },
113 5,
114 nil,
115 3,
116 },
117 {
118
119 []*client.Node{
120 {Key: "/1000/_config/size", Value: "3", CreatedIndex: 1},
121 {Key: "/1000/_config/"},
122 {Key: "/1000/2", CreatedIndex: 2},
123 {Key: "/1000/3", CreatedIndex: 3},
124 {Key: self, CreatedIndex: 4},
125 {Key: "/1000/4", CreatedIndex: 5},
126 },
127 5,
128 nil,
129 3,
130 },
131 {
132
133 []*client.Node{
134 {Key: "/1000/_config/size", Value: "3", CreatedIndex: 1},
135 {Key: "/1000/_config/"},
136 {Key: "/1000/2", CreatedIndex: 2},
137 {Key: "/1000/3", CreatedIndex: 3},
138 {Key: "/1000/4", CreatedIndex: 4},
139 {Key: self, CreatedIndex: 5},
140 },
141 5,
142 ErrFullCluster,
143 3,
144 },
145 {
146
147 []*client.Node{
148 {Key: "/1000/_config/size", Value: "3", CreatedIndex: 1},
149 {Key: "/1000/_config/"},
150 {Key: "/1000/2", CreatedIndex: 2},
151 {Key: "/1000/3", CreatedIndex: 3},
152 },
153 3,
154 nil,
155 3,
156 },
157 {
158 []*client.Node{
159 {Key: "/1000/_config/size", Value: "3", CreatedIndex: 1},
160 {Key: "/1000/_config/"},
161 {Key: "/1000/2", CreatedIndex: 2},
162 {Key: "/1000/3", CreatedIndex: 3},
163 {Key: "/1000/4", CreatedIndex: 4},
164 },
165 3,
166 ErrFullCluster,
167 3,
168 },
169 {
170
171 []*client.Node{
172 {Key: "/1000/_config/size", Value: "bad", CreatedIndex: 1},
173 },
174 0,
175 ErrBadSizeKey,
176 0,
177 },
178 {
179
180 []*client.Node{},
181 0,
182 ErrSizeNotFound,
183 0,
184 },
185 }
186
187 for i, tt := range tests {
188 var rs []*client.Response
189 if len(tt.nodes) > 0 {
190 rs = append(rs, &client.Response{Node: tt.nodes[0], Index: tt.index})
191 rs = append(rs, &client.Response{
192 Node: &client.Node{
193 Key: cluster,
194 Nodes: tt.nodes[1:],
195 },
196 Index: tt.index,
197 })
198 }
199 c := &clientWithResp{rs: rs}
200 dBase := newTestDiscovery(cluster, 1, c)
201
202 cRetry := &clientWithRetry{failTimes: 3}
203 cRetry.rs = rs
204 fc := clockwork.NewFakeClock()
205 dRetry := newTestDiscoveryWithClock(cluster, 1, cRetry, fc)
206
207 for _, d := range []*discovery{dBase, dRetry} {
208 go func() {
209 for i := uint(1); i <= maxRetryInTest; i++ {
210 fc.BlockUntil(1)
211 fc.Advance(time.Second * (0x1 << i))
212 }
213 }()
214 ns, size, index, err := d.checkCluster()
215 if err != tt.werr {
216 t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
217 }
218 if reflect.DeepEqual(ns, tt.nodes) {
219 t.Errorf("#%d: nodes = %v, want %v", i, ns, tt.nodes)
220 }
221 if size != uint64(tt.wsize) {
222 t.Errorf("#%d: size = %v, want %d", i, size, tt.wsize)
223 }
224 if index != tt.index {
225 t.Errorf("#%d: index = %v, want %d", i, index, tt.index)
226 }
227 }
228 }
229 }
230
231 func TestWaitNodes(t *testing.T) {
232 all := []*client.Node{
233 0: {Key: "/1000/1", CreatedIndex: 2},
234 1: {Key: "/1000/2", CreatedIndex: 3},
235 2: {Key: "/1000/3", CreatedIndex: 4},
236 }
237
238 tests := []struct {
239 nodes []*client.Node
240 rs []*client.Response
241 }{
242 {
243 all,
244 []*client.Response{},
245 },
246 {
247 all[:1],
248 []*client.Response{
249 {Node: &client.Node{Key: "/1000/2", CreatedIndex: 3}},
250 {Node: &client.Node{Key: "/1000/3", CreatedIndex: 4}},
251 },
252 },
253 {
254 all[:2],
255 []*client.Response{
256 {Node: &client.Node{Key: "/1000/3", CreatedIndex: 4}},
257 },
258 },
259 {
260 append(all, &client.Node{Key: "/1000/4", CreatedIndex: 5}),
261 []*client.Response{
262 {Node: &client.Node{Key: "/1000/3", CreatedIndex: 4}},
263 },
264 },
265 }
266
267 for i, tt := range tests {
268
269 c := &clientWithResp{rs: nil, w: &watcherWithResp{rs: tt.rs}}
270 dBase := newTestDiscovery("1000", 1, c)
271
272
273 var retryScanResp []*client.Response
274 if len(tt.nodes) > 0 {
275 retryScanResp = append(retryScanResp, &client.Response{
276 Node: &client.Node{
277 Key: "1000",
278 Value: strconv.Itoa(3),
279 },
280 })
281 retryScanResp = append(retryScanResp, &client.Response{
282 Node: &client.Node{
283 Nodes: tt.nodes,
284 },
285 })
286 }
287 cRetry := &clientWithResp{
288 rs: retryScanResp,
289 w: &watcherWithRetry{rs: tt.rs, failTimes: 2},
290 }
291 fc := clockwork.NewFakeClock()
292 dRetry := newTestDiscoveryWithClock("1000", 1, cRetry, fc)
293
294 for _, d := range []*discovery{dBase, dRetry} {
295 go func() {
296 for i := uint(1); i <= maxRetryInTest; i++ {
297 fc.BlockUntil(1)
298 fc.Advance(time.Second * (0x1 << i))
299 }
300 }()
301 g, err := d.waitNodes(tt.nodes, uint64(3), 0)
302 if err != nil {
303 t.Errorf("#%d: err = %v, want %v", i, err, nil)
304 }
305 if !reflect.DeepEqual(g, all) {
306 t.Errorf("#%d: all = %v, want %v", i, g, all)
307 }
308 }
309 }
310 }
311
312 func TestCreateSelf(t *testing.T) {
313 rs := []*client.Response{{Node: &client.Node{Key: "1000/1", CreatedIndex: 2}}}
314
315 w := &watcherWithResp{rs: rs}
316 errw := &watcherWithErr{err: errors.New("watch err")}
317
318 c := &clientWithResp{rs: rs, w: w}
319 errc := &clientWithErr{err: errors.New("create err"), w: w}
320 errdupc := &clientWithErr{err: client.Error{Code: client.ErrorCodeNodeExist}}
321 errwc := &clientWithResp{rs: rs, w: errw}
322
323 tests := []struct {
324 c client.KeysAPI
325 werr error
326 }{
327
328 {c, nil},
329
330 {errc, errc.err},
331
332 {errwc, errw.err},
333
334 {errdupc, ErrDuplicateID},
335 }
336
337 for i, tt := range tests {
338 d := newTestDiscovery("1000", 1, tt.c)
339 if err := d.createSelf(""); err != tt.werr {
340 t.Errorf("#%d: err = %v, want %v", i, err, nil)
341 }
342 }
343 }
344
345 func TestNodesToCluster(t *testing.T) {
346 tests := []struct {
347 nodes []*client.Node
348 size uint64
349 wcluster string
350 werr error
351 }{
352 {
353 []*client.Node{
354 0: {Key: "/1000/1", Value: "1=http://1.1.1.1:2380", CreatedIndex: 1},
355 1: {Key: "/1000/2", Value: "2=http://2.2.2.2:2380", CreatedIndex: 2},
356 2: {Key: "/1000/3", Value: "3=http://3.3.3.3:2380", CreatedIndex: 3},
357 },
358 3,
359 "1=http://1.1.1.1:2380,2=http://2.2.2.2:2380,3=http://3.3.3.3:2380",
360 nil,
361 },
362 {
363 []*client.Node{
364 0: {Key: "/1000/1", Value: "1=http://1.1.1.1:2380", CreatedIndex: 1},
365 1: {Key: "/1000/2", Value: "2=http://2.2.2.2:2380", CreatedIndex: 2},
366 2: {Key: "/1000/3", Value: "2=http://3.3.3.3:2380", CreatedIndex: 3},
367 },
368 3,
369 "1=http://1.1.1.1:2380,2=http://2.2.2.2:2380,2=http://3.3.3.3:2380",
370 ErrDuplicateName,
371 },
372 {
373 []*client.Node{
374 0: {Key: "/1000/1", Value: "1=1.1.1.1:2380", CreatedIndex: 1},
375 1: {Key: "/1000/2", Value: "2=http://2.2.2.2:2380", CreatedIndex: 2},
376 2: {Key: "/1000/3", Value: "2=http://3.3.3.3:2380", CreatedIndex: 3},
377 },
378 3,
379 "1=1.1.1.1:2380,2=http://2.2.2.2:2380,2=http://3.3.3.3:2380",
380 ErrInvalidURL,
381 },
382 }
383
384 for i, tt := range tests {
385 cluster, err := nodesToCluster(tt.nodes, tt.size)
386 if err != tt.werr {
387 t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
388 }
389 if !reflect.DeepEqual(cluster, tt.wcluster) {
390 t.Errorf("#%d: cluster = %v, want %v", i, cluster, tt.wcluster)
391 }
392 }
393 }
394
395 func TestSortableNodes(t *testing.T) {
396 ns := []*client.Node{
397 0: {CreatedIndex: 5},
398 1: {CreatedIndex: 1},
399 2: {CreatedIndex: 3},
400 3: {CreatedIndex: 4},
401 }
402
403 for i := 0; i < 10000; i++ {
404 ns = append(ns, &client.Node{CreatedIndex: uint64(rand.Int31())})
405 }
406 sns := sortableNodes{ns}
407 sort.Sort(sns)
408 var cis []int
409 for _, n := range sns.Nodes {
410 cis = append(cis, int(n.CreatedIndex))
411 }
412 if !sort.IntsAreSorted(cis) {
413 t.Errorf("isSorted = %v, want %v", sort.IntsAreSorted(cis), true)
414 }
415 cis = make([]int, 0)
416 for _, n := range ns {
417 cis = append(cis, int(n.CreatedIndex))
418 }
419 if !sort.IntsAreSorted(cis) {
420 t.Errorf("isSorted = %v, want %v", sort.IntsAreSorted(cis), true)
421 }
422 }
423
424 func TestRetryFailure(t *testing.T) {
425 nRetries = maxRetryInTest
426 defer func() { nRetries = math.MaxUint32 }()
427
428 cluster := "1000"
429 c := &clientWithRetry{failTimes: 4}
430 fc := clockwork.NewFakeClock()
431 d := newTestDiscoveryWithClock(cluster, 1, c, fc)
432 go func() {
433 for i := uint(1); i <= maxRetryInTest; i++ {
434 fc.BlockUntil(1)
435 fc.Advance(time.Second * (0x1 << i))
436 }
437 }()
438 if _, _, _, err := d.checkCluster(); err != ErrTooManyRetries {
439 t.Errorf("err = %v, want %v", err, ErrTooManyRetries)
440 }
441 }
442
443 type clientWithResp struct {
444 rs []*client.Response
445 w client.Watcher
446 client.KeysAPI
447 }
448
449 func (c *clientWithResp) Create(ctx context.Context, key string, value string) (*client.Response, error) {
450 if len(c.rs) == 0 {
451 return &client.Response{}, nil
452 }
453 r := c.rs[0]
454 c.rs = c.rs[1:]
455 return r, nil
456 }
457
458 func (c *clientWithResp) Get(ctx context.Context, key string, opts *client.GetOptions) (*client.Response, error) {
459 if len(c.rs) == 0 {
460 return &client.Response{}, &client.Error{Code: client.ErrorCodeKeyNotFound}
461 }
462 r := c.rs[0]
463 c.rs = append(c.rs[1:], r)
464 return r, nil
465 }
466
467 func (c *clientWithResp) Watcher(key string, opts *client.WatcherOptions) client.Watcher {
468 return c.w
469 }
470
471 type clientWithErr struct {
472 err error
473 w client.Watcher
474 client.KeysAPI
475 }
476
477 func (c *clientWithErr) Create(ctx context.Context, key string, value string) (*client.Response, error) {
478 return &client.Response{}, c.err
479 }
480
481 func (c *clientWithErr) Get(ctx context.Context, key string, opts *client.GetOptions) (*client.Response, error) {
482 return &client.Response{}, c.err
483 }
484
485 func (c *clientWithErr) Watcher(key string, opts *client.WatcherOptions) client.Watcher {
486 return c.w
487 }
488
489 type watcherWithResp struct {
490 client.KeysAPI
491 rs []*client.Response
492 }
493
494 func (w *watcherWithResp) Next(context.Context) (*client.Response, error) {
495 if len(w.rs) == 0 {
496 return &client.Response{}, nil
497 }
498 r := w.rs[0]
499 w.rs = w.rs[1:]
500 return r, nil
501 }
502
503 type watcherWithErr struct {
504 err error
505 }
506
507 func (w *watcherWithErr) Next(context.Context) (*client.Response, error) {
508 return &client.Response{}, w.err
509 }
510
511
512 type clientWithRetry struct {
513 clientWithResp
514 failCount int
515 failTimes int
516 }
517
518 func (c *clientWithRetry) Create(ctx context.Context, key string, value string) (*client.Response, error) {
519 if c.failCount < c.failTimes {
520 c.failCount++
521 return nil, &client.ClusterError{Errors: []error{context.DeadlineExceeded}}
522 }
523 return c.clientWithResp.Create(ctx, key, value)
524 }
525
526 func (c *clientWithRetry) Get(ctx context.Context, key string, opts *client.GetOptions) (*client.Response, error) {
527 if c.failCount < c.failTimes {
528 c.failCount++
529 return nil, &client.ClusterError{Errors: []error{context.DeadlineExceeded}}
530 }
531 return c.clientWithResp.Get(ctx, key, opts)
532 }
533
534
535 type watcherWithRetry struct {
536 rs []*client.Response
537 failCount int
538 failTimes int
539 }
540
541 func (w *watcherWithRetry) Next(context.Context) (*client.Response, error) {
542 if w.failCount < w.failTimes {
543 w.failCount++
544 return nil, &client.ClusterError{Errors: []error{context.DeadlineExceeded}}
545 }
546 if len(w.rs) == 0 {
547 return &client.Response{}, nil
548 }
549 r := w.rs[0]
550 w.rs = w.rs[1:]
551 return r, nil
552 }
553
554 func newTestDiscovery(cluster string, id types.ID, c client.KeysAPI) *discovery {
555 return &discovery{
556 lg: zap.NewExample(),
557 cluster: cluster,
558 id: id,
559 c: c,
560 url: &url.URL{Scheme: "http", Host: "test.com"},
561 }
562 }
563
564 func newTestDiscoveryWithClock(cluster string, id types.ID, c client.KeysAPI, clock clockwork.Clock) *discovery {
565 return &discovery{
566 lg: zap.NewExample(),
567 cluster: cluster,
568 id: id,
569 c: c,
570 url: &url.URL{Scheme: "http", Host: "test.com"},
571 clock: clock,
572 }
573 }
574
View as plain text