1
18
19 package weightedtarget
20
21 import (
22 "context"
23 "encoding/json"
24 "errors"
25 "fmt"
26 "strings"
27 "testing"
28 "time"
29
30 "github.com/google/go-cmp/cmp"
31 "google.golang.org/grpc/attributes"
32 "google.golang.org/grpc/balancer"
33 "google.golang.org/grpc/balancer/roundrobin"
34 "google.golang.org/grpc/connectivity"
35 "google.golang.org/grpc/internal/balancer/stub"
36 "google.golang.org/grpc/internal/grpctest"
37 "google.golang.org/grpc/internal/hierarchy"
38 "google.golang.org/grpc/internal/testutils"
39 "google.golang.org/grpc/resolver"
40 "google.golang.org/grpc/serviceconfig"
41 )
42
43 const (
44 defaultTestTimeout = 5 * time.Second
45 )
46
47 type s struct {
48 grpctest.Tester
49 }
50
51 func Test(t *testing.T) {
52 grpctest.RunSubTests(t, s{})
53 }
54
55 type testConfigBalancerBuilder struct {
56 balancer.Builder
57 }
58
59 func newTestConfigBalancerBuilder() *testConfigBalancerBuilder {
60 return &testConfigBalancerBuilder{
61 Builder: balancer.Get(roundrobin.Name),
62 }
63 }
64
65
66
67 func pickAndCheckError(want error) func(balancer.Picker) error {
68 const rpcCount = 5
69 return func(p balancer.Picker) error {
70 for i := 0; i < rpcCount; i++ {
71 if _, err := p.Pick(balancer.PickInfo{}); err == nil || !strings.Contains(err.Error(), want.Error()) {
72 return fmt.Errorf("picker.Pick() returned error: %v, want: %v", err, want)
73 }
74 }
75 return nil
76 }
77 }
78
79 func (t *testConfigBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
80 rr := t.Builder.Build(cc, opts)
81 return &testConfigBalancer{
82 Balancer: rr,
83 }
84 }
85
86 const testConfigBalancerName = "test_config_balancer"
87
88 func (t *testConfigBalancerBuilder) Name() string {
89 return testConfigBalancerName
90 }
91
92 type stringBalancerConfig struct {
93 serviceconfig.LoadBalancingConfig
94 configStr string
95 }
96
97 func (t *testConfigBalancerBuilder) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
98 var cfg string
99 if err := json.Unmarshal(c, &cfg); err != nil {
100 return nil, fmt.Errorf("failed to unmarshal config in %q: %v", testConfigBalancerName, err)
101 }
102 return stringBalancerConfig{configStr: cfg}, nil
103 }
104
105
106
107 type testConfigBalancer struct {
108 balancer.Balancer
109 }
110
111
112
113 type configKey struct{}
114
115 func setConfigKey(addr resolver.Address, config string) resolver.Address {
116 addr.Attributes = addr.Attributes.WithValue(configKey{}, config)
117 return addr
118 }
119
120 func getConfigKey(attr *attributes.Attributes) (string, bool) {
121 v := attr.Value(configKey{})
122 name, ok := v.(string)
123 return name, ok
124 }
125
126 func (b *testConfigBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
127 c, ok := s.BalancerConfig.(stringBalancerConfig)
128 if !ok {
129 return fmt.Errorf("unexpected balancer config with type %T", s.BalancerConfig)
130 }
131
132 addrsWithAttr := make([]resolver.Address, len(s.ResolverState.Addresses))
133 for i, addr := range s.ResolverState.Addresses {
134 addrsWithAttr[i] = setConfigKey(addr, c.configStr)
135 }
136 s.BalancerConfig = nil
137 s.ResolverState.Addresses = addrsWithAttr
138 return b.Balancer.UpdateClientConnState(s)
139 }
140
141 func (b *testConfigBalancer) Close() {
142 b.Balancer.Close()
143 }
144
145 var (
146 wtbBuilder balancer.Builder
147 wtbParser balancer.ConfigParser
148 testBackendAddrStrs []string
149 )
150
151 const testBackendAddrsCount = 12
152
153 func init() {
154 balancer.Register(newTestConfigBalancerBuilder())
155 for i := 0; i < testBackendAddrsCount; i++ {
156 testBackendAddrStrs = append(testBackendAddrStrs, fmt.Sprintf("%d.%d.%d.%d:%d", i, i, i, i, i))
157 }
158 wtbBuilder = balancer.Get(Name)
159 wtbParser = wtbBuilder.(balancer.ConfigParser)
160
161 NewRandomWRR = testutils.NewTestWRR
162 }
163
164
165
166
167
168
169 func (s) TestWeightedTarget(t *testing.T) {
170 cc := testutils.NewBalancerClientConn(t)
171 wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
172 defer wtb.Close()
173
174
175 config1, err := wtbParser.ParseConfig([]byte(`
176 {
177 "targets": {
178 "cluster_1": {
179 "weight":1,
180 "childPolicy": [{"round_robin": ""}]
181 }
182 }
183 }`))
184 if err != nil {
185 t.Fatalf("failed to parse balancer config: %v", err)
186 }
187
188
189 addr1 := resolver.Address{Addr: testBackendAddrStrs[1], Attributes: nil}
190 if err := wtb.UpdateClientConnState(balancer.ClientConnState{
191 ResolverState: resolver.State{Addresses: []resolver.Address{hierarchy.Set(addr1, []string{"cluster_1"})}},
192 BalancerConfig: config1,
193 }); err != nil {
194 t.Fatalf("failed to update ClientConn state: %v", err)
195 }
196 verifyAddressInNewSubConn(t, cc, addr1)
197
198
199 sc1 := <-cc.NewSubConnCh
200 sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
201 <-cc.NewPickerCh
202 sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
203 p := <-cc.NewPickerCh
204
205
206 for i := 0; i < 5; i++ {
207 gotSCSt, _ := p.Pick(balancer.PickInfo{})
208 if gotSCSt.SubConn != sc1 {
209 t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1)
210 }
211 }
212
213
214
215
216 config2, err := wtbParser.ParseConfig([]byte(`
217 {
218 "targets": {
219 "cluster_2": {
220 "weight":1,
221 "childPolicy": [{"test_config_balancer": "cluster_2"}]
222 }
223 }
224 }`))
225 if err != nil {
226 t.Fatalf("failed to parse balancer config: %v", err)
227 }
228
229
230 addr2 := resolver.Address{Addr: testBackendAddrStrs[2], Attributes: nil}
231 if err := wtb.UpdateClientConnState(balancer.ClientConnState{
232 ResolverState: resolver.State{Addresses: []resolver.Address{hierarchy.Set(addr2, []string{"cluster_2"})}},
233 BalancerConfig: config2,
234 }); err != nil {
235 t.Fatalf("failed to update ClientConn state: %v", err)
236 }
237
238
239
240 verifyAddressInNewSubConn(t, cc, setConfigKey(addr2, "cluster_2"))
241
242
243 scShutdown := <-cc.ShutdownSubConnCh
244 if scShutdown != sc1 {
245 t.Fatalf("ShutdownSubConn, want %v, got %v", sc1, scShutdown)
246 }
247 scShutdown.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
248
249 sc2 := <-cc.NewSubConnCh
250 sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
251 <-cc.NewPickerCh
252 sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
253 p = <-cc.NewPickerCh
254
255
256 for i := 0; i < 5; i++ {
257 gotSCSt, _ := p.Pick(balancer.PickInfo{})
258 if gotSCSt.SubConn != sc2 {
259 t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc2)
260 }
261 }
262
263
264 config3, err := wtbParser.ParseConfig([]byte(`
265 {
266 "targets": {
267 "cluster_2": {
268 "weight":1,
269 "childPolicy": [{"round_robin": ""}]
270 }
271 }
272 }`))
273 if err != nil {
274 t.Fatalf("failed to parse balancer config: %v", err)
275 }
276
277
278 addr3 := resolver.Address{Addr: testBackendAddrStrs[3], Attributes: nil}
279 if err := wtb.UpdateClientConnState(balancer.ClientConnState{
280 ResolverState: resolver.State{Addresses: []resolver.Address{hierarchy.Set(addr3, []string{"cluster_2"})}},
281 BalancerConfig: config3,
282 }); err != nil {
283 t.Fatalf("failed to update ClientConn state: %v", err)
284 }
285 verifyAddressInNewSubConn(t, cc, addr3)
286
287
288 scShutdown = <-cc.ShutdownSubConnCh
289 if scShutdown != sc2 {
290 t.Fatalf("ShutdownSubConn, want %v, got %v", sc1, scShutdown)
291 }
292 scShutdown.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
293
294
295 sc3 := <-cc.NewSubConnCh
296 sc3.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
297 <-cc.NewPickerCh
298 sc3.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
299 p = <-cc.NewPickerCh
300
301
302 for i := 0; i < 5; i++ {
303 gotSCSt, _ := p.Pick(balancer.PickInfo{})
304 if gotSCSt.SubConn != sc3 {
305 t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc3)
306 }
307 }
308
309
310
311 emptyConfig, err := wtbParser.ParseConfig([]byte(`{}`))
312 if err != nil {
313 t.Fatalf("Failed to parse balancer config: %v", err)
314 }
315 if err := wtb.UpdateClientConnState(balancer.ClientConnState{
316 ResolverState: resolver.State{},
317 BalancerConfig: emptyConfig,
318 }); err != nil {
319 t.Fatalf("Failed to update ClientConn state: %v", err)
320 }
321
322 state := <-cc.NewStateCh
323 if state != connectivity.TransientFailure {
324 t.Fatalf("Empty target update should have triggered a TF state update, got: %v", state)
325 }
326 }
327
328
329
330
331 func (s) TestWeightedTarget_OneSubBalancer_AddRemoveBackend(t *testing.T) {
332 cc := testutils.NewBalancerClientConn(t)
333 wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
334 defer wtb.Close()
335
336
337 config, err := wtbParser.ParseConfig([]byte(`
338 {
339 "targets": {
340 "cluster_1": {
341 "weight":1,
342 "childPolicy": [{"round_robin": ""}]
343 }
344 }
345 }`))
346 if err != nil {
347 t.Fatalf("failed to parse balancer config: %v", err)
348 }
349
350
351 addr1 := resolver.Address{Addr: testBackendAddrStrs[1]}
352 if err := wtb.UpdateClientConnState(balancer.ClientConnState{
353 ResolverState: resolver.State{Addresses: []resolver.Address{hierarchy.Set(addr1, []string{"cluster_1"})}},
354 BalancerConfig: config,
355 }); err != nil {
356 t.Fatalf("failed to update ClientConn state: %v", err)
357 }
358 verifyAddressInNewSubConn(t, cc, addr1)
359
360
361 sc1 := <-cc.NewSubConnCh
362 sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
363 <-cc.NewPickerCh
364 sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
365 p := <-cc.NewPickerCh
366
367
368 for i := 0; i < 5; i++ {
369 gotSCSt, _ := p.Pick(balancer.PickInfo{})
370 if gotSCSt.SubConn != sc1 {
371 t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1)
372 }
373 }
374
375
376 addr2 := resolver.Address{Addr: testBackendAddrStrs[2]}
377 if err := wtb.UpdateClientConnState(balancer.ClientConnState{
378 ResolverState: resolver.State{Addresses: []resolver.Address{
379 hierarchy.Set(addr1, []string{"cluster_1"}),
380 hierarchy.Set(addr2, []string{"cluster_1"}),
381 }},
382 BalancerConfig: config,
383 }); err != nil {
384 t.Fatalf("failed to update ClientConn state: %v", err)
385 }
386 verifyAddressInNewSubConn(t, cc, addr2)
387
388
389 sc2 := <-cc.NewSubConnCh
390
391 sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
392 <-cc.NewPickerCh
393 sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
394 p = <-cc.NewPickerCh
395
396
397 want := []balancer.SubConn{sc1, sc2}
398 if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p)); err != nil {
399 t.Fatalf("want %v, got %v", want, err)
400 }
401
402
403 if err := wtb.UpdateClientConnState(balancer.ClientConnState{
404 ResolverState: resolver.State{Addresses: []resolver.Address{hierarchy.Set(addr2, []string{"cluster_1"})}},
405 BalancerConfig: config,
406 }); err != nil {
407 t.Fatalf("failed to update ClientConn state: %v", err)
408 }
409
410
411 scShutdown := <-cc.ShutdownSubConnCh
412 if scShutdown != sc1 {
413 t.Fatalf("ShutdownSubConn, want %v, got %v", sc1, scShutdown)
414 }
415 scShutdown.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
416 p = <-cc.NewPickerCh
417
418
419 for i := 0; i < 5; i++ {
420 gotSC, _ := p.Pick(balancer.PickInfo{})
421 if gotSC.SubConn != sc2 {
422 t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSC, sc2)
423 }
424 }
425 }
426
427
428
429 func (s) TestWeightedTarget_TwoSubBalancers_OneBackend(t *testing.T) {
430 cc := testutils.NewBalancerClientConn(t)
431 wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
432 defer wtb.Close()
433
434
435 config, err := wtbParser.ParseConfig([]byte(`
436 {
437 "targets": {
438 "cluster_1": {
439 "weight":1,
440 "childPolicy": [{"test_config_balancer": "cluster_1"}]
441 },
442 "cluster_2": {
443 "weight":1,
444 "childPolicy": [{"test_config_balancer": "cluster_2"}]
445 }
446 }
447 }`))
448 if err != nil {
449 t.Fatalf("failed to parse balancer config: %v", err)
450 }
451
452
453 addr1 := resolver.Address{Addr: testBackendAddrStrs[1]}
454 addr2 := resolver.Address{Addr: testBackendAddrStrs[2]}
455 if err := wtb.UpdateClientConnState(balancer.ClientConnState{
456 ResolverState: resolver.State{Addresses: []resolver.Address{
457 hierarchy.Set(addr1, []string{"cluster_1"}),
458 hierarchy.Set(addr2, []string{"cluster_2"}),
459 }},
460 BalancerConfig: config,
461 }); err != nil {
462 t.Fatalf("failed to update ClientConn state: %v", err)
463 }
464
465 scs := waitForNewSubConns(t, cc, 2)
466 verifySubConnAddrs(t, scs, map[string][]resolver.Address{
467 "cluster_1": {addr1},
468 "cluster_2": {addr2},
469 })
470
471
472 sc1 := scs["cluster_1"][0].sc.(*testutils.TestSubConn)
473 sc2 := scs["cluster_2"][0].sc.(*testutils.TestSubConn)
474
475
476 sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
477 <-cc.NewPickerCh
478 sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
479 <-cc.NewPickerCh
480 sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
481 <-cc.NewPickerCh
482 sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
483 p := <-cc.NewPickerCh
484
485
486 want := []balancer.SubConn{sc1, sc2}
487 if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p)); err != nil {
488 t.Fatalf("want %v, got %v", want, err)
489 }
490 }
491
492
493
494
495 func (s) TestWeightedTarget_TwoSubBalancers_MoreBackends(t *testing.T) {
496 cc := testutils.NewBalancerClientConn(t)
497 wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
498 defer wtb.Close()
499
500
501 config, err := wtbParser.ParseConfig([]byte(`
502 {
503 "targets": {
504 "cluster_1": {
505 "weight":1,
506 "childPolicy": [{"test_config_balancer": "cluster_1"}]
507 },
508 "cluster_2": {
509 "weight":1,
510 "childPolicy": [{"test_config_balancer": "cluster_2"}]
511 }
512 }
513 }`))
514 if err != nil {
515 t.Fatalf("failed to parse balancer config: %v", err)
516 }
517
518
519 addr1 := resolver.Address{Addr: testBackendAddrStrs[1]}
520 addr2 := resolver.Address{Addr: testBackendAddrStrs[2]}
521 addr3 := resolver.Address{Addr: testBackendAddrStrs[3]}
522 addr4 := resolver.Address{Addr: testBackendAddrStrs[4]}
523 if err := wtb.UpdateClientConnState(balancer.ClientConnState{
524 ResolverState: resolver.State{Addresses: []resolver.Address{
525 hierarchy.Set(addr1, []string{"cluster_1"}),
526 hierarchy.Set(addr2, []string{"cluster_1"}),
527 hierarchy.Set(addr3, []string{"cluster_2"}),
528 hierarchy.Set(addr4, []string{"cluster_2"}),
529 }},
530 BalancerConfig: config,
531 }); err != nil {
532 t.Fatalf("failed to update ClientConn state: %v", err)
533 }
534
535 scs := waitForNewSubConns(t, cc, 4)
536 verifySubConnAddrs(t, scs, map[string][]resolver.Address{
537 "cluster_1": {addr1, addr2},
538 "cluster_2": {addr3, addr4},
539 })
540
541
542 sc1 := scs["cluster_1"][0].sc.(*testutils.TestSubConn)
543 sc2 := scs["cluster_1"][1].sc.(*testutils.TestSubConn)
544 sc3 := scs["cluster_2"][0].sc.(*testutils.TestSubConn)
545 sc4 := scs["cluster_2"][1].sc.(*testutils.TestSubConn)
546
547
548 sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
549 <-cc.NewPickerCh
550 sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
551 <-cc.NewPickerCh
552 sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
553 <-cc.NewPickerCh
554 sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
555 <-cc.NewPickerCh
556 sc3.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
557 <-cc.NewPickerCh
558 sc3.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
559 <-cc.NewPickerCh
560 sc4.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
561 <-cc.NewPickerCh
562 sc4.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
563 p := <-cc.NewPickerCh
564
565
566
567 want := []balancer.SubConn{sc1, sc2, sc3, sc4}
568 if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p)); err != nil {
569 t.Fatalf("want %v, got %v", want, err)
570 }
571
572
573 sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
574 p = <-cc.NewPickerCh
575 want = []balancer.SubConn{sc1, sc1, sc3, sc4}
576 if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p)); err != nil {
577 t.Fatalf("want %v, got %v", want, err)
578 }
579
580
581 if err := wtb.UpdateClientConnState(balancer.ClientConnState{
582 ResolverState: resolver.State{Addresses: []resolver.Address{
583 hierarchy.Set(addr1, []string{"cluster_1"}),
584 hierarchy.Set(addr2, []string{"cluster_1"}),
585 hierarchy.Set(addr4, []string{"cluster_2"}),
586 }},
587 BalancerConfig: config,
588 }); err != nil {
589 t.Fatalf("failed to update ClientConn state: %v", err)
590 }
591 scShutdown := <-cc.ShutdownSubConnCh
592 if scShutdown != sc3 {
593 t.Fatalf("ShutdownSubConn, want %v, got %v", sc3, scShutdown)
594 }
595 scShutdown.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
596 p = <-cc.NewPickerCh
597 want = []balancer.SubConn{sc1, sc4}
598 if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p)); err != nil {
599 t.Fatalf("want %v, got %v", want, err)
600 }
601
602
603 wantSubConnErr := errors.New("subConn connection error")
604 sc1.UpdateState(balancer.SubConnState{
605 ConnectivityState: connectivity.TransientFailure,
606 ConnectionError: wantSubConnErr,
607 })
608 p = <-cc.NewPickerCh
609 want = []balancer.SubConn{sc4}
610 if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p)); err != nil {
611 t.Fatalf("want %v, got %v", want, err)
612 }
613
614
615 sc4.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
616 p = <-cc.NewPickerCh
617 for i := 0; i < 5; i++ {
618 if _, err := p.Pick(balancer.PickInfo{}); err != balancer.ErrNoSubConnAvailable {
619 t.Fatalf("want pick error %v, got %v", balancer.ErrNoSubConnAvailable, err)
620 }
621 }
622
623
624 sc4.UpdateState(balancer.SubConnState{
625 ConnectivityState: connectivity.TransientFailure,
626 ConnectionError: wantSubConnErr,
627 })
628
629 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
630 defer cancel()
631 if err := cc.WaitForPicker(ctx, pickAndCheckError(wantSubConnErr)); err != nil {
632 t.Fatal(err)
633 }
634 }
635
636
637
638
639 func (s) TestWeightedTarget_TwoSubBalancers_DifferentWeight_MoreBackends(t *testing.T) {
640 cc := testutils.NewBalancerClientConn(t)
641 wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
642 defer wtb.Close()
643
644
645 config, err := wtbParser.ParseConfig([]byte(`
646 {
647 "targets": {
648 "cluster_1": {
649 "weight": 2,
650 "childPolicy": [{"test_config_balancer": "cluster_1"}]
651 },
652 "cluster_2": {
653 "weight": 1,
654 "childPolicy": [{"test_config_balancer": "cluster_2"}]
655 }
656 }
657 }`))
658 if err != nil {
659 t.Fatalf("failed to parse balancer config: %v", err)
660 }
661
662
663 addr1 := resolver.Address{Addr: testBackendAddrStrs[1]}
664 addr2 := resolver.Address{Addr: testBackendAddrStrs[2]}
665 addr3 := resolver.Address{Addr: testBackendAddrStrs[3]}
666 addr4 := resolver.Address{Addr: testBackendAddrStrs[4]}
667 if err := wtb.UpdateClientConnState(balancer.ClientConnState{
668 ResolverState: resolver.State{Addresses: []resolver.Address{
669 hierarchy.Set(addr1, []string{"cluster_1"}),
670 hierarchy.Set(addr2, []string{"cluster_1"}),
671 hierarchy.Set(addr3, []string{"cluster_2"}),
672 hierarchy.Set(addr4, []string{"cluster_2"}),
673 }},
674 BalancerConfig: config,
675 }); err != nil {
676 t.Fatalf("failed to update ClientConn state: %v", err)
677 }
678
679 scs := waitForNewSubConns(t, cc, 4)
680 verifySubConnAddrs(t, scs, map[string][]resolver.Address{
681 "cluster_1": {addr1, addr2},
682 "cluster_2": {addr3, addr4},
683 })
684
685
686 sc1 := scs["cluster_1"][0].sc.(*testutils.TestSubConn)
687 sc2 := scs["cluster_1"][1].sc.(*testutils.TestSubConn)
688 sc3 := scs["cluster_2"][0].sc.(*testutils.TestSubConn)
689 sc4 := scs["cluster_2"][1].sc.(*testutils.TestSubConn)
690
691
692 sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
693 <-cc.NewPickerCh
694 sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
695 <-cc.NewPickerCh
696 sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
697 <-cc.NewPickerCh
698 sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
699 <-cc.NewPickerCh
700 sc3.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
701 <-cc.NewPickerCh
702 sc3.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
703 <-cc.NewPickerCh
704 sc4.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
705 <-cc.NewPickerCh
706 sc4.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
707 p := <-cc.NewPickerCh
708
709
710
711 want := []balancer.SubConn{sc1, sc1, sc2, sc2, sc3, sc4}
712 if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p)); err != nil {
713 t.Fatalf("want %v, got %v", want, err)
714 }
715 }
716
717
718
719
720 func (s) TestWeightedTarget_ThreeSubBalancers_RemoveBalancer(t *testing.T) {
721 cc := testutils.NewBalancerClientConn(t)
722 wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
723 defer wtb.Close()
724
725
726 config, err := wtbParser.ParseConfig([]byte(`
727 {
728 "targets": {
729 "cluster_1": {
730 "weight": 1,
731 "childPolicy": [{"test_config_balancer": "cluster_1"}]
732 },
733 "cluster_2": {
734 "weight": 1,
735 "childPolicy": [{"test_config_balancer": "cluster_2"}]
736 },
737 "cluster_3": {
738 "weight": 1,
739 "childPolicy": [{"test_config_balancer": "cluster_3"}]
740 }
741 }
742 }`))
743 if err != nil {
744 t.Fatalf("failed to parse balancer config: %v", err)
745 }
746
747
748 addr1 := resolver.Address{Addr: testBackendAddrStrs[1]}
749 addr2 := resolver.Address{Addr: testBackendAddrStrs[2]}
750 addr3 := resolver.Address{Addr: testBackendAddrStrs[3]}
751 if err := wtb.UpdateClientConnState(balancer.ClientConnState{
752 ResolverState: resolver.State{Addresses: []resolver.Address{
753 hierarchy.Set(addr1, []string{"cluster_1"}),
754 hierarchy.Set(addr2, []string{"cluster_2"}),
755 hierarchy.Set(addr3, []string{"cluster_3"}),
756 }},
757 BalancerConfig: config,
758 }); err != nil {
759 t.Fatalf("failed to update ClientConn state: %v", err)
760 }
761
762 scs := waitForNewSubConns(t, cc, 3)
763 verifySubConnAddrs(t, scs, map[string][]resolver.Address{
764 "cluster_1": {addr1},
765 "cluster_2": {addr2},
766 "cluster_3": {addr3},
767 })
768
769
770 sc1 := scs["cluster_1"][0].sc.(*testutils.TestSubConn)
771 sc2 := scs["cluster_2"][0].sc.(*testutils.TestSubConn)
772 sc3 := scs["cluster_3"][0].sc.(*testutils.TestSubConn)
773
774
775 sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
776 <-cc.NewPickerCh
777 sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
778 <-cc.NewPickerCh
779 sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
780 <-cc.NewPickerCh
781 sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
782 <-cc.NewPickerCh
783 sc3.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
784 <-cc.NewPickerCh
785 sc3.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
786 p := <-cc.NewPickerCh
787
788 want := []balancer.SubConn{sc1, sc2, sc3}
789 if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p)); err != nil {
790 t.Fatalf("want %v, got %v", want, err)
791 }
792
793
794 config, err = wtbParser.ParseConfig([]byte(`
795 {
796 "targets": {
797 "cluster_1": {
798 "weight": 1,
799 "childPolicy": [{"test_config_balancer": "cluster_1"}]
800 },
801 "cluster_3": {
802 "weight": 1,
803 "childPolicy": [{"test_config_balancer": "cluster_3"}]
804 }
805 }
806 }`))
807 if err != nil {
808 t.Fatalf("failed to parse balancer config: %v", err)
809 }
810 if err := wtb.UpdateClientConnState(balancer.ClientConnState{
811 ResolverState: resolver.State{Addresses: []resolver.Address{
812 hierarchy.Set(addr1, []string{"cluster_1"}),
813 hierarchy.Set(addr3, []string{"cluster_3"}),
814 }},
815 BalancerConfig: config,
816 }); err != nil {
817 t.Fatalf("failed to update ClientConn state: %v", err)
818 }
819
820
821
822 p = <-cc.NewPickerCh
823
824 scShutdown := <-cc.ShutdownSubConnCh
825 if scShutdown != sc2 {
826 t.Fatalf("ShutdownSubConn, want %v, got %v", sc2, scShutdown)
827 }
828 want = []balancer.SubConn{sc1, sc3}
829 if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p)); err != nil {
830 t.Fatalf("want %v, got %v", want, err)
831 }
832
833
834 wantSubConnErr := errors.New("subConn connection error")
835 sc3.UpdateState(balancer.SubConnState{
836 ConnectivityState: connectivity.TransientFailure,
837 ConnectionError: wantSubConnErr,
838 })
839 <-cc.NewPickerCh
840
841
842 config, err = wtbParser.ParseConfig([]byte(`
843 {
844 "targets": {
845 "cluster_3": {
846 "weight": 1,
847 "childPolicy": [{"test_config_balancer": "cluster_3"}]
848 }
849 }
850 }`))
851 if err != nil {
852 t.Fatalf("failed to parse balancer config: %v", err)
853 }
854 if err := wtb.UpdateClientConnState(balancer.ClientConnState{
855 ResolverState: resolver.State{Addresses: []resolver.Address{
856 hierarchy.Set(addr3, []string{"cluster_3"}),
857 }},
858 BalancerConfig: config,
859 }); err != nil {
860 t.Fatalf("failed to update ClientConn state: %v", err)
861 }
862
863
864
865
866 scShutdown = <-cc.ShutdownSubConnCh
867 if scShutdown != sc1 {
868 t.Fatalf("ShutdownSubConn, want %v, got %v", sc1, scShutdown)
869 }
870
871 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
872 defer cancel()
873 if err := cc.WaitForPicker(ctx, pickAndCheckError(wantSubConnErr)); err != nil {
874 t.Fatal(err)
875 }
876 }
877
878
879
880
881 func (s) TestWeightedTarget_TwoSubBalancers_ChangeWeight_MoreBackends(t *testing.T) {
882 cc := testutils.NewBalancerClientConn(t)
883 wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
884 defer wtb.Close()
885
886
887 config, err := wtbParser.ParseConfig([]byte(`
888 {
889 "targets": {
890 "cluster_1": {
891 "weight": 2,
892 "childPolicy": [{"test_config_balancer": "cluster_1"}]
893 },
894 "cluster_2": {
895 "weight": 1,
896 "childPolicy": [{"test_config_balancer": "cluster_2"}]
897 }
898 }
899 }`))
900 if err != nil {
901 t.Fatalf("failed to parse balancer config: %v", err)
902 }
903
904
905 addr1 := resolver.Address{Addr: testBackendAddrStrs[1]}
906 addr2 := resolver.Address{Addr: testBackendAddrStrs[2]}
907 addr3 := resolver.Address{Addr: testBackendAddrStrs[3]}
908 addr4 := resolver.Address{Addr: testBackendAddrStrs[4]}
909 if err := wtb.UpdateClientConnState(balancer.ClientConnState{
910 ResolverState: resolver.State{Addresses: []resolver.Address{
911 hierarchy.Set(addr1, []string{"cluster_1"}),
912 hierarchy.Set(addr2, []string{"cluster_1"}),
913 hierarchy.Set(addr3, []string{"cluster_2"}),
914 hierarchy.Set(addr4, []string{"cluster_2"}),
915 }},
916 BalancerConfig: config,
917 }); err != nil {
918 t.Fatalf("failed to update ClientConn state: %v", err)
919 }
920
921 scs := waitForNewSubConns(t, cc, 4)
922 verifySubConnAddrs(t, scs, map[string][]resolver.Address{
923 "cluster_1": {addr1, addr2},
924 "cluster_2": {addr3, addr4},
925 })
926
927
928 sc1 := scs["cluster_1"][0].sc.(*testutils.TestSubConn)
929 sc2 := scs["cluster_1"][1].sc.(*testutils.TestSubConn)
930 sc3 := scs["cluster_2"][0].sc.(*testutils.TestSubConn)
931 sc4 := scs["cluster_2"][1].sc.(*testutils.TestSubConn)
932
933
934 sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
935 <-cc.NewPickerCh
936 sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
937 <-cc.NewPickerCh
938 sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
939 <-cc.NewPickerCh
940 sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
941 <-cc.NewPickerCh
942 sc3.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
943 <-cc.NewPickerCh
944 sc3.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
945 <-cc.NewPickerCh
946 sc4.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
947 <-cc.NewPickerCh
948 sc4.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
949 p := <-cc.NewPickerCh
950
951
952
953 want := []balancer.SubConn{sc1, sc1, sc2, sc2, sc3, sc4}
954 if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p)); err != nil {
955 t.Fatalf("want %v, got %v", want, err)
956 }
957
958
959 config, err = wtbParser.ParseConfig([]byte(`
960 {
961 "targets": {
962 "cluster_1": {
963 "weight": 3,
964 "childPolicy": [{"test_config_balancer": "cluster_1"}]
965 },
966 "cluster_2": {
967 "weight": 1,
968 "childPolicy": [{"test_config_balancer": "cluster_2"}]
969 }
970 }
971 }`))
972 if err != nil {
973 t.Fatalf("failed to parse balancer config: %v", err)
974 }
975 if err := wtb.UpdateClientConnState(balancer.ClientConnState{
976 ResolverState: resolver.State{Addresses: []resolver.Address{
977 hierarchy.Set(addr1, []string{"cluster_1"}),
978 hierarchy.Set(addr2, []string{"cluster_1"}),
979 hierarchy.Set(addr3, []string{"cluster_2"}),
980 hierarchy.Set(addr4, []string{"cluster_2"}),
981 }},
982 BalancerConfig: config,
983 }); err != nil {
984 t.Fatalf("failed to update ClientConn state: %v", err)
985 }
986
987
988 p = <-cc.NewPickerCh
989 want = []balancer.SubConn{sc1, sc1, sc1, sc2, sc2, sc2, sc3, sc4}
990 if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p)); err != nil {
991 t.Fatalf("want %v, got %v", want, err)
992 }
993 }
994
995
996
997
998
999 func (s) TestWeightedTarget_InitOneSubBalancerTransientFailure(t *testing.T) {
1000 cc := testutils.NewBalancerClientConn(t)
1001 wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
1002 defer wtb.Close()
1003
1004
1005 config, err := wtbParser.ParseConfig([]byte(`
1006 {
1007 "targets": {
1008 "cluster_1": {
1009 "weight":1,
1010 "childPolicy": [{"test_config_balancer": "cluster_1"}]
1011 },
1012 "cluster_2": {
1013 "weight":1,
1014 "childPolicy": [{"test_config_balancer": "cluster_2"}]
1015 }
1016 }
1017 }`))
1018 if err != nil {
1019 t.Fatalf("failed to parse balancer config: %v", err)
1020 }
1021
1022
1023 addr1 := resolver.Address{Addr: testBackendAddrStrs[1]}
1024 addr2 := resolver.Address{Addr: testBackendAddrStrs[2]}
1025 if err := wtb.UpdateClientConnState(balancer.ClientConnState{
1026 ResolverState: resolver.State{Addresses: []resolver.Address{
1027 hierarchy.Set(addr1, []string{"cluster_1"}),
1028 hierarchy.Set(addr2, []string{"cluster_2"}),
1029 }},
1030 BalancerConfig: config,
1031 }); err != nil {
1032 t.Fatalf("failed to update ClientConn state: %v", err)
1033 }
1034
1035 scs := waitForNewSubConns(t, cc, 2)
1036 verifySubConnAddrs(t, scs, map[string][]resolver.Address{
1037 "cluster_1": {addr1},
1038 "cluster_2": {addr2},
1039 })
1040
1041
1042 sc1 := scs["cluster_1"][0].sc.(*testutils.TestSubConn)
1043 _ = scs["cluster_2"][0].sc
1044
1045
1046
1047 sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
1048
1049 p := <-cc.NewPickerCh
1050 for i := 0; i < 5; i++ {
1051 r, err := p.Pick(balancer.PickInfo{})
1052 if err != balancer.ErrNoSubConnAvailable {
1053 t.Fatalf("want pick to fail with %v, got result %v, err %v", balancer.ErrNoSubConnAvailable, r, err)
1054 }
1055 }
1056 }
1057
1058
1059
1060
1061 func (s) TestBalancerGroup_SubBalancerTurnsConnectingFromTransientFailure(t *testing.T) {
1062 cc := testutils.NewBalancerClientConn(t)
1063 wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
1064 defer wtb.Close()
1065
1066
1067 config, err := wtbParser.ParseConfig([]byte(`
1068 {
1069 "targets": {
1070 "cluster_1": {
1071 "weight":1,
1072 "childPolicy": [{"test_config_balancer": "cluster_1"}]
1073 },
1074 "cluster_2": {
1075 "weight":1,
1076 "childPolicy": [{"test_config_balancer": "cluster_2"}]
1077 }
1078 }
1079 }`))
1080 if err != nil {
1081 t.Fatalf("failed to parse balancer config: %v", err)
1082 }
1083
1084
1085 addr1 := resolver.Address{Addr: testBackendAddrStrs[1]}
1086 addr2 := resolver.Address{Addr: testBackendAddrStrs[2]}
1087 if err := wtb.UpdateClientConnState(balancer.ClientConnState{
1088 ResolverState: resolver.State{Addresses: []resolver.Address{
1089 hierarchy.Set(addr1, []string{"cluster_1"}),
1090 hierarchy.Set(addr2, []string{"cluster_2"}),
1091 }},
1092 BalancerConfig: config,
1093 }); err != nil {
1094 t.Fatalf("failed to update ClientConn state: %v", err)
1095 }
1096
1097 scs := waitForNewSubConns(t, cc, 2)
1098 verifySubConnAddrs(t, scs, map[string][]resolver.Address{
1099 "cluster_1": {addr1},
1100 "cluster_2": {addr2},
1101 })
1102
1103
1104 sc1 := scs["cluster_1"][0].sc.(*testutils.TestSubConn)
1105 sc2 := scs["cluster_2"][0].sc.(*testutils.TestSubConn)
1106
1107
1108
1109 wantSubConnErr := errors.New("subConn connection error")
1110 sc1.UpdateState(balancer.SubConnState{
1111 ConnectivityState: connectivity.TransientFailure,
1112 ConnectionError: wantSubConnErr,
1113 })
1114 <-cc.NewPickerCh
1115 sc2.UpdateState(balancer.SubConnState{
1116 ConnectivityState: connectivity.TransientFailure,
1117 ConnectionError: wantSubConnErr,
1118 })
1119 p := <-cc.NewPickerCh
1120
1121 for i := 0; i < 5; i++ {
1122 if _, err := p.Pick(balancer.PickInfo{}); err == nil || !strings.Contains(err.Error(), wantSubConnErr.Error()) {
1123 t.Fatalf("picker.Pick() returned error: %v, want: %v", err, wantSubConnErr)
1124 }
1125 }
1126
1127
1128 sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
1129 select {
1130 case <-time.After(100 * time.Millisecond):
1131 case <-cc.NewPickerCh:
1132 t.Fatal("received new picker from the LB policy when expecting none")
1133 }
1134
1135 for i := 0; i < 5; i++ {
1136 if _, err := p.Pick(balancer.PickInfo{}); err == nil || !strings.Contains(err.Error(), wantSubConnErr.Error()) {
1137 t.Fatalf("picker.Pick() returned error: %v, want: %v", err, wantSubConnErr)
1138 }
1139 }
1140 }
1141
1142
1143
1144 func verifyAddressInNewSubConn(t *testing.T, cc *testutils.BalancerClientConn, addr resolver.Address) {
1145 t.Helper()
1146
1147 gotAddr := <-cc.NewSubConnAddrsCh
1148 wantAddr := []resolver.Address{hierarchy.Set(addr, []string{})}
1149 if diff := cmp.Diff(gotAddr, wantAddr, cmp.AllowUnexported(attributes.Attributes{})); diff != "" {
1150 t.Fatalf("got unexpected new subconn addrs: %v", diff)
1151 }
1152 }
1153
1154
1155 type subConnWithAddr struct {
1156 sc balancer.SubConn
1157 addr resolver.Address
1158 }
1159
1160
1161
1162
1163
1164
1165
1166 func waitForNewSubConns(t *testing.T, cc *testutils.BalancerClientConn, num int) map[string][]subConnWithAddr {
1167 t.Helper()
1168
1169 scs := make(map[string][]subConnWithAddr)
1170 for i := 0; i < num; i++ {
1171 addrs := <-cc.NewSubConnAddrsCh
1172 if len(addrs) != 1 {
1173 t.Fatalf("received subConns with %d addresses, want 1", len(addrs))
1174 }
1175 cfg, ok := getConfigKey(addrs[0].Attributes)
1176 if !ok {
1177 t.Fatalf("received subConn address %v contains no attribute for balancer config", addrs[0])
1178 }
1179 sc := <-cc.NewSubConnCh
1180 scWithAddr := subConnWithAddr{sc: sc, addr: addrs[0]}
1181 scs[cfg] = append(scs[cfg], scWithAddr)
1182 }
1183 return scs
1184 }
1185
1186 func verifySubConnAddrs(t *testing.T, scs map[string][]subConnWithAddr, wantSubConnAddrs map[string][]resolver.Address) {
1187 t.Helper()
1188
1189 if len(scs) != len(wantSubConnAddrs) {
1190 t.Fatalf("got new subConns %+v, want %v", scs, wantSubConnAddrs)
1191 }
1192 for cfg, scsWithAddr := range scs {
1193 if len(scsWithAddr) != len(wantSubConnAddrs[cfg]) {
1194 t.Fatalf("got new subConns %+v, want %v", scs, wantSubConnAddrs)
1195 }
1196 wantAddrs := wantSubConnAddrs[cfg]
1197 for i, scWithAddr := range scsWithAddr {
1198 if diff := cmp.Diff(wantAddrs[i].Addr, scWithAddr.addr.Addr); diff != "" {
1199 t.Fatalf("got unexpected new subconn addrs: %v", diff)
1200 }
1201 }
1202 }
1203 }
1204
1205 const initIdleBalancerName = "test-init-Idle-balancer"
1206
1207 var errTestInitIdle = fmt.Errorf("init Idle balancer error 0")
1208
1209 func init() {
1210 stub.Register(initIdleBalancerName, stub.BalancerFuncs{
1211 UpdateClientConnState: func(bd *stub.BalancerData, opts balancer.ClientConnState) error {
1212 sc, err := bd.ClientConn.NewSubConn(opts.ResolverState.Addresses, balancer.NewSubConnOptions{
1213 StateListener: func(state balancer.SubConnState) {
1214 err := fmt.Errorf("wrong picker error")
1215 if state.ConnectivityState == connectivity.Idle {
1216 err = errTestInitIdle
1217 }
1218 bd.ClientConn.UpdateState(balancer.State{
1219 ConnectivityState: state.ConnectivityState,
1220 Picker: &testutils.TestConstPicker{Err: err},
1221 })
1222 },
1223 })
1224 if err != nil {
1225 return err
1226 }
1227 sc.Connect()
1228 return nil
1229 },
1230 })
1231 }
1232
1233
1234
1235 func (s) TestInitialIdle(t *testing.T) {
1236 cc := testutils.NewBalancerClientConn(t)
1237 wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
1238 defer wtb.Close()
1239
1240 config, err := wtbParser.ParseConfig([]byte(`
1241 {
1242 "targets": {
1243 "cluster_1": {
1244 "weight":1,
1245 "childPolicy": [{"test-init-Idle-balancer": ""}]
1246 }
1247 }
1248 }`))
1249 if err != nil {
1250 t.Fatalf("failed to parse balancer config: %v", err)
1251 }
1252
1253
1254 addrs := []resolver.Address{{Addr: testBackendAddrStrs[0], Attributes: nil}}
1255 if err := wtb.UpdateClientConnState(balancer.ClientConnState{
1256 ResolverState: resolver.State{Addresses: []resolver.Address{hierarchy.Set(addrs[0], []string{"cds:cluster_1"})}},
1257 BalancerConfig: config,
1258 }); err != nil {
1259 t.Fatalf("failed to update ClientConn state: %v", err)
1260 }
1261
1262
1263
1264 for range addrs {
1265 sc := <-cc.NewSubConnCh
1266 sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Idle})
1267 }
1268
1269 if state := <-cc.NewStateCh; state != connectivity.Idle {
1270 t.Fatalf("Received aggregated state: %v, want Idle", state)
1271 }
1272 }
1273
1274
1275
1276 func (s) TestIgnoreSubBalancerStateTransitions(t *testing.T) {
1277 cc := &tcc{BalancerClientConn: testutils.NewBalancerClientConn(t)}
1278
1279 wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
1280 defer wtb.Close()
1281
1282 config, err := wtbParser.ParseConfig([]byte(`
1283 {
1284 "targets": {
1285 "cluster_1": {
1286 "weight":1,
1287 "childPolicy": [{"round_robin": ""}]
1288 }
1289 }
1290 }`))
1291 if err != nil {
1292 t.Fatalf("failed to parse balancer config: %v", err)
1293 }
1294
1295
1296 addr := resolver.Address{Addr: testBackendAddrStrs[0], Attributes: nil}
1297 if err := wtb.UpdateClientConnState(balancer.ClientConnState{
1298 ResolverState: resolver.State{Addresses: []resolver.Address{hierarchy.Set(addr, []string{"cluster_1"})}},
1299 BalancerConfig: config,
1300 }); err != nil {
1301 t.Fatalf("failed to update ClientConn state: %v", err)
1302 }
1303
1304 sc := <-cc.NewSubConnCh
1305 sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
1306 sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
1307
1308
1309 if len(cc.states) != 2 || cc.states[0].ConnectivityState != connectivity.Connecting || cc.states[1].ConnectivityState != connectivity.TransientFailure {
1310 t.Fatalf("cc.states = %v; want [Connecting, TransientFailure]", cc.states)
1311 }
1312 }
1313
1314
1315
1316 type tcc struct {
1317 *testutils.BalancerClientConn
1318 states []balancer.State
1319 }
1320
1321 func (t *tcc) UpdateState(bs balancer.State) {
1322 t.states = append(t.states, bs)
1323 t.BalancerClientConn.UpdateState(bs)
1324 }
1325
1326 func (s) TestUpdateStatePauses(t *testing.T) {
1327 cc := &tcc{BalancerClientConn: testutils.NewBalancerClientConn(t)}
1328
1329 balFuncs := stub.BalancerFuncs{
1330 UpdateClientConnState: func(bd *stub.BalancerData, s balancer.ClientConnState) error {
1331 bd.ClientConn.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: nil})
1332 bd.ClientConn.UpdateState(balancer.State{ConnectivityState: connectivity.Ready, Picker: nil})
1333 return nil
1334 },
1335 }
1336 stub.Register("update_state_balancer", balFuncs)
1337
1338 wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
1339 defer wtb.Close()
1340
1341 config, err := wtbParser.ParseConfig([]byte(`
1342 {
1343 "targets": {
1344 "cluster_1": {
1345 "weight":1,
1346 "childPolicy": [{"update_state_balancer": ""}]
1347 }
1348 }
1349 }`))
1350 if err != nil {
1351 t.Fatalf("failed to parse balancer config: %v", err)
1352 }
1353
1354
1355 addrs := []resolver.Address{{Addr: testBackendAddrStrs[0], Attributes: nil}}
1356 if err := wtb.UpdateClientConnState(balancer.ClientConnState{
1357 ResolverState: resolver.State{Addresses: []resolver.Address{hierarchy.Set(addrs[0], []string{"cds:cluster_1"})}},
1358 BalancerConfig: config,
1359 }); err != nil {
1360 t.Fatalf("failed to update ClientConn state: %v", err)
1361 }
1362
1363
1364 if len(cc.states) != 1 || cc.states[0].ConnectivityState != connectivity.Ready {
1365 t.Fatalf("cc.states = %v; want [connectivity.Ready]", cc.states)
1366 }
1367 }
1368
View as plain text