...

Source file src/google.golang.org/grpc/balancer/leastrequest/balancer_test.go

Documentation: google.golang.org/grpc/balancer/leastrequest

     1  /*
     2   *
     3   * Copyright 2023 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   */
    17  
    18  package leastrequest
    19  
    20  import (
    21  	"context"
    22  	"encoding/json"
    23  	"fmt"
    24  	"strings"
    25  	"sync"
    26  	"testing"
    27  	"time"
    28  
    29  	"github.com/google/go-cmp/cmp"
    30  
    31  	"google.golang.org/grpc"
    32  	"google.golang.org/grpc/credentials/insecure"
    33  	"google.golang.org/grpc/internal"
    34  	"google.golang.org/grpc/internal/grpctest"
    35  	"google.golang.org/grpc/internal/stubserver"
    36  	testgrpc "google.golang.org/grpc/interop/grpc_testing"
    37  	testpb "google.golang.org/grpc/interop/grpc_testing"
    38  	"google.golang.org/grpc/peer"
    39  	"google.golang.org/grpc/resolver"
    40  	"google.golang.org/grpc/resolver/manual"
    41  	"google.golang.org/grpc/serviceconfig"
    42  )
    43  
    44  const (
    45  	defaultTestTimeout = 5 * time.Second
    46  )
    47  
    48  type s struct {
    49  	grpctest.Tester
    50  }
    51  
    52  func Test(t *testing.T) {
    53  	grpctest.RunSubTests(t, s{})
    54  }
    55  
    56  func (s) TestParseConfig(t *testing.T) {
    57  	parser := bb{}
    58  	tests := []struct {
    59  		name    string
    60  		input   string
    61  		wantCfg serviceconfig.LoadBalancingConfig
    62  		wantErr string
    63  	}{
    64  		{
    65  			name:  "happy-case-default",
    66  			input: `{}`,
    67  			wantCfg: &LBConfig{
    68  				ChoiceCount: 2,
    69  			},
    70  		},
    71  		{
    72  			name:  "happy-case-choice-count-set",
    73  			input: `{"choiceCount": 3}`,
    74  			wantCfg: &LBConfig{
    75  				ChoiceCount: 3,
    76  			},
    77  		},
    78  		{
    79  			name:  "happy-case-choice-count-greater-than-ten",
    80  			input: `{"choiceCount": 11}`,
    81  			wantCfg: &LBConfig{
    82  				ChoiceCount: 10,
    83  			},
    84  		},
    85  		{
    86  			name:    "choice-count-less-than-2",
    87  			input:   `{"choiceCount": 1}`,
    88  			wantErr: "must be >= 2",
    89  		},
    90  		{
    91  			name:    "invalid-json",
    92  			input:   "{{invalidjson{{",
    93  			wantErr: "invalid character",
    94  		},
    95  	}
    96  	for _, test := range tests {
    97  		t.Run(test.name, func(t *testing.T) {
    98  			gotCfg, gotErr := parser.ParseConfig(json.RawMessage(test.input))
    99  			// Substring match makes this very tightly coupled to the
   100  			// internalserviceconfig.BalancerConfig error strings. However, it
   101  			// is important to distinguish the different types of error messages
   102  			// possible as the parser has a few defined buckets of ways it can
   103  			// error out.
   104  			if (gotErr != nil) != (test.wantErr != "") {
   105  				t.Fatalf("ParseConfig(%v) = %v, wantErr %v", test.input, gotErr, test.wantErr)
   106  			}
   107  			if gotErr != nil && !strings.Contains(gotErr.Error(), test.wantErr) {
   108  				t.Fatalf("ParseConfig(%v) = %v, wantErr %v", test.input, gotErr, test.wantErr)
   109  			}
   110  			if test.wantErr != "" {
   111  				return
   112  			}
   113  			if diff := cmp.Diff(gotCfg, test.wantCfg); diff != "" {
   114  				t.Fatalf("ParseConfig(%v) got unexpected output, diff (-got +want): %v", test.input, diff)
   115  			}
   116  		})
   117  	}
   118  }
   119  
   120  // setupBackends spins up three test backends, each listening on a port on
   121  // localhost. The three backends always reply with an empty response with no
   122  // error, and for streaming receive until hitting an EOF error.
   123  func setupBackends(t *testing.T) []string {
   124  	t.Helper()
   125  	const numBackends = 3
   126  	addresses := make([]string, numBackends)
   127  	// Construct and start three working backends.
   128  	for i := 0; i < numBackends; i++ {
   129  		backend := &stubserver.StubServer{
   130  			EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
   131  				return &testpb.Empty{}, nil
   132  			},
   133  			FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
   134  				<-stream.Context().Done()
   135  				return nil
   136  			},
   137  		}
   138  		if err := backend.StartServer(); err != nil {
   139  			t.Fatalf("Failed to start backend: %v", err)
   140  		}
   141  		t.Logf("Started good TestService backend at: %q", backend.Address)
   142  		t.Cleanup(func() { backend.Stop() })
   143  		addresses[i] = backend.Address
   144  	}
   145  	return addresses
   146  }
   147  
   148  // checkRoundRobinRPCs verifies that EmptyCall RPCs on the given ClientConn,
   149  // connected to a server exposing the test.grpc_testing.TestService, are
   150  // roundrobined across the given backend addresses.
   151  //
   152  // Returns a non-nil error if context deadline expires before RPCs start to get
   153  // roundrobined across the given backends.
   154  func checkRoundRobinRPCs(ctx context.Context, client testgrpc.TestServiceClient, addrs []resolver.Address) error {
   155  	wantAddrCount := make(map[string]int)
   156  	for _, addr := range addrs {
   157  		wantAddrCount[addr.Addr]++
   158  	}
   159  	gotAddrCount := make(map[string]int)
   160  	for ; ctx.Err() == nil; <-time.After(time.Millisecond) {
   161  		gotAddrCount = make(map[string]int)
   162  		// Perform 3 iterations.
   163  		var iterations [][]string
   164  		for i := 0; i < 3; i++ {
   165  			iteration := make([]string, len(addrs))
   166  			for c := 0; c < len(addrs); c++ {
   167  				var peer peer.Peer
   168  				client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer))
   169  				iteration[c] = peer.Addr.String()
   170  			}
   171  			iterations = append(iterations, iteration)
   172  		}
   173  		// Ensure the first iteration contains all addresses in addrs.
   174  		for _, addr := range iterations[0] {
   175  			gotAddrCount[addr]++
   176  		}
   177  		if !cmp.Equal(gotAddrCount, wantAddrCount) {
   178  			continue
   179  		}
   180  		// Ensure all three iterations contain the same addresses.
   181  		if !cmp.Equal(iterations[0], iterations[1]) || !cmp.Equal(iterations[0], iterations[2]) {
   182  			continue
   183  		}
   184  		return nil
   185  	}
   186  	return fmt.Errorf("timeout when waiting for roundrobin distribution of RPCs across addresses: %v; got: %v", addrs, gotAddrCount)
   187  }
   188  
   189  // TestLeastRequestE2E tests the Least Request LB policy in an e2e style. The
   190  // Least Request balancer is configured as the top level balancer of the
   191  // channel, and is passed three addresses. Eventually, the test creates three
   192  // streams, which should be on certain backends according to the least request
   193  // algorithm. The randomness in the picker is injected in the test to be
   194  // deterministic, allowing the test to make assertions on the distribution.
   195  func (s) TestLeastRequestE2E(t *testing.T) {
   196  	defer func(u func() uint32) {
   197  		grpcranduint32 = u
   198  	}(grpcranduint32)
   199  	var index int
   200  	indexes := []uint32{
   201  		0, 0, 1, 1, 2, 2, // Triggers a round robin distribution.
   202  	}
   203  	grpcranduint32 = func() uint32 {
   204  		ret := indexes[index%len(indexes)]
   205  		index++
   206  		return ret
   207  	}
   208  	addresses := setupBackends(t)
   209  
   210  	mr := manual.NewBuilderWithScheme("lr-e2e")
   211  	defer mr.Close()
   212  
   213  	// Configure least request as top level balancer of channel.
   214  	lrscJSON := `
   215  {
   216    "loadBalancingConfig": [
   217      {
   218        "least_request_experimental": {
   219          "choiceCount": 2
   220        }
   221      }
   222    ]
   223  }`
   224  	sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(lrscJSON)
   225  	firstThreeAddresses := []resolver.Address{
   226  		{Addr: addresses[0]},
   227  		{Addr: addresses[1]},
   228  		{Addr: addresses[2]},
   229  	}
   230  	mr.InitialState(resolver.State{
   231  		Addresses:     firstThreeAddresses,
   232  		ServiceConfig: sc,
   233  	})
   234  
   235  	cc, err := grpc.NewClient(mr.Scheme()+":///", grpc.WithResolvers(mr), grpc.WithTransportCredentials(insecure.NewCredentials()))
   236  	if err != nil {
   237  		t.Fatalf("grpc.NewClient() failed: %v", err)
   238  	}
   239  	defer cc.Close()
   240  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   241  	defer cancel()
   242  	testServiceClient := testgrpc.NewTestServiceClient(cc)
   243  
   244  	// Wait for all 3 backends to round robin across. The happens because a
   245  	// SubConn transitioning into READY causes a new picker update. Once the
   246  	// picker update with all 3 backends is present, this test can start to make
   247  	// assertions based on those backends.
   248  	if err := checkRoundRobinRPCs(ctx, testServiceClient, firstThreeAddresses); err != nil {
   249  		t.Fatalf("error in expected round robin: %v", err)
   250  	}
   251  
   252  	// Map ordering of READY SubConns is non deterministic. Thus, perform 3 RPCs
   253  	// mocked from the random to each index to learn the addresses of SubConns
   254  	// at each index.
   255  	index = 0
   256  	peerAtIndex := make([]string, 3)
   257  	var peer0 peer.Peer
   258  	if _, err := testServiceClient.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer0)); err != nil {
   259  		t.Fatalf("testServiceClient.EmptyCall failed: %v", err)
   260  	}
   261  	peerAtIndex[0] = peer0.Addr.String()
   262  	if _, err := testServiceClient.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer0)); err != nil {
   263  		t.Fatalf("testServiceClient.EmptyCall failed: %v", err)
   264  	}
   265  	peerAtIndex[1] = peer0.Addr.String()
   266  	if _, err := testServiceClient.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer0)); err != nil {
   267  		t.Fatalf("testServiceClient.EmptyCall failed: %v", err)
   268  	}
   269  	peerAtIndex[2] = peer0.Addr.String()
   270  
   271  	// Start streaming RPCs, but do not finish them. Each subsequent stream
   272  	// should be started according to the least request algorithm, and chosen
   273  	// between the indexes provided.
   274  	index = 0
   275  	indexes = []uint32{
   276  		0, 0, // Causes first stream to be on first address.
   277  		0, 1, // Compares first address (one RPC) to second (no RPCs), so choose second.
   278  		1, 2, // Compares second address (one RPC) to third (no RPCs), so choose third.
   279  		0, 3, // Causes another stream on first address.
   280  		1, 0, // Compares second address (one RPC) to first (two RPCs), so choose second.
   281  		2, 0, // Compares third address (one RPC) to first (two RPCs), so choose third.
   282  		0, 0, // Causes another stream on first address.
   283  		2, 2, // Causes a stream on third address.
   284  		2, 1, // Compares third address (three RPCs) to second (two RPCs), so choose third.
   285  	}
   286  	wantIndex := []uint32{0, 1, 2, 0, 1, 2, 0, 2, 1}
   287  
   288  	// Start streaming RPC's, but do not finish them. Each created stream should
   289  	// be started based on the least request algorithm and injected randomness
   290  	// (see indexes slice above for exact expectations).
   291  	for _, wantIndex := range wantIndex {
   292  		stream, err := testServiceClient.FullDuplexCall(ctx)
   293  		if err != nil {
   294  			t.Fatalf("testServiceClient.FullDuplexCall failed: %v", err)
   295  		}
   296  		p, ok := peer.FromContext(stream.Context())
   297  		if !ok {
   298  			t.Fatalf("testServiceClient.FullDuplexCall has no Peer")
   299  		}
   300  		if p.Addr.String() != peerAtIndex[wantIndex] {
   301  			t.Fatalf("testServiceClient.FullDuplexCall's Peer got: %v, want: %v", p.Addr.String(), peerAtIndex[wantIndex])
   302  		}
   303  	}
   304  }
   305  
   306  // TestLeastRequestPersistsCounts tests that the Least Request Balancer persists
   307  // counts once it gets a new picker update. It first updates the Least Request
   308  // Balancer with two backends, and creates a bunch of streams on them. Then, it
   309  // updates the Least Request Balancer with three backends, including the two
   310  // previous. Any created streams should then be started on the new backend.
   311  func (s) TestLeastRequestPersistsCounts(t *testing.T) {
   312  	defer func(u func() uint32) {
   313  		grpcranduint32 = u
   314  	}(grpcranduint32)
   315  	var index int
   316  	indexes := []uint32{
   317  		0, 0, 1, 1,
   318  	}
   319  	grpcranduint32 = func() uint32 {
   320  		ret := indexes[index%len(indexes)]
   321  		index++
   322  		return ret
   323  	}
   324  	addresses := setupBackends(t)
   325  
   326  	mr := manual.NewBuilderWithScheme("lr-e2e")
   327  	defer mr.Close()
   328  
   329  	// Configure least request as top level balancer of channel.
   330  	lrscJSON := `
   331  {
   332    "loadBalancingConfig": [
   333      {
   334        "least_request_experimental": {
   335          "choiceCount": 2
   336        }
   337      }
   338    ]
   339  }`
   340  	sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(lrscJSON)
   341  	firstTwoAddresses := []resolver.Address{
   342  		{Addr: addresses[0]},
   343  		{Addr: addresses[1]},
   344  	}
   345  	mr.InitialState(resolver.State{
   346  		Addresses:     firstTwoAddresses,
   347  		ServiceConfig: sc,
   348  	})
   349  
   350  	cc, err := grpc.NewClient(mr.Scheme()+":///", grpc.WithResolvers(mr), grpc.WithTransportCredentials(insecure.NewCredentials()))
   351  	if err != nil {
   352  		t.Fatalf("grpc.NewClient() failed: %v", err)
   353  	}
   354  	defer cc.Close()
   355  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   356  	defer cancel()
   357  	testServiceClient := testgrpc.NewTestServiceClient(cc)
   358  
   359  	// Wait for the two backends to round robin across. The happens because a
   360  	// SubConn transitioning into READY causes a new picker update. Once the
   361  	// picker update with the two backends is present, this test can start to
   362  	// populate those backends with streams.
   363  	if err := checkRoundRobinRPCs(ctx, testServiceClient, firstTwoAddresses); err != nil {
   364  		t.Fatalf("error in expected round robin: %v", err)
   365  	}
   366  
   367  	// Start 50 streaming RPCs, and leave them unfinished for the duration of
   368  	// the test. This will populate the first two addresses with many active
   369  	// RPCs.
   370  	for i := 0; i < 50; i++ {
   371  		_, err := testServiceClient.FullDuplexCall(ctx)
   372  		if err != nil {
   373  			t.Fatalf("testServiceClient.FullDuplexCall failed: %v", err)
   374  		}
   375  	}
   376  
   377  	// Update the least request balancer to choice count 3. Also update the
   378  	// address list adding a third address. Alongside the injected randomness,
   379  	// this should trigger the least request balancer to search all created
   380  	// SubConns. Thus, since address 3 is the new address and the first two
   381  	// addresses are populated with RPCs, once the picker update of all 3 READY
   382  	// SubConns takes effect, all new streams should be started on address 3.
   383  	index = 0
   384  	indexes = []uint32{
   385  		0, 1, 2, 3, 4, 5,
   386  	}
   387  	lrscJSON = `
   388  {
   389    "loadBalancingConfig": [
   390      {
   391        "least_request_experimental": {
   392          "choiceCount": 3
   393        }
   394      }
   395    ]
   396  }`
   397  	sc = internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(lrscJSON)
   398  	fullAddresses := []resolver.Address{
   399  		{Addr: addresses[0]},
   400  		{Addr: addresses[1]},
   401  		{Addr: addresses[2]},
   402  	}
   403  	mr.UpdateState(resolver.State{
   404  		Addresses:     fullAddresses,
   405  		ServiceConfig: sc,
   406  	})
   407  	newAddress := fullAddresses[2]
   408  	// Poll for only address 3 to show up. This requires a polling loop because
   409  	// picker update with all three SubConns doesn't take into effect
   410  	// immediately, needs the third SubConn to become READY.
   411  	if err := checkRoundRobinRPCs(ctx, testServiceClient, []resolver.Address{newAddress}); err != nil {
   412  		t.Fatalf("error in expected round robin: %v", err)
   413  	}
   414  
   415  	// Start 25 rpcs, but don't finish them. They should all start on address 3,
   416  	// since the first two addresses both have 25 RPCs (and randomness
   417  	// injection/choiceCount causes all 3 to be compared every iteration).
   418  	for i := 0; i < 25; i++ {
   419  		stream, err := testServiceClient.FullDuplexCall(ctx)
   420  		if err != nil {
   421  			t.Fatalf("testServiceClient.FullDuplexCall failed: %v", err)
   422  		}
   423  		p, ok := peer.FromContext(stream.Context())
   424  		if !ok {
   425  			t.Fatalf("testServiceClient.FullDuplexCall has no Peer")
   426  		}
   427  		if p.Addr.String() != addresses[2] {
   428  			t.Fatalf("testServiceClient.FullDuplexCall's Peer got: %v, want: %v", p.Addr.String(), addresses[2])
   429  		}
   430  	}
   431  
   432  	// Now 25 RPC's are active on each address, the next three RPC's should
   433  	// round robin, since choiceCount is three and the injected random indexes
   434  	// cause it to search all three addresses for fewest outstanding requests on
   435  	// each iteration.
   436  	wantAddrCount := map[string]int{
   437  		addresses[0]: 1,
   438  		addresses[1]: 1,
   439  		addresses[2]: 1,
   440  	}
   441  	gotAddrCount := make(map[string]int)
   442  	for i := 0; i < len(addresses); i++ {
   443  		stream, err := testServiceClient.FullDuplexCall(ctx)
   444  		if err != nil {
   445  			t.Fatalf("testServiceClient.FullDuplexCall failed: %v", err)
   446  		}
   447  		p, ok := peer.FromContext(stream.Context())
   448  		if !ok {
   449  			t.Fatalf("testServiceClient.FullDuplexCall has no Peer")
   450  		}
   451  		if p.Addr != nil {
   452  			gotAddrCount[p.Addr.String()]++
   453  		}
   454  	}
   455  	if diff := cmp.Diff(gotAddrCount, wantAddrCount); diff != "" {
   456  		t.Fatalf("addr count (-got:, +want): %v", diff)
   457  	}
   458  }
   459  
   460  // TestConcurrentRPCs tests concurrent RPCs on the least request balancer. It
   461  // configures a channel with a least request balancer as the top level balancer,
   462  // and makes 100 RPCs asynchronously. This makes sure no race conditions happen
   463  // in this scenario.
   464  func (s) TestConcurrentRPCs(t *testing.T) {
   465  	addresses := setupBackends(t)
   466  
   467  	mr := manual.NewBuilderWithScheme("lr-e2e")
   468  	defer mr.Close()
   469  
   470  	// Configure least request as top level balancer of channel.
   471  	lrscJSON := `
   472  {
   473    "loadBalancingConfig": [
   474      {
   475        "least_request_experimental": {
   476          "choiceCount": 2
   477        }
   478      }
   479    ]
   480  }`
   481  	sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(lrscJSON)
   482  	firstTwoAddresses := []resolver.Address{
   483  		{Addr: addresses[0]},
   484  		{Addr: addresses[1]},
   485  	}
   486  	mr.InitialState(resolver.State{
   487  		Addresses:     firstTwoAddresses,
   488  		ServiceConfig: sc,
   489  	})
   490  
   491  	cc, err := grpc.NewClient(mr.Scheme()+":///", grpc.WithResolvers(mr), grpc.WithTransportCredentials(insecure.NewCredentials()))
   492  	if err != nil {
   493  		t.Fatalf("grpc.NewClient() failed: %v", err)
   494  	}
   495  	defer cc.Close()
   496  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   497  	defer cancel()
   498  	testServiceClient := testgrpc.NewTestServiceClient(cc)
   499  
   500  	var wg sync.WaitGroup
   501  	for i := 0; i < 100; i++ {
   502  		wg.Add(1)
   503  		go func() {
   504  			defer wg.Done()
   505  			for j := 0; j < 5; j++ {
   506  				testServiceClient.EmptyCall(ctx, &testpb.Empty{})
   507  			}
   508  		}()
   509  	}
   510  	wg.Wait()
   511  
   512  }
   513  

View as plain text