1
18
19 package grpc_test
20
21 import (
22 "context"
23 "errors"
24 "fmt"
25 "runtime"
26 "strings"
27 "testing"
28 "time"
29
30 "google.golang.org/grpc"
31 "google.golang.org/grpc/balancer"
32 "google.golang.org/grpc/connectivity"
33 "google.golang.org/grpc/credentials/insecure"
34 "google.golang.org/grpc/internal"
35 "google.golang.org/grpc/internal/balancer/stub"
36 "google.golang.org/grpc/internal/channelz"
37 "google.golang.org/grpc/resolver"
38 "google.golang.org/grpc/resolver/manual"
39 )
40
41
42
43
44
45
46
47 func (s) TestResolverBalancerInteraction(t *testing.T) {
48 name := strings.Replace(strings.ToLower(t.Name()), "/", "", -1)
49 fmt.Println(name)
50 bf := stub.BalancerFuncs{
51 UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
52 bd.ClientConn.ResolveNow(resolver.ResolveNowOptions{})
53 return nil
54 },
55 }
56 stub.Register(name, bf)
57
58 rb := manual.NewBuilderWithScheme(name)
59 rb.BuildCallback = func(_ resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) {
60 sc := cc.ParseServiceConfig(`{"loadBalancingConfig": [{"` + name + `":{}}]}`)
61 cc.UpdateState(resolver.State{
62 Addresses: []resolver.Address{{Addr: "test"}},
63 ServiceConfig: sc,
64 })
65 }
66 rnCh := make(chan struct{})
67 rb.ResolveNowCallback = func(resolver.ResolveNowOptions) { close(rnCh) }
68 resolver.Register(rb)
69
70 cc, err := grpc.Dial(name+":///", grpc.WithTransportCredentials(insecure.NewCredentials()))
71 if err != nil {
72 t.Fatalf("grpc.Dial error: %v", err)
73 }
74 defer cc.Close()
75 select {
76 case <-rnCh:
77 case <-time.After(defaultTestTimeout):
78 t.Fatalf("timed out waiting for resolver.ResolveNow")
79 }
80 }
81
82 type resolverBuilderWithErr struct {
83 resolver.Resolver
84 errCh <-chan error
85 scheme string
86 }
87
88 func (b *resolverBuilderWithErr) Build(_ resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (resolver.Resolver, error) {
89 if err := <-b.errCh; err != nil {
90 return nil, err
91 }
92 return b, nil
93 }
94
95 func (b *resolverBuilderWithErr) Scheme() string {
96 return b.scheme
97 }
98
99 func (b *resolverBuilderWithErr) Close() {}
100
101
102
103
104
105
106 func (s) TestResolverBuildFailure(t *testing.T) {
107 enterIdle := internal.EnterIdleModeForTesting.(func(*grpc.ClientConn))
108 name := strings.Replace(strings.ToLower(t.Name()), "/", "", -1)
109 resErrCh := make(chan error, 1)
110 resolver.Register(&resolverBuilderWithErr{errCh: resErrCh, scheme: name})
111
112 resErrCh <- nil
113 cc, err := grpc.Dial(name+":///", grpc.WithTransportCredentials(insecure.NewCredentials()))
114 if err != nil {
115 t.Fatalf("grpc.Dial error: %v", err)
116 }
117 defer cc.Close()
118 enterIdle(cc)
119 const errStr = "test error from resolver builder"
120 t.Log("pushing res err")
121 resErrCh <- errors.New(errStr)
122 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
123 defer cancel()
124 if err := cc.Invoke(ctx, "/a/b", nil, nil); err == nil || !strings.Contains(err.Error(), errStr) {
125 t.Fatalf("Invoke = %v; want %v", err, errStr)
126 }
127 }
128
129
130
131
132 func (s) TestEnterIdleDuringResolverUpdateState(t *testing.T) {
133 enterIdle := internal.EnterIdleModeForTesting.(func(*grpc.ClientConn))
134 name := strings.Replace(strings.ToLower(t.Name()), "/", "", -1)
135
136
137 rb := manual.NewBuilderWithScheme(name)
138 var cancel context.CancelFunc
139 rb.BuildCallback = func(_ resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) {
140 var ctx context.Context
141 ctx, cancel = context.WithCancel(context.Background())
142 go func() {
143 for ctx.Err() == nil {
144 cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "test"}}})
145 }
146 }()
147 }
148 rb.CloseCallback = func() {
149 cancel()
150 }
151 resolver.Register(rb)
152
153 cc, err := grpc.NewClient(name+":///", grpc.WithTransportCredentials(insecure.NewCredentials()))
154 if err != nil {
155 t.Fatalf("grpc.NewClient error: %v", err)
156 }
157 defer cc.Close()
158
159
160 for i := 0; i < 2000; i++ {
161
162
163 p := time.AfterFunc(time.Second, func() {
164 buf := make([]byte, 8192)
165 buf = buf[0:runtime.Stack(buf, true)]
166 t.Error("Timed out waiting for enterIdle")
167 panic(fmt.Sprint("Stack trace:\n", string(buf)))
168 })
169 enterIdle(cc)
170 p.Stop()
171 cc.Connect()
172 }
173 }
174
175
176
177 func (s) TestEnterIdleDuringBalancerUpdateState(t *testing.T) {
178 enterIdle := internal.EnterIdleModeForTesting.(func(*grpc.ClientConn))
179 name := strings.Replace(strings.ToLower(t.Name()), "/", "", -1)
180
181
182
183 bf := stub.BalancerFuncs{
184 UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
185 go func() {
186 bd.ClientConn.UpdateState(balancer.State{ConnectivityState: connectivity.Ready})
187 }()
188 return nil
189 },
190 }
191 stub.Register(name, bf)
192
193 rb := manual.NewBuilderWithScheme(name)
194 rb.BuildCallback = func(_ resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) {
195 cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "test"}}})
196 }
197 resolver.Register(rb)
198
199 cc, err := grpc.NewClient(
200 name+":///",
201 grpc.WithTransportCredentials(insecure.NewCredentials()),
202 grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"`+name+`":{}}]}`))
203 if err != nil {
204 t.Fatalf("grpc.NewClient error: %v", err)
205 }
206 defer cc.Close()
207
208
209 for i := 0; i < 2000; i++ {
210 enterIdle(cc)
211 if got, want := cc.GetState(), connectivity.Idle; got != want {
212 t.Fatalf("cc state = %v; want %v", got, want)
213 }
214 cc.Connect()
215 }
216 }
217
218
219
220 func (s) TestEnterIdleDuringBalancerNewSubConn(t *testing.T) {
221 channelz.TurnOn()
222 defer internal.ChannelzTurnOffForTesting()
223 enterIdle := internal.EnterIdleModeForTesting.(func(*grpc.ClientConn))
224 name := strings.Replace(strings.ToLower(t.Name()), "/", "", -1)
225
226
227
228 bf := stub.BalancerFuncs{
229 UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
230 go func() {
231 bd.ClientConn.NewSubConn([]resolver.Address{{Addr: "test"}}, balancer.NewSubConnOptions{})
232 }()
233 return nil
234 },
235 }
236 stub.Register(name, bf)
237
238 rb := manual.NewBuilderWithScheme(name)
239 rb.BuildCallback = func(_ resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) {
240 cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "test"}}})
241 }
242 resolver.Register(rb)
243
244 cc, err := grpc.NewClient(
245 name+":///",
246 grpc.WithTransportCredentials(insecure.NewCredentials()),
247 grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"`+name+`":{}}]}`))
248 if err != nil {
249 t.Fatalf("grpc.NewClient error: %v", err)
250 }
251 defer cc.Close()
252
253
254 for i := 0; i < 2000; i++ {
255 enterIdle(cc)
256 tcs, _ := channelz.GetTopChannels(0, 0)
257 if len(tcs) != 1 {
258 t.Fatalf("Found channels: %v; expected 1 entry", tcs)
259 }
260 if got := tcs[0].SubChans(); len(got) != 0 {
261 t.Fatalf("Found subchannels: %v; expected 0 entries", got)
262 }
263 cc.Connect()
264 }
265 }
266
View as plain text