...
1
18
19 package test
20
21 import (
22 "context"
23 "errors"
24 "fmt"
25 "testing"
26
27 "google.golang.org/grpc"
28 "google.golang.org/grpc/balancer"
29 "google.golang.org/grpc/balancer/base"
30 "google.golang.org/grpc/connectivity"
31 "google.golang.org/grpc/internal/balancer/stub"
32 "google.golang.org/grpc/internal/stubserver"
33 "google.golang.org/grpc/internal/testutils"
34 testpb "google.golang.org/grpc/interop/grpc_testing"
35 "google.golang.org/grpc/resolver"
36 )
37
38 type tsccPicker struct {
39 sc balancer.SubConn
40 }
41
42 func (p *tsccPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
43 return balancer.PickResult{SubConn: p.sc}, nil
44 }
45
46
47
48 func (s) TestSubConnEmpty(t *testing.T) {
49 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
50 defer cancel()
51
52
53
54 var sc balancer.SubConn
55
56
57
58
59 bal := stub.BalancerFuncs{
60 UpdateClientConnState: func(d *stub.BalancerData, ccs balancer.ClientConnState) error {
61 if sc == nil {
62 var err error
63 sc, err = d.ClientConn.NewSubConn(ccs.ResolverState.Addresses, balancer.NewSubConnOptions{
64 StateListener: func(state balancer.SubConnState) {
65 switch state.ConnectivityState {
66 case connectivity.Ready:
67 d.ClientConn.UpdateState(balancer.State{
68 ConnectivityState: connectivity.Ready,
69 Picker: &tsccPicker{sc: sc},
70 })
71 case connectivity.TransientFailure:
72 d.ClientConn.UpdateState(balancer.State{
73 ConnectivityState: connectivity.TransientFailure,
74 Picker: base.NewErrPicker(fmt.Errorf("error connecting: %v", state.ConnectionError)),
75 })
76 }
77 },
78 })
79 if err != nil {
80 t.Errorf("error creating initial subconn: %v", err)
81 }
82 } else {
83 d.ClientConn.UpdateAddresses(sc, ccs.ResolverState.Addresses)
84 }
85 sc.Connect()
86
87 if len(ccs.ResolverState.Addresses) == 0 {
88 d.ClientConn.UpdateState(balancer.State{
89 ConnectivityState: connectivity.TransientFailure,
90 Picker: base.NewErrPicker(errors.New("no addresses")),
91 })
92 } else {
93 d.ClientConn.UpdateState(balancer.State{
94 ConnectivityState: connectivity.Connecting,
95 Picker: &tsccPicker{sc: sc},
96 })
97 }
98 return nil
99 },
100 }
101 stub.Register("tscc", bal)
102
103
104 ss := &stubserver.StubServer{
105 EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
106 return &testpb.Empty{}, nil
107 },
108 }
109 if err := ss.Start(nil, grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"tscc":{}}]}`)); err != nil {
110 t.Fatalf("Error starting server: %v", err)
111 }
112 defer ss.Stop()
113 if _, err := ss.Client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
114 t.Fatalf("EmptyCall failed: %v", err)
115 }
116
117 t.Log("Removing addresses from resolver and SubConn")
118 ss.R.UpdateState(resolver.State{Addresses: []resolver.Address{}})
119 testutils.AwaitState(ctx, t, ss.CC, connectivity.TransientFailure)
120
121 t.Log("Re-adding addresses to resolver and SubConn")
122 ss.R.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: ss.Address}}})
123 if _, err := ss.Client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
124 t.Fatalf("EmptyCall failed: %v", err)
125 }
126 }
127
View as plain text