1
18
19 package clustermanager
20
21 import (
22 "context"
23 "fmt"
24 "testing"
25 "time"
26
27 "github.com/google/go-cmp/cmp"
28 "google.golang.org/grpc"
29 "google.golang.org/grpc/balancer"
30 "google.golang.org/grpc/codes"
31 "google.golang.org/grpc/connectivity"
32 "google.golang.org/grpc/credentials/insecure"
33 "google.golang.org/grpc/internal/balancer/stub"
34 "google.golang.org/grpc/internal/grpctest"
35 "google.golang.org/grpc/internal/hierarchy"
36 "google.golang.org/grpc/internal/testutils"
37 "google.golang.org/grpc/resolver"
38 "google.golang.org/grpc/status"
39 )
40
41 type s struct {
42 grpctest.Tester
43 }
44
45 func Test(t *testing.T) {
46 grpctest.RunSubTests(t, s{})
47 }
48
49 const (
50 defaultTestTimeout = 5 * time.Second
51 defaultTestShortTimeout = 10 * time.Millisecond
52 testBackendAddrsCount = 12
53 )
54
55 var testBackendAddrStrs []string
56
57 func init() {
58 for i := 0; i < testBackendAddrsCount; i++ {
59 testBackendAddrStrs = append(testBackendAddrStrs, fmt.Sprintf("%d.%d.%d.%d:%d", i, i, i, i, i))
60 }
61 }
62
63 func testPick(t *testing.T, p balancer.Picker, info balancer.PickInfo, wantSC balancer.SubConn, wantErr error) {
64 t.Helper()
65 for i := 0; i < 5; i++ {
66 gotSCSt, err := p.Pick(info)
67 if fmt.Sprint(err) != fmt.Sprint(wantErr) {
68 t.Fatalf("picker.Pick(%+v), got error %v, want %v", info, err, wantErr)
69 }
70 if gotSCSt.SubConn != wantSC {
71 t.Fatalf("picker.Pick(%+v), got %v, want SubConn=%v", info, gotSCSt, wantSC)
72 }
73 }
74 }
75
76 func TestClusterPicks(t *testing.T) {
77 cc := testutils.NewBalancerClientConn(t)
78 builder := balancer.Get(balancerName)
79 parser := builder.(balancer.ConfigParser)
80 bal := builder.Build(cc, balancer.BuildOptions{})
81
82 configJSON1 := `{
83 "children": {
84 "cds:cluster_1":{ "childPolicy": [{"round_robin":""}] },
85 "cds:cluster_2":{ "childPolicy": [{"round_robin":""}] }
86 }
87 }`
88 config1, err := parser.ParseConfig([]byte(configJSON1))
89 if err != nil {
90 t.Fatalf("failed to parse balancer config: %v", err)
91 }
92
93
94 wantAddrs := []resolver.Address{
95 {Addr: testBackendAddrStrs[0], BalancerAttributes: nil},
96 {Addr: testBackendAddrStrs[1], BalancerAttributes: nil},
97 }
98 if err := bal.UpdateClientConnState(balancer.ClientConnState{
99 ResolverState: resolver.State{Addresses: []resolver.Address{
100 hierarchy.Set(wantAddrs[0], []string{"cds:cluster_1"}),
101 hierarchy.Set(wantAddrs[1], []string{"cds:cluster_2"}),
102 }},
103 BalancerConfig: config1,
104 }); err != nil {
105 t.Fatalf("failed to update ClientConn state: %v", err)
106 }
107
108 m1 := make(map[resolver.Address]balancer.SubConn)
109
110
111 for range wantAddrs {
112 addrs := <-cc.NewSubConnAddrsCh
113 if len(hierarchy.Get(addrs[0])) != 0 {
114 t.Fatalf("NewSubConn with address %+v, attrs %+v, want address with hierarchy cleared", addrs[0], addrs[0].BalancerAttributes)
115 }
116 sc := <-cc.NewSubConnCh
117
118 addrs[0].BalancerAttributes = nil
119 m1[addrs[0]] = sc
120 sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
121 sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
122 }
123
124 p1 := <-cc.NewPickerCh
125 for _, tt := range []struct {
126 pickInfo balancer.PickInfo
127 wantSC balancer.SubConn
128 wantErr error
129 }{
130 {
131 pickInfo: balancer.PickInfo{
132 Ctx: SetPickedCluster(context.Background(), "cds:cluster_1"),
133 },
134 wantSC: m1[wantAddrs[0]],
135 },
136 {
137 pickInfo: balancer.PickInfo{
138 Ctx: SetPickedCluster(context.Background(), "cds:cluster_2"),
139 },
140 wantSC: m1[wantAddrs[1]],
141 },
142 {
143 pickInfo: balancer.PickInfo{
144 Ctx: SetPickedCluster(context.Background(), "notacluster"),
145 },
146 wantErr: status.Errorf(codes.Unavailable, `unknown cluster selected for RPC: "notacluster"`),
147 },
148 } {
149 testPick(t, p1, tt.pickInfo, tt.wantSC, tt.wantErr)
150 }
151 }
152
153
154
155 func TestConfigUpdateAddCluster(t *testing.T) {
156 cc := testutils.NewBalancerClientConn(t)
157 builder := balancer.Get(balancerName)
158 parser := builder.(balancer.ConfigParser)
159 bal := builder.Build(cc, balancer.BuildOptions{})
160
161 configJSON1 := `{
162 "children": {
163 "cds:cluster_1":{ "childPolicy": [{"round_robin":""}] },
164 "cds:cluster_2":{ "childPolicy": [{"round_robin":""}] }
165 }
166 }`
167 config1, err := parser.ParseConfig([]byte(configJSON1))
168 if err != nil {
169 t.Fatalf("failed to parse balancer config: %v", err)
170 }
171
172
173 wantAddrs := []resolver.Address{
174 {Addr: testBackendAddrStrs[0], BalancerAttributes: nil},
175 {Addr: testBackendAddrStrs[1], BalancerAttributes: nil},
176 }
177 if err := bal.UpdateClientConnState(balancer.ClientConnState{
178 ResolverState: resolver.State{Addresses: []resolver.Address{
179 hierarchy.Set(wantAddrs[0], []string{"cds:cluster_1"}),
180 hierarchy.Set(wantAddrs[1], []string{"cds:cluster_2"}),
181 }},
182 BalancerConfig: config1,
183 }); err != nil {
184 t.Fatalf("failed to update ClientConn state: %v", err)
185 }
186
187 m1 := make(map[resolver.Address]balancer.SubConn)
188
189
190 for range wantAddrs {
191 addrs := <-cc.NewSubConnAddrsCh
192 if len(hierarchy.Get(addrs[0])) != 0 {
193 t.Fatalf("NewSubConn with address %+v, attrs %+v, want address with hierarchy cleared", addrs[0], addrs[0].BalancerAttributes)
194 }
195 sc := <-cc.NewSubConnCh
196
197 addrs[0].BalancerAttributes = nil
198 m1[addrs[0]] = sc
199 sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
200 sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
201 }
202
203 p1 := <-cc.NewPickerCh
204 for _, tt := range []struct {
205 pickInfo balancer.PickInfo
206 wantSC balancer.SubConn
207 wantErr error
208 }{
209 {
210 pickInfo: balancer.PickInfo{
211 Ctx: SetPickedCluster(context.Background(), "cds:cluster_1"),
212 },
213 wantSC: m1[wantAddrs[0]],
214 },
215 {
216 pickInfo: balancer.PickInfo{
217 Ctx: SetPickedCluster(context.Background(), "cds:cluster_2"),
218 },
219 wantSC: m1[wantAddrs[1]],
220 },
221 {
222 pickInfo: balancer.PickInfo{
223 Ctx: SetPickedCluster(context.Background(), "cds:notacluster"),
224 },
225 wantErr: status.Errorf(codes.Unavailable, `unknown cluster selected for RPC: "cds:notacluster"`),
226 },
227 } {
228 testPick(t, p1, tt.pickInfo, tt.wantSC, tt.wantErr)
229 }
230
231
232
233 configJSON2 := `{
234 "children": {
235 "cds:cluster_1":{ "childPolicy": [{"round_robin":""}] },
236 "cds:cluster_2":{ "childPolicy": [{"round_robin":""}] },
237 "cds:cluster_3":{ "childPolicy": [{"round_robin":""}] }
238 }
239 }`
240 config2, err := parser.ParseConfig([]byte(configJSON2))
241 if err != nil {
242 t.Fatalf("failed to parse balancer config: %v", err)
243 }
244 wantAddrs = append(wantAddrs, resolver.Address{Addr: testBackendAddrStrs[2], BalancerAttributes: nil})
245 if err := bal.UpdateClientConnState(balancer.ClientConnState{
246 ResolverState: resolver.State{Addresses: []resolver.Address{
247 hierarchy.Set(wantAddrs[0], []string{"cds:cluster_1"}),
248 hierarchy.Set(wantAddrs[1], []string{"cds:cluster_2"}),
249 hierarchy.Set(wantAddrs[2], []string{"cds:cluster_3"}),
250 }},
251 BalancerConfig: config2,
252 }); err != nil {
253 t.Fatalf("failed to update ClientConn state: %v", err)
254 }
255
256
257 addrs := <-cc.NewSubConnAddrsCh
258 if len(hierarchy.Get(addrs[0])) != 0 {
259 t.Fatalf("NewSubConn with address %+v, attrs %+v, want address with hierarchy cleared", addrs[0], addrs[0].BalancerAttributes)
260 }
261 sc := <-cc.NewSubConnCh
262
263 addrs[0].BalancerAttributes = nil
264 m1[addrs[0]] = sc
265 sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
266 sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
267
268
269 select {
270 case <-time.After(time.Millisecond * 500):
271 case <-cc.NewSubConnCh:
272 addrs := <-cc.NewSubConnAddrsCh
273 t.Fatalf("unexpected NewSubConn with address %v", addrs)
274 }
275
276 p2 := <-cc.NewPickerCh
277 for _, tt := range []struct {
278 pickInfo balancer.PickInfo
279 wantSC balancer.SubConn
280 wantErr error
281 }{
282 {
283 pickInfo: balancer.PickInfo{
284 Ctx: SetPickedCluster(context.Background(), "cds:cluster_1"),
285 },
286 wantSC: m1[wantAddrs[0]],
287 },
288 {
289 pickInfo: balancer.PickInfo{
290 Ctx: SetPickedCluster(context.Background(), "cds:cluster_2"),
291 },
292 wantSC: m1[wantAddrs[1]],
293 },
294 {
295 pickInfo: balancer.PickInfo{
296 Ctx: SetPickedCluster(context.Background(), "cds:cluster_3"),
297 },
298 wantSC: m1[wantAddrs[2]],
299 },
300 {
301 pickInfo: balancer.PickInfo{
302 Ctx: SetPickedCluster(context.Background(), "cds:notacluster"),
303 },
304 wantErr: status.Errorf(codes.Unavailable, `unknown cluster selected for RPC: "cds:notacluster"`),
305 },
306 } {
307 testPick(t, p2, tt.pickInfo, tt.wantSC, tt.wantErr)
308 }
309 }
310
311
312
313 func TestRoutingConfigUpdateDeleteAll(t *testing.T) {
314 cc := testutils.NewBalancerClientConn(t)
315 builder := balancer.Get(balancerName)
316 parser := builder.(balancer.ConfigParser)
317 bal := builder.Build(cc, balancer.BuildOptions{})
318
319 configJSON1 := `{
320 "children": {
321 "cds:cluster_1":{ "childPolicy": [{"round_robin":""}] },
322 "cds:cluster_2":{ "childPolicy": [{"round_robin":""}] }
323 }
324 }`
325 config1, err := parser.ParseConfig([]byte(configJSON1))
326 if err != nil {
327 t.Fatalf("failed to parse balancer config: %v", err)
328 }
329
330
331 wantAddrs := []resolver.Address{
332 {Addr: testBackendAddrStrs[0], BalancerAttributes: nil},
333 {Addr: testBackendAddrStrs[1], BalancerAttributes: nil},
334 }
335 if err := bal.UpdateClientConnState(balancer.ClientConnState{
336 ResolverState: resolver.State{Addresses: []resolver.Address{
337 hierarchy.Set(wantAddrs[0], []string{"cds:cluster_1"}),
338 hierarchy.Set(wantAddrs[1], []string{"cds:cluster_2"}),
339 }},
340 BalancerConfig: config1,
341 }); err != nil {
342 t.Fatalf("failed to update ClientConn state: %v", err)
343 }
344
345 m1 := make(map[resolver.Address]balancer.SubConn)
346
347
348 for range wantAddrs {
349 addrs := <-cc.NewSubConnAddrsCh
350 if len(hierarchy.Get(addrs[0])) != 0 {
351 t.Fatalf("NewSubConn with address %+v, attrs %+v, want address with hierarchy cleared", addrs[0], addrs[0].BalancerAttributes)
352 }
353 sc := <-cc.NewSubConnCh
354
355 addrs[0].BalancerAttributes = nil
356 m1[addrs[0]] = sc
357 sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
358 sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
359 }
360
361 p1 := <-cc.NewPickerCh
362 for _, tt := range []struct {
363 pickInfo balancer.PickInfo
364 wantSC balancer.SubConn
365 wantErr error
366 }{
367 {
368 pickInfo: balancer.PickInfo{
369 Ctx: SetPickedCluster(context.Background(), "cds:cluster_1"),
370 },
371 wantSC: m1[wantAddrs[0]],
372 },
373 {
374 pickInfo: balancer.PickInfo{
375 Ctx: SetPickedCluster(context.Background(), "cds:cluster_2"),
376 },
377 wantSC: m1[wantAddrs[1]],
378 },
379 {
380 pickInfo: balancer.PickInfo{
381 Ctx: SetPickedCluster(context.Background(), "cds:notacluster"),
382 },
383 wantErr: status.Errorf(codes.Unavailable, `unknown cluster selected for RPC: "cds:notacluster"`),
384 },
385 } {
386 testPick(t, p1, tt.pickInfo, tt.wantSC, tt.wantErr)
387 }
388
389
390 configJSON2 := `{}`
391 config2, err := parser.ParseConfig([]byte(configJSON2))
392 if err != nil {
393 t.Fatalf("failed to parse balancer config: %v", err)
394 }
395 if err := bal.UpdateClientConnState(balancer.ClientConnState{
396 BalancerConfig: config2,
397 }); err != nil {
398 t.Fatalf("failed to update ClientConn state: %v", err)
399 }
400
401
402 for range wantAddrs {
403 select {
404 case <-time.After(time.Millisecond * 500):
405 t.Fatalf("timeout waiting for remove subconn")
406 case <-cc.ShutdownSubConnCh:
407 }
408 }
409
410 p2 := <-cc.NewPickerCh
411 for i := 0; i < 5; i++ {
412 gotSCSt, err := p2.Pick(balancer.PickInfo{Ctx: SetPickedCluster(context.Background(), "cds:notacluster")})
413 if fmt.Sprint(err) != status.Errorf(codes.Unavailable, `unknown cluster selected for RPC: "cds:notacluster"`).Error() {
414 t.Fatalf("picker.Pick, got %v, %v, want error %v", gotSCSt, err, `unknown cluster selected for RPC: "cds:notacluster"`)
415 }
416 }
417
418
419 if err := bal.UpdateClientConnState(balancer.ClientConnState{
420 ResolverState: resolver.State{Addresses: []resolver.Address{
421 hierarchy.Set(wantAddrs[0], []string{"cds:cluster_1"}),
422 hierarchy.Set(wantAddrs[1], []string{"cds:cluster_2"}),
423 }},
424 BalancerConfig: config1,
425 }); err != nil {
426 t.Fatalf("failed to update ClientConn state: %v", err)
427 }
428
429 m2 := make(map[resolver.Address]balancer.SubConn)
430
431
432 for range wantAddrs {
433 addrs := <-cc.NewSubConnAddrsCh
434 if len(hierarchy.Get(addrs[0])) != 0 {
435 t.Fatalf("NewSubConn with address %+v, attrs %+v, want address with hierarchy cleared", addrs[0], addrs[0].BalancerAttributes)
436 }
437 sc := <-cc.NewSubConnCh
438
439 addrs[0].BalancerAttributes = nil
440 m2[addrs[0]] = sc
441 sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
442 sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
443 }
444
445 p3 := <-cc.NewPickerCh
446 for _, tt := range []struct {
447 pickInfo balancer.PickInfo
448 wantSC balancer.SubConn
449 wantErr error
450 }{
451 {
452 pickInfo: balancer.PickInfo{
453 Ctx: SetPickedCluster(context.Background(), "cds:cluster_1"),
454 },
455 wantSC: m2[wantAddrs[0]],
456 },
457 {
458 pickInfo: balancer.PickInfo{
459 Ctx: SetPickedCluster(context.Background(), "cds:cluster_2"),
460 },
461 wantSC: m2[wantAddrs[1]],
462 },
463 {
464 pickInfo: balancer.PickInfo{
465 Ctx: SetPickedCluster(context.Background(), "cds:notacluster"),
466 },
467 wantErr: status.Errorf(codes.Unavailable, `unknown cluster selected for RPC: "cds:notacluster"`),
468 },
469 } {
470 testPick(t, p3, tt.pickInfo, tt.wantSC, tt.wantErr)
471 }
472 }
473
474 func TestClusterManagerForwardsBalancerBuildOptions(t *testing.T) {
475 const (
476 userAgent = "ua"
477 defaultTestTimeout = 1 * time.Second
478 )
479
480
481
482 ccsCh := testutils.NewChannel()
483 bOpts := balancer.BuildOptions{
484 DialCreds: insecure.NewCredentials(),
485 CustomUserAgent: userAgent,
486 }
487 stub.Register(t.Name(), stub.BalancerFuncs{
488 UpdateClientConnState: func(bd *stub.BalancerData, _ balancer.ClientConnState) error {
489 if !cmp.Equal(bd.BuildOptions, bOpts) {
490 err := fmt.Errorf("buildOptions in child balancer: %v, want %v", bd, bOpts)
491 ccsCh.Send(err)
492 return err
493 }
494 ccsCh.Send(nil)
495 return nil
496 },
497 })
498
499 cc := testutils.NewBalancerClientConn(t)
500 builder := balancer.Get(balancerName)
501 parser := builder.(balancer.ConfigParser)
502 bal := builder.Build(cc, bOpts)
503
504 configJSON1 := fmt.Sprintf(`{
505 "children": {
506 "cds:cluster_1":{ "childPolicy": [{"%s":""}] }
507 }
508 }`, t.Name())
509 config1, err := parser.ParseConfig([]byte(configJSON1))
510 if err != nil {
511 t.Fatalf("failed to parse balancer config: %v", err)
512 }
513
514 if err := bal.UpdateClientConnState(balancer.ClientConnState{BalancerConfig: config1}); err != nil {
515 t.Fatalf("failed to update ClientConn state: %v", err)
516 }
517 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
518 defer cancel()
519 v, err := ccsCh.Receive(ctx)
520 if err != nil {
521 t.Fatalf("timed out waiting for UpdateClientConnState result: %v", err)
522 }
523 if v != nil {
524 t.Fatal(v)
525 }
526 }
527
528 const initIdleBalancerName = "test-init-Idle-balancer"
529
530 var errTestInitIdle = fmt.Errorf("init Idle balancer error 0")
531
532 func init() {
533 stub.Register(initIdleBalancerName, stub.BalancerFuncs{
534 UpdateClientConnState: func(bd *stub.BalancerData, opts balancer.ClientConnState) error {
535 sc, err := bd.ClientConn.NewSubConn(opts.ResolverState.Addresses, balancer.NewSubConnOptions{
536 StateListener: func(state balancer.SubConnState) {
537 err := fmt.Errorf("wrong picker error")
538 if state.ConnectivityState == connectivity.Idle {
539 err = errTestInitIdle
540 }
541 bd.ClientConn.UpdateState(balancer.State{
542 ConnectivityState: state.ConnectivityState,
543 Picker: &testutils.TestConstPicker{Err: err},
544 })
545 },
546 })
547 if err != nil {
548 return err
549 }
550 sc.Connect()
551 return nil
552 },
553 })
554 }
555
556
557
558 func TestInitialIdle(t *testing.T) {
559 cc := testutils.NewBalancerClientConn(t)
560 builder := balancer.Get(balancerName)
561 parser := builder.(balancer.ConfigParser)
562 bal := builder.Build(cc, balancer.BuildOptions{})
563
564 configJSON1 := `{
565 "children": {
566 "cds:cluster_1":{ "childPolicy": [{"test-init-Idle-balancer":""}] }
567 }
568 }`
569 config1, err := parser.ParseConfig([]byte(configJSON1))
570 if err != nil {
571 t.Fatalf("failed to parse balancer config: %v", err)
572 }
573
574
575 wantAddrs := []resolver.Address{
576 {Addr: testBackendAddrStrs[0], BalancerAttributes: nil},
577 }
578 if err := bal.UpdateClientConnState(balancer.ClientConnState{
579 ResolverState: resolver.State{Addresses: []resolver.Address{
580 hierarchy.Set(wantAddrs[0], []string{"cds:cluster_1"}),
581 }},
582 BalancerConfig: config1,
583 }); err != nil {
584 t.Fatalf("failed to update ClientConn state: %v", err)
585 }
586
587
588
589 for range wantAddrs {
590 sc := <-cc.NewSubConnCh
591 sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Idle})
592 }
593
594 if state1 := <-cc.NewStateCh; state1 != connectivity.Idle {
595 t.Fatalf("Received aggregated state: %v, want Idle", state1)
596 }
597 }
598
599
600
601
602
603
604
605 func TestClusterGracefulSwitch(t *testing.T) {
606 cc := testutils.NewBalancerClientConn(t)
607 builder := balancer.Get(balancerName)
608 parser := builder.(balancer.ConfigParser)
609 bal := builder.Build(cc, balancer.BuildOptions{})
610
611 configJSON1 := `{
612 "children": {
613 "csp:cluster":{ "childPolicy": [{"round_robin":""}] }
614 }
615 }`
616 config1, err := parser.ParseConfig([]byte(configJSON1))
617 if err != nil {
618 t.Fatalf("failed to parse balancer config: %v", err)
619 }
620 wantAddrs := []resolver.Address{
621 {Addr: testBackendAddrStrs[0], BalancerAttributes: nil},
622 {Addr: testBackendAddrStrs[1], BalancerAttributes: nil},
623 }
624 if err := bal.UpdateClientConnState(balancer.ClientConnState{
625 ResolverState: resolver.State{Addresses: []resolver.Address{
626 hierarchy.Set(wantAddrs[0], []string{"csp:cluster"}),
627 }},
628 BalancerConfig: config1,
629 }); err != nil {
630 t.Fatalf("failed to update ClientConn state: %v", err)
631 }
632
633 sc1 := <-cc.NewSubConnCh
634 sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
635 sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
636 p1 := <-cc.NewPickerCh
637 pi := balancer.PickInfo{
638 Ctx: SetPickedCluster(context.Background(), "csp:cluster"),
639 }
640 testPick(t, p1, pi, sc1, nil)
641
642 childPolicyName := t.Name()
643 stub.Register(childPolicyName, stub.BalancerFuncs{
644 Init: func(bd *stub.BalancerData) {
645 bd.Data = balancer.Get(grpc.PickFirstBalancerName).Build(bd.ClientConn, bd.BuildOptions)
646 },
647 UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
648 bal := bd.Data.(balancer.Balancer)
649 return bal.UpdateClientConnState(ccs)
650 },
651 })
652
653 configJSON2 := fmt.Sprintf(`{
654 "children": {
655 "csp:cluster":{ "childPolicy": [{"%s":""}] }
656 }
657 }`, childPolicyName)
658 config2, err := parser.ParseConfig([]byte(configJSON2))
659 if err != nil {
660 t.Fatalf("failed to parse balancer config: %v", err)
661 }
662 if err := bal.UpdateClientConnState(balancer.ClientConnState{
663 ResolverState: resolver.State{Addresses: []resolver.Address{
664 hierarchy.Set(wantAddrs[1], []string{"csp:cluster"}),
665 }},
666 BalancerConfig: config2,
667 }); err != nil {
668 t.Fatalf("failed to update ClientConn state: %v", err)
669 }
670 sc2 := <-cc.NewSubConnCh
671
672
673
674 sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
675 ctx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
676 defer cancel()
677 select {
678 case <-cc.NewPickerCh:
679 t.Fatalf("No new picker should have been sent due to the Graceful Switch process not completing")
680 case <-ctx.Done():
681 }
682
683
684
685
686
687 sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
688 p2 := <-cc.NewPickerCh
689 testPick(t, p2, pi, sc2, nil)
690
691
692 ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout)
693 defer cancel()
694 select {
695 case <-ctx.Done():
696 t.Fatalf("error waiting for sc.Shutdown()")
697 case rsc := <-cc.ShutdownSubConnCh:
698
699
700 if rsc != sc1 {
701 t.Fatalf("Shutdown() got: %v, want %v", rsc, sc1)
702 }
703 }
704 }
705
706
707
708 type tcc struct {
709 *testutils.BalancerClientConn
710 states []balancer.State
711 }
712
713 func (t *tcc) UpdateState(bs balancer.State) {
714 t.states = append(t.states, bs)
715 t.BalancerClientConn.UpdateState(bs)
716 }
717
718 func (s) TestUpdateStatePauses(t *testing.T) {
719 cc := &tcc{BalancerClientConn: testutils.NewBalancerClientConn(t)}
720
721 balFuncs := stub.BalancerFuncs{
722 UpdateClientConnState: func(bd *stub.BalancerData, s balancer.ClientConnState) error {
723 bd.ClientConn.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: nil})
724 bd.ClientConn.UpdateState(balancer.State{ConnectivityState: connectivity.Ready, Picker: nil})
725 return nil
726 },
727 }
728 stub.Register("update_state_balancer", balFuncs)
729
730 builder := balancer.Get(balancerName)
731 parser := builder.(balancer.ConfigParser)
732 bal := builder.Build(cc, balancer.BuildOptions{})
733
734 configJSON1 := `{
735 "children": {
736 "cds:cluster_1":{ "childPolicy": [{"update_state_balancer":""}] }
737 }
738 }`
739 config1, err := parser.ParseConfig([]byte(configJSON1))
740 if err != nil {
741 t.Fatalf("failed to parse balancer config: %v", err)
742 }
743
744
745 wantAddrs := []resolver.Address{
746 {Addr: testBackendAddrStrs[0], BalancerAttributes: nil},
747 }
748 if err := bal.UpdateClientConnState(balancer.ClientConnState{
749 ResolverState: resolver.State{Addresses: []resolver.Address{
750 hierarchy.Set(wantAddrs[0], []string{"cds:cluster_1"}),
751 }},
752 BalancerConfig: config1,
753 }); err != nil {
754 t.Fatalf("failed to update ClientConn state: %v", err)
755 }
756
757
758 if len(cc.states) != 1 || cc.states[0].ConnectivityState != connectivity.Ready {
759 t.Fatalf("cc.states = %v; want [connectivity.Ready]", cc.states)
760 }
761 }
762
View as plain text