1
18
19 package test
20
21 import (
22 "context"
23 "fmt"
24 "testing"
25
26 "google.golang.org/grpc"
27 "google.golang.org/grpc/balancer"
28 grpclbstate "google.golang.org/grpc/balancer/grpclb/state"
29 "google.golang.org/grpc/credentials/insecure"
30 "google.golang.org/grpc/internal"
31 "google.golang.org/grpc/internal/balancer/stub"
32 "google.golang.org/grpc/internal/stubserver"
33 "google.golang.org/grpc/internal/testutils/fakegrpclb"
34 "google.golang.org/grpc/internal/testutils/pickfirst"
35 rrutil "google.golang.org/grpc/internal/testutils/roundrobin"
36 "google.golang.org/grpc/resolver"
37 "google.golang.org/grpc/resolver/manual"
38
39 testgrpc "google.golang.org/grpc/interop/grpc_testing"
40 testpb "google.golang.org/grpc/interop/grpc_testing"
41 )
42
43 const (
44 loadBalancedServiceName = "foo.bar.service"
45 loadBalancedServicePort = 443
46 wantGRPCLBTraceDesc = `Channel switches to new LB policy "grpclb"`
47 wantRoundRobinTraceDesc = `Channel switches to new LB policy "round_robin"`
48
49
50
51
52 backendCount = 3
53 )
54
55
56
57
58
59
60
61
62
63
64 func setupBackendsAndFakeGRPCLB(t *testing.T) ([]*stubserver.StubServer, *fakegrpclb.Server, func()) {
65 backends, backendsCleanup := startBackendsForBalancerSwitch(t)
66
67 lbServer, err := fakegrpclb.NewServer(fakegrpclb.ServerParams{
68 LoadBalancedServiceName: loadBalancedServiceName,
69 LoadBalancedServicePort: loadBalancedServicePort,
70 BackendAddresses: []string{backends[0].Address},
71 })
72 if err != nil {
73 t.Fatalf("failed to create fake grpclb server: %v", err)
74 }
75 go func() {
76 if err := lbServer.Serve(); err != nil {
77 t.Errorf("fake grpclb Serve() failed: %v", err)
78 }
79 }()
80
81 return backends, lbServer, func() {
82 backendsCleanup()
83 lbServer.Stop()
84 }
85 }
86
87
88
89
90 func startBackendsForBalancerSwitch(t *testing.T) ([]*stubserver.StubServer, func()) {
91 t.Helper()
92
93 backends := make([]*stubserver.StubServer, backendCount)
94 for i := 0; i < backendCount; i++ {
95 backend := &stubserver.StubServer{
96 EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil },
97 }
98 if err := backend.StartServer(); err != nil {
99 t.Fatalf("Failed to start backend: %v", err)
100 }
101 t.Logf("Started TestService backend at: %q", backend.Address)
102 backends[i] = backend
103 }
104 return backends, func() {
105 for _, b := range backends {
106 b.Stop()
107 }
108 }
109 }
110
111
112
113 func (s) TestBalancerSwitch_Basic(t *testing.T) {
114 backends, cleanup := startBackendsForBalancerSwitch(t)
115 defer cleanup()
116 addrs := stubBackendsToResolverAddrs(backends)
117
118 r := manual.NewBuilderWithScheme("whatever")
119 cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
120 if err != nil {
121 t.Fatalf("grpc.Dial() failed: %v", err)
122 }
123 defer cc.Close()
124
125
126
127 r.UpdateState(resolver.State{Addresses: addrs})
128 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
129 defer cancel()
130 if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
131 t.Fatal(err)
132 }
133
134
135 r.UpdateState(resolver.State{
136 Addresses: addrs,
137 ServiceConfig: parseServiceConfig(t, r, rrServiceConfig),
138 })
139 client := testgrpc.NewTestServiceClient(cc)
140 if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs); err != nil {
141 t.Fatal(err)
142 }
143
144
145 r.UpdateState(resolver.State{
146 Addresses: addrs,
147 ServiceConfig: parseServiceConfig(t, r, pickFirstServiceConfig),
148 })
149 if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
150 t.Fatal(err)
151 }
152 }
153
154
155
156 func (s) TestBalancerSwitch_grpclbToPickFirst(t *testing.T) {
157 backends, lbServer, cleanup := setupBackendsAndFakeGRPCLB(t)
158 defer cleanup()
159
160 addrs := stubBackendsToResolverAddrs(backends)
161 r := manual.NewBuilderWithScheme("whatever")
162 target := fmt.Sprintf("%s:///%s", r.Scheme(), loadBalancedServiceName)
163 cc, err := grpc.Dial(target, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
164 if err != nil {
165 t.Fatalf("grpc.Dial() failed: %v", err)
166 }
167 defer cc.Close()
168
169
170
171
172
173 grpclbConfig := parseServiceConfig(t, r, `{"loadBalancingPolicy": "grpclb"}`)
174 state := resolver.State{ServiceConfig: grpclbConfig}
175 r.UpdateState(grpclbstate.Set(state, &grpclbstate.State{BalancerAddresses: []resolver.Address{{Addr: lbServer.Address()}}}))
176 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
177 defer cancel()
178 client := testgrpc.NewTestServiceClient(cc)
179 if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[0:1]); err != nil {
180 t.Fatal(err)
181 }
182
183
184
185 const nonExistentServer = "non-existent-grpclb-server-address"
186 r.UpdateState(grpclbstate.Set(state, &grpclbstate.State{BalancerAddresses: []resolver.Address{{Addr: nonExistentServer}}}))
187 if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[:1]); err != nil {
188 t.Fatal(err)
189 }
190
191
192
193
194
195
196 emptyConfig := parseServiceConfig(t, r, `{}`)
197 r.UpdateState(resolver.State{Addresses: addrs[1:], ServiceConfig: emptyConfig})
198 if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[1]); err != nil {
199 t.Fatal(err)
200 }
201 }
202
203
204
205 func (s) TestBalancerSwitch_pickFirstToGRPCLB(t *testing.T) {
206 backends, lbServer, cleanup := setupBackendsAndFakeGRPCLB(t)
207 defer cleanup()
208
209 addrs := stubBackendsToResolverAddrs(backends)
210 r := manual.NewBuilderWithScheme("whatever")
211 target := fmt.Sprintf("%s:///%s", r.Scheme(), loadBalancedServiceName)
212 cc, err := grpc.Dial(target, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
213 if err != nil {
214 t.Fatalf("grpc.Dial() failed: %v", err)
215 }
216 defer cc.Close()
217
218
219
220 r.UpdateState(resolver.State{Addresses: addrs[1:]})
221 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
222 defer cancel()
223 if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[1]); err != nil {
224 t.Fatal(err)
225 }
226
227
228
229
230 grpclbConfig := parseServiceConfig(t, r, `{"loadBalancingPolicy": "grpclb"}`)
231 state := resolver.State{ServiceConfig: grpclbConfig}
232 r.UpdateState(grpclbstate.Set(state, &grpclbstate.State{BalancerAddresses: []resolver.Address{{Addr: lbServer.Address()}}}))
233 client := testgrpc.NewTestServiceClient(cc)
234 if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[:1]); err != nil {
235 t.Fatal(err)
236 }
237
238
239
240 r.UpdateState(grpclbstate.Set(state, &grpclbstate.State{BalancerAddresses: []resolver.Address{{Addr: "nonExistentServer"}}}))
241 if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[:1]); err != nil {
242 t.Fatal(err)
243 }
244
245
246 emptyConfig := parseServiceConfig(t, r, `{}`)
247 r.UpdateState(resolver.State{Addresses: addrs[1:], ServiceConfig: emptyConfig})
248 if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[1]); err != nil {
249 t.Fatal(err)
250 }
251 }
252
253
254
255
256
257
258 func (s) TestBalancerSwitch_RoundRobinToGRPCLB(t *testing.T) {
259 backends, lbServer, cleanup := setupBackendsAndFakeGRPCLB(t)
260 defer cleanup()
261
262 addrs := stubBackendsToResolverAddrs(backends)
263 r := manual.NewBuilderWithScheme("whatever")
264 target := fmt.Sprintf("%s:///%s", r.Scheme(), loadBalancedServiceName)
265 cc, err := grpc.Dial(target, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
266 if err != nil {
267 t.Fatalf("grpc.Dial() failed: %v", err)
268 }
269 defer cc.Close()
270
271
272
273
274
275
276
277
278
279
280
281 scpr := parseServiceConfig(t, r, `{"loadBalancingPolicy": "round_robin"}`)
282
283
284 r.UpdateState(resolver.State{Addresses: addrs[1:], ServiceConfig: scpr})
285 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
286 defer cancel()
287 client := testgrpc.NewTestServiceClient(cc)
288 if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[1:]); err != nil {
289 t.Fatal(err)
290 }
291
292
293
294
295
296 grpclbConfig := parseServiceConfig(t, r, `{"loadBalancingPolicy": "grpclb"}`)
297 state := resolver.State{ServiceConfig: grpclbConfig}
298 r.UpdateState(grpclbstate.Set(state, &grpclbstate.State{BalancerAddresses: []resolver.Address{{Addr: lbServer.Address()}}}))
299 if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[:1]); err != nil {
300 t.Fatal(err)
301 }
302
303
304 r.UpdateState(resolver.State{Addresses: addrs[1:], ServiceConfig: scpr})
305 if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[1:]); err != nil {
306 t.Fatal(err)
307 }
308 }
309
310
311
312
313
314 func (s) TestBalancerSwitch_grpclbNotRegistered(t *testing.T) {
315
316 grpclbBuilder := balancer.Get("grpclb")
317 internal.BalancerUnregister(grpclbBuilder.Name())
318 defer balancer.Register(grpclbBuilder)
319
320 backends, cleanup := startBackendsForBalancerSwitch(t)
321 defer cleanup()
322 addrs := stubBackendsToResolverAddrs(backends)
323
324 r := manual.NewBuilderWithScheme("whatever")
325 cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
326 if err != nil {
327 t.Fatalf("grpc.Dial() failed: %v", err)
328 }
329 defer cc.Close()
330
331
332
333
334
335
336
337 grpclbAddr := []resolver.Address{{Addr: "non-existent-grpclb-server-address"}}
338 grpclbConfig := parseServiceConfig(t, r, `{"loadBalancingPolicy": "grpclb"}`)
339 state := resolver.State{ServiceConfig: grpclbConfig, Addresses: addrs}
340 r.UpdateState(grpclbstate.Set(state, &grpclbstate.State{BalancerAddresses: grpclbAddr}))
341 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
342 defer cancel()
343 if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
344 t.Fatal(err)
345 }
346
347
348
349
350 r.UpdateState(resolver.State{
351 Addresses: addrs,
352 ServiceConfig: parseServiceConfig(t, r, rrServiceConfig),
353 })
354 client := testgrpc.NewTestServiceClient(cc)
355 if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs); err != nil {
356 t.Fatal(err)
357 }
358 }
359
360
361
362
363 func (s) TestBalancerSwitch_OldBalancerCallsShutdownInClose(t *testing.T) {
364
365 scChan := make(chan balancer.SubConn, 1)
366 uccsCalled := make(chan struct{}, 1)
367 stub.Register(t.Name(), stub.BalancerFuncs{
368 UpdateClientConnState: func(data *stub.BalancerData, ccs balancer.ClientConnState) error {
369 sc, err := data.ClientConn.NewSubConn(ccs.ResolverState.Addresses, balancer.NewSubConnOptions{})
370 if err != nil {
371 t.Errorf("failed to create subConn: %v", err)
372 }
373 scChan <- sc
374 close(uccsCalled)
375 return nil
376 },
377 Close: func(data *stub.BalancerData) {
378 (<-scChan).Shutdown()
379 },
380 })
381
382 r := manual.NewBuilderWithScheme("whatever")
383 cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
384 if err != nil {
385 t.Fatalf("grpc.Dial() failed: %v", err)
386 }
387 defer cc.Close()
388
389
390 scpr := parseServiceConfig(t, r, fmt.Sprintf(`{"loadBalancingPolicy": "%v"}`, t.Name()))
391 r.UpdateState(resolver.State{
392 Addresses: []resolver.Address{{Addr: "dummy-address"}},
393 ServiceConfig: scpr,
394 })
395
396 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
397 defer cancel()
398 select {
399 case <-ctx.Done():
400 t.Fatalf("timeout waiting for UpdateClientConnState to be called: %v", ctx.Err())
401 case <-uccsCalled:
402 }
403
404
405
406
407
408
409
410
411
412
413 done := make(chan struct{})
414 go func() {
415 r.UpdateState(resolver.State{
416 Addresses: []resolver.Address{{Addr: "dummy-address"}},
417 ServiceConfig: parseServiceConfig(t, r, pickFirstServiceConfig),
418 })
419 close(done)
420 }()
421
422 select {
423 case <-ctx.Done():
424 t.Fatalf("timeout waiting for resolver.UpdateState to finish: %v", ctx.Err())
425 case <-done:
426 }
427 }
428
429
430
431
432
433
434
435
436 func (s) TestBalancerSwitch_Graceful(t *testing.T) {
437 backends, cleanup := startBackendsForBalancerSwitch(t)
438 defer cleanup()
439 addrs := stubBackendsToResolverAddrs(backends)
440
441 r := manual.NewBuilderWithScheme("whatever")
442 cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
443 if err != nil {
444 t.Fatalf("grpc.Dial() failed: %v", err)
445 }
446 defer cc.Close()
447
448
449 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
450 defer cancel()
451 r.UpdateState(resolver.State{
452 Addresses: addrs[1:],
453 ServiceConfig: parseServiceConfig(t, r, rrServiceConfig),
454 })
455 client := testgrpc.NewTestServiceClient(cc)
456 if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[1:]); err != nil {
457 t.Fatal(err)
458 }
459
460
461
462
463
464
465
466
467 ccUpdateCh := make(chan struct{})
468 waitToProceed := make(chan struct{})
469 stub.Register(t.Name(), stub.BalancerFuncs{
470 Init: func(bd *stub.BalancerData) {
471 pf := balancer.Get(grpc.PickFirstBalancerName)
472 bd.Data = pf.Build(bd.ClientConn, bd.BuildOptions)
473 },
474 UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
475 bal := bd.Data.(balancer.Balancer)
476 close(ccUpdateCh)
477 go func() {
478 <-waitToProceed
479 bal.UpdateClientConnState(ccs)
480 }()
481 return nil
482 },
483 })
484
485
486
487
488
489 r.UpdateState(resolver.State{
490 Addresses: addrs[:1],
491 ServiceConfig: r.CC.ParseServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%v": {}}]}`, t.Name())),
492 })
493 select {
494 case <-ctx.Done():
495 t.Fatal("Timeout when waiting for a ClientConnState update on the new balancer")
496 case <-ccUpdateCh:
497 }
498 if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[1:]); err != nil {
499 t.Fatal(err)
500 }
501
502
503
504
505 close(waitToProceed)
506 if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
507 t.Fatal(err)
508 }
509 }
510
View as plain text