1
18
19 package grpclb
20
21 import (
22 "context"
23 "errors"
24 "fmt"
25 "io"
26 "net"
27 "strconv"
28 "strings"
29 "sync"
30 "sync/atomic"
31 "testing"
32 "time"
33
34 "github.com/google/go-cmp/cmp"
35 "github.com/google/go-cmp/cmp/cmpopts"
36
37 "google.golang.org/grpc"
38 "google.golang.org/grpc/balancer"
39 grpclbstate "google.golang.org/grpc/balancer/grpclb/state"
40 "google.golang.org/grpc/codes"
41 "google.golang.org/grpc/credentials"
42 "google.golang.org/grpc/internal"
43 "google.golang.org/grpc/internal/grpctest"
44 "google.golang.org/grpc/internal/testutils"
45 "google.golang.org/grpc/internal/testutils/pickfirst"
46 "google.golang.org/grpc/internal/testutils/roundrobin"
47 "google.golang.org/grpc/metadata"
48 "google.golang.org/grpc/peer"
49 "google.golang.org/grpc/resolver"
50 "google.golang.org/grpc/resolver/manual"
51 "google.golang.org/grpc/serviceconfig"
52 "google.golang.org/grpc/status"
53 "google.golang.org/protobuf/types/known/durationpb"
54
55 lbgrpc "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1"
56 lbpb "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1"
57 testgrpc "google.golang.org/grpc/interop/grpc_testing"
58 testpb "google.golang.org/grpc/interop/grpc_testing"
59 )
60
61 var (
62 lbServerName = "lb.server.com"
63 beServerName = "backends.com"
64 lbToken = "iamatoken"
65
66
67
68
69 fakeName = "fake.Name"
70 )
71
72 const (
73 defaultTestTimeout = 10 * time.Second
74 defaultTestShortTimeout = 10 * time.Millisecond
75 testUserAgent = "test-user-agent"
76 grpclbConfig = `{"loadBalancingConfig": [{"grpclb": {}}]}`
77 )
78
79 type s struct {
80 grpctest.Tester
81 }
82
83 func Test(t *testing.T) {
84 grpctest.RunSubTests(t, s{})
85 }
86
87 type serverNameCheckCreds struct {
88 mu sync.Mutex
89 sn string
90 }
91
92 func (c *serverNameCheckCreds) ServerHandshake(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
93 if _, err := io.WriteString(rawConn, c.sn); err != nil {
94 fmt.Printf("Failed to write the server name %s to the client %v", c.sn, err)
95 return nil, nil, err
96 }
97 return rawConn, nil, nil
98 }
99 func (c *serverNameCheckCreds) ClientHandshake(ctx context.Context, authority string, rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
100 c.mu.Lock()
101 defer c.mu.Unlock()
102 b := make([]byte, len(authority))
103 errCh := make(chan error, 1)
104 go func() {
105 _, err := rawConn.Read(b)
106 errCh <- err
107 }()
108 select {
109 case err := <-errCh:
110 if err != nil {
111 fmt.Printf("test-creds: failed to read expected authority name from the server: %v\n", err)
112 return nil, nil, err
113 }
114 case <-ctx.Done():
115 return nil, nil, ctx.Err()
116 }
117 if authority != string(b) {
118 fmt.Printf("test-creds: got authority from ClientConn %q, expected by server %q\n", authority, string(b))
119 return nil, nil, errors.New("received unexpected server name")
120 }
121 return rawConn, nil, nil
122 }
123 func (c *serverNameCheckCreds) Info() credentials.ProtocolInfo {
124 return credentials.ProtocolInfo{}
125 }
126 func (c *serverNameCheckCreds) Clone() credentials.TransportCredentials {
127 return &serverNameCheckCreds{}
128 }
129 func (c *serverNameCheckCreds) OverrideServerName(s string) error {
130 return nil
131 }
132
133
134
135 func fakeNameDialer(ctx context.Context, addr string) (net.Conn, error) {
136 addr = strings.Replace(addr, fakeName, "localhost", 1)
137 return (&net.Dialer{}).DialContext(ctx, "tcp", addr)
138 }
139
140
141
142
143 func (s *rpcStats) merge(cs *lbpb.ClientStats) {
144 atomic.AddInt64(&s.numCallsStarted, cs.NumCallsStarted)
145 atomic.AddInt64(&s.numCallsFinished, cs.NumCallsFinished)
146 atomic.AddInt64(&s.numCallsFinishedWithClientFailedToSend, cs.NumCallsFinishedWithClientFailedToSend)
147 atomic.AddInt64(&s.numCallsFinishedKnownReceived, cs.NumCallsFinishedKnownReceived)
148 s.mu.Lock()
149 for _, perToken := range cs.CallsFinishedWithDrop {
150 s.numCallsDropped[perToken.LoadBalanceToken] += perToken.NumCalls
151 }
152 s.mu.Unlock()
153 }
154
155 func atomicEqual(a, b *int64) bool {
156 return atomic.LoadInt64(a) == atomic.LoadInt64(b)
157 }
158
159
160
161
162 func (s *rpcStats) equal(o *rpcStats) bool {
163 if !atomicEqual(&s.numCallsStarted, &o.numCallsStarted) {
164 return false
165 }
166 if !atomicEqual(&s.numCallsFinished, &o.numCallsFinished) {
167 return false
168 }
169 if !atomicEqual(&s.numCallsFinishedWithClientFailedToSend, &o.numCallsFinishedWithClientFailedToSend) {
170 return false
171 }
172 if !atomicEqual(&s.numCallsFinishedKnownReceived, &o.numCallsFinishedKnownReceived) {
173 return false
174 }
175 s.mu.Lock()
176 defer s.mu.Unlock()
177 o.mu.Lock()
178 defer o.mu.Unlock()
179 return cmp.Equal(s.numCallsDropped, o.numCallsDropped, cmpopts.EquateEmpty())
180 }
181
182 func (s *rpcStats) String() string {
183 s.mu.Lock()
184 defer s.mu.Unlock()
185 return fmt.Sprintf("Started: %v, Finished: %v, FinishedWithClientFailedToSend: %v, FinishedKnownReceived: %v, Dropped: %v",
186 atomic.LoadInt64(&s.numCallsStarted),
187 atomic.LoadInt64(&s.numCallsFinished),
188 atomic.LoadInt64(&s.numCallsFinishedWithClientFailedToSend),
189 atomic.LoadInt64(&s.numCallsFinishedKnownReceived),
190 s.numCallsDropped)
191 }
192
193 type remoteBalancer struct {
194 lbgrpc.UnimplementedLoadBalancerServer
195 sls chan *lbpb.ServerList
196 statsDura time.Duration
197 done chan struct{}
198 stats *rpcStats
199 statsChan chan *lbpb.ClientStats
200 fbChan chan struct{}
201 balanceLoadCh chan struct{}
202
203 wantUserAgent string
204 wantServerName string
205 }
206
207 func newRemoteBalancer(wantUserAgent, wantServerName string, statsChan chan *lbpb.ClientStats) *remoteBalancer {
208 return &remoteBalancer{
209 sls: make(chan *lbpb.ServerList, 1),
210 done: make(chan struct{}),
211 stats: newRPCStats(),
212 statsChan: statsChan,
213 fbChan: make(chan struct{}),
214 balanceLoadCh: make(chan struct{}, 1),
215 wantUserAgent: wantUserAgent,
216 wantServerName: wantServerName,
217 }
218 }
219
220 func (b *remoteBalancer) stop() {
221 close(b.sls)
222 close(b.done)
223 }
224
225 func (b *remoteBalancer) fallbackNow() {
226 b.fbChan <- struct{}{}
227 }
228
229 func (b *remoteBalancer) updateServerName(name string) {
230 b.wantServerName = name
231 }
232
233 func (b *remoteBalancer) BalanceLoad(stream lbgrpc.LoadBalancer_BalanceLoadServer) error {
234 md, ok := metadata.FromIncomingContext(stream.Context())
235 if !ok {
236 return status.Error(codes.Internal, "failed to receive metadata")
237 }
238 if b.wantUserAgent != "" {
239 if ua := md["user-agent"]; len(ua) == 0 || !strings.HasPrefix(ua[0], b.wantUserAgent) {
240 return status.Errorf(codes.InvalidArgument, "received unexpected user-agent: %v, want prefix %q", ua, b.wantUserAgent)
241 }
242 }
243
244 req, err := stream.Recv()
245 if err != nil {
246 return err
247 }
248 initReq := req.GetInitialRequest()
249 if initReq.Name != b.wantServerName {
250 return status.Errorf(codes.InvalidArgument, "invalid service name: %q, want: %q", initReq.Name, b.wantServerName)
251 }
252 b.balanceLoadCh <- struct{}{}
253 resp := &lbpb.LoadBalanceResponse{
254 LoadBalanceResponseType: &lbpb.LoadBalanceResponse_InitialResponse{
255 InitialResponse: &lbpb.InitialLoadBalanceResponse{
256 ClientStatsReportInterval: &durationpb.Duration{
257 Seconds: int64(b.statsDura.Seconds()),
258 Nanos: int32(b.statsDura.Nanoseconds() - int64(b.statsDura.Seconds())*1e9),
259 },
260 },
261 },
262 }
263 if err := stream.Send(resp); err != nil {
264 return err
265 }
266 go func() {
267 for {
268 req, err := stream.Recv()
269 if err != nil {
270 return
271 }
272 b.stats.merge(req.GetClientStats())
273 if b.statsChan != nil && req.GetClientStats() != nil {
274 b.statsChan <- req.GetClientStats()
275 }
276 }
277 }()
278 for {
279 select {
280 case v := <-b.sls:
281 resp = &lbpb.LoadBalanceResponse{
282 LoadBalanceResponseType: &lbpb.LoadBalanceResponse_ServerList{
283 ServerList: v,
284 },
285 }
286 case <-b.fbChan:
287 resp = &lbpb.LoadBalanceResponse{
288 LoadBalanceResponseType: &lbpb.LoadBalanceResponse_FallbackResponse{
289 FallbackResponse: &lbpb.FallbackResponse{},
290 },
291 }
292 case <-stream.Context().Done():
293 return stream.Context().Err()
294 }
295 if err := stream.Send(resp); err != nil {
296 return err
297 }
298 }
299 }
300
301 type testServer struct {
302 testgrpc.UnimplementedTestServiceServer
303
304 addr string
305 fallback bool
306 }
307
308 const testmdkey = "testmd"
309
310 func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
311 md, ok := metadata.FromIncomingContext(ctx)
312 if !ok {
313 return nil, status.Error(codes.Internal, "failed to receive metadata")
314 }
315 if !s.fallback && (md == nil || len(md["lb-token"]) == 0 || md["lb-token"][0] != lbToken) {
316 return nil, status.Errorf(codes.Internal, "received unexpected metadata: %v", md)
317 }
318 grpc.SetTrailer(ctx, metadata.Pairs(testmdkey, s.addr))
319 return &testpb.Empty{}, nil
320 }
321
322 func (s *testServer) FullDuplexCall(stream testgrpc.TestService_FullDuplexCallServer) error {
323 return nil
324 }
325
326 func startBackends(t *testing.T, sn string, fallback bool, lis ...net.Listener) (servers []*grpc.Server) {
327 for _, l := range lis {
328 creds := &serverNameCheckCreds{
329 sn: sn,
330 }
331 s := grpc.NewServer(grpc.Creds(creds))
332 testgrpc.RegisterTestServiceServer(s, &testServer{addr: l.Addr().String(), fallback: fallback})
333 servers = append(servers, s)
334 go func(s *grpc.Server, l net.Listener) {
335 s.Serve(l)
336 }(s, l)
337 t.Logf("Started backend server listening on %s", l.Addr().String())
338 }
339 return
340 }
341
342 func stopBackends(servers []*grpc.Server) {
343 for _, s := range servers {
344 s.Stop()
345 }
346 }
347
348 type testServers struct {
349 lbAddr string
350 ls *remoteBalancer
351 lb *grpc.Server
352 backends []*grpc.Server
353 beIPs []net.IP
354 bePorts []int
355
356 lbListener net.Listener
357 beListeners []net.Listener
358 }
359
360 func startBackendsAndRemoteLoadBalancer(t *testing.T, numberOfBackends int, customUserAgent string, statsChan chan *lbpb.ClientStats) (tss *testServers, cleanup func(), err error) {
361 var (
362 beListeners []net.Listener
363 ls *remoteBalancer
364 lb *grpc.Server
365 beIPs []net.IP
366 bePorts []int
367 )
368 for i := 0; i < numberOfBackends; i++ {
369 beLis, e := net.Listen("tcp", "localhost:0")
370 if e != nil {
371 err = fmt.Errorf("failed to listen %v", err)
372 return
373 }
374 beIPs = append(beIPs, beLis.Addr().(*net.TCPAddr).IP)
375 bePorts = append(bePorts, beLis.Addr().(*net.TCPAddr).Port)
376
377 beListeners = append(beListeners, testutils.NewRestartableListener(beLis))
378 }
379 backends := startBackends(t, beServerName, false, beListeners...)
380
381 lbLis, err := net.Listen("tcp", "localhost:0")
382 if err != nil {
383 err = fmt.Errorf("failed to create the listener for the load balancer %v", err)
384 return
385 }
386 lbLis = testutils.NewRestartableListener(lbLis)
387 lbCreds := &serverNameCheckCreds{
388 sn: lbServerName,
389 }
390 lb = grpc.NewServer(grpc.Creds(lbCreds))
391 ls = newRemoteBalancer(customUserAgent, beServerName, statsChan)
392 lbgrpc.RegisterLoadBalancerServer(lb, ls)
393 go func() {
394 lb.Serve(lbLis)
395 }()
396 t.Logf("Started remote load balancer server listening on %s", lbLis.Addr().String())
397
398 tss = &testServers{
399 lbAddr: net.JoinHostPort(fakeName, strconv.Itoa(lbLis.Addr().(*net.TCPAddr).Port)),
400 ls: ls,
401 lb: lb,
402 backends: backends,
403 beIPs: beIPs,
404 bePorts: bePorts,
405
406 lbListener: lbLis,
407 beListeners: beListeners,
408 }
409 cleanup = func() {
410 defer stopBackends(backends)
411 defer func() {
412 ls.stop()
413 lb.Stop()
414 }()
415 }
416 return
417 }
418
419
420
421 func (s) TestGRPCLB_Basic(t *testing.T) {
422 tss, cleanup, err := startBackendsAndRemoteLoadBalancer(t, 1, testUserAgent, nil)
423 if err != nil {
424 t.Fatalf("failed to create new load balancer: %v", err)
425 }
426 defer cleanup()
427
428
429 tss.ls.sls <- &lbpb.ServerList{
430 Servers: []*lbpb.Server{
431 {
432 IpAddress: tss.beIPs[0],
433 Port: int32(tss.bePorts[0]),
434 LoadBalanceToken: lbToken,
435 },
436 },
437 }
438
439
440
441
442 r := manual.NewBuilderWithScheme("whatever")
443 s := &grpclbstate.State{
444 BalancerAddresses: []resolver.Address{
445 {
446 Addr: tss.lbAddr,
447 ServerName: lbServerName,
448 },
449 },
450 }
451 rs := grpclbstate.Set(resolver.State{ServiceConfig: internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(grpclbConfig)}, s)
452 r.InitialState(rs)
453
454
455 dopts := []grpc.DialOption{
456 grpc.WithResolvers(r),
457 grpc.WithTransportCredentials(&serverNameCheckCreds{}),
458 grpc.WithContextDialer(fakeNameDialer),
459 grpc.WithUserAgent(testUserAgent),
460 }
461 cc, err := grpc.NewClient(r.Scheme()+":///"+beServerName, dopts...)
462 if err != nil {
463 t.Fatalf("Failed to dial to the backend %v", err)
464 }
465 defer cc.Close()
466
467
468 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
469 defer cancel()
470 testC := testgrpc.NewTestServiceClient(cc)
471 if _, err := testC.EmptyCall(ctx, &testpb.Empty{}); err != nil {
472 t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
473 }
474 }
475
476
477
478
479
480 func (s) TestGRPCLB_Weighted(t *testing.T) {
481 tss, cleanup, err := startBackendsAndRemoteLoadBalancer(t, 2, "", nil)
482 if err != nil {
483 t.Fatalf("failed to create new load balancer: %v", err)
484 }
485 defer cleanup()
486
487 beServers := []*lbpb.Server{{
488 IpAddress: tss.beIPs[0],
489 Port: int32(tss.bePorts[0]),
490 LoadBalanceToken: lbToken,
491 }, {
492 IpAddress: tss.beIPs[1],
493 Port: int32(tss.bePorts[1]),
494 LoadBalanceToken: lbToken,
495 }}
496
497
498
499
500 r := manual.NewBuilderWithScheme("whatever")
501 s := &grpclbstate.State{
502 BalancerAddresses: []resolver.Address{
503 {
504 Addr: tss.lbAddr,
505 ServerName: lbServerName,
506 },
507 },
508 }
509 rs := grpclbstate.Set(resolver.State{ServiceConfig: internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(grpclbConfig)}, s)
510 r.InitialState(rs)
511
512
513 dopts := []grpc.DialOption{
514 grpc.WithResolvers(r),
515 grpc.WithTransportCredentials(&serverNameCheckCreds{}),
516 grpc.WithContextDialer(fakeNameDialer),
517 }
518 cc, err := grpc.NewClient(r.Scheme()+":///"+beServerName, dopts...)
519 if err != nil {
520 t.Fatalf("Failed to dial to the backend %v", err)
521 }
522 defer cc.Close()
523
524 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
525 defer cancel()
526
527
528 sequences := [][]int{
529 {0, 0, 1, 0, 1},
530 {0, 0, 0, 1, 1},
531 }
532 for _, seq := range sequences {
533
534
535 var backends []*lbpb.Server
536 var wantAddrs []resolver.Address
537 for _, s := range seq {
538 backends = append(backends, beServers[s])
539 wantAddrs = append(wantAddrs, resolver.Address{Addr: tss.beListeners[s].Addr().String()})
540 }
541 tss.ls.sls <- &lbpb.ServerList{Servers: backends}
542
543 testC := testgrpc.NewTestServiceClient(cc)
544 if err := roundrobin.CheckWeightedRoundRobinRPCs(ctx, testC, wantAddrs); err != nil {
545 t.Fatal(err)
546 }
547 }
548 }
549
550
551
552
553
554
555 func (s) TestGRPCLB_DropRequest(t *testing.T) {
556 tss, cleanup, err := startBackendsAndRemoteLoadBalancer(t, 2, "", nil)
557 if err != nil {
558 t.Fatalf("failed to create new load balancer: %v", err)
559 }
560 defer cleanup()
561 tss.ls.sls <- &lbpb.ServerList{
562 Servers: []*lbpb.Server{{
563 IpAddress: tss.beIPs[0],
564 Port: int32(tss.bePorts[0]),
565 LoadBalanceToken: lbToken,
566 Drop: false,
567 }, {
568 IpAddress: tss.beIPs[1],
569 Port: int32(tss.bePorts[1]),
570 LoadBalanceToken: lbToken,
571 Drop: false,
572 }, {
573 Drop: true,
574 }},
575 }
576
577
578
579
580 r := manual.NewBuilderWithScheme("whatever")
581 s := &grpclbstate.State{
582 BalancerAddresses: []resolver.Address{
583 {
584 Addr: tss.lbAddr,
585 ServerName: lbServerName,
586 },
587 },
588 }
589 rs := grpclbstate.Set(resolver.State{ServiceConfig: internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(grpclbConfig)}, s)
590 r.InitialState(rs)
591
592
593 dopts := []grpc.DialOption{
594 grpc.WithResolvers(r),
595 grpc.WithTransportCredentials(&serverNameCheckCreds{}),
596 grpc.WithContextDialer(fakeNameDialer),
597 }
598 cc, err := grpc.NewClient(r.Scheme()+":///"+beServerName, dopts...)
599 if err != nil {
600 t.Fatalf("Failed to dial to the backend %v", err)
601 }
602 defer cc.Close()
603 testC := testgrpc.NewTestServiceClient(cc)
604
605 var (
606 i int
607 p peer.Peer
608 )
609 const (
610
611
612 sleepEachLoop = time.Millisecond
613 loopCount = int(time.Second / sleepEachLoop)
614 )
615 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
616 defer cancel()
617
618 for i = 0; i < loopCount; i++ {
619 if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err == nil {
620 break
621 }
622 time.Sleep(sleepEachLoop)
623 }
624 if i >= loopCount {
625 t.Fatalf("timeout waiting for the first connection to become ready. EmptyCall(_, _) = _, %v, want _, <nil>", err)
626 }
627
628
629
630 for i = 0; i < loopCount; i++ {
631 var temp peer.Peer
632 if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&temp)); err == nil {
633 if temp.Addr.(*net.TCPAddr).Port != p.Addr.(*net.TCPAddr).Port {
634 break
635 }
636 }
637 time.Sleep(sleepEachLoop)
638 }
639 if i >= loopCount {
640 t.Fatalf("timeout waiting for the second connection to become ready")
641 }
642
643
644
645 for i = 0; i < loopCount; i++ {
646 if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) == codes.Unavailable {
647 break
648 }
649 time.Sleep(sleepEachLoop)
650 }
651 if i >= loopCount {
652 t.Fatalf("timeout waiting for drop. EmptyCall(_, _) = _, %v, want _, <Unavailable>", err)
653 }
654
655 select {
656 case <-ctx.Done():
657 t.Fatal("timed out", ctx.Err())
658 default:
659 }
660 for _, failfast := range []bool{true, false} {
661 for i := 0; i < 3; i++ {
662
663
664
665 if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(!failfast)); err != nil {
666 t.Errorf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
667 }
668
669
670
671 if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(!failfast)); err != nil {
672 t.Errorf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
673 }
674
675
676 if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(!failfast)); status.Code(err) != codes.Unavailable {
677 t.Errorf("%v.EmptyCall(_, _) = _, %v, want _, %s", testC, err, codes.Unavailable)
678 }
679 }
680 }
681
682
683
684
685 if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
686 t.Errorf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
687 }
688
689 tss.backends[0].Stop()
690
691
692
693 time.Sleep(time.Second)
694 for i := 0; i < 3; i++ {
695 var p peer.Peer
696 if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil {
697 t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
698 }
699 if want := tss.bePorts[1]; p.Addr.(*net.TCPAddr).Port != want {
700 t.Errorf("got peer: %v, want peer port: %v", p.Addr, want)
701 }
702
703 if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.Unavailable {
704 t.Errorf("%v.EmptyCall(_, _) = _, %v, want _, %s", testC, err, codes.Unavailable)
705 }
706
707 if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil {
708 t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
709 }
710 if want := tss.bePorts[1]; p.Addr.(*net.TCPAddr).Port != want {
711 t.Errorf("got peer: %v, want peer port: %v", p.Addr, want)
712 }
713 }
714 }
715
716
717
718
719
720 func (s) TestGRPCLB_BalancerDisconnects(t *testing.T) {
721 var (
722 tests []*testServers
723 lbs []*grpc.Server
724 )
725 for i := 0; i < 2; i++ {
726 tss, cleanup, err := startBackendsAndRemoteLoadBalancer(t, 1, "", nil)
727 if err != nil {
728 t.Fatalf("failed to create new load balancer: %v", err)
729 }
730 defer cleanup()
731
732 tss.ls.sls <- &lbpb.ServerList{
733 Servers: []*lbpb.Server{
734 {
735 IpAddress: tss.beIPs[0],
736 Port: int32(tss.bePorts[0]),
737 LoadBalanceToken: lbToken,
738 },
739 },
740 }
741
742 tests = append(tests, tss)
743 lbs = append(lbs, tss.lb)
744 }
745
746
747
748
749 r := manual.NewBuilderWithScheme("whatever")
750 s := &grpclbstate.State{
751 BalancerAddresses: []resolver.Address{
752 {
753 Addr: tests[0].lbAddr,
754 ServerName: lbServerName,
755 },
756 {
757 Addr: tests[1].lbAddr,
758 ServerName: lbServerName,
759 },
760 },
761 }
762 rs := grpclbstate.Set(resolver.State{ServiceConfig: internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(grpclbConfig)}, s)
763 r.InitialState(rs)
764
765 dopts := []grpc.DialOption{
766 grpc.WithResolvers(r),
767 grpc.WithTransportCredentials(&serverNameCheckCreds{}),
768 grpc.WithContextDialer(fakeNameDialer),
769 }
770 cc, err := grpc.NewClient(r.Scheme()+":///"+beServerName, dopts...)
771 if err != nil {
772 t.Fatalf("Failed to dial to the backend %v", err)
773 }
774 defer cc.Close()
775 testC := testgrpc.NewTestServiceClient(cc)
776
777 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
778 defer cancel()
779 if err := roundrobin.CheckRoundRobinRPCs(ctx, testC, []resolver.Address{{Addr: tests[0].beListeners[0].Addr().String()}}); err != nil {
780 t.Fatal(err)
781 }
782
783
784
785 lbs[0].Stop()
786 if err := roundrobin.CheckRoundRobinRPCs(ctx, testC, []resolver.Address{{Addr: tests[1].beListeners[0].Addr().String()}}); err != nil {
787 t.Fatal(err)
788 }
789 }
790
791
792
793
794
795
796
797
798
799 func (s) TestGRPCLB_Fallback(t *testing.T) {
800 balancer.Register(newLBBuilderWithFallbackTimeout(100 * time.Millisecond))
801 defer balancer.Register(newLBBuilder())
802
803 tss, cleanup, err := startBackendsAndRemoteLoadBalancer(t, 1, "", nil)
804 if err != nil {
805 t.Fatalf("failed to create new load balancer: %v", err)
806 }
807 defer cleanup()
808 sl := &lbpb.ServerList{
809 Servers: []*lbpb.Server{
810 {
811 IpAddress: tss.beIPs[0],
812 Port: int32(tss.bePorts[0]),
813 LoadBalanceToken: lbToken,
814 },
815 },
816 }
817
818 tss.ls.sls <- sl
819
820
821 beLis, err := net.Listen("tcp", "localhost:0")
822 if err != nil {
823 t.Fatalf("Failed to listen %v", err)
824 }
825 defer beLis.Close()
826 standaloneBEs := startBackends(t, beServerName, true, beLis)
827 defer stopBackends(standaloneBEs)
828
829 r := manual.NewBuilderWithScheme("whatever")
830 dopts := []grpc.DialOption{
831 grpc.WithResolvers(r),
832 grpc.WithTransportCredentials(&serverNameCheckCreds{}),
833 grpc.WithContextDialer(fakeNameDialer),
834 }
835 cc, err := grpc.Dial(r.Scheme()+":///"+beServerName, dopts...)
836 if err != nil {
837 t.Fatalf("Failed to dial to the backend %v", err)
838 }
839 defer cc.Close()
840 testC := testgrpc.NewTestServiceClient(cc)
841
842
843
844
845 rs := resolver.State{
846 Addresses: []resolver.Address{{Addr: beLis.Addr().String()}},
847 ServiceConfig: r.CC.ParseServiceConfig(grpclbConfig),
848 }
849 rs = grpclbstate.Set(rs, &grpclbstate.State{BalancerAddresses: []resolver.Address{{Addr: "invalid.address", ServerName: lbServerName}}})
850 r.UpdateState(rs)
851
852
853 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
854 defer cancel()
855 if err := roundrobin.CheckRoundRobinRPCs(ctx, testC, []resolver.Address{{Addr: beLis.Addr().String()}}); err != nil {
856 t.Fatal(err)
857 }
858
859
860
861 rs = resolver.State{
862 ServiceConfig: r.CC.ParseServiceConfig(grpclbConfig),
863 Addresses: []resolver.Address{{Addr: beLis.Addr().String()}},
864 }
865 rs = grpclbstate.Set(rs, &grpclbstate.State{BalancerAddresses: []resolver.Address{{Addr: tss.lbAddr, ServerName: lbServerName}}})
866 r.UpdateState(rs)
867 select {
868 case <-ctx.Done():
869 t.Fatalf("timeout when waiting for BalanceLoad RPC to be called on the remote balancer")
870 case <-tss.ls.balanceLoadCh:
871 }
872
873
874 if err := roundrobin.CheckRoundRobinRPCs(ctx, testC, []resolver.Address{{Addr: tss.beListeners[0].Addr().String()}}); err != nil {
875 t.Fatal(err)
876 }
877
878
879 tss.beListeners[0].(*testutils.RestartableListener).Stop()
880 tss.lbListener.(*testutils.RestartableListener).Stop()
881 if err := roundrobin.CheckRoundRobinRPCs(ctx, testC, []resolver.Address{{Addr: beLis.Addr().String()}}); err != nil {
882 t.Fatal(err)
883 }
884
885
886 tss.beListeners[0].(*testutils.RestartableListener).Restart()
887 tss.lbListener.(*testutils.RestartableListener).Restart()
888 tss.ls.sls <- sl
889 if err := roundrobin.CheckRoundRobinRPCs(ctx, testC, []resolver.Address{{Addr: tss.beListeners[0].Addr().String()}}); err != nil {
890 t.Fatal(err)
891 }
892 }
893
894
895
896
897 func (s) TestGRPCLB_ExplicitFallback(t *testing.T) {
898 tss, cleanup, err := startBackendsAndRemoteLoadBalancer(t, 1, "", nil)
899 if err != nil {
900 t.Fatalf("failed to create new load balancer: %v", err)
901 }
902 defer cleanup()
903 sl := &lbpb.ServerList{
904 Servers: []*lbpb.Server{
905 {
906 IpAddress: tss.beIPs[0],
907 Port: int32(tss.bePorts[0]),
908 LoadBalanceToken: lbToken,
909 },
910 },
911 }
912
913 tss.ls.sls <- sl
914
915
916 beLis, err := net.Listen("tcp", "localhost:0")
917 if err != nil {
918 t.Fatalf("Failed to listen %v", err)
919 }
920 defer beLis.Close()
921 standaloneBEs := startBackends(t, beServerName, true, beLis)
922 defer stopBackends(standaloneBEs)
923
924
925
926
927
928 r := manual.NewBuilderWithScheme("whatever")
929 rs := resolver.State{
930 Addresses: []resolver.Address{{Addr: beLis.Addr().String()}},
931 ServiceConfig: internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(grpclbConfig),
932 }
933 rs = grpclbstate.Set(rs, &grpclbstate.State{BalancerAddresses: []resolver.Address{{Addr: tss.lbAddr, ServerName: lbServerName}}})
934 r.InitialState(rs)
935
936 dopts := []grpc.DialOption{
937 grpc.WithResolvers(r),
938 grpc.WithTransportCredentials(&serverNameCheckCreds{}),
939 grpc.WithContextDialer(fakeNameDialer),
940 }
941 cc, err := grpc.NewClient(r.Scheme()+":///"+beServerName, dopts...)
942 if err != nil {
943 t.Fatalf("Failed to dial to the backend %v", err)
944 }
945 defer cc.Close()
946 testC := testgrpc.NewTestServiceClient(cc)
947
948 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
949 defer cancel()
950 if err := roundrobin.CheckRoundRobinRPCs(ctx, testC, []resolver.Address{{Addr: tss.beListeners[0].Addr().String()}}); err != nil {
951 t.Fatal(err)
952 }
953
954
955 tss.ls.fallbackNow()
956 if err := roundrobin.CheckRoundRobinRPCs(ctx, testC, []resolver.Address{{Addr: beLis.Addr().String()}}); err != nil {
957 t.Fatal(err)
958 }
959
960
961 tss.ls.sls <- sl
962 if err := roundrobin.CheckRoundRobinRPCs(ctx, testC, []resolver.Address{{Addr: tss.beListeners[0].Addr().String()}}); err != nil {
963 t.Fatal(err)
964 }
965 }
966
967
968
969 func (s) TestGRPCLB_FallBackWithNoServerAddress(t *testing.T) {
970 resolveNowCh := testutils.NewChannel()
971 r := manual.NewBuilderWithScheme("whatever")
972 r.ResolveNowCallback = func(resolver.ResolveNowOptions) {
973 ctx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
974 defer cancel()
975 if err := resolveNowCh.SendContext(ctx, nil); err != nil {
976 t.Error("timeout when attempting to send on resolverNowCh")
977 }
978 }
979
980
981
982 tss, cleanup, err := startBackendsAndRemoteLoadBalancer(t, 1, "", nil)
983 if err != nil {
984 t.Fatalf("failed to create new load balancer: %v", err)
985 }
986 defer cleanup()
987 sl := &lbpb.ServerList{
988 Servers: []*lbpb.Server{
989 {
990 IpAddress: tss.beIPs[0],
991 Port: int32(tss.bePorts[0]),
992 LoadBalanceToken: lbToken,
993 },
994 },
995 }
996
997
998 beLis, err := net.Listen("tcp", "localhost:0")
999 if err != nil {
1000 t.Fatalf("Failed to listen %v", err)
1001 }
1002 defer beLis.Close()
1003 standaloneBEs := startBackends(t, beServerName, true, beLis)
1004 defer stopBackends(standaloneBEs)
1005
1006 dopts := []grpc.DialOption{
1007 grpc.WithResolvers(r),
1008 grpc.WithTransportCredentials(&serverNameCheckCreds{}),
1009 grpc.WithContextDialer(fakeNameDialer),
1010 }
1011 cc, err := grpc.Dial(r.Scheme()+":///"+beServerName, dopts...)
1012 if err != nil {
1013 t.Fatalf("Failed to dial to the backend %v", err)
1014 }
1015 defer cc.Close()
1016 testC := testgrpc.NewTestServiceClient(cc)
1017
1018 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
1019 defer cancel()
1020 for i := 0; i < 2; i++ {
1021
1022
1023 r.UpdateState(resolver.State{
1024 Addresses: []resolver.Address{{Addr: beLis.Addr().String()}},
1025 ServiceConfig: r.CC.ParseServiceConfig(grpclbConfig),
1026 })
1027
1028 sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
1029 defer sCancel()
1030 if _, err := resolveNowCh.Receive(sCtx); err != context.DeadlineExceeded {
1031 t.Fatalf("unexpected resolveNow when grpclb gets no balancer address 1111, %d", i)
1032 }
1033
1034 var p peer.Peer
1035 if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil {
1036 t.Fatalf("_.EmptyCall(_, _) = _, %v, want _, <nil>", err)
1037 }
1038 if p.Addr.String() != beLis.Addr().String() {
1039 t.Fatalf("got peer: %v, want peer: %v", p.Addr, beLis.Addr())
1040 }
1041
1042 sCtx, sCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
1043 defer sCancel()
1044 if _, err := resolveNowCh.Receive(sCtx); err != context.DeadlineExceeded {
1045 t.Errorf("unexpected resolveNow when grpclb gets no balancer address 2222, %d", i)
1046 }
1047
1048 tss.ls.sls <- sl
1049
1050
1051 rs := resolver.State{
1052 Addresses: []resolver.Address{{Addr: beLis.Addr().String()}},
1053 ServiceConfig: r.CC.ParseServiceConfig(grpclbConfig),
1054 }
1055 rs = grpclbstate.Set(rs, &grpclbstate.State{BalancerAddresses: []resolver.Address{{Addr: tss.lbAddr, ServerName: lbServerName}}})
1056 r.UpdateState(rs)
1057
1058 select {
1059 case <-ctx.Done():
1060 t.Fatalf("timeout when waiting for BalanceLoad RPC to be called on the remote balancer")
1061 case <-tss.ls.balanceLoadCh:
1062 }
1063
1064 if err := roundrobin.CheckRoundRobinRPCs(ctx, testC, []resolver.Address{{Addr: tss.beListeners[0].Addr().String()}}); err != nil {
1065 t.Fatal(err)
1066 }
1067 }
1068 }
1069
1070
1071
1072
1073 func (s) TestGRPCLB_PickFirst(t *testing.T) {
1074 tss, cleanup, err := startBackendsAndRemoteLoadBalancer(t, 3, "", nil)
1075 if err != nil {
1076 t.Fatalf("failed to create new load balancer: %v", err)
1077 }
1078 defer cleanup()
1079
1080 beServers := []*lbpb.Server{{
1081 IpAddress: tss.beIPs[0],
1082 Port: int32(tss.bePorts[0]),
1083 LoadBalanceToken: lbToken,
1084 }, {
1085 IpAddress: tss.beIPs[1],
1086 Port: int32(tss.bePorts[1]),
1087 LoadBalanceToken: lbToken,
1088 }, {
1089 IpAddress: tss.beIPs[2],
1090 Port: int32(tss.bePorts[2]),
1091 LoadBalanceToken: lbToken,
1092 }}
1093 beServerAddrs := []resolver.Address{}
1094 for _, lis := range tss.beListeners {
1095 beServerAddrs = append(beServerAddrs, resolver.Address{Addr: lis.Addr().String()})
1096 }
1097
1098
1099 r := manual.NewBuilderWithScheme("whatever")
1100 dopts := []grpc.DialOption{
1101 grpc.WithResolvers(r),
1102 grpc.WithTransportCredentials(&serverNameCheckCreds{}),
1103 grpc.WithContextDialer(fakeNameDialer),
1104 }
1105 cc, err := grpc.Dial(r.Scheme()+":///"+beServerName, dopts...)
1106 if err != nil {
1107 t.Fatalf("Failed to dial to the backend %v", err)
1108 }
1109 defer cc.Close()
1110
1111
1112
1113 rs := resolver.State{ServiceConfig: r.CC.ParseServiceConfig(`{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"pick_first":{}}]}}]}`)}
1114
1115
1116
1117 r.UpdateState(grpclbstate.Set(rs, &grpclbstate.State{BalancerAddresses: []resolver.Address{{Addr: tss.lbAddr, ServerName: lbServerName}}}))
1118
1119
1120
1121 tss.ls.sls <- &lbpb.ServerList{Servers: beServers[0:3]}
1122 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
1123 defer cancel()
1124 if err := pickfirst.CheckRPCsToBackend(ctx, cc, beServerAddrs[0]); err != nil {
1125 t.Fatal(err)
1126 }
1127
1128
1129
1130 tss.ls.sls <- &lbpb.ServerList{Servers: beServers[2:]}
1131 if err := pickfirst.CheckRPCsToBackend(ctx, cc, beServerAddrs[2]); err != nil {
1132 t.Fatal(err)
1133 }
1134
1135
1136
1137
1138
1139 tss.ls.sls <- &lbpb.ServerList{Servers: beServers[1:]}
1140 if err := pickfirst.CheckRPCsToBackend(ctx, cc, beServerAddrs[2]); err != nil {
1141 t.Fatal(err)
1142 }
1143
1144
1145 s := &grpclbstate.State{
1146 BalancerAddresses: []resolver.Address{
1147 {
1148 Addr: tss.lbAddr,
1149 ServerName: lbServerName,
1150 },
1151 },
1152 }
1153 rs = grpclbstate.Set(resolver.State{ServiceConfig: r.CC.ParseServiceConfig(grpclbConfig)}, s)
1154 r.UpdateState(rs)
1155 testC := testgrpc.NewTestServiceClient(cc)
1156 if err := roundrobin.CheckRoundRobinRPCs(ctx, testC, beServerAddrs[1:]); err != nil {
1157 t.Fatal(err)
1158 }
1159
1160 tss.ls.sls <- &lbpb.ServerList{Servers: beServers[0:3]}
1161 if err := roundrobin.CheckRoundRobinRPCs(ctx, testC, beServerAddrs[0:3]); err != nil {
1162 t.Fatal(err)
1163 }
1164 }
1165
1166
1167
1168
1169 func (s) TestGRPCLB_BackendConnectionErrorPropagation(t *testing.T) {
1170 r := manual.NewBuilderWithScheme("whatever")
1171
1172
1173 tss, cleanup, err := startBackendsAndRemoteLoadBalancer(t, 0, "", nil)
1174 if err != nil {
1175 t.Fatalf("failed to create new load balancer: %v", err)
1176 }
1177 defer cleanup()
1178
1179
1180
1181
1182 beLis, err := net.Listen("tcp", "localhost:0")
1183 if err != nil {
1184 t.Fatalf("Failed to listen %v", err)
1185 }
1186 defer beLis.Close()
1187 standaloneBEs := startBackends(t, "arbitrary.invalid.name", true, beLis)
1188 defer stopBackends(standaloneBEs)
1189
1190 cc, err := grpc.Dial(r.Scheme()+":///"+beServerName,
1191 grpc.WithResolvers(r),
1192 grpc.WithTransportCredentials(&serverNameCheckCreds{}),
1193 grpc.WithContextDialer(fakeNameDialer))
1194 if err != nil {
1195 t.Fatalf("Failed to dial to the backend %v", err)
1196 }
1197 defer cc.Close()
1198 testC := testgrpc.NewTestServiceClient(cc)
1199
1200 rs := resolver.State{
1201 Addresses: []resolver.Address{{Addr: beLis.Addr().String()}},
1202 ServiceConfig: r.CC.ParseServiceConfig(grpclbConfig),
1203 }
1204 rs = grpclbstate.Set(rs, &grpclbstate.State{BalancerAddresses: []resolver.Address{{Addr: tss.lbAddr, ServerName: lbServerName}}})
1205 r.UpdateState(rs)
1206
1207
1208
1209 const expectedErrMsg = "received unexpected server name"
1210 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
1211 defer cancel()
1212 var wg sync.WaitGroup
1213 wg.Add(1)
1214 go func() {
1215 tss.ls.fallbackNow()
1216 wg.Done()
1217 }()
1218 if _, err := testC.EmptyCall(ctx, &testpb.Empty{}); err == nil || !strings.Contains(err.Error(), expectedErrMsg) {
1219 t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, rpc error containing substring: %q", testC, err, expectedErrMsg)
1220 }
1221 wg.Wait()
1222 }
1223
1224 func testGRPCLBEmptyServerList(t *testing.T, svcfg string) {
1225 tss, cleanup, err := startBackendsAndRemoteLoadBalancer(t, 1, "", nil)
1226 if err != nil {
1227 t.Fatalf("failed to create new load balancer: %v", err)
1228 }
1229 defer cleanup()
1230
1231 beServers := []*lbpb.Server{{
1232 IpAddress: tss.beIPs[0],
1233 Port: int32(tss.bePorts[0]),
1234 LoadBalanceToken: lbToken,
1235 }}
1236
1237 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
1238 defer cancel()
1239 r := manual.NewBuilderWithScheme("whatever")
1240 dopts := []grpc.DialOption{
1241 grpc.WithResolvers(r),
1242 grpc.WithTransportCredentials(&serverNameCheckCreds{}),
1243 grpc.WithContextDialer(fakeNameDialer),
1244 }
1245 cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName, dopts...)
1246 if err != nil {
1247 t.Fatalf("Failed to dial to the backend %v", err)
1248 }
1249 defer cc.Close()
1250 testC := testgrpc.NewTestServiceClient(cc)
1251
1252 tss.ls.sls <- &lbpb.ServerList{Servers: beServers}
1253
1254 s := &grpclbstate.State{
1255 BalancerAddresses: []resolver.Address{
1256 {
1257 Addr: tss.lbAddr,
1258 ServerName: lbServerName,
1259 },
1260 },
1261 }
1262 rs := grpclbstate.Set(resolver.State{ServiceConfig: r.CC.ParseServiceConfig(svcfg)}, s)
1263 r.UpdateState(rs)
1264 t.Log("Perform an initial RPC and expect it to succeed...")
1265 if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
1266 t.Fatalf("Initial _.EmptyCall(_, _) = _, %v, want _, <nil>", err)
1267 }
1268 t.Log("Now send an empty server list. Wait until we see an RPC failure to make sure the client got it...")
1269 tss.ls.sls <- &lbpb.ServerList{}
1270 gotError := false
1271 for ; ctx.Err() == nil; <-time.After(time.Millisecond) {
1272 if _, err := testC.EmptyCall(ctx, &testpb.Empty{}); err != nil {
1273 gotError = true
1274 break
1275 }
1276 }
1277 if !gotError {
1278 t.Fatalf("Expected to eventually see an RPC fail after the grpclb sends an empty server list, but none did.")
1279 }
1280 t.Log("Now send a non-empty server list. A wait-for-ready RPC should now succeed...")
1281 tss.ls.sls <- &lbpb.ServerList{Servers: beServers}
1282 if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
1283 t.Fatalf("Final _.EmptyCall(_, _) = _, %v, want _, <nil>", err)
1284 }
1285 }
1286
1287 func (s) TestGRPCLBEmptyServerListRoundRobin(t *testing.T) {
1288 testGRPCLBEmptyServerList(t, `{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"round_robin":{}}]}}]}`)
1289 }
1290
1291 func (s) TestGRPCLBEmptyServerListPickFirst(t *testing.T) {
1292 testGRPCLBEmptyServerList(t, `{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"pick_first":{}}]}}]}`)
1293 }
1294
1295 func (s) TestGRPCLBWithTargetNameFieldInConfig(t *testing.T) {
1296 r := manual.NewBuilderWithScheme("whatever")
1297
1298 tss, cleanup, err := startBackendsAndRemoteLoadBalancer(t, 1, "", nil)
1299 if err != nil {
1300 t.Fatalf("failed to create new load balancer: %v", err)
1301 }
1302 defer cleanup()
1303 sl := &lbpb.ServerList{
1304 Servers: []*lbpb.Server{
1305 {
1306 IpAddress: tss.beIPs[0],
1307 Port: int32(tss.bePorts[0]),
1308 LoadBalanceToken: lbToken,
1309 },
1310 },
1311 }
1312
1313 tss.ls.sls <- sl
1314
1315 cc, err := grpc.Dial(r.Scheme()+":///"+beServerName,
1316 grpc.WithResolvers(r),
1317 grpc.WithTransportCredentials(&serverNameCheckCreds{}),
1318 grpc.WithContextDialer(fakeNameDialer),
1319 grpc.WithUserAgent(testUserAgent))
1320 if err != nil {
1321 t.Fatalf("Failed to dial to the backend %v", err)
1322 }
1323 defer cc.Close()
1324 testC := testgrpc.NewTestServiceClient(cc)
1325
1326
1327
1328
1329 rs := grpclbstate.Set(resolver.State{ServiceConfig: r.CC.ParseServiceConfig(grpclbConfig)},
1330 &grpclbstate.State{BalancerAddresses: []resolver.Address{{
1331 Addr: tss.lbAddr,
1332 ServerName: lbServerName,
1333 }}})
1334 r.UpdateState(rs)
1335
1336 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
1337 defer cancel()
1338 select {
1339 case <-ctx.Done():
1340 t.Fatalf("timeout when waiting for BalanceLoad RPC to be called on the remote balancer")
1341 case <-tss.ls.balanceLoadCh:
1342 }
1343 if _, err := testC.EmptyCall(ctx, &testpb.Empty{}); err != nil {
1344 t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
1345 }
1346
1347
1348
1349
1350 const newServerName = "new-server-name"
1351 tss.ls.updateServerName(newServerName)
1352 tss.ls.sls <- sl
1353
1354
1355
1356
1357 lbCfg := fmt.Sprintf(`{"loadBalancingConfig": [{"grpclb": {"serviceName": "%s"}}]}`, newServerName)
1358 s := &grpclbstate.State{
1359 BalancerAddresses: []resolver.Address{
1360 {
1361 Addr: tss.lbAddr,
1362 ServerName: lbServerName,
1363 },
1364 },
1365 }
1366 rs = grpclbstate.Set(resolver.State{ServiceConfig: r.CC.ParseServiceConfig(lbCfg)}, s)
1367 r.UpdateState(rs)
1368 select {
1369 case <-ctx.Done():
1370 t.Fatalf("timeout when waiting for BalanceLoad RPC to be called on the remote balancer")
1371 case <-tss.ls.balanceLoadCh:
1372 }
1373
1374 if _, err := testC.EmptyCall(ctx, &testpb.Empty{}); err != nil {
1375 t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
1376 }
1377 }
1378
1379 type failPreRPCCred struct{}
1380
1381 func (failPreRPCCred) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
1382 if strings.Contains(uri[0], failtosendURI) {
1383 return nil, fmt.Errorf("rpc should fail to send")
1384 }
1385 return nil, nil
1386 }
1387
1388 func (failPreRPCCred) RequireTransportSecurity() bool {
1389 return false
1390 }
1391
1392 func checkStats(stats, expected *rpcStats) error {
1393 if !stats.equal(expected) {
1394 return fmt.Errorf("stats not equal: got %+v, want %+v", stats, expected)
1395 }
1396 return nil
1397 }
1398
1399 func runAndCheckStats(t *testing.T, drop bool, statsChan chan *lbpb.ClientStats, runRPCs func(*grpc.ClientConn), statsWant *rpcStats) error {
1400 r := manual.NewBuilderWithScheme("whatever")
1401
1402 tss, cleanup, err := startBackendsAndRemoteLoadBalancer(t, 1, "", statsChan)
1403 if err != nil {
1404 t.Fatalf("failed to create new load balancer: %v", err)
1405 }
1406 defer cleanup()
1407 servers := []*lbpb.Server{{
1408 IpAddress: tss.beIPs[0],
1409 Port: int32(tss.bePorts[0]),
1410 LoadBalanceToken: lbToken,
1411 }}
1412 if drop {
1413 servers = append(servers, &lbpb.Server{
1414 LoadBalanceToken: lbToken,
1415 Drop: drop,
1416 })
1417 }
1418 tss.ls.sls <- &lbpb.ServerList{Servers: servers}
1419 tss.ls.statsDura = 100 * time.Millisecond
1420 creds := serverNameCheckCreds{}
1421
1422 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
1423 defer cancel()
1424 cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName, grpc.WithResolvers(r),
1425 grpc.WithTransportCredentials(&creds),
1426 grpc.WithPerRPCCredentials(failPreRPCCred{}),
1427 grpc.WithContextDialer(fakeNameDialer))
1428 if err != nil {
1429 t.Fatalf("Failed to dial to the backend %v", err)
1430 }
1431 defer cc.Close()
1432
1433 rstate := resolver.State{ServiceConfig: r.CC.ParseServiceConfig(grpclbConfig)}
1434 r.UpdateState(grpclbstate.Set(rstate, &grpclbstate.State{BalancerAddresses: []resolver.Address{{
1435 Addr: tss.lbAddr,
1436 ServerName: lbServerName,
1437 }}}))
1438
1439 runRPCs(cc)
1440 end := time.Now().Add(time.Second)
1441 for time.Now().Before(end) {
1442 if err := checkStats(tss.ls.stats, statsWant); err == nil {
1443 time.Sleep(200 * time.Millisecond)
1444 break
1445 }
1446 }
1447 return checkStats(tss.ls.stats, statsWant)
1448 }
1449
1450 const (
1451 countRPC = 40
1452 failtosendURI = "failtosend"
1453 )
1454
1455 func (s) TestGRPCLBStatsUnarySuccess(t *testing.T) {
1456 if err := runAndCheckStats(t, false, nil, func(cc *grpc.ClientConn) {
1457 testC := testgrpc.NewTestServiceClient(cc)
1458 ctx, cancel := context.WithTimeout(context.Background(), defaultFallbackTimeout)
1459 defer cancel()
1460
1461 if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
1462 t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
1463 }
1464 for i := 0; i < countRPC-1; i++ {
1465 testC.EmptyCall(ctx, &testpb.Empty{})
1466 }
1467 }, &rpcStats{
1468 numCallsStarted: int64(countRPC),
1469 numCallsFinished: int64(countRPC),
1470 numCallsFinishedKnownReceived: int64(countRPC),
1471 }); err != nil {
1472 t.Fatal(err)
1473 }
1474 }
1475
1476 func (s) TestGRPCLBStatsUnaryDrop(t *testing.T) {
1477 if err := runAndCheckStats(t, true, nil, func(cc *grpc.ClientConn) {
1478 testC := testgrpc.NewTestServiceClient(cc)
1479 ctx, cancel := context.WithTimeout(context.Background(), defaultFallbackTimeout)
1480 defer cancel()
1481
1482 if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
1483 t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
1484 }
1485 for i := 0; i < countRPC-1; i++ {
1486 testC.EmptyCall(ctx, &testpb.Empty{})
1487 }
1488 }, &rpcStats{
1489 numCallsStarted: int64(countRPC),
1490 numCallsFinished: int64(countRPC),
1491 numCallsFinishedKnownReceived: int64(countRPC) / 2,
1492 numCallsDropped: map[string]int64{lbToken: int64(countRPC) / 2},
1493 }); err != nil {
1494 t.Fatal(err)
1495 }
1496 }
1497
1498 func (s) TestGRPCLBStatsUnaryFailedToSend(t *testing.T) {
1499 if err := runAndCheckStats(t, false, nil, func(cc *grpc.ClientConn) {
1500 testC := testgrpc.NewTestServiceClient(cc)
1501 ctx, cancel := context.WithTimeout(context.Background(), defaultFallbackTimeout)
1502 defer cancel()
1503
1504 if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
1505 t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
1506 }
1507 for i := 0; i < countRPC-1; i++ {
1508 cc.Invoke(ctx, failtosendURI, &testpb.Empty{}, nil)
1509 }
1510 }, &rpcStats{
1511 numCallsStarted: int64(countRPC),
1512 numCallsFinished: int64(countRPC),
1513 numCallsFinishedWithClientFailedToSend: int64(countRPC) - 1,
1514 numCallsFinishedKnownReceived: 1,
1515 }); err != nil {
1516 t.Fatal(err)
1517 }
1518 }
1519
1520 func (s) TestGRPCLBStatsStreamingSuccess(t *testing.T) {
1521 if err := runAndCheckStats(t, false, nil, func(cc *grpc.ClientConn) {
1522 testC := testgrpc.NewTestServiceClient(cc)
1523 ctx, cancel := context.WithTimeout(context.Background(), defaultFallbackTimeout)
1524 defer cancel()
1525
1526 stream, err := testC.FullDuplexCall(ctx, grpc.WaitForReady(true))
1527 if err != nil {
1528 t.Fatalf("%v.FullDuplexCall(_, _) = _, %v, want _, <nil>", testC, err)
1529 }
1530 for {
1531 if _, err = stream.Recv(); err == io.EOF {
1532 break
1533 }
1534 }
1535 for i := 0; i < countRPC-1; i++ {
1536 stream, err = testC.FullDuplexCall(ctx)
1537 if err == nil {
1538
1539 for {
1540 if _, err = stream.Recv(); err == io.EOF {
1541 break
1542 }
1543 }
1544 }
1545 }
1546 }, &rpcStats{
1547 numCallsStarted: int64(countRPC),
1548 numCallsFinished: int64(countRPC),
1549 numCallsFinishedKnownReceived: int64(countRPC),
1550 }); err != nil {
1551 t.Fatal(err)
1552 }
1553 }
1554
1555 func (s) TestGRPCLBStatsStreamingDrop(t *testing.T) {
1556 if err := runAndCheckStats(t, true, nil, func(cc *grpc.ClientConn) {
1557 testC := testgrpc.NewTestServiceClient(cc)
1558 ctx, cancel := context.WithTimeout(context.Background(), defaultFallbackTimeout)
1559 defer cancel()
1560
1561 stream, err := testC.FullDuplexCall(ctx, grpc.WaitForReady(true))
1562 if err != nil {
1563 t.Fatalf("%v.FullDuplexCall(_, _) = _, %v, want _, <nil>", testC, err)
1564 }
1565 for {
1566 if _, err = stream.Recv(); err == io.EOF {
1567 break
1568 }
1569 }
1570 for i := 0; i < countRPC-1; i++ {
1571 stream, err = testC.FullDuplexCall(ctx)
1572 if err == nil {
1573
1574 for {
1575 if _, err = stream.Recv(); err == io.EOF {
1576 break
1577 }
1578 }
1579 }
1580 }
1581 }, &rpcStats{
1582 numCallsStarted: int64(countRPC),
1583 numCallsFinished: int64(countRPC),
1584 numCallsFinishedKnownReceived: int64(countRPC) / 2,
1585 numCallsDropped: map[string]int64{lbToken: int64(countRPC) / 2},
1586 }); err != nil {
1587 t.Fatal(err)
1588 }
1589 }
1590
1591 func (s) TestGRPCLBStatsStreamingFailedToSend(t *testing.T) {
1592 if err := runAndCheckStats(t, false, nil, func(cc *grpc.ClientConn) {
1593 testC := testgrpc.NewTestServiceClient(cc)
1594 ctx, cancel := context.WithTimeout(context.Background(), defaultFallbackTimeout)
1595 defer cancel()
1596
1597 stream, err := testC.FullDuplexCall(ctx, grpc.WaitForReady(true))
1598 if err != nil {
1599 t.Fatalf("%v.FullDuplexCall(_, _) = _, %v, want _, <nil>", testC, err)
1600 }
1601 for {
1602 if _, err = stream.Recv(); err == io.EOF {
1603 break
1604 }
1605 }
1606 for i := 0; i < countRPC-1; i++ {
1607 cc.NewStream(ctx, &grpc.StreamDesc{}, failtosendURI)
1608 }
1609 }, &rpcStats{
1610 numCallsStarted: int64(countRPC),
1611 numCallsFinished: int64(countRPC),
1612 numCallsFinishedWithClientFailedToSend: int64(countRPC) - 1,
1613 numCallsFinishedKnownReceived: 1,
1614 }); err != nil {
1615 t.Fatal(err)
1616 }
1617 }
1618
1619 func (s) TestGRPCLBStatsQuashEmpty(t *testing.T) {
1620 ch := make(chan *lbpb.ClientStats)
1621 defer close(ch)
1622 if err := runAndCheckStats(t, false, ch, func(cc *grpc.ClientConn) {
1623
1624
1625
1626 select {
1627 case st := <-ch:
1628 if !isZeroStats(st) {
1629 t.Errorf("got stats %v; want all zero", st)
1630 }
1631 case <-time.After(5 * time.Second):
1632 t.Errorf("did not get initial stats report after 5 seconds")
1633 return
1634 }
1635
1636 select {
1637 case st := <-ch:
1638 t.Errorf("got unexpected stats report: %v", st)
1639 case <-time.After(500 * time.Millisecond):
1640
1641 }
1642 go func() {
1643 for range ch {
1644 }
1645 }()
1646 }, &rpcStats{
1647 numCallsStarted: 0,
1648 numCallsFinished: 0,
1649 numCallsFinishedKnownReceived: 0,
1650 }); err != nil {
1651 t.Fatal(err)
1652 }
1653 }
1654
View as plain text