...

Source file src/google.golang.org/grpc/balancer/grpclb/grpclb_test.go

Documentation: google.golang.org/grpc/balancer/grpclb

     1  /*
     2   *
     3   * Copyright 2016 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package grpclb
    20  
    21  import (
    22  	"context"
    23  	"errors"
    24  	"fmt"
    25  	"io"
    26  	"net"
    27  	"strconv"
    28  	"strings"
    29  	"sync"
    30  	"sync/atomic"
    31  	"testing"
    32  	"time"
    33  
    34  	"github.com/google/go-cmp/cmp"
    35  	"github.com/google/go-cmp/cmp/cmpopts"
    36  
    37  	"google.golang.org/grpc"
    38  	"google.golang.org/grpc/balancer"
    39  	grpclbstate "google.golang.org/grpc/balancer/grpclb/state"
    40  	"google.golang.org/grpc/codes"
    41  	"google.golang.org/grpc/credentials"
    42  	"google.golang.org/grpc/internal"
    43  	"google.golang.org/grpc/internal/grpctest"
    44  	"google.golang.org/grpc/internal/testutils"
    45  	"google.golang.org/grpc/internal/testutils/pickfirst"
    46  	"google.golang.org/grpc/internal/testutils/roundrobin"
    47  	"google.golang.org/grpc/metadata"
    48  	"google.golang.org/grpc/peer"
    49  	"google.golang.org/grpc/resolver"
    50  	"google.golang.org/grpc/resolver/manual"
    51  	"google.golang.org/grpc/serviceconfig"
    52  	"google.golang.org/grpc/status"
    53  	"google.golang.org/protobuf/types/known/durationpb"
    54  
    55  	lbgrpc "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1"
    56  	lbpb "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1"
    57  	testgrpc "google.golang.org/grpc/interop/grpc_testing"
    58  	testpb "google.golang.org/grpc/interop/grpc_testing"
    59  )
    60  
    61  var (
    62  	lbServerName = "lb.server.com"
    63  	beServerName = "backends.com"
    64  	lbToken      = "iamatoken"
    65  
    66  	// Resolver replaces localhost with fakeName in Next().
    67  	// Dialer replaces fakeName with localhost when dialing.
    68  	// This will test that custom dialer is passed from Dial to grpclb.
    69  	fakeName = "fake.Name"
    70  )
    71  
    72  const (
    73  	defaultTestTimeout      = 10 * time.Second
    74  	defaultTestShortTimeout = 10 * time.Millisecond
    75  	testUserAgent           = "test-user-agent"
    76  	grpclbConfig            = `{"loadBalancingConfig": [{"grpclb": {}}]}`
    77  )
    78  
    79  type s struct {
    80  	grpctest.Tester
    81  }
    82  
    83  func Test(t *testing.T) {
    84  	grpctest.RunSubTests(t, s{})
    85  }
    86  
    87  type serverNameCheckCreds struct {
    88  	mu sync.Mutex
    89  	sn string
    90  }
    91  
    92  func (c *serverNameCheckCreds) ServerHandshake(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
    93  	if _, err := io.WriteString(rawConn, c.sn); err != nil {
    94  		fmt.Printf("Failed to write the server name %s to the client %v", c.sn, err)
    95  		return nil, nil, err
    96  	}
    97  	return rawConn, nil, nil
    98  }
    99  func (c *serverNameCheckCreds) ClientHandshake(ctx context.Context, authority string, rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
   100  	c.mu.Lock()
   101  	defer c.mu.Unlock()
   102  	b := make([]byte, len(authority))
   103  	errCh := make(chan error, 1)
   104  	go func() {
   105  		_, err := rawConn.Read(b)
   106  		errCh <- err
   107  	}()
   108  	select {
   109  	case err := <-errCh:
   110  		if err != nil {
   111  			fmt.Printf("test-creds: failed to read expected authority name from the server: %v\n", err)
   112  			return nil, nil, err
   113  		}
   114  	case <-ctx.Done():
   115  		return nil, nil, ctx.Err()
   116  	}
   117  	if authority != string(b) {
   118  		fmt.Printf("test-creds: got authority from ClientConn %q, expected by server %q\n", authority, string(b))
   119  		return nil, nil, errors.New("received unexpected server name")
   120  	}
   121  	return rawConn, nil, nil
   122  }
   123  func (c *serverNameCheckCreds) Info() credentials.ProtocolInfo {
   124  	return credentials.ProtocolInfo{}
   125  }
   126  func (c *serverNameCheckCreds) Clone() credentials.TransportCredentials {
   127  	return &serverNameCheckCreds{}
   128  }
   129  func (c *serverNameCheckCreds) OverrideServerName(s string) error {
   130  	return nil
   131  }
   132  
   133  // fakeNameDialer replaces fakeName with localhost when dialing.
   134  // This will test that custom dialer is passed from Dial to grpclb.
   135  func fakeNameDialer(ctx context.Context, addr string) (net.Conn, error) {
   136  	addr = strings.Replace(addr, fakeName, "localhost", 1)
   137  	return (&net.Dialer{}).DialContext(ctx, "tcp", addr)
   138  }
   139  
   140  // merge merges the new client stats into current stats.
   141  //
   142  // It's a test-only method. rpcStats is defined in grpclb_picker.
   143  func (s *rpcStats) merge(cs *lbpb.ClientStats) {
   144  	atomic.AddInt64(&s.numCallsStarted, cs.NumCallsStarted)
   145  	atomic.AddInt64(&s.numCallsFinished, cs.NumCallsFinished)
   146  	atomic.AddInt64(&s.numCallsFinishedWithClientFailedToSend, cs.NumCallsFinishedWithClientFailedToSend)
   147  	atomic.AddInt64(&s.numCallsFinishedKnownReceived, cs.NumCallsFinishedKnownReceived)
   148  	s.mu.Lock()
   149  	for _, perToken := range cs.CallsFinishedWithDrop {
   150  		s.numCallsDropped[perToken.LoadBalanceToken] += perToken.NumCalls
   151  	}
   152  	s.mu.Unlock()
   153  }
   154  
   155  func atomicEqual(a, b *int64) bool {
   156  	return atomic.LoadInt64(a) == atomic.LoadInt64(b)
   157  }
   158  
   159  // equal compares two rpcStats.
   160  //
   161  // It's a test-only method. rpcStats is defined in grpclb_picker.
   162  func (s *rpcStats) equal(o *rpcStats) bool {
   163  	if !atomicEqual(&s.numCallsStarted, &o.numCallsStarted) {
   164  		return false
   165  	}
   166  	if !atomicEqual(&s.numCallsFinished, &o.numCallsFinished) {
   167  		return false
   168  	}
   169  	if !atomicEqual(&s.numCallsFinishedWithClientFailedToSend, &o.numCallsFinishedWithClientFailedToSend) {
   170  		return false
   171  	}
   172  	if !atomicEqual(&s.numCallsFinishedKnownReceived, &o.numCallsFinishedKnownReceived) {
   173  		return false
   174  	}
   175  	s.mu.Lock()
   176  	defer s.mu.Unlock()
   177  	o.mu.Lock()
   178  	defer o.mu.Unlock()
   179  	return cmp.Equal(s.numCallsDropped, o.numCallsDropped, cmpopts.EquateEmpty())
   180  }
   181  
   182  func (s *rpcStats) String() string {
   183  	s.mu.Lock()
   184  	defer s.mu.Unlock()
   185  	return fmt.Sprintf("Started: %v, Finished: %v, FinishedWithClientFailedToSend: %v, FinishedKnownReceived: %v, Dropped: %v",
   186  		atomic.LoadInt64(&s.numCallsStarted),
   187  		atomic.LoadInt64(&s.numCallsFinished),
   188  		atomic.LoadInt64(&s.numCallsFinishedWithClientFailedToSend),
   189  		atomic.LoadInt64(&s.numCallsFinishedKnownReceived),
   190  		s.numCallsDropped)
   191  }
   192  
   193  type remoteBalancer struct {
   194  	lbgrpc.UnimplementedLoadBalancerServer
   195  	sls           chan *lbpb.ServerList
   196  	statsDura     time.Duration
   197  	done          chan struct{}
   198  	stats         *rpcStats
   199  	statsChan     chan *lbpb.ClientStats
   200  	fbChan        chan struct{}
   201  	balanceLoadCh chan struct{} // notify successful invocation of BalanceLoad
   202  
   203  	wantUserAgent  string // expected user-agent in metadata of BalancerLoad
   204  	wantServerName string // expected server name in InitialLoadBalanceRequest
   205  }
   206  
   207  func newRemoteBalancer(wantUserAgent, wantServerName string, statsChan chan *lbpb.ClientStats) *remoteBalancer {
   208  	return &remoteBalancer{
   209  		sls:            make(chan *lbpb.ServerList, 1),
   210  		done:           make(chan struct{}),
   211  		stats:          newRPCStats(),
   212  		statsChan:      statsChan,
   213  		fbChan:         make(chan struct{}),
   214  		balanceLoadCh:  make(chan struct{}, 1),
   215  		wantUserAgent:  wantUserAgent,
   216  		wantServerName: wantServerName,
   217  	}
   218  }
   219  
   220  func (b *remoteBalancer) stop() {
   221  	close(b.sls)
   222  	close(b.done)
   223  }
   224  
   225  func (b *remoteBalancer) fallbackNow() {
   226  	b.fbChan <- struct{}{}
   227  }
   228  
   229  func (b *remoteBalancer) updateServerName(name string) {
   230  	b.wantServerName = name
   231  }
   232  
   233  func (b *remoteBalancer) BalanceLoad(stream lbgrpc.LoadBalancer_BalanceLoadServer) error {
   234  	md, ok := metadata.FromIncomingContext(stream.Context())
   235  	if !ok {
   236  		return status.Error(codes.Internal, "failed to receive metadata")
   237  	}
   238  	if b.wantUserAgent != "" {
   239  		if ua := md["user-agent"]; len(ua) == 0 || !strings.HasPrefix(ua[0], b.wantUserAgent) {
   240  			return status.Errorf(codes.InvalidArgument, "received unexpected user-agent: %v, want prefix %q", ua, b.wantUserAgent)
   241  		}
   242  	}
   243  
   244  	req, err := stream.Recv()
   245  	if err != nil {
   246  		return err
   247  	}
   248  	initReq := req.GetInitialRequest()
   249  	if initReq.Name != b.wantServerName {
   250  		return status.Errorf(codes.InvalidArgument, "invalid service name: %q, want: %q", initReq.Name, b.wantServerName)
   251  	}
   252  	b.balanceLoadCh <- struct{}{}
   253  	resp := &lbpb.LoadBalanceResponse{
   254  		LoadBalanceResponseType: &lbpb.LoadBalanceResponse_InitialResponse{
   255  			InitialResponse: &lbpb.InitialLoadBalanceResponse{
   256  				ClientStatsReportInterval: &durationpb.Duration{
   257  					Seconds: int64(b.statsDura.Seconds()),
   258  					Nanos:   int32(b.statsDura.Nanoseconds() - int64(b.statsDura.Seconds())*1e9),
   259  				},
   260  			},
   261  		},
   262  	}
   263  	if err := stream.Send(resp); err != nil {
   264  		return err
   265  	}
   266  	go func() {
   267  		for {
   268  			req, err := stream.Recv()
   269  			if err != nil {
   270  				return
   271  			}
   272  			b.stats.merge(req.GetClientStats())
   273  			if b.statsChan != nil && req.GetClientStats() != nil {
   274  				b.statsChan <- req.GetClientStats()
   275  			}
   276  		}
   277  	}()
   278  	for {
   279  		select {
   280  		case v := <-b.sls:
   281  			resp = &lbpb.LoadBalanceResponse{
   282  				LoadBalanceResponseType: &lbpb.LoadBalanceResponse_ServerList{
   283  					ServerList: v,
   284  				},
   285  			}
   286  		case <-b.fbChan:
   287  			resp = &lbpb.LoadBalanceResponse{
   288  				LoadBalanceResponseType: &lbpb.LoadBalanceResponse_FallbackResponse{
   289  					FallbackResponse: &lbpb.FallbackResponse{},
   290  				},
   291  			}
   292  		case <-stream.Context().Done():
   293  			return stream.Context().Err()
   294  		}
   295  		if err := stream.Send(resp); err != nil {
   296  			return err
   297  		}
   298  	}
   299  }
   300  
   301  type testServer struct {
   302  	testgrpc.UnimplementedTestServiceServer
   303  
   304  	addr     string
   305  	fallback bool
   306  }
   307  
   308  const testmdkey = "testmd"
   309  
   310  func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
   311  	md, ok := metadata.FromIncomingContext(ctx)
   312  	if !ok {
   313  		return nil, status.Error(codes.Internal, "failed to receive metadata")
   314  	}
   315  	if !s.fallback && (md == nil || len(md["lb-token"]) == 0 || md["lb-token"][0] != lbToken) {
   316  		return nil, status.Errorf(codes.Internal, "received unexpected metadata: %v", md)
   317  	}
   318  	grpc.SetTrailer(ctx, metadata.Pairs(testmdkey, s.addr))
   319  	return &testpb.Empty{}, nil
   320  }
   321  
   322  func (s *testServer) FullDuplexCall(stream testgrpc.TestService_FullDuplexCallServer) error {
   323  	return nil
   324  }
   325  
   326  func startBackends(t *testing.T, sn string, fallback bool, lis ...net.Listener) (servers []*grpc.Server) {
   327  	for _, l := range lis {
   328  		creds := &serverNameCheckCreds{
   329  			sn: sn,
   330  		}
   331  		s := grpc.NewServer(grpc.Creds(creds))
   332  		testgrpc.RegisterTestServiceServer(s, &testServer{addr: l.Addr().String(), fallback: fallback})
   333  		servers = append(servers, s)
   334  		go func(s *grpc.Server, l net.Listener) {
   335  			s.Serve(l)
   336  		}(s, l)
   337  		t.Logf("Started backend server listening on %s", l.Addr().String())
   338  	}
   339  	return
   340  }
   341  
   342  func stopBackends(servers []*grpc.Server) {
   343  	for _, s := range servers {
   344  		s.Stop()
   345  	}
   346  }
   347  
   348  type testServers struct {
   349  	lbAddr   string
   350  	ls       *remoteBalancer
   351  	lb       *grpc.Server
   352  	backends []*grpc.Server
   353  	beIPs    []net.IP
   354  	bePorts  []int
   355  
   356  	lbListener  net.Listener
   357  	beListeners []net.Listener
   358  }
   359  
   360  func startBackendsAndRemoteLoadBalancer(t *testing.T, numberOfBackends int, customUserAgent string, statsChan chan *lbpb.ClientStats) (tss *testServers, cleanup func(), err error) {
   361  	var (
   362  		beListeners []net.Listener
   363  		ls          *remoteBalancer
   364  		lb          *grpc.Server
   365  		beIPs       []net.IP
   366  		bePorts     []int
   367  	)
   368  	for i := 0; i < numberOfBackends; i++ {
   369  		beLis, e := net.Listen("tcp", "localhost:0")
   370  		if e != nil {
   371  			err = fmt.Errorf("failed to listen %v", err)
   372  			return
   373  		}
   374  		beIPs = append(beIPs, beLis.Addr().(*net.TCPAddr).IP)
   375  		bePorts = append(bePorts, beLis.Addr().(*net.TCPAddr).Port)
   376  
   377  		beListeners = append(beListeners, testutils.NewRestartableListener(beLis))
   378  	}
   379  	backends := startBackends(t, beServerName, false, beListeners...)
   380  
   381  	lbLis, err := net.Listen("tcp", "localhost:0")
   382  	if err != nil {
   383  		err = fmt.Errorf("failed to create the listener for the load balancer %v", err)
   384  		return
   385  	}
   386  	lbLis = testutils.NewRestartableListener(lbLis)
   387  	lbCreds := &serverNameCheckCreds{
   388  		sn: lbServerName,
   389  	}
   390  	lb = grpc.NewServer(grpc.Creds(lbCreds))
   391  	ls = newRemoteBalancer(customUserAgent, beServerName, statsChan)
   392  	lbgrpc.RegisterLoadBalancerServer(lb, ls)
   393  	go func() {
   394  		lb.Serve(lbLis)
   395  	}()
   396  	t.Logf("Started remote load balancer server listening on %s", lbLis.Addr().String())
   397  
   398  	tss = &testServers{
   399  		lbAddr:   net.JoinHostPort(fakeName, strconv.Itoa(lbLis.Addr().(*net.TCPAddr).Port)),
   400  		ls:       ls,
   401  		lb:       lb,
   402  		backends: backends,
   403  		beIPs:    beIPs,
   404  		bePorts:  bePorts,
   405  
   406  		lbListener:  lbLis,
   407  		beListeners: beListeners,
   408  	}
   409  	cleanup = func() {
   410  		defer stopBackends(backends)
   411  		defer func() {
   412  			ls.stop()
   413  			lb.Stop()
   414  		}()
   415  	}
   416  	return
   417  }
   418  
   419  // TestGRPCLB_Basic tests the basic case of a channel being configured with
   420  // grpclb as the load balancing policy.
   421  func (s) TestGRPCLB_Basic(t *testing.T) {
   422  	tss, cleanup, err := startBackendsAndRemoteLoadBalancer(t, 1, testUserAgent, nil)
   423  	if err != nil {
   424  		t.Fatalf("failed to create new load balancer: %v", err)
   425  	}
   426  	defer cleanup()
   427  
   428  	// Push the test backend address to the remote balancer.
   429  	tss.ls.sls <- &lbpb.ServerList{
   430  		Servers: []*lbpb.Server{
   431  			{
   432  				IpAddress:        tss.beIPs[0],
   433  				Port:             int32(tss.bePorts[0]),
   434  				LoadBalanceToken: lbToken,
   435  			},
   436  		},
   437  	}
   438  
   439  	// Configure the manual resolver with an initial state containing a service
   440  	// config with grpclb as the load balancing policy and the remote balancer
   441  	// address specified via attributes.
   442  	r := manual.NewBuilderWithScheme("whatever")
   443  	s := &grpclbstate.State{
   444  		BalancerAddresses: []resolver.Address{
   445  			{
   446  				Addr:       tss.lbAddr,
   447  				ServerName: lbServerName,
   448  			},
   449  		},
   450  	}
   451  	rs := grpclbstate.Set(resolver.State{ServiceConfig: internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(grpclbConfig)}, s)
   452  	r.InitialState(rs)
   453  
   454  	// Connect to the test backend.
   455  	dopts := []grpc.DialOption{
   456  		grpc.WithResolvers(r),
   457  		grpc.WithTransportCredentials(&serverNameCheckCreds{}),
   458  		grpc.WithContextDialer(fakeNameDialer),
   459  		grpc.WithUserAgent(testUserAgent),
   460  	}
   461  	cc, err := grpc.NewClient(r.Scheme()+":///"+beServerName, dopts...)
   462  	if err != nil {
   463  		t.Fatalf("Failed to dial to the backend %v", err)
   464  	}
   465  	defer cc.Close()
   466  
   467  	// Make one successful RPC.
   468  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   469  	defer cancel()
   470  	testC := testgrpc.NewTestServiceClient(cc)
   471  	if _, err := testC.EmptyCall(ctx, &testpb.Empty{}); err != nil {
   472  		t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
   473  	}
   474  }
   475  
   476  // TestGRPCLB_Weighted tests weighted roundrobin. The remote balancer is
   477  // configured to send a response with duplicate backend addresses (to simulate
   478  // weights) to the grpclb client. The test verifies that RPCs are weighted
   479  // roundrobin-ed across these backends.
   480  func (s) TestGRPCLB_Weighted(t *testing.T) {
   481  	tss, cleanup, err := startBackendsAndRemoteLoadBalancer(t, 2, "", nil)
   482  	if err != nil {
   483  		t.Fatalf("failed to create new load balancer: %v", err)
   484  	}
   485  	defer cleanup()
   486  
   487  	beServers := []*lbpb.Server{{
   488  		IpAddress:        tss.beIPs[0],
   489  		Port:             int32(tss.bePorts[0]),
   490  		LoadBalanceToken: lbToken,
   491  	}, {
   492  		IpAddress:        tss.beIPs[1],
   493  		Port:             int32(tss.bePorts[1]),
   494  		LoadBalanceToken: lbToken,
   495  	}}
   496  
   497  	// Configure the manual resolver with an initial state containing a service
   498  	// config with grpclb as the load balancing policy and the remote balancer
   499  	// address specified via attributes.
   500  	r := manual.NewBuilderWithScheme("whatever")
   501  	s := &grpclbstate.State{
   502  		BalancerAddresses: []resolver.Address{
   503  			{
   504  				Addr:       tss.lbAddr,
   505  				ServerName: lbServerName,
   506  			},
   507  		},
   508  	}
   509  	rs := grpclbstate.Set(resolver.State{ServiceConfig: internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(grpclbConfig)}, s)
   510  	r.InitialState(rs)
   511  
   512  	// Connect to test backends.
   513  	dopts := []grpc.DialOption{
   514  		grpc.WithResolvers(r),
   515  		grpc.WithTransportCredentials(&serverNameCheckCreds{}),
   516  		grpc.WithContextDialer(fakeNameDialer),
   517  	}
   518  	cc, err := grpc.NewClient(r.Scheme()+":///"+beServerName, dopts...)
   519  	if err != nil {
   520  		t.Fatalf("Failed to dial to the backend %v", err)
   521  	}
   522  	defer cc.Close()
   523  
   524  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   525  	defer cancel()
   526  	// Sequence represents the sequence of backends to be returned from the
   527  	// remote load balancer.
   528  	sequences := [][]int{
   529  		{0, 0, 1, 0, 1},
   530  		{0, 0, 0, 1, 1},
   531  	}
   532  	for _, seq := range sequences {
   533  		// Push the configured sequence of backend to the remote balancer, and
   534  		// compute the expected addresses to which RPCs should be routed.
   535  		var backends []*lbpb.Server
   536  		var wantAddrs []resolver.Address
   537  		for _, s := range seq {
   538  			backends = append(backends, beServers[s])
   539  			wantAddrs = append(wantAddrs, resolver.Address{Addr: tss.beListeners[s].Addr().String()})
   540  		}
   541  		tss.ls.sls <- &lbpb.ServerList{Servers: backends}
   542  
   543  		testC := testgrpc.NewTestServiceClient(cc)
   544  		if err := roundrobin.CheckWeightedRoundRobinRPCs(ctx, testC, wantAddrs); err != nil {
   545  			t.Fatal(err)
   546  		}
   547  	}
   548  }
   549  
   550  // TestGRPCLB_DropRequest tests grpclb support for dropping requests based on
   551  // configuration received from the remote balancer.
   552  //
   553  // TODO: Rewrite this test to verify drop behavior using the
   554  // ClientStats.CallsFinishedWithDrop field instead.
   555  func (s) TestGRPCLB_DropRequest(t *testing.T) {
   556  	tss, cleanup, err := startBackendsAndRemoteLoadBalancer(t, 2, "", nil)
   557  	if err != nil {
   558  		t.Fatalf("failed to create new load balancer: %v", err)
   559  	}
   560  	defer cleanup()
   561  	tss.ls.sls <- &lbpb.ServerList{
   562  		Servers: []*lbpb.Server{{
   563  			IpAddress:        tss.beIPs[0],
   564  			Port:             int32(tss.bePorts[0]),
   565  			LoadBalanceToken: lbToken,
   566  			Drop:             false,
   567  		}, {
   568  			IpAddress:        tss.beIPs[1],
   569  			Port:             int32(tss.bePorts[1]),
   570  			LoadBalanceToken: lbToken,
   571  			Drop:             false,
   572  		}, {
   573  			Drop: true,
   574  		}},
   575  	}
   576  
   577  	// Configure the manual resolver with an initial state containing a service
   578  	// config with grpclb as the load balancing policy and the remote balancer
   579  	// address specified via attributes.
   580  	r := manual.NewBuilderWithScheme("whatever")
   581  	s := &grpclbstate.State{
   582  		BalancerAddresses: []resolver.Address{
   583  			{
   584  				Addr:       tss.lbAddr,
   585  				ServerName: lbServerName,
   586  			},
   587  		},
   588  	}
   589  	rs := grpclbstate.Set(resolver.State{ServiceConfig: internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(grpclbConfig)}, s)
   590  	r.InitialState(rs)
   591  
   592  	// Connect to test backends.
   593  	dopts := []grpc.DialOption{
   594  		grpc.WithResolvers(r),
   595  		grpc.WithTransportCredentials(&serverNameCheckCreds{}),
   596  		grpc.WithContextDialer(fakeNameDialer),
   597  	}
   598  	cc, err := grpc.NewClient(r.Scheme()+":///"+beServerName, dopts...)
   599  	if err != nil {
   600  		t.Fatalf("Failed to dial to the backend %v", err)
   601  	}
   602  	defer cc.Close()
   603  	testC := testgrpc.NewTestServiceClient(cc)
   604  
   605  	var (
   606  		i int
   607  		p peer.Peer
   608  	)
   609  	const (
   610  		// Poll to wait for something to happen. Total timeout 1 second. Sleep 1
   611  		// ms each loop, and do at most 1000 loops.
   612  		sleepEachLoop = time.Millisecond
   613  		loopCount     = int(time.Second / sleepEachLoop)
   614  	)
   615  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   616  	defer cancel()
   617  	// Make a non-fail-fast RPC and wait for it to succeed.
   618  	for i = 0; i < loopCount; i++ {
   619  		if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err == nil {
   620  			break
   621  		}
   622  		time.Sleep(sleepEachLoop)
   623  	}
   624  	if i >= loopCount {
   625  		t.Fatalf("timeout waiting for the first connection to become ready. EmptyCall(_, _) = _, %v, want _, <nil>", err)
   626  	}
   627  
   628  	// Make RPCs until the peer is different. So we know both connections are
   629  	// READY.
   630  	for i = 0; i < loopCount; i++ {
   631  		var temp peer.Peer
   632  		if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&temp)); err == nil {
   633  			if temp.Addr.(*net.TCPAddr).Port != p.Addr.(*net.TCPAddr).Port {
   634  				break
   635  			}
   636  		}
   637  		time.Sleep(sleepEachLoop)
   638  	}
   639  	if i >= loopCount {
   640  		t.Fatalf("timeout waiting for the second connection to become ready")
   641  	}
   642  
   643  	// More RPCs until drop happens. So we know the picker index, and the
   644  	// expected behavior of following RPCs.
   645  	for i = 0; i < loopCount; i++ {
   646  		if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) == codes.Unavailable {
   647  			break
   648  		}
   649  		time.Sleep(sleepEachLoop)
   650  	}
   651  	if i >= loopCount {
   652  		t.Fatalf("timeout waiting for drop. EmptyCall(_, _) = _, %v, want _, <Unavailable>", err)
   653  	}
   654  
   655  	select {
   656  	case <-ctx.Done():
   657  		t.Fatal("timed out", ctx.Err())
   658  	default:
   659  	}
   660  	for _, failfast := range []bool{true, false} {
   661  		for i := 0; i < 3; i++ {
   662  			// 1st RPCs pick the first item in server list. They should succeed
   663  			// since they choose the non-drop-request backend according to the
   664  			// round robin policy.
   665  			if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(!failfast)); err != nil {
   666  				t.Errorf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
   667  			}
   668  			// 2nd RPCs pick the second item in server list. They should succeed
   669  			// since they choose the non-drop-request backend according to the
   670  			// round robin policy.
   671  			if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(!failfast)); err != nil {
   672  				t.Errorf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
   673  			}
   674  			// 3rd RPCs should fail, because they pick last item in server list,
   675  			// with Drop set to true.
   676  			if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(!failfast)); status.Code(err) != codes.Unavailable {
   677  				t.Errorf("%v.EmptyCall(_, _) = _, %v, want _, %s", testC, err, codes.Unavailable)
   678  			}
   679  		}
   680  	}
   681  
   682  	// Make one more RPC to move the picker index one step further, so it's not
   683  	// 0. The following RPCs will test that drop index is not reset. If picker
   684  	// index is at 0, we cannot tell whether it's reset or not.
   685  	if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
   686  		t.Errorf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
   687  	}
   688  
   689  	tss.backends[0].Stop()
   690  	// This last pick was backend 0. Closing backend 0 doesn't reset drop index
   691  	// (for level 1 picking), so the following picks will be (backend1, drop,
   692  	// backend1), instead of (backend, backend, drop) if drop index was reset.
   693  	time.Sleep(time.Second)
   694  	for i := 0; i < 3; i++ {
   695  		var p peer.Peer
   696  		if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil {
   697  			t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
   698  		}
   699  		if want := tss.bePorts[1]; p.Addr.(*net.TCPAddr).Port != want {
   700  			t.Errorf("got peer: %v, want peer port: %v", p.Addr, want)
   701  		}
   702  
   703  		if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.Unavailable {
   704  			t.Errorf("%v.EmptyCall(_, _) = _, %v, want _, %s", testC, err, codes.Unavailable)
   705  		}
   706  
   707  		if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil {
   708  			t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
   709  		}
   710  		if want := tss.bePorts[1]; p.Addr.(*net.TCPAddr).Port != want {
   711  			t.Errorf("got peer: %v, want peer port: %v", p.Addr, want)
   712  		}
   713  	}
   714  }
   715  
   716  // TestGRPCLB_BalancerDisconnects tests the case where the remote balancer in
   717  // use disconnects. The test verifies that grpclb connects to the next remote
   718  // balancer address specified in attributes, and RPCs get routed to the backends
   719  // returned by the new balancer.
   720  func (s) TestGRPCLB_BalancerDisconnects(t *testing.T) {
   721  	var (
   722  		tests []*testServers
   723  		lbs   []*grpc.Server
   724  	)
   725  	for i := 0; i < 2; i++ {
   726  		tss, cleanup, err := startBackendsAndRemoteLoadBalancer(t, 1, "", nil)
   727  		if err != nil {
   728  			t.Fatalf("failed to create new load balancer: %v", err)
   729  		}
   730  		defer cleanup()
   731  
   732  		tss.ls.sls <- &lbpb.ServerList{
   733  			Servers: []*lbpb.Server{
   734  				{
   735  					IpAddress:        tss.beIPs[0],
   736  					Port:             int32(tss.bePorts[0]),
   737  					LoadBalanceToken: lbToken,
   738  				},
   739  			},
   740  		}
   741  
   742  		tests = append(tests, tss)
   743  		lbs = append(lbs, tss.lb)
   744  	}
   745  
   746  	// Configure the manual resolver with an initial state containing a service
   747  	// config with grpclb as the load balancing policy and the remote balancer
   748  	// addresses specified via attributes.
   749  	r := manual.NewBuilderWithScheme("whatever")
   750  	s := &grpclbstate.State{
   751  		BalancerAddresses: []resolver.Address{
   752  			{
   753  				Addr:       tests[0].lbAddr,
   754  				ServerName: lbServerName,
   755  			},
   756  			{
   757  				Addr:       tests[1].lbAddr,
   758  				ServerName: lbServerName,
   759  			},
   760  		},
   761  	}
   762  	rs := grpclbstate.Set(resolver.State{ServiceConfig: internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(grpclbConfig)}, s)
   763  	r.InitialState(rs)
   764  
   765  	dopts := []grpc.DialOption{
   766  		grpc.WithResolvers(r),
   767  		grpc.WithTransportCredentials(&serverNameCheckCreds{}),
   768  		grpc.WithContextDialer(fakeNameDialer),
   769  	}
   770  	cc, err := grpc.NewClient(r.Scheme()+":///"+beServerName, dopts...)
   771  	if err != nil {
   772  		t.Fatalf("Failed to dial to the backend %v", err)
   773  	}
   774  	defer cc.Close()
   775  	testC := testgrpc.NewTestServiceClient(cc)
   776  
   777  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   778  	defer cancel()
   779  	if err := roundrobin.CheckRoundRobinRPCs(ctx, testC, []resolver.Address{{Addr: tests[0].beListeners[0].Addr().String()}}); err != nil {
   780  		t.Fatal(err)
   781  	}
   782  
   783  	// Stop balancer[0], balancer[1] should be used by grpclb.
   784  	// Check peer address to see if that happened.
   785  	lbs[0].Stop()
   786  	if err := roundrobin.CheckRoundRobinRPCs(ctx, testC, []resolver.Address{{Addr: tests[1].beListeners[0].Addr().String()}}); err != nil {
   787  		t.Fatal(err)
   788  	}
   789  }
   790  
   791  // TestGRPCLB_Fallback tests the following fallback scenarios:
   792  //   - when the remote balancer address specified in attributes is invalid, the
   793  //     test verifies that RPCs are routed to the fallback backend.
   794  //   - when the remote balancer address specified in attributes is changed to a
   795  //     valid one, the test verifies that RPCs are routed to the backend returned
   796  //     by the remote balancer.
   797  //   - when the configured remote balancer goes down, the test verifies that
   798  //     RPCs are routed to the fallback backend.
   799  func (s) TestGRPCLB_Fallback(t *testing.T) {
   800  	balancer.Register(newLBBuilderWithFallbackTimeout(100 * time.Millisecond))
   801  	defer balancer.Register(newLBBuilder())
   802  
   803  	tss, cleanup, err := startBackendsAndRemoteLoadBalancer(t, 1, "", nil)
   804  	if err != nil {
   805  		t.Fatalf("failed to create new load balancer: %v", err)
   806  	}
   807  	defer cleanup()
   808  	sl := &lbpb.ServerList{
   809  		Servers: []*lbpb.Server{
   810  			{
   811  				IpAddress:        tss.beIPs[0],
   812  				Port:             int32(tss.bePorts[0]),
   813  				LoadBalanceToken: lbToken,
   814  			},
   815  		},
   816  	}
   817  	// Push the backend address to the remote balancer.
   818  	tss.ls.sls <- sl
   819  
   820  	// Start a standalone backend for fallback.
   821  	beLis, err := net.Listen("tcp", "localhost:0")
   822  	if err != nil {
   823  		t.Fatalf("Failed to listen %v", err)
   824  	}
   825  	defer beLis.Close()
   826  	standaloneBEs := startBackends(t, beServerName, true, beLis)
   827  	defer stopBackends(standaloneBEs)
   828  
   829  	r := manual.NewBuilderWithScheme("whatever")
   830  	dopts := []grpc.DialOption{
   831  		grpc.WithResolvers(r),
   832  		grpc.WithTransportCredentials(&serverNameCheckCreds{}),
   833  		grpc.WithContextDialer(fakeNameDialer),
   834  	}
   835  	cc, err := grpc.Dial(r.Scheme()+":///"+beServerName, dopts...)
   836  	if err != nil {
   837  		t.Fatalf("Failed to dial to the backend %v", err)
   838  	}
   839  	defer cc.Close()
   840  	testC := testgrpc.NewTestServiceClient(cc)
   841  
   842  	// Push an update to the resolver with fallback backend address stored in
   843  	// the `Addresses` field and an invalid remote balancer address stored in
   844  	// attributes, which will cause fallback behavior to be invoked.
   845  	rs := resolver.State{
   846  		Addresses:     []resolver.Address{{Addr: beLis.Addr().String()}},
   847  		ServiceConfig: r.CC.ParseServiceConfig(grpclbConfig),
   848  	}
   849  	rs = grpclbstate.Set(rs, &grpclbstate.State{BalancerAddresses: []resolver.Address{{Addr: "invalid.address", ServerName: lbServerName}}})
   850  	r.UpdateState(rs)
   851  
   852  	// Make an RPC and verify that it got routed to the fallback backend.
   853  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   854  	defer cancel()
   855  	if err := roundrobin.CheckRoundRobinRPCs(ctx, testC, []resolver.Address{{Addr: beLis.Addr().String()}}); err != nil {
   856  		t.Fatal(err)
   857  	}
   858  
   859  	// Push another update to the resolver, this time with a valid balancer
   860  	// address in the attributes field.
   861  	rs = resolver.State{
   862  		ServiceConfig: r.CC.ParseServiceConfig(grpclbConfig),
   863  		Addresses:     []resolver.Address{{Addr: beLis.Addr().String()}},
   864  	}
   865  	rs = grpclbstate.Set(rs, &grpclbstate.State{BalancerAddresses: []resolver.Address{{Addr: tss.lbAddr, ServerName: lbServerName}}})
   866  	r.UpdateState(rs)
   867  	select {
   868  	case <-ctx.Done():
   869  		t.Fatalf("timeout when waiting for BalanceLoad RPC to be called on the remote balancer")
   870  	case <-tss.ls.balanceLoadCh:
   871  	}
   872  
   873  	// Wait for RPCs to get routed to the backend behind the remote balancer.
   874  	if err := roundrobin.CheckRoundRobinRPCs(ctx, testC, []resolver.Address{{Addr: tss.beListeners[0].Addr().String()}}); err != nil {
   875  		t.Fatal(err)
   876  	}
   877  
   878  	// Close backend and remote balancer connections, should use fallback.
   879  	tss.beListeners[0].(*testutils.RestartableListener).Stop()
   880  	tss.lbListener.(*testutils.RestartableListener).Stop()
   881  	if err := roundrobin.CheckRoundRobinRPCs(ctx, testC, []resolver.Address{{Addr: beLis.Addr().String()}}); err != nil {
   882  		t.Fatal(err)
   883  	}
   884  
   885  	// Restart backend and remote balancer, should not use fallback backend.
   886  	tss.beListeners[0].(*testutils.RestartableListener).Restart()
   887  	tss.lbListener.(*testutils.RestartableListener).Restart()
   888  	tss.ls.sls <- sl
   889  	if err := roundrobin.CheckRoundRobinRPCs(ctx, testC, []resolver.Address{{Addr: tss.beListeners[0].Addr().String()}}); err != nil {
   890  		t.Fatal(err)
   891  	}
   892  }
   893  
   894  // TestGRPCLB_ExplicitFallback tests the case where the remote balancer sends an
   895  // explicit fallback signal to the grpclb client, and the test verifies that
   896  // RPCs are routed to the fallback backend.
   897  func (s) TestGRPCLB_ExplicitFallback(t *testing.T) {
   898  	tss, cleanup, err := startBackendsAndRemoteLoadBalancer(t, 1, "", nil)
   899  	if err != nil {
   900  		t.Fatalf("failed to create new load balancer: %v", err)
   901  	}
   902  	defer cleanup()
   903  	sl := &lbpb.ServerList{
   904  		Servers: []*lbpb.Server{
   905  			{
   906  				IpAddress:        tss.beIPs[0],
   907  				Port:             int32(tss.bePorts[0]),
   908  				LoadBalanceToken: lbToken,
   909  			},
   910  		},
   911  	}
   912  	// Push the backend address to the remote balancer.
   913  	tss.ls.sls <- sl
   914  
   915  	// Start a standalone backend for fallback.
   916  	beLis, err := net.Listen("tcp", "localhost:0")
   917  	if err != nil {
   918  		t.Fatalf("Failed to listen %v", err)
   919  	}
   920  	defer beLis.Close()
   921  	standaloneBEs := startBackends(t, beServerName, true, beLis)
   922  	defer stopBackends(standaloneBEs)
   923  
   924  	// Configure the manual resolver with an initial state containing a service
   925  	// config with grpclb as the load balancing policy and the address of the
   926  	// fallback backend. The remote balancer address is specified via
   927  	// attributes.
   928  	r := manual.NewBuilderWithScheme("whatever")
   929  	rs := resolver.State{
   930  		Addresses:     []resolver.Address{{Addr: beLis.Addr().String()}},
   931  		ServiceConfig: internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(grpclbConfig),
   932  	}
   933  	rs = grpclbstate.Set(rs, &grpclbstate.State{BalancerAddresses: []resolver.Address{{Addr: tss.lbAddr, ServerName: lbServerName}}})
   934  	r.InitialState(rs)
   935  
   936  	dopts := []grpc.DialOption{
   937  		grpc.WithResolvers(r),
   938  		grpc.WithTransportCredentials(&serverNameCheckCreds{}),
   939  		grpc.WithContextDialer(fakeNameDialer),
   940  	}
   941  	cc, err := grpc.NewClient(r.Scheme()+":///"+beServerName, dopts...)
   942  	if err != nil {
   943  		t.Fatalf("Failed to dial to the backend %v", err)
   944  	}
   945  	defer cc.Close()
   946  	testC := testgrpc.NewTestServiceClient(cc)
   947  
   948  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   949  	defer cancel()
   950  	if err := roundrobin.CheckRoundRobinRPCs(ctx, testC, []resolver.Address{{Addr: tss.beListeners[0].Addr().String()}}); err != nil {
   951  		t.Fatal(err)
   952  	}
   953  
   954  	// Send fallback signal from remote balancer; should use fallback.
   955  	tss.ls.fallbackNow()
   956  	if err := roundrobin.CheckRoundRobinRPCs(ctx, testC, []resolver.Address{{Addr: beLis.Addr().String()}}); err != nil {
   957  		t.Fatal(err)
   958  	}
   959  
   960  	// Send another server list; should use backends again.
   961  	tss.ls.sls <- sl
   962  	if err := roundrobin.CheckRoundRobinRPCs(ctx, testC, []resolver.Address{{Addr: tss.beListeners[0].Addr().String()}}); err != nil {
   963  		t.Fatal(err)
   964  	}
   965  }
   966  
   967  // TestGRPCLB_FallBackWithNoServerAddress tests the fallback case where no
   968  // backend addresses are returned by the remote balancer.
   969  func (s) TestGRPCLB_FallBackWithNoServerAddress(t *testing.T) {
   970  	resolveNowCh := testutils.NewChannel()
   971  	r := manual.NewBuilderWithScheme("whatever")
   972  	r.ResolveNowCallback = func(resolver.ResolveNowOptions) {
   973  		ctx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
   974  		defer cancel()
   975  		if err := resolveNowCh.SendContext(ctx, nil); err != nil {
   976  			t.Error("timeout when attempting to send on resolverNowCh")
   977  		}
   978  	}
   979  
   980  	// Start a remote balancer and a backend. Don't push the backend address to
   981  	// the remote balancer yet.
   982  	tss, cleanup, err := startBackendsAndRemoteLoadBalancer(t, 1, "", nil)
   983  	if err != nil {
   984  		t.Fatalf("failed to create new load balancer: %v", err)
   985  	}
   986  	defer cleanup()
   987  	sl := &lbpb.ServerList{
   988  		Servers: []*lbpb.Server{
   989  			{
   990  				IpAddress:        tss.beIPs[0],
   991  				Port:             int32(tss.bePorts[0]),
   992  				LoadBalanceToken: lbToken,
   993  			},
   994  		},
   995  	}
   996  
   997  	// Start a standalone backend for fallback.
   998  	beLis, err := net.Listen("tcp", "localhost:0")
   999  	if err != nil {
  1000  		t.Fatalf("Failed to listen %v", err)
  1001  	}
  1002  	defer beLis.Close()
  1003  	standaloneBEs := startBackends(t, beServerName, true, beLis)
  1004  	defer stopBackends(standaloneBEs)
  1005  
  1006  	dopts := []grpc.DialOption{
  1007  		grpc.WithResolvers(r),
  1008  		grpc.WithTransportCredentials(&serverNameCheckCreds{}),
  1009  		grpc.WithContextDialer(fakeNameDialer),
  1010  	}
  1011  	cc, err := grpc.Dial(r.Scheme()+":///"+beServerName, dopts...)
  1012  	if err != nil {
  1013  		t.Fatalf("Failed to dial to the backend %v", err)
  1014  	}
  1015  	defer cc.Close()
  1016  	testC := testgrpc.NewTestServiceClient(cc)
  1017  
  1018  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  1019  	defer cancel()
  1020  	for i := 0; i < 2; i++ {
  1021  		// Send an update with only backend address. grpclb should enter
  1022  		// fallback and use the fallback backend.
  1023  		r.UpdateState(resolver.State{
  1024  			Addresses:     []resolver.Address{{Addr: beLis.Addr().String()}},
  1025  			ServiceConfig: r.CC.ParseServiceConfig(grpclbConfig),
  1026  		})
  1027  
  1028  		sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
  1029  		defer sCancel()
  1030  		if _, err := resolveNowCh.Receive(sCtx); err != context.DeadlineExceeded {
  1031  			t.Fatalf("unexpected resolveNow when grpclb gets no balancer address 1111, %d", i)
  1032  		}
  1033  
  1034  		var p peer.Peer
  1035  		if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil {
  1036  			t.Fatalf("_.EmptyCall(_, _) = _, %v, want _, <nil>", err)
  1037  		}
  1038  		if p.Addr.String() != beLis.Addr().String() {
  1039  			t.Fatalf("got peer: %v, want peer: %v", p.Addr, beLis.Addr())
  1040  		}
  1041  
  1042  		sCtx, sCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
  1043  		defer sCancel()
  1044  		if _, err := resolveNowCh.Receive(sCtx); err != context.DeadlineExceeded {
  1045  			t.Errorf("unexpected resolveNow when grpclb gets no balancer address 2222, %d", i)
  1046  		}
  1047  
  1048  		tss.ls.sls <- sl
  1049  		// Send an update with balancer address. The backends behind grpclb should
  1050  		// be used.
  1051  		rs := resolver.State{
  1052  			Addresses:     []resolver.Address{{Addr: beLis.Addr().String()}},
  1053  			ServiceConfig: r.CC.ParseServiceConfig(grpclbConfig),
  1054  		}
  1055  		rs = grpclbstate.Set(rs, &grpclbstate.State{BalancerAddresses: []resolver.Address{{Addr: tss.lbAddr, ServerName: lbServerName}}})
  1056  		r.UpdateState(rs)
  1057  
  1058  		select {
  1059  		case <-ctx.Done():
  1060  			t.Fatalf("timeout when waiting for BalanceLoad RPC to be called on the remote balancer")
  1061  		case <-tss.ls.balanceLoadCh:
  1062  		}
  1063  
  1064  		if err := roundrobin.CheckRoundRobinRPCs(ctx, testC, []resolver.Address{{Addr: tss.beListeners[0].Addr().String()}}); err != nil {
  1065  			t.Fatal(err)
  1066  		}
  1067  	}
  1068  }
  1069  
  1070  // TestGRPCLB_PickFirst configures grpclb with pick_first as the child policy.
  1071  // The test changes the list of backend addresses returned by the remote
  1072  // balancer and verifies that RPCs are sent to the first address returned.
  1073  func (s) TestGRPCLB_PickFirst(t *testing.T) {
  1074  	tss, cleanup, err := startBackendsAndRemoteLoadBalancer(t, 3, "", nil)
  1075  	if err != nil {
  1076  		t.Fatalf("failed to create new load balancer: %v", err)
  1077  	}
  1078  	defer cleanup()
  1079  
  1080  	beServers := []*lbpb.Server{{
  1081  		IpAddress:        tss.beIPs[0],
  1082  		Port:             int32(tss.bePorts[0]),
  1083  		LoadBalanceToken: lbToken,
  1084  	}, {
  1085  		IpAddress:        tss.beIPs[1],
  1086  		Port:             int32(tss.bePorts[1]),
  1087  		LoadBalanceToken: lbToken,
  1088  	}, {
  1089  		IpAddress:        tss.beIPs[2],
  1090  		Port:             int32(tss.bePorts[2]),
  1091  		LoadBalanceToken: lbToken,
  1092  	}}
  1093  	beServerAddrs := []resolver.Address{}
  1094  	for _, lis := range tss.beListeners {
  1095  		beServerAddrs = append(beServerAddrs, resolver.Address{Addr: lis.Addr().String()})
  1096  	}
  1097  
  1098  	// Connect to the test backends.
  1099  	r := manual.NewBuilderWithScheme("whatever")
  1100  	dopts := []grpc.DialOption{
  1101  		grpc.WithResolvers(r),
  1102  		grpc.WithTransportCredentials(&serverNameCheckCreds{}),
  1103  		grpc.WithContextDialer(fakeNameDialer),
  1104  	}
  1105  	cc, err := grpc.Dial(r.Scheme()+":///"+beServerName, dopts...)
  1106  	if err != nil {
  1107  		t.Fatalf("Failed to dial to the backend %v", err)
  1108  	}
  1109  	defer cc.Close()
  1110  
  1111  	// Push a service config with grpclb as the load balancing policy and
  1112  	// configure pick_first as its child policy.
  1113  	rs := resolver.State{ServiceConfig: r.CC.ParseServiceConfig(`{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"pick_first":{}}]}}]}`)}
  1114  
  1115  	// Push a resolver update with the remote balancer address specified via
  1116  	// attributes.
  1117  	r.UpdateState(grpclbstate.Set(rs, &grpclbstate.State{BalancerAddresses: []resolver.Address{{Addr: tss.lbAddr, ServerName: lbServerName}}}))
  1118  
  1119  	// Push all three backend addresses to the remote balancer, and verify that
  1120  	// RPCs are routed to the first backend.
  1121  	tss.ls.sls <- &lbpb.ServerList{Servers: beServers[0:3]}
  1122  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  1123  	defer cancel()
  1124  	if err := pickfirst.CheckRPCsToBackend(ctx, cc, beServerAddrs[0]); err != nil {
  1125  		t.Fatal(err)
  1126  	}
  1127  
  1128  	// Update the address list with the remote balancer and verify pick_first
  1129  	// behavior based on the new backends.
  1130  	tss.ls.sls <- &lbpb.ServerList{Servers: beServers[2:]}
  1131  	if err := pickfirst.CheckRPCsToBackend(ctx, cc, beServerAddrs[2]); err != nil {
  1132  		t.Fatal(err)
  1133  	}
  1134  
  1135  	// Update the address list with the remote balancer and verify pick_first
  1136  	// behavior based on the new backends. Since the currently connected backend
  1137  	// is in the new list (even though it is not the first one on the list),
  1138  	// pick_first will continue to use it.
  1139  	tss.ls.sls <- &lbpb.ServerList{Servers: beServers[1:]}
  1140  	if err := pickfirst.CheckRPCsToBackend(ctx, cc, beServerAddrs[2]); err != nil {
  1141  		t.Fatal(err)
  1142  	}
  1143  
  1144  	// Switch child policy to roundrobin.
  1145  	s := &grpclbstate.State{
  1146  		BalancerAddresses: []resolver.Address{
  1147  			{
  1148  				Addr:       tss.lbAddr,
  1149  				ServerName: lbServerName,
  1150  			},
  1151  		},
  1152  	}
  1153  	rs = grpclbstate.Set(resolver.State{ServiceConfig: r.CC.ParseServiceConfig(grpclbConfig)}, s)
  1154  	r.UpdateState(rs)
  1155  	testC := testgrpc.NewTestServiceClient(cc)
  1156  	if err := roundrobin.CheckRoundRobinRPCs(ctx, testC, beServerAddrs[1:]); err != nil {
  1157  		t.Fatal(err)
  1158  	}
  1159  
  1160  	tss.ls.sls <- &lbpb.ServerList{Servers: beServers[0:3]}
  1161  	if err := roundrobin.CheckRoundRobinRPCs(ctx, testC, beServerAddrs[0:3]); err != nil {
  1162  		t.Fatal(err)
  1163  	}
  1164  }
  1165  
  1166  // TestGRPCLB_BackendConnectionErrorPropagation tests the case where grpclb
  1167  // falls back to a backend which returns an error and the test verifies that the
  1168  // error is propagated to the RPC.
  1169  func (s) TestGRPCLB_BackendConnectionErrorPropagation(t *testing.T) {
  1170  	r := manual.NewBuilderWithScheme("whatever")
  1171  
  1172  	// Start up an LB which will tells the client to fall back right away.
  1173  	tss, cleanup, err := startBackendsAndRemoteLoadBalancer(t, 0, "", nil)
  1174  	if err != nil {
  1175  		t.Fatalf("failed to create new load balancer: %v", err)
  1176  	}
  1177  	defer cleanup()
  1178  
  1179  	// Start a standalone backend, to be used during fallback. The creds
  1180  	// are intentionally misconfigured in order to simulate failure of a
  1181  	// security handshake.
  1182  	beLis, err := net.Listen("tcp", "localhost:0")
  1183  	if err != nil {
  1184  		t.Fatalf("Failed to listen %v", err)
  1185  	}
  1186  	defer beLis.Close()
  1187  	standaloneBEs := startBackends(t, "arbitrary.invalid.name", true, beLis)
  1188  	defer stopBackends(standaloneBEs)
  1189  
  1190  	cc, err := grpc.Dial(r.Scheme()+":///"+beServerName,
  1191  		grpc.WithResolvers(r),
  1192  		grpc.WithTransportCredentials(&serverNameCheckCreds{}),
  1193  		grpc.WithContextDialer(fakeNameDialer))
  1194  	if err != nil {
  1195  		t.Fatalf("Failed to dial to the backend %v", err)
  1196  	}
  1197  	defer cc.Close()
  1198  	testC := testgrpc.NewTestServiceClient(cc)
  1199  
  1200  	rs := resolver.State{
  1201  		Addresses:     []resolver.Address{{Addr: beLis.Addr().String()}},
  1202  		ServiceConfig: r.CC.ParseServiceConfig(grpclbConfig),
  1203  	}
  1204  	rs = grpclbstate.Set(rs, &grpclbstate.State{BalancerAddresses: []resolver.Address{{Addr: tss.lbAddr, ServerName: lbServerName}}})
  1205  	r.UpdateState(rs)
  1206  
  1207  	// If https://github.com/grpc/grpc-go/blob/65cabd74d8e18d7347fecd414fa8d83a00035f5f/balancer/grpclb/grpclb_test.go#L103
  1208  	// changes, then expectedErrMsg may need to be updated.
  1209  	const expectedErrMsg = "received unexpected server name"
  1210  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  1211  	defer cancel()
  1212  	var wg sync.WaitGroup
  1213  	wg.Add(1)
  1214  	go func() {
  1215  		tss.ls.fallbackNow()
  1216  		wg.Done()
  1217  	}()
  1218  	if _, err := testC.EmptyCall(ctx, &testpb.Empty{}); err == nil || !strings.Contains(err.Error(), expectedErrMsg) {
  1219  		t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, rpc error containing substring: %q", testC, err, expectedErrMsg)
  1220  	}
  1221  	wg.Wait()
  1222  }
  1223  
  1224  func testGRPCLBEmptyServerList(t *testing.T, svcfg string) {
  1225  	tss, cleanup, err := startBackendsAndRemoteLoadBalancer(t, 1, "", nil)
  1226  	if err != nil {
  1227  		t.Fatalf("failed to create new load balancer: %v", err)
  1228  	}
  1229  	defer cleanup()
  1230  
  1231  	beServers := []*lbpb.Server{{
  1232  		IpAddress:        tss.beIPs[0],
  1233  		Port:             int32(tss.bePorts[0]),
  1234  		LoadBalanceToken: lbToken,
  1235  	}}
  1236  
  1237  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  1238  	defer cancel()
  1239  	r := manual.NewBuilderWithScheme("whatever")
  1240  	dopts := []grpc.DialOption{
  1241  		grpc.WithResolvers(r),
  1242  		grpc.WithTransportCredentials(&serverNameCheckCreds{}),
  1243  		grpc.WithContextDialer(fakeNameDialer),
  1244  	}
  1245  	cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName, dopts...)
  1246  	if err != nil {
  1247  		t.Fatalf("Failed to dial to the backend %v", err)
  1248  	}
  1249  	defer cc.Close()
  1250  	testC := testgrpc.NewTestServiceClient(cc)
  1251  
  1252  	tss.ls.sls <- &lbpb.ServerList{Servers: beServers}
  1253  
  1254  	s := &grpclbstate.State{
  1255  		BalancerAddresses: []resolver.Address{
  1256  			{
  1257  				Addr:       tss.lbAddr,
  1258  				ServerName: lbServerName,
  1259  			},
  1260  		},
  1261  	}
  1262  	rs := grpclbstate.Set(resolver.State{ServiceConfig: r.CC.ParseServiceConfig(svcfg)}, s)
  1263  	r.UpdateState(rs)
  1264  	t.Log("Perform an initial RPC and expect it to succeed...")
  1265  	if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
  1266  		t.Fatalf("Initial _.EmptyCall(_, _) = _, %v, want _, <nil>", err)
  1267  	}
  1268  	t.Log("Now send an empty server list. Wait until we see an RPC failure to make sure the client got it...")
  1269  	tss.ls.sls <- &lbpb.ServerList{}
  1270  	gotError := false
  1271  	for ; ctx.Err() == nil; <-time.After(time.Millisecond) {
  1272  		if _, err := testC.EmptyCall(ctx, &testpb.Empty{}); err != nil {
  1273  			gotError = true
  1274  			break
  1275  		}
  1276  	}
  1277  	if !gotError {
  1278  		t.Fatalf("Expected to eventually see an RPC fail after the grpclb sends an empty server list, but none did.")
  1279  	}
  1280  	t.Log("Now send a non-empty server list. A wait-for-ready RPC should now succeed...")
  1281  	tss.ls.sls <- &lbpb.ServerList{Servers: beServers}
  1282  	if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
  1283  		t.Fatalf("Final _.EmptyCall(_, _) = _, %v, want _, <nil>", err)
  1284  	}
  1285  }
  1286  
  1287  func (s) TestGRPCLBEmptyServerListRoundRobin(t *testing.T) {
  1288  	testGRPCLBEmptyServerList(t, `{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"round_robin":{}}]}}]}`)
  1289  }
  1290  
  1291  func (s) TestGRPCLBEmptyServerListPickFirst(t *testing.T) {
  1292  	testGRPCLBEmptyServerList(t, `{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"pick_first":{}}]}}]}`)
  1293  }
  1294  
  1295  func (s) TestGRPCLBWithTargetNameFieldInConfig(t *testing.T) {
  1296  	r := manual.NewBuilderWithScheme("whatever")
  1297  
  1298  	tss, cleanup, err := startBackendsAndRemoteLoadBalancer(t, 1, "", nil)
  1299  	if err != nil {
  1300  		t.Fatalf("failed to create new load balancer: %v", err)
  1301  	}
  1302  	defer cleanup()
  1303  	sl := &lbpb.ServerList{
  1304  		Servers: []*lbpb.Server{
  1305  			{
  1306  				IpAddress:        tss.beIPs[0],
  1307  				Port:             int32(tss.bePorts[0]),
  1308  				LoadBalanceToken: lbToken,
  1309  			},
  1310  		},
  1311  	}
  1312  	// Push the backend address to the remote balancer.
  1313  	tss.ls.sls <- sl
  1314  
  1315  	cc, err := grpc.Dial(r.Scheme()+":///"+beServerName,
  1316  		grpc.WithResolvers(r),
  1317  		grpc.WithTransportCredentials(&serverNameCheckCreds{}),
  1318  		grpc.WithContextDialer(fakeNameDialer),
  1319  		grpc.WithUserAgent(testUserAgent))
  1320  	if err != nil {
  1321  		t.Fatalf("Failed to dial to the backend %v", err)
  1322  	}
  1323  	defer cc.Close()
  1324  	testC := testgrpc.NewTestServiceClient(cc)
  1325  
  1326  	// Push a resolver update with grpclb configuration which does not contain the
  1327  	// target_name field. Our fake remote balancer is configured to always
  1328  	// expect `beServerName` as the server name in the initial request.
  1329  	rs := grpclbstate.Set(resolver.State{ServiceConfig: r.CC.ParseServiceConfig(grpclbConfig)},
  1330  		&grpclbstate.State{BalancerAddresses: []resolver.Address{{
  1331  			Addr:       tss.lbAddr,
  1332  			ServerName: lbServerName,
  1333  		}}})
  1334  	r.UpdateState(rs)
  1335  
  1336  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  1337  	defer cancel()
  1338  	select {
  1339  	case <-ctx.Done():
  1340  		t.Fatalf("timeout when waiting for BalanceLoad RPC to be called on the remote balancer")
  1341  	case <-tss.ls.balanceLoadCh:
  1342  	}
  1343  	if _, err := testC.EmptyCall(ctx, &testpb.Empty{}); err != nil {
  1344  		t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
  1345  	}
  1346  
  1347  	// When the value of target_field changes, grpclb will recreate the stream
  1348  	// to the remote balancer. So, we need to update the fake remote balancer to
  1349  	// expect a new server name in the initial request.
  1350  	const newServerName = "new-server-name"
  1351  	tss.ls.updateServerName(newServerName)
  1352  	tss.ls.sls <- sl
  1353  
  1354  	// Push the resolver update with target_field changed.
  1355  	// Push a resolver update with grpclb configuration containing the
  1356  	// target_name field. Our fake remote balancer has been updated above to expect the newServerName in the initial request.
  1357  	lbCfg := fmt.Sprintf(`{"loadBalancingConfig": [{"grpclb": {"serviceName": "%s"}}]}`, newServerName)
  1358  	s := &grpclbstate.State{
  1359  		BalancerAddresses: []resolver.Address{
  1360  			{
  1361  				Addr:       tss.lbAddr,
  1362  				ServerName: lbServerName,
  1363  			},
  1364  		},
  1365  	}
  1366  	rs = grpclbstate.Set(resolver.State{ServiceConfig: r.CC.ParseServiceConfig(lbCfg)}, s)
  1367  	r.UpdateState(rs)
  1368  	select {
  1369  	case <-ctx.Done():
  1370  		t.Fatalf("timeout when waiting for BalanceLoad RPC to be called on the remote balancer")
  1371  	case <-tss.ls.balanceLoadCh:
  1372  	}
  1373  
  1374  	if _, err := testC.EmptyCall(ctx, &testpb.Empty{}); err != nil {
  1375  		t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
  1376  	}
  1377  }
  1378  
  1379  type failPreRPCCred struct{}
  1380  
  1381  func (failPreRPCCred) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
  1382  	if strings.Contains(uri[0], failtosendURI) {
  1383  		return nil, fmt.Errorf("rpc should fail to send")
  1384  	}
  1385  	return nil, nil
  1386  }
  1387  
  1388  func (failPreRPCCred) RequireTransportSecurity() bool {
  1389  	return false
  1390  }
  1391  
  1392  func checkStats(stats, expected *rpcStats) error {
  1393  	if !stats.equal(expected) {
  1394  		return fmt.Errorf("stats not equal: got %+v, want %+v", stats, expected)
  1395  	}
  1396  	return nil
  1397  }
  1398  
  1399  func runAndCheckStats(t *testing.T, drop bool, statsChan chan *lbpb.ClientStats, runRPCs func(*grpc.ClientConn), statsWant *rpcStats) error {
  1400  	r := manual.NewBuilderWithScheme("whatever")
  1401  
  1402  	tss, cleanup, err := startBackendsAndRemoteLoadBalancer(t, 1, "", statsChan)
  1403  	if err != nil {
  1404  		t.Fatalf("failed to create new load balancer: %v", err)
  1405  	}
  1406  	defer cleanup()
  1407  	servers := []*lbpb.Server{{
  1408  		IpAddress:        tss.beIPs[0],
  1409  		Port:             int32(tss.bePorts[0]),
  1410  		LoadBalanceToken: lbToken,
  1411  	}}
  1412  	if drop {
  1413  		servers = append(servers, &lbpb.Server{
  1414  			LoadBalanceToken: lbToken,
  1415  			Drop:             drop,
  1416  		})
  1417  	}
  1418  	tss.ls.sls <- &lbpb.ServerList{Servers: servers}
  1419  	tss.ls.statsDura = 100 * time.Millisecond
  1420  	creds := serverNameCheckCreds{}
  1421  
  1422  	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  1423  	defer cancel()
  1424  	cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName, grpc.WithResolvers(r),
  1425  		grpc.WithTransportCredentials(&creds),
  1426  		grpc.WithPerRPCCredentials(failPreRPCCred{}),
  1427  		grpc.WithContextDialer(fakeNameDialer))
  1428  	if err != nil {
  1429  		t.Fatalf("Failed to dial to the backend %v", err)
  1430  	}
  1431  	defer cc.Close()
  1432  
  1433  	rstate := resolver.State{ServiceConfig: r.CC.ParseServiceConfig(grpclbConfig)}
  1434  	r.UpdateState(grpclbstate.Set(rstate, &grpclbstate.State{BalancerAddresses: []resolver.Address{{
  1435  		Addr:       tss.lbAddr,
  1436  		ServerName: lbServerName,
  1437  	}}}))
  1438  
  1439  	runRPCs(cc)
  1440  	end := time.Now().Add(time.Second)
  1441  	for time.Now().Before(end) {
  1442  		if err := checkStats(tss.ls.stats, statsWant); err == nil {
  1443  			time.Sleep(200 * time.Millisecond) // sleep for two intervals to make sure no new stats are reported.
  1444  			break
  1445  		}
  1446  	}
  1447  	return checkStats(tss.ls.stats, statsWant)
  1448  }
  1449  
  1450  const (
  1451  	countRPC      = 40
  1452  	failtosendURI = "failtosend"
  1453  )
  1454  
  1455  func (s) TestGRPCLBStatsUnarySuccess(t *testing.T) {
  1456  	if err := runAndCheckStats(t, false, nil, func(cc *grpc.ClientConn) {
  1457  		testC := testgrpc.NewTestServiceClient(cc)
  1458  		ctx, cancel := context.WithTimeout(context.Background(), defaultFallbackTimeout)
  1459  		defer cancel()
  1460  		// The first non-failfast RPC succeeds, all connections are up.
  1461  		if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
  1462  			t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
  1463  		}
  1464  		for i := 0; i < countRPC-1; i++ {
  1465  			testC.EmptyCall(ctx, &testpb.Empty{})
  1466  		}
  1467  	}, &rpcStats{
  1468  		numCallsStarted:               int64(countRPC),
  1469  		numCallsFinished:              int64(countRPC),
  1470  		numCallsFinishedKnownReceived: int64(countRPC),
  1471  	}); err != nil {
  1472  		t.Fatal(err)
  1473  	}
  1474  }
  1475  
  1476  func (s) TestGRPCLBStatsUnaryDrop(t *testing.T) {
  1477  	if err := runAndCheckStats(t, true, nil, func(cc *grpc.ClientConn) {
  1478  		testC := testgrpc.NewTestServiceClient(cc)
  1479  		ctx, cancel := context.WithTimeout(context.Background(), defaultFallbackTimeout)
  1480  		defer cancel()
  1481  		// The first non-failfast RPC succeeds, all connections are up.
  1482  		if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
  1483  			t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
  1484  		}
  1485  		for i := 0; i < countRPC-1; i++ {
  1486  			testC.EmptyCall(ctx, &testpb.Empty{})
  1487  		}
  1488  	}, &rpcStats{
  1489  		numCallsStarted:               int64(countRPC),
  1490  		numCallsFinished:              int64(countRPC),
  1491  		numCallsFinishedKnownReceived: int64(countRPC) / 2,
  1492  		numCallsDropped:               map[string]int64{lbToken: int64(countRPC) / 2},
  1493  	}); err != nil {
  1494  		t.Fatal(err)
  1495  	}
  1496  }
  1497  
  1498  func (s) TestGRPCLBStatsUnaryFailedToSend(t *testing.T) {
  1499  	if err := runAndCheckStats(t, false, nil, func(cc *grpc.ClientConn) {
  1500  		testC := testgrpc.NewTestServiceClient(cc)
  1501  		ctx, cancel := context.WithTimeout(context.Background(), defaultFallbackTimeout)
  1502  		defer cancel()
  1503  		// The first non-failfast RPC succeeds, all connections are up.
  1504  		if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
  1505  			t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
  1506  		}
  1507  		for i := 0; i < countRPC-1; i++ {
  1508  			cc.Invoke(ctx, failtosendURI, &testpb.Empty{}, nil)
  1509  		}
  1510  	}, &rpcStats{
  1511  		numCallsStarted:                        int64(countRPC),
  1512  		numCallsFinished:                       int64(countRPC),
  1513  		numCallsFinishedWithClientFailedToSend: int64(countRPC) - 1,
  1514  		numCallsFinishedKnownReceived:          1,
  1515  	}); err != nil {
  1516  		t.Fatal(err)
  1517  	}
  1518  }
  1519  
  1520  func (s) TestGRPCLBStatsStreamingSuccess(t *testing.T) {
  1521  	if err := runAndCheckStats(t, false, nil, func(cc *grpc.ClientConn) {
  1522  		testC := testgrpc.NewTestServiceClient(cc)
  1523  		ctx, cancel := context.WithTimeout(context.Background(), defaultFallbackTimeout)
  1524  		defer cancel()
  1525  		// The first non-failfast RPC succeeds, all connections are up.
  1526  		stream, err := testC.FullDuplexCall(ctx, grpc.WaitForReady(true))
  1527  		if err != nil {
  1528  			t.Fatalf("%v.FullDuplexCall(_, _) = _, %v, want _, <nil>", testC, err)
  1529  		}
  1530  		for {
  1531  			if _, err = stream.Recv(); err == io.EOF {
  1532  				break
  1533  			}
  1534  		}
  1535  		for i := 0; i < countRPC-1; i++ {
  1536  			stream, err = testC.FullDuplexCall(ctx)
  1537  			if err == nil {
  1538  				// Wait for stream to end if err is nil.
  1539  				for {
  1540  					if _, err = stream.Recv(); err == io.EOF {
  1541  						break
  1542  					}
  1543  				}
  1544  			}
  1545  		}
  1546  	}, &rpcStats{
  1547  		numCallsStarted:               int64(countRPC),
  1548  		numCallsFinished:              int64(countRPC),
  1549  		numCallsFinishedKnownReceived: int64(countRPC),
  1550  	}); err != nil {
  1551  		t.Fatal(err)
  1552  	}
  1553  }
  1554  
  1555  func (s) TestGRPCLBStatsStreamingDrop(t *testing.T) {
  1556  	if err := runAndCheckStats(t, true, nil, func(cc *grpc.ClientConn) {
  1557  		testC := testgrpc.NewTestServiceClient(cc)
  1558  		ctx, cancel := context.WithTimeout(context.Background(), defaultFallbackTimeout)
  1559  		defer cancel()
  1560  		// The first non-failfast RPC succeeds, all connections are up.
  1561  		stream, err := testC.FullDuplexCall(ctx, grpc.WaitForReady(true))
  1562  		if err != nil {
  1563  			t.Fatalf("%v.FullDuplexCall(_, _) = _, %v, want _, <nil>", testC, err)
  1564  		}
  1565  		for {
  1566  			if _, err = stream.Recv(); err == io.EOF {
  1567  				break
  1568  			}
  1569  		}
  1570  		for i := 0; i < countRPC-1; i++ {
  1571  			stream, err = testC.FullDuplexCall(ctx)
  1572  			if err == nil {
  1573  				// Wait for stream to end if err is nil.
  1574  				for {
  1575  					if _, err = stream.Recv(); err == io.EOF {
  1576  						break
  1577  					}
  1578  				}
  1579  			}
  1580  		}
  1581  	}, &rpcStats{
  1582  		numCallsStarted:               int64(countRPC),
  1583  		numCallsFinished:              int64(countRPC),
  1584  		numCallsFinishedKnownReceived: int64(countRPC) / 2,
  1585  		numCallsDropped:               map[string]int64{lbToken: int64(countRPC) / 2},
  1586  	}); err != nil {
  1587  		t.Fatal(err)
  1588  	}
  1589  }
  1590  
  1591  func (s) TestGRPCLBStatsStreamingFailedToSend(t *testing.T) {
  1592  	if err := runAndCheckStats(t, false, nil, func(cc *grpc.ClientConn) {
  1593  		testC := testgrpc.NewTestServiceClient(cc)
  1594  		ctx, cancel := context.WithTimeout(context.Background(), defaultFallbackTimeout)
  1595  		defer cancel()
  1596  		// The first non-failfast RPC succeeds, all connections are up.
  1597  		stream, err := testC.FullDuplexCall(ctx, grpc.WaitForReady(true))
  1598  		if err != nil {
  1599  			t.Fatalf("%v.FullDuplexCall(_, _) = _, %v, want _, <nil>", testC, err)
  1600  		}
  1601  		for {
  1602  			if _, err = stream.Recv(); err == io.EOF {
  1603  				break
  1604  			}
  1605  		}
  1606  		for i := 0; i < countRPC-1; i++ {
  1607  			cc.NewStream(ctx, &grpc.StreamDesc{}, failtosendURI)
  1608  		}
  1609  	}, &rpcStats{
  1610  		numCallsStarted:                        int64(countRPC),
  1611  		numCallsFinished:                       int64(countRPC),
  1612  		numCallsFinishedWithClientFailedToSend: int64(countRPC) - 1,
  1613  		numCallsFinishedKnownReceived:          1,
  1614  	}); err != nil {
  1615  		t.Fatal(err)
  1616  	}
  1617  }
  1618  
  1619  func (s) TestGRPCLBStatsQuashEmpty(t *testing.T) {
  1620  	ch := make(chan *lbpb.ClientStats)
  1621  	defer close(ch)
  1622  	if err := runAndCheckStats(t, false, ch, func(cc *grpc.ClientConn) {
  1623  		// Perform no RPCs; wait for load reports to start, which should be
  1624  		// zero, then expect no other load report within 5x the update
  1625  		// interval.
  1626  		select {
  1627  		case st := <-ch:
  1628  			if !isZeroStats(st) {
  1629  				t.Errorf("got stats %v; want all zero", st)
  1630  			}
  1631  		case <-time.After(5 * time.Second):
  1632  			t.Errorf("did not get initial stats report after 5 seconds")
  1633  			return
  1634  		}
  1635  
  1636  		select {
  1637  		case st := <-ch:
  1638  			t.Errorf("got unexpected stats report: %v", st)
  1639  		case <-time.After(500 * time.Millisecond):
  1640  			// Success.
  1641  		}
  1642  		go func() {
  1643  			for range ch { // Drain statsChan until it is closed.
  1644  			}
  1645  		}()
  1646  	}, &rpcStats{
  1647  		numCallsStarted:               0,
  1648  		numCallsFinished:              0,
  1649  		numCallsFinishedKnownReceived: 0,
  1650  	}); err != nil {
  1651  		t.Fatal(err)
  1652  	}
  1653  }
  1654  

View as plain text