1
18
19 package ringhash
20
21 import (
22 "context"
23 "fmt"
24 "testing"
25 "time"
26
27 "github.com/google/go-cmp/cmp"
28 "github.com/google/go-cmp/cmp/cmpopts"
29 "google.golang.org/grpc/attributes"
30 "google.golang.org/grpc/balancer"
31 "google.golang.org/grpc/balancer/weightedroundrobin"
32 "google.golang.org/grpc/connectivity"
33 "google.golang.org/grpc/internal/grpctest"
34 "google.golang.org/grpc/internal/testutils"
35 "google.golang.org/grpc/resolver"
36 "google.golang.org/grpc/xds/internal"
37 )
38
39 var (
40 cmpOpts = cmp.Options{
41 cmp.AllowUnexported(testutils.TestSubConn{}, ringEntry{}, subConn{}),
42 cmpopts.IgnoreFields(subConn{}, "mu"),
43 cmpopts.IgnoreFields(testutils.TestSubConn{}, "connectCalled"),
44 }
45 )
46
47 const (
48 defaultTestTimeout = 10 * time.Second
49 defaultTestShortTimeout = 10 * time.Millisecond
50
51 testBackendAddrsCount = 12
52 )
53
54 var (
55 testBackendAddrStrs []string
56 testConfig = &LBConfig{MinRingSize: 1, MaxRingSize: 10}
57 )
58
59 func init() {
60 for i := 0; i < testBackendAddrsCount; i++ {
61 testBackendAddrStrs = append(testBackendAddrStrs, fmt.Sprintf("%d.%d.%d.%d:%d", i, i, i, i, i))
62 }
63 }
64
65 func ctxWithHash(h uint64) context.Context {
66 return SetRequestHash(context.Background(), h)
67 }
68
69
70 func setupTest(t *testing.T, addrs []resolver.Address) (*testutils.BalancerClientConn, balancer.Balancer, balancer.Picker) {
71 t.Helper()
72 cc := testutils.NewBalancerClientConn(t)
73 builder := balancer.Get(Name)
74 b := builder.Build(cc, balancer.BuildOptions{})
75 if b == nil {
76 t.Fatalf("builder.Build(%s) failed and returned nil", Name)
77 }
78 if err := b.UpdateClientConnState(balancer.ClientConnState{
79 ResolverState: resolver.State{Addresses: addrs},
80 BalancerConfig: testConfig,
81 }); err != nil {
82 t.Fatalf("UpdateClientConnState returned err: %v", err)
83 }
84
85 for _, addr := range addrs {
86 addr1 := <-cc.NewSubConnAddrsCh
87 if want := []resolver.Address{addr}; !cmp.Equal(addr1, want, cmp.AllowUnexported(attributes.Attributes{})) {
88 t.Fatalf("got unexpected new subconn addrs: %v", cmp.Diff(addr1, want, cmp.AllowUnexported(attributes.Attributes{})))
89 }
90 sc1 := <-cc.NewSubConnCh
91
92 select {
93 case <-sc1.ConnectCh:
94 t.Errorf("unexpected Connect() from SubConn %v", sc1)
95 case <-time.After(defaultTestShortTimeout):
96 }
97 }
98
99
100 p1 := <-cc.NewPickerCh
101 return cc, b, p1
102 }
103
104 type s struct {
105 grpctest.Tester
106 }
107
108 func Test(t *testing.T) {
109 grpctest.RunSubTests(t, s{})
110 }
111
112
113
114
115
116 func (s) TestUpdateClientConnState_NewRingSize(t *testing.T) {
117 origMinRingSize, origMaxRingSize := 1, 10
118 newMinRingSize, newMaxRingSize := 20, 100
119
120 addrs := []resolver.Address{{Addr: testBackendAddrStrs[0]}}
121 cc, b, p1 := setupTest(t, addrs)
122 ring1 := p1.(*picker).ring
123 if ringSize := len(ring1.items); ringSize < origMinRingSize || ringSize > origMaxRingSize {
124 t.Fatalf("Ring created with size %d, want between [%d, %d]", ringSize, origMinRingSize, origMaxRingSize)
125 }
126
127 if err := b.UpdateClientConnState(balancer.ClientConnState{
128 ResolverState: resolver.State{Addresses: addrs},
129 BalancerConfig: &LBConfig{MinRingSize: uint64(newMinRingSize), MaxRingSize: uint64(newMaxRingSize)},
130 }); err != nil {
131 t.Fatalf("UpdateClientConnState returned err: %v", err)
132 }
133
134 var ring2 *ring
135 select {
136 case <-time.After(defaultTestTimeout):
137 t.Fatal("Timeout when waiting for a picker update after a configuration update")
138 case p2 := <-cc.NewPickerCh:
139 ring2 = p2.(*picker).ring
140 }
141 if ringSize := len(ring2.items); ringSize < newMinRingSize || ringSize > newMaxRingSize {
142 t.Fatalf("Ring created with size %d, want between [%d, %d]", ringSize, newMinRingSize, newMaxRingSize)
143 }
144 }
145
146 func (s) TestOneSubConn(t *testing.T) {
147 wantAddr1 := resolver.Address{Addr: testBackendAddrStrs[0]}
148 cc, _, p0 := setupTest(t, []resolver.Address{wantAddr1})
149 ring0 := p0.(*picker).ring
150
151 firstHash := ring0.items[0].hash
152
153 testHash := firstHash - 1
154
155
156 if _, err := p0.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)}); err != balancer.ErrNoSubConnAvailable {
157 t.Fatalf("first pick returned err %v, want %v", err, balancer.ErrNoSubConnAvailable)
158 }
159 sc0 := ring0.items[0].sc.sc.(*testutils.TestSubConn)
160 select {
161 case <-sc0.ConnectCh:
162 case <-time.After(defaultTestTimeout):
163 t.Errorf("timeout waiting for Connect() from SubConn %v", sc0)
164 }
165
166
167 sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
168 sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
169
170
171 p1 := <-cc.NewPickerCh
172 for i := 0; i < 5; i++ {
173 gotSCSt, _ := p1.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)})
174 if gotSCSt.SubConn != sc0 {
175 t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc0)
176 }
177 }
178 }
179
180
181
182
183 func (s) TestThreeSubConnsAffinity(t *testing.T) {
184 wantAddrs := []resolver.Address{
185 {Addr: testBackendAddrStrs[0]},
186 {Addr: testBackendAddrStrs[1]},
187 {Addr: testBackendAddrStrs[2]},
188 }
189 cc, _, p0 := setupTest(t, wantAddrs)
190
191
192 ring0 := p0.(*picker).ring
193
194 firstHash := ring0.items[0].hash
195
196 testHash := firstHash + 1
197
198
199 if _, err := p0.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)}); err != balancer.ErrNoSubConnAvailable {
200 t.Fatalf("first pick returned err %v, want %v", err, balancer.ErrNoSubConnAvailable)
201 }
202
203 sc0 := ring0.items[1].sc.sc.(*testutils.TestSubConn)
204 select {
205 case <-sc0.ConnectCh:
206 case <-time.After(defaultTestTimeout):
207 t.Errorf("timeout waiting for Connect() from SubConn %v", sc0)
208 }
209
210
211 sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
212 sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
213 p1 := <-cc.NewPickerCh
214 for i := 0; i < 5; i++ {
215 gotSCSt, _ := p1.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)})
216 if gotSCSt.SubConn != sc0 {
217 t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc0)
218 }
219 }
220
221
222 sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
223 p2 := <-cc.NewPickerCh
224
225
226 if _, err := p2.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)}); err != balancer.ErrNoSubConnAvailable {
227 t.Fatalf("first pick returned err %v, want %v", err, balancer.ErrNoSubConnAvailable)
228 }
229
230
231 sc1 := ring0.items[2].sc.sc.(*testutils.TestSubConn)
232 select {
233 case <-sc1.ConnectCh:
234 case <-time.After(defaultTestTimeout):
235 t.Errorf("timeout waiting for Connect() from SubConn %v", sc1)
236 }
237
238
239 sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
240 sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
241
242 p3 := <-cc.NewPickerCh
243 for i := 0; i < 5; i++ {
244 gotSCSt, _ := p3.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)})
245 if gotSCSt.SubConn != sc1 {
246 t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1)
247 }
248 }
249
250
251 sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Idle})
252
253
254 select {
255 case <-sc0.ConnectCh:
256 case <-time.After(defaultTestTimeout):
257 t.Errorf("timeout waiting for Connect() from SubConn %v", sc0)
258 }
259
260
261
262 sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
263 sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
264 p4 := <-cc.NewPickerCh
265 for i := 0; i < 5; i++ {
266 gotSCSt, _ := p4.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)})
267 if gotSCSt.SubConn != sc0 {
268 t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc0)
269 }
270 }
271 }
272
273
274
275
276 func (s) TestThreeSubConnsAffinityMultiple(t *testing.T) {
277 wantAddrs := []resolver.Address{
278 {Addr: testBackendAddrStrs[0]},
279 {Addr: testBackendAddrStrs[1]},
280 {Addr: testBackendAddrStrs[2]},
281 }
282 cc, _, p0 := setupTest(t, wantAddrs)
283
284
285 ring0 := p0.(*picker).ring
286
287 firstHash := ring0.items[0].hash
288
289 testHash := firstHash + 1
290
291
292 if _, err := p0.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)}); err != balancer.ErrNoSubConnAvailable {
293 t.Fatalf("first pick returned err %v, want %v", err, balancer.ErrNoSubConnAvailable)
294 }
295 sc0 := ring0.items[1].sc.sc.(*testutils.TestSubConn)
296 select {
297 case <-sc0.ConnectCh:
298 case <-time.After(defaultTestTimeout):
299 t.Errorf("timeout waiting for Connect() from SubConn %v", sc0)
300 }
301
302
303 sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
304 sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
305
306
307 p1 := <-cc.NewPickerCh
308 for i := 0; i < 5; i++ {
309 gotSCSt, _ := p1.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)})
310 if gotSCSt.SubConn != sc0 {
311 t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc0)
312 }
313 }
314
315 secondHash := ring0.items[1].hash
316
317 testHash2 := secondHash + 1
318 if _, err := p0.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash2)}); err != balancer.ErrNoSubConnAvailable {
319 t.Fatalf("first pick returned err %v, want %v", err, balancer.ErrNoSubConnAvailable)
320 }
321 sc1 := ring0.items[2].sc.sc.(*testutils.TestSubConn)
322 select {
323 case <-sc1.ConnectCh:
324 case <-time.After(defaultTestTimeout):
325 t.Errorf("timeout waiting for Connect() from SubConn %v", sc1)
326 }
327 sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
328 sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
329
330
331 p2 := <-cc.NewPickerCh
332 for i := 0; i < 5; i++ {
333 gotSCSt, _ := p2.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash2)})
334 if gotSCSt.SubConn != sc1 {
335 t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1)
336 }
337 }
338
339 for i := 0; i < 5; i++ {
340 gotSCSt, _ := p2.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)})
341 if gotSCSt.SubConn != sc0 {
342 t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc0)
343 }
344 }
345 }
346
347 func (s) TestAddrWeightChange(t *testing.T) {
348 wantAddrs := []resolver.Address{
349 {Addr: testBackendAddrStrs[0]},
350 {Addr: testBackendAddrStrs[1]},
351 {Addr: testBackendAddrStrs[2]},
352 }
353 cc, b, p0 := setupTest(t, wantAddrs)
354 ring0 := p0.(*picker).ring
355
356 if err := b.UpdateClientConnState(balancer.ClientConnState{
357 ResolverState: resolver.State{Addresses: wantAddrs},
358 BalancerConfig: testConfig,
359 }); err != nil {
360 t.Fatalf("UpdateClientConnState returned err: %v", err)
361 }
362 select {
363 case <-cc.NewPickerCh:
364 t.Fatalf("unexpected picker after UpdateClientConn with the same addresses")
365 case <-time.After(defaultTestShortTimeout):
366 }
367
368
369 if err := b.UpdateClientConnState(balancer.ClientConnState{
370 ResolverState: resolver.State{Addresses: []resolver.Address{
371 {Addr: testBackendAddrStrs[0]},
372 {Addr: testBackendAddrStrs[1]},
373 }},
374 BalancerConfig: testConfig,
375 }); err != nil {
376 t.Fatalf("UpdateClientConnState returned err: %v", err)
377 }
378 var p1 balancer.Picker
379 select {
380 case p1 = <-cc.NewPickerCh:
381 case <-time.After(defaultTestTimeout):
382 t.Fatalf("timeout waiting for picker after UpdateClientConn with different addresses")
383 }
384 ring1 := p1.(*picker).ring
385 if ring1 == ring0 {
386 t.Fatalf("new picker after removing address has the same ring as before, want different")
387 }
388
389
390 if err := b.UpdateClientConnState(balancer.ClientConnState{
391 ResolverState: resolver.State{Addresses: []resolver.Address{
392 {Addr: testBackendAddrStrs[0]},
393 weightedroundrobin.SetAddrInfo(
394 resolver.Address{Addr: testBackendAddrStrs[1]},
395 weightedroundrobin.AddrInfo{Weight: 2}),
396 }},
397 BalancerConfig: testConfig,
398 }); err != nil {
399 t.Fatalf("UpdateClientConnState returned err: %v", err)
400 }
401 var p2 balancer.Picker
402 select {
403 case p2 = <-cc.NewPickerCh:
404 case <-time.After(defaultTestTimeout):
405 t.Fatalf("timeout waiting for picker after UpdateClientConn with different addresses")
406 }
407 if p2.(*picker).ring == ring1 {
408 t.Fatalf("new picker after changing address weight has the same ring as before, want different")
409 }
410 }
411
412
413
414
415
416 func (s) TestSubConnToConnectWhenOverallTransientFailure(t *testing.T) {
417 wantAddrs := []resolver.Address{
418 {Addr: testBackendAddrStrs[0]},
419 {Addr: testBackendAddrStrs[1]},
420 {Addr: testBackendAddrStrs[2]},
421 }
422 _, _, p0 := setupTest(t, wantAddrs)
423 ring0 := p0.(*picker).ring
424
425
426
427 p0.Pick(balancer.PickInfo{Ctx: context.Background()})
428
429
430 sc0 := ring0.items[0].sc.sc.(*testutils.TestSubConn)
431 sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
432 sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Idle})
433
434
435
436 sc1 := ring0.items[1].sc.sc.(*testutils.TestSubConn)
437 select {
438 case <-sc1.ConnectCh:
439 case <-time.After(defaultTestShortTimeout):
440 t.Fatalf("timeout waiting for Connect() from SubConn %v", sc1)
441 }
442
443
444 sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
445 sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Idle})
446
447
448 sc2 := ring0.items[2].sc.sc.(*testutils.TestSubConn)
449 select {
450 case <-sc2.ConnectCh:
451 case <-time.After(defaultTestShortTimeout):
452 t.Fatalf("timeout waiting for Connect() from SubConn %v", sc2)
453 }
454
455
456 sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
457 sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Idle})
458
459
460 select {
461 case <-sc0.ConnectCh:
462 case <-time.After(defaultTestShortTimeout):
463 t.Fatalf("timeout waiting for Connect() from SubConn %v", sc0)
464 }
465
466
467 sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
468 sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Idle})
469
470
471
472 select {
473 case <-sc0.ConnectCh:
474 t.Fatalf("unexpected Connect() from SubConn %v", sc0)
475 case <-sc1.ConnectCh:
476 t.Fatalf("unexpected Connect() from SubConn %v", sc1)
477 case <-sc2.ConnectCh:
478 t.Fatalf("unexpected Connect() from SubConn %v", sc2)
479 case <-time.After(defaultTestShortTimeout):
480 }
481 }
482
483 func (s) TestConnectivityStateEvaluatorRecordTransition(t *testing.T) {
484 tests := []struct {
485 name string
486 from, to []connectivity.State
487 want connectivity.State
488 }{
489 {
490 name: "one ready",
491 from: []connectivity.State{connectivity.Idle},
492 to: []connectivity.State{connectivity.Ready},
493 want: connectivity.Ready,
494 },
495 {
496 name: "one connecting",
497 from: []connectivity.State{connectivity.Idle},
498 to: []connectivity.State{connectivity.Connecting},
499 want: connectivity.Connecting,
500 },
501 {
502 name: "one ready one transient failure",
503 from: []connectivity.State{connectivity.Idle, connectivity.Idle},
504 to: []connectivity.State{connectivity.Ready, connectivity.TransientFailure},
505 want: connectivity.Ready,
506 },
507 {
508 name: "one connecting one transient failure",
509 from: []connectivity.State{connectivity.Idle, connectivity.Idle},
510 to: []connectivity.State{connectivity.Connecting, connectivity.TransientFailure},
511 want: connectivity.Connecting,
512 },
513 {
514 name: "one connecting two transient failure",
515 from: []connectivity.State{connectivity.Idle, connectivity.Idle, connectivity.Idle},
516 to: []connectivity.State{connectivity.Connecting, connectivity.TransientFailure, connectivity.TransientFailure},
517 want: connectivity.TransientFailure,
518 },
519 }
520 for _, tt := range tests {
521 t.Run(tt.name, func(t *testing.T) {
522 cse := &connectivityStateEvaluator{}
523 var got connectivity.State
524 for i, fff := range tt.from {
525 ttt := tt.to[i]
526 got = cse.recordTransition(fff, ttt)
527 }
528 if got != tt.want {
529 t.Errorf("recordTransition() = %v, want %v", got, tt.want)
530 }
531 })
532 }
533 }
534
535
536
537
538
539
540 func (s) TestAddrBalancerAttributesChange(t *testing.T) {
541 addrs1 := []resolver.Address{internal.SetLocalityID(resolver.Address{Addr: testBackendAddrStrs[0]}, internal.LocalityID{Region: "americas"})}
542 cc, b, _ := setupTest(t, addrs1)
543
544 addrs2 := []resolver.Address{internal.SetLocalityID(resolver.Address{Addr: testBackendAddrStrs[0]}, internal.LocalityID{Region: "americas"})}
545 if err := b.UpdateClientConnState(balancer.ClientConnState{
546 ResolverState: resolver.State{Addresses: addrs2},
547 BalancerConfig: testConfig,
548 }); err != nil {
549 t.Fatalf("UpdateClientConnState returned err: %v", err)
550 }
551 select {
552 case <-cc.NewSubConnCh:
553 t.Fatal("new subConn created for an update with the same addresses")
554 case <-time.After(defaultTestShortTimeout):
555 }
556 }
557
View as plain text