...

Source file src/google.golang.org/grpc/balancer/rls/picker_test.go

Documentation: google.golang.org/grpc/balancer/rls

     1  /*
     2   *
     3   * Copyright 2021 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package rls
    20  
    21  import (
    22  	"context"
    23  	"errors"
    24  	"fmt"
    25  	"testing"
    26  	"time"
    27  
    28  	"google.golang.org/grpc"
    29  	"google.golang.org/grpc/codes"
    30  	"google.golang.org/grpc/credentials/insecure"
    31  	"google.golang.org/grpc/internal/grpcsync"
    32  	"google.golang.org/grpc/internal/stubserver"
    33  	rlstest "google.golang.org/grpc/internal/testutils/rls"
    34  	"google.golang.org/grpc/metadata"
    35  	"google.golang.org/grpc/status"
    36  	"google.golang.org/protobuf/types/known/durationpb"
    37  
    38  	rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
    39  	testgrpc "google.golang.org/grpc/interop/grpc_testing"
    40  	testpb "google.golang.org/grpc/interop/grpc_testing"
    41  )
    42  
    43  // TestNoNonEmptyTargetsReturnsError tests the case where the RLS Server returns
    44  // a response with no non empty targets. This should be treated as an Control
    45  // Plane RPC failure, and thus fail Data Plane RPC's with an error with the
    46  // appropriate information specifying data plane sent a response with no non
    47  // empty targets.
    48  func (s) TestNoNonEmptyTargetsReturnsError(t *testing.T) {
    49  	// Setup RLS Server to return a response with an empty target string.
    50  	rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil)
    51  	rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
    52  		return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{}}
    53  	})
    54  
    55  	// Register a manual resolver and push the RLS service config through it.
    56  	rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)
    57  	r := startManualResolverWithConfig(t, rlsConfig)
    58  
    59  	// Dial the backend.
    60  	cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
    61  	if err != nil {
    62  		t.Fatalf("grpc.Dial() failed: %v", err)
    63  	}
    64  	defer cc.Close()
    65  
    66  	// Make an RPC and expect it to fail with an error specifying RLS response's
    67  	// target list does not contain any non empty entries.
    68  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
    69  	defer cancel()
    70  	makeTestRPCAndVerifyError(ctx, t, cc, codes.Unavailable, errors.New("RLS response's target list does not contain any entries for key"))
    71  
    72  	// Make sure an RLS request is sent out. Even though the RLS Server will
    73  	// return no targets, the request should still hit the server.
    74  	verifyRLSRequest(t, rlsReqCh, true)
    75  }
    76  
    77  // Test verifies the scenario where there is no matching entry in the data cache
    78  // and no pending request either, and the ensuing RLS request is throttled.
    79  func (s) TestPick_DataCacheMiss_NoPendingEntry_ThrottledWithDefaultTarget(t *testing.T) {
    80  	// Start an RLS server and set the throttler to always throttle requests.
    81  	rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil)
    82  	overrideAdaptiveThrottler(t, alwaysThrottlingThrottler())
    83  
    84  	// Build RLS service config with a default target.
    85  	rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)
    86  	defBackendCh, defBackendAddress := startBackend(t)
    87  	rlsConfig.RouteLookupConfig.DefaultTarget = defBackendAddress
    88  
    89  	// Register a manual resolver and push the RLS service config through it.
    90  	r := startManualResolverWithConfig(t, rlsConfig)
    91  
    92  	cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
    93  	if err != nil {
    94  		t.Fatalf("grpc.Dial() failed: %v", err)
    95  	}
    96  	defer cc.Close()
    97  
    98  	// Make an RPC and ensure it gets routed to the default target.
    99  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   100  	defer cancel()
   101  	makeTestRPCAndExpectItToReachBackend(ctx, t, cc, defBackendCh)
   102  
   103  	// Make sure no RLS request is sent out.
   104  	verifyRLSRequest(t, rlsReqCh, false)
   105  }
   106  
   107  // Test verifies the scenario where there is no matching entry in the data cache
   108  // and no pending request either, and the ensuing RLS request is throttled.
   109  // There is no default target configured in the service config, so the RPC is
   110  // expected to fail with an RLS throttled error.
   111  func (s) TestPick_DataCacheMiss_NoPendingEntry_ThrottledWithoutDefaultTarget(t *testing.T) {
   112  	// Start an RLS server and set the throttler to always throttle requests.
   113  	rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil)
   114  	overrideAdaptiveThrottler(t, alwaysThrottlingThrottler())
   115  
   116  	// Build an RLS config without a default target.
   117  	rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)
   118  
   119  	// Register a manual resolver and push the RLS service config through it.
   120  	r := startManualResolverWithConfig(t, rlsConfig)
   121  
   122  	// Dial the backend.
   123  	cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
   124  	if err != nil {
   125  		t.Fatalf("grpc.Dial() failed: %v", err)
   126  	}
   127  	defer cc.Close()
   128  
   129  	// Make an RPC and expect it to fail with RLS throttled error.
   130  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   131  	defer cancel()
   132  	makeTestRPCAndVerifyError(ctx, t, cc, codes.Unavailable, errRLSThrottled)
   133  
   134  	// Make sure no RLS request is sent out.
   135  	verifyRLSRequest(t, rlsReqCh, false)
   136  }
   137  
   138  // Test verifies the scenario where there is no matching entry in the data cache
   139  // and no pending request either, and the ensuing RLS request is not throttled.
   140  // The RLS response does not contain any backends, so the RPC fails with a
   141  // unavailable error.
   142  func (s) TestPick_DataCacheMiss_NoPendingEntry_NotThrottled(t *testing.T) {
   143  	// Start an RLS server and set the throttler to never throttle requests.
   144  	rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil)
   145  	overrideAdaptiveThrottler(t, neverThrottlingThrottler())
   146  
   147  	// Build an RLS config without a default target.
   148  	rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)
   149  
   150  	// Register a manual resolver and push the RLS service config through it.
   151  	r := startManualResolverWithConfig(t, rlsConfig)
   152  
   153  	// Dial the backend.
   154  	cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
   155  	if err != nil {
   156  		t.Fatalf("grpc.Dial() failed: %v", err)
   157  	}
   158  	defer cc.Close()
   159  
   160  	// Make an RPC and expect it to fail with deadline exceeded error. We use a
   161  	// smaller timeout to ensure that the test doesn't run very long.
   162  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
   163  	defer cancel()
   164  	makeTestRPCAndVerifyError(ctx, t, cc, codes.Unavailable, errors.New("RLS response's target list does not contain any entries for key"))
   165  
   166  	// Make sure an RLS request is sent out.
   167  	verifyRLSRequest(t, rlsReqCh, true)
   168  }
   169  
   170  // Test verifies the scenario where there is no matching entry in the data
   171  // cache, but there is a pending request. So, we expect no RLS request to be
   172  // sent out. The pick should be queued and not delegated to the default target.
   173  func (s) TestPick_DataCacheMiss_PendingEntryExists(t *testing.T) {
   174  	tests := []struct {
   175  		name              string
   176  		withDefaultTarget bool
   177  	}{
   178  		{
   179  			name:              "withDefaultTarget",
   180  			withDefaultTarget: true,
   181  		},
   182  		{
   183  			name:              "withoutDefaultTarget",
   184  			withDefaultTarget: false,
   185  		},
   186  	}
   187  
   188  	for _, test := range tests {
   189  		t.Run(test.name, func(t *testing.T) {
   190  			// A unary interceptor which blocks the RouteLookup RPC on the fake
   191  			// RLS server until the test is done. The first RPC by the client
   192  			// will cause the LB policy to send out an RLS request. This will
   193  			// also lead to creation of a pending entry, and further RPCs by the
   194  			// client should not result in RLS requests being sent out.
   195  			rlsReqCh := make(chan struct{}, 1)
   196  			interceptor := func(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
   197  				rlsReqCh <- struct{}{}
   198  				<-ctx.Done()
   199  				return nil, ctx.Err()
   200  			}
   201  
   202  			// Start an RLS server and set the throttler to never throttle.
   203  			rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil, grpc.UnaryInterceptor(interceptor))
   204  			overrideAdaptiveThrottler(t, neverThrottlingThrottler())
   205  
   206  			// Build RLS service config with an optional default target.
   207  			rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)
   208  			if test.withDefaultTarget {
   209  				_, defBackendAddress := startBackend(t)
   210  				rlsConfig.RouteLookupConfig.DefaultTarget = defBackendAddress
   211  			}
   212  
   213  			// Register a manual resolver and push the RLS service config
   214  			// through it.
   215  			r := startManualResolverWithConfig(t, rlsConfig)
   216  
   217  			// Dial the backend.
   218  			cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
   219  			if err != nil {
   220  				t.Fatalf("grpc.Dial() failed: %v", err)
   221  			}
   222  			defer cc.Close()
   223  
   224  			// Make an RPC that results in the RLS request being sent out. And
   225  			// since the RLS server is configured to block on the first request,
   226  			// this RPC will block until its context expires. This ensures that
   227  			// we have a pending cache entry for the duration of the test.
   228  			ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   229  			defer cancel()
   230  			go func() {
   231  				client := testgrpc.NewTestServiceClient(cc)
   232  				client.EmptyCall(ctx, &testpb.Empty{})
   233  			}()
   234  
   235  			// Make sure an RLS request is sent out.
   236  			verifyRLSRequest(t, rlsReqCh, true)
   237  
   238  			// Make another RPC and expect it to fail the same way.
   239  			ctx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
   240  			defer cancel()
   241  			makeTestRPCAndVerifyError(ctx, t, cc, codes.DeadlineExceeded, context.DeadlineExceeded)
   242  
   243  			// Make sure no RLS request is sent out this time around.
   244  			verifyRLSRequest(t, rlsReqCh, false)
   245  		})
   246  	}
   247  }
   248  
   249  // Test verifies the scenario where there is a matching entry in the data cache
   250  // which is valid and there is no pending request. The pick is expected to be
   251  // delegated to the child policy.
   252  func (s) TestPick_DataCacheHit_NoPendingEntry_ValidEntry(t *testing.T) {
   253  	// Start an RLS server and set the throttler to never throttle requests.
   254  	rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil)
   255  	overrideAdaptiveThrottler(t, neverThrottlingThrottler())
   256  
   257  	// Build the RLS config without a default target.
   258  	rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)
   259  
   260  	// Start a test backend, and setup the fake RLS server to return this as a
   261  	// target in the RLS response.
   262  	testBackendCh, testBackendAddress := startBackend(t)
   263  	rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
   264  		return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{testBackendAddress}}}
   265  	})
   266  
   267  	// Register a manual resolver and push the RLS service config through it.
   268  	r := startManualResolverWithConfig(t, rlsConfig)
   269  
   270  	// Dial the backend.
   271  	cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
   272  	if err != nil {
   273  		t.Fatalf("grpc.Dial() failed: %v", err)
   274  	}
   275  	defer cc.Close()
   276  
   277  	// Make an RPC and ensure it gets routed to the test backend.
   278  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   279  	defer cancel()
   280  	makeTestRPCAndExpectItToReachBackend(ctx, t, cc, testBackendCh)
   281  
   282  	// Make sure an RLS request is sent out.
   283  	verifyRLSRequest(t, rlsReqCh, true)
   284  
   285  	// Make another RPC and expect it to find the target in the data cache.
   286  	makeTestRPCAndExpectItToReachBackend(ctx, t, cc, testBackendCh)
   287  
   288  	// Make sure no RLS request is sent out this time around.
   289  	verifyRLSRequest(t, rlsReqCh, false)
   290  }
   291  
   292  // Test verifies the scenario where there is a matching entry in the data cache
   293  // which is valid and there is no pending request. The pick is expected to be
   294  // delegated to the child policy.
   295  func (s) TestPick_DataCacheHit_NoPendingEntry_ValidEntry_WithHeaderData(t *testing.T) {
   296  	// Start an RLS server and set the throttler to never throttle requests.
   297  	rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil)
   298  	overrideAdaptiveThrottler(t, neverThrottlingThrottler())
   299  
   300  	// Build the RLS config without a default target.
   301  	rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)
   302  
   303  	// Start a test backend which expects the header data contents sent from the
   304  	// RLS server to be part of RPC metadata as X-Google-RLS-Data header.
   305  	const headerDataContents = "foo,bar,baz"
   306  	backend := &stubserver.StubServer{
   307  		EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
   308  			gotHeaderData := metadata.ValueFromIncomingContext(ctx, "x-google-rls-data")
   309  			if len(gotHeaderData) != 1 || gotHeaderData[0] != headerDataContents {
   310  				return nil, fmt.Errorf("got metadata in `X-Google-RLS-Data` is %v, want %s", gotHeaderData, headerDataContents)
   311  			}
   312  			return &testpb.Empty{}, nil
   313  		},
   314  	}
   315  	if err := backend.StartServer(); err != nil {
   316  		t.Fatalf("Failed to start backend: %v", err)
   317  	}
   318  	t.Logf("Started TestService backend at: %q", backend.Address)
   319  	defer backend.Stop()
   320  
   321  	// Setup the fake RLS server to return the above backend as a target in the
   322  	// RLS response. Also, populate the header data field in the response.
   323  	rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
   324  		return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{
   325  			Targets:    []string{backend.Address},
   326  			HeaderData: headerDataContents,
   327  		}}
   328  	})
   329  
   330  	// Register a manual resolver and push the RLS service config through it.
   331  	r := startManualResolverWithConfig(t, rlsConfig)
   332  
   333  	// Dial the backend.
   334  	cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
   335  	if err != nil {
   336  		t.Fatalf("grpc.Dial() failed: %v", err)
   337  	}
   338  	defer cc.Close()
   339  
   340  	// Make an RPC and ensure it gets routed to the test backend with the header
   341  	// data sent by the RLS server.
   342  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   343  	defer cancel()
   344  	if _, err := testgrpc.NewTestServiceClient(cc).EmptyCall(ctx, &testpb.Empty{}); err != nil {
   345  		t.Fatalf("EmptyCall() RPC: %v", err)
   346  	}
   347  }
   348  
   349  // Test verifies the scenario where there is a matching entry in the data cache
   350  // which is stale and there is no pending request. The pick is expected to be
   351  // delegated to the child policy with a proactive cache refresh.
   352  func (s) TestPick_DataCacheHit_NoPendingEntry_StaleEntry(t *testing.T) {
   353  	// We expect the same pick behavior (i.e delegated to the child policy) for
   354  	// a proactive refresh whether the control channel is throttled or not.
   355  	tests := []struct {
   356  		name      string
   357  		throttled bool
   358  	}{
   359  		{
   360  			name:      "throttled",
   361  			throttled: true,
   362  		},
   363  		{
   364  			name:      "notThrottled",
   365  			throttled: false,
   366  		},
   367  	}
   368  
   369  	for _, test := range tests {
   370  		t.Run(test.name, func(t *testing.T) {
   371  			// Start an RLS server and setup the throttler appropriately.
   372  			rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil)
   373  			var throttler *fakeThrottler
   374  			firstRPCDone := grpcsync.NewEvent()
   375  			if test.throttled {
   376  				throttler = oneTimeAllowingThrottler(firstRPCDone)
   377  				overrideAdaptiveThrottler(t, throttler)
   378  			} else {
   379  				throttler = neverThrottlingThrottler()
   380  				overrideAdaptiveThrottler(t, throttler)
   381  			}
   382  
   383  			// Build the RLS config without a default target. Set the stale age
   384  			// to a very low value to force entries to become stale quickly.
   385  			rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)
   386  			rlsConfig.RouteLookupConfig.MaxAge = durationpb.New(time.Minute)
   387  			rlsConfig.RouteLookupConfig.StaleAge = durationpb.New(defaultTestShortTimeout)
   388  
   389  			// Start a test backend, and setup the fake RLS server to return
   390  			// this as a target in the RLS response.
   391  			testBackendCh, testBackendAddress := startBackend(t)
   392  			rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
   393  				return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{testBackendAddress}}}
   394  			})
   395  
   396  			// Register a manual resolver and push the RLS service config
   397  			// through it.
   398  			r := startManualResolverWithConfig(t, rlsConfig)
   399  
   400  			// Dial the backend.
   401  			cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
   402  			if err != nil {
   403  				t.Fatalf("grpc.Dial() failed: %v", err)
   404  			}
   405  			defer cc.Close()
   406  
   407  			// Make an RPC and ensure it gets routed to the test backend.
   408  			ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   409  			defer cancel()
   410  			makeTestRPCAndExpectItToReachBackend(ctx, t, cc, testBackendCh)
   411  
   412  			// Make sure an RLS request is sent out.
   413  			verifyRLSRequest(t, rlsReqCh, true)
   414  			firstRPCDone.Fire()
   415  
   416  			// The cache entry has a large maxAge, but a small stateAge. We keep
   417  			// retrying until the cache entry becomes stale, in which case we expect a
   418  			// proactive cache refresh.
   419  			//
   420  			// If the control channel is not throttled, then we expect an RLS request
   421  			// to be sent out. If the control channel is throttled, we expect the fake
   422  			// throttler's channel to be signalled.
   423  			for {
   424  				// Make another RPC and expect it to find the target in the data cache.
   425  				makeTestRPCAndExpectItToReachBackend(ctx, t, cc, testBackendCh)
   426  
   427  				if !test.throttled {
   428  					select {
   429  					case <-time.After(defaultTestShortTimeout):
   430  						// Go back and retry the RPC.
   431  					case <-rlsReqCh:
   432  						return
   433  					}
   434  				} else {
   435  					select {
   436  					case <-time.After(defaultTestShortTimeout):
   437  						// Go back and retry the RPC.
   438  					case <-throttler.throttleCh:
   439  						return
   440  					}
   441  				}
   442  			}
   443  		})
   444  	}
   445  }
   446  
   447  // Test verifies scenarios where there is a matching entry in the data cache
   448  // which has expired and there is no pending request.
   449  func (s) TestPick_DataCacheHit_NoPendingEntry_ExpiredEntry(t *testing.T) {
   450  	tests := []struct {
   451  		name              string
   452  		throttled         bool
   453  		withDefaultTarget bool
   454  	}{
   455  		{
   456  			name:              "throttledWithDefaultTarget",
   457  			throttled:         true,
   458  			withDefaultTarget: true,
   459  		},
   460  		{
   461  			name:              "throttledWithoutDefaultTarget",
   462  			throttled:         true,
   463  			withDefaultTarget: false,
   464  		},
   465  		{
   466  			name:      "notThrottled",
   467  			throttled: false,
   468  		},
   469  	}
   470  
   471  	for _, test := range tests {
   472  		t.Run(test.name, func(t *testing.T) {
   473  			// Start an RLS server and setup the throttler appropriately.
   474  			rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil)
   475  			var throttler *fakeThrottler
   476  			firstRPCDone := grpcsync.NewEvent()
   477  			if test.throttled {
   478  				throttler = oneTimeAllowingThrottler(firstRPCDone)
   479  				overrideAdaptiveThrottler(t, throttler)
   480  			} else {
   481  				throttler = neverThrottlingThrottler()
   482  				overrideAdaptiveThrottler(t, throttler)
   483  			}
   484  
   485  			// Build the RLS config with a very low value for maxAge. This will
   486  			// ensure that cache entries become invalid very soon.
   487  			rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)
   488  			rlsConfig.RouteLookupConfig.MaxAge = durationpb.New(defaultTestShortTimeout)
   489  
   490  			// Start a default backend if needed.
   491  			var defBackendCh chan struct{}
   492  			if test.withDefaultTarget {
   493  				var defBackendAddress string
   494  				defBackendCh, defBackendAddress = startBackend(t)
   495  				rlsConfig.RouteLookupConfig.DefaultTarget = defBackendAddress
   496  			}
   497  
   498  			// Start a test backend, and setup the fake RLS server to return
   499  			// this as a target in the RLS response.
   500  			testBackendCh, testBackendAddress := startBackend(t)
   501  			rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
   502  				return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{testBackendAddress}}}
   503  			})
   504  
   505  			// Register a manual resolver and push the RLS service config
   506  			// through it.
   507  			r := startManualResolverWithConfig(t, rlsConfig)
   508  
   509  			// Dial the backend.
   510  			cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
   511  			if err != nil {
   512  				t.Fatalf("grpc.Dial() failed: %v", err)
   513  			}
   514  			defer cc.Close()
   515  
   516  			// Make an RPC and ensure it gets routed to the test backend.
   517  			ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   518  			defer cancel()
   519  			makeTestRPCAndExpectItToReachBackend(ctx, t, cc, testBackendCh)
   520  
   521  			// Make sure an RLS request is sent out.
   522  			verifyRLSRequest(t, rlsReqCh, true)
   523  			firstRPCDone.Fire()
   524  
   525  			// Keep retrying the RPC until the cache entry expires. Expected behavior
   526  			// is dependent on the scenario being tested.
   527  			switch {
   528  			case test.throttled && test.withDefaultTarget:
   529  				makeTestRPCAndExpectItToReachBackend(ctx, t, cc, defBackendCh)
   530  				<-throttler.throttleCh
   531  			case test.throttled && !test.withDefaultTarget:
   532  				makeTestRPCAndVerifyError(ctx, t, cc, codes.Unavailable, errRLSThrottled)
   533  				<-throttler.throttleCh
   534  			case !test.throttled:
   535  				for {
   536  					// The backend to which the RPC is routed does not change after the
   537  					// cache entry expires because the control channel is not throttled.
   538  					// So, we need to keep retrying until the cache entry expires, at
   539  					// which point we expect an RLS request to be sent out and the RPC to
   540  					// get routed to the same testBackend.
   541  					makeTestRPCAndExpectItToReachBackend(ctx, t, cc, testBackendCh)
   542  					select {
   543  					case <-time.After(defaultTestShortTimeout):
   544  						// Go back and retry the RPC.
   545  					case <-rlsReqCh:
   546  						return
   547  					}
   548  				}
   549  			}
   550  		})
   551  	}
   552  }
   553  
   554  // Test verifies scenarios where there is a matching entry in the data cache
   555  // which has expired and is in backoff and there is no pending request.
   556  func (s) TestPick_DataCacheHit_NoPendingEntry_ExpiredEntryInBackoff(t *testing.T) {
   557  	tests := []struct {
   558  		name              string
   559  		withDefaultTarget bool
   560  	}{
   561  		{
   562  			name:              "withDefaultTarget",
   563  			withDefaultTarget: true,
   564  		},
   565  		{
   566  			name:              "withoutDefaultTarget",
   567  			withDefaultTarget: false,
   568  		},
   569  	}
   570  
   571  	for _, test := range tests {
   572  		t.Run(test.name, func(t *testing.T) {
   573  			// Start an RLS server and set the throttler to never throttle requests.
   574  			rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil)
   575  			overrideAdaptiveThrottler(t, neverThrottlingThrottler())
   576  
   577  			// Override the backoff strategy to return a large backoff which
   578  			// will make sure the date cache entry remains in backoff for the
   579  			// duration of the test.
   580  			origBackoffStrategy := defaultBackoffStrategy
   581  			defaultBackoffStrategy = &fakeBackoffStrategy{backoff: defaultTestTimeout}
   582  			defer func() { defaultBackoffStrategy = origBackoffStrategy }()
   583  
   584  			// Build the RLS config with a very low value for maxAge. This will
   585  			// ensure that cache entries become invalid very soon.
   586  			rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)
   587  			rlsConfig.RouteLookupConfig.MaxAge = durationpb.New(defaultTestShortTimeout)
   588  
   589  			// Start a default backend if needed.
   590  			var defBackendCh chan struct{}
   591  			if test.withDefaultTarget {
   592  				var defBackendAddress string
   593  				defBackendCh, defBackendAddress = startBackend(t)
   594  				rlsConfig.RouteLookupConfig.DefaultTarget = defBackendAddress
   595  			}
   596  
   597  			// Start a test backend, and set up the fake RLS server to return this as
   598  			// a target in the RLS response.
   599  			testBackendCh, testBackendAddress := startBackend(t)
   600  			rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
   601  				return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{testBackendAddress}}}
   602  			})
   603  
   604  			// Register a manual resolver and push the RLS service config through it.
   605  			r := startManualResolverWithConfig(t, rlsConfig)
   606  
   607  			// Dial the backend.
   608  			cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
   609  			if err != nil {
   610  				t.Fatalf("grpc.Dial() failed: %v", err)
   611  			}
   612  			defer cc.Close()
   613  
   614  			// Make an RPC and ensure it gets routed to the test backend.
   615  			ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   616  			defer cancel()
   617  			makeTestRPCAndExpectItToReachBackend(ctx, t, cc, testBackendCh)
   618  
   619  			// Make sure an RLS request is sent out.
   620  			verifyRLSRequest(t, rlsReqCh, true)
   621  
   622  			// Set up the fake RLS server to return errors. This will push the cache
   623  			// entry into backoff.
   624  			var rlsLastErr = status.Error(codes.DeadlineExceeded, "last RLS request failed")
   625  			rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
   626  				return &rlstest.RouteLookupResponse{Err: rlsLastErr}
   627  			})
   628  
   629  			// Since the RLS server is now configured to return errors, this will push
   630  			// the cache entry into backoff. The pick will be delegated to the default
   631  			// backend if one exits, and will fail with the error returned by the RLS
   632  			// server otherwise.
   633  			if test.withDefaultTarget {
   634  				makeTestRPCAndExpectItToReachBackend(ctx, t, cc, defBackendCh)
   635  			} else {
   636  				makeTestRPCAndVerifyError(ctx, t, cc, codes.Unavailable, rlsLastErr)
   637  			}
   638  		})
   639  	}
   640  }
   641  
   642  // Test verifies scenarios where there is a matching entry in the data cache
   643  // which is stale and there is a pending request.
   644  func (s) TestPick_DataCacheHit_PendingEntryExists_StaleEntry(t *testing.T) {
   645  	tests := []struct {
   646  		name              string
   647  		withDefaultTarget bool
   648  	}{
   649  		{
   650  			name:              "withDefaultTarget",
   651  			withDefaultTarget: true,
   652  		},
   653  		{
   654  			name:              "withoutDefaultTarget",
   655  			withDefaultTarget: false,
   656  		},
   657  	}
   658  
   659  	for _, test := range tests {
   660  		t.Run(test.name, func(t *testing.T) {
   661  			// A unary interceptor which simply calls the underlying handler
   662  			// until the first client RPC is done. We want one client RPC to
   663  			// succeed to ensure that a data cache entry is created. For
   664  			// subsequent client RPCs which result in RLS requests, this
   665  			// interceptor blocks until the test's context expires. And since we
   666  			// configure the RLS LB policy with a really low value for max age,
   667  			// this allows us to simulate the condition where the it has an
   668  			// expired entry and a pending entry in the cache.
   669  			rlsReqCh := make(chan struct{}, 1)
   670  			firstRPCDone := grpcsync.NewEvent()
   671  			interceptor := func(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
   672  				select {
   673  				case rlsReqCh <- struct{}{}:
   674  				default:
   675  				}
   676  				if firstRPCDone.HasFired() {
   677  					<-ctx.Done()
   678  					return nil, ctx.Err()
   679  				}
   680  				return handler(ctx, req)
   681  			}
   682  
   683  			// Start an RLS server and set the throttler to never throttle.
   684  			rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil, grpc.UnaryInterceptor(interceptor))
   685  			overrideAdaptiveThrottler(t, neverThrottlingThrottler())
   686  
   687  			// Build RLS service config with an optional default target.
   688  			rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)
   689  			if test.withDefaultTarget {
   690  				_, defBackendAddress := startBackend(t)
   691  				rlsConfig.RouteLookupConfig.DefaultTarget = defBackendAddress
   692  			}
   693  
   694  			// Low value for stale age to force entries to become stale quickly.
   695  			rlsConfig.RouteLookupConfig.MaxAge = durationpb.New(time.Minute)
   696  			rlsConfig.RouteLookupConfig.StaleAge = durationpb.New(defaultTestShortTimeout)
   697  
   698  			// Start a test backend, and setup the fake RLS server to return
   699  			// this as a target in the RLS response.
   700  			testBackendCh, testBackendAddress := startBackend(t)
   701  			rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
   702  				return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{testBackendAddress}}}
   703  			})
   704  
   705  			// Register a manual resolver and push the RLS service config
   706  			// through it.
   707  			r := startManualResolverWithConfig(t, rlsConfig)
   708  
   709  			// Dial the backend.
   710  			cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
   711  			if err != nil {
   712  				t.Fatalf("grpc.Dial() failed: %v", err)
   713  			}
   714  			defer cc.Close()
   715  
   716  			// Make an RPC and ensure it gets routed to the test backend.
   717  			ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   718  			defer cancel()
   719  			makeTestRPCAndExpectItToReachBackend(ctx, t, cc, testBackendCh)
   720  
   721  			// Make sure an RLS request is sent out.
   722  			verifyRLSRequest(t, rlsReqCh, true)
   723  			firstRPCDone.Fire()
   724  
   725  			// The cache entry has a large maxAge, but a small stateAge. We keep
   726  			// retrying until the cache entry becomes stale, in which case we expect a
   727  			// proactive cache refresh.
   728  			for {
   729  				makeTestRPCAndExpectItToReachBackend(ctx, t, cc, testBackendCh)
   730  
   731  				select {
   732  				case <-time.After(defaultTestShortTimeout):
   733  					// Go back and retry the RPC.
   734  				case <-rlsReqCh:
   735  					return
   736  				}
   737  			}
   738  		})
   739  	}
   740  }
   741  
   742  // Test verifies scenarios where there is a matching entry in the data cache
   743  // which is expired and there is a pending request.
   744  func (s) TestPick_DataCacheHit_PendingEntryExists_ExpiredEntry(t *testing.T) {
   745  	tests := []struct {
   746  		name              string
   747  		withDefaultTarget bool
   748  	}{
   749  		{
   750  			name:              "withDefaultTarget",
   751  			withDefaultTarget: true,
   752  		},
   753  		{
   754  			name:              "withoutDefaultTarget",
   755  			withDefaultTarget: false,
   756  		},
   757  	}
   758  
   759  	for _, test := range tests {
   760  		t.Run(test.name, func(t *testing.T) {
   761  			// A unary interceptor which simply calls the underlying handler
   762  			// until the first client RPC is done. We want one client RPC to
   763  			// succeed to ensure that a data cache entry is created. For
   764  			// subsequent client RPCs which result in RLS requests, this
   765  			// interceptor blocks until the test's context expires. And since we
   766  			// configure the RLS LB policy with a really low value for max age,
   767  			// this allows us to simulate the condition where the it has an
   768  			// expired entry and a pending entry in the cache.
   769  			rlsReqCh := make(chan struct{}, 1)
   770  			firstRPCDone := grpcsync.NewEvent()
   771  			interceptor := func(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
   772  				select {
   773  				case rlsReqCh <- struct{}{}:
   774  				default:
   775  				}
   776  				if firstRPCDone.HasFired() {
   777  					<-ctx.Done()
   778  					return nil, ctx.Err()
   779  				}
   780  				return handler(ctx, req)
   781  			}
   782  
   783  			// Start an RLS server and set the throttler to never throttle.
   784  			rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil, grpc.UnaryInterceptor(interceptor))
   785  			overrideAdaptiveThrottler(t, neverThrottlingThrottler())
   786  
   787  			// Build RLS service config with an optional default target.
   788  			rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)
   789  			if test.withDefaultTarget {
   790  				_, defBackendAddress := startBackend(t)
   791  				rlsConfig.RouteLookupConfig.DefaultTarget = defBackendAddress
   792  			}
   793  			// Set a low value for maxAge to ensure cache entries expire soon.
   794  			rlsConfig.RouteLookupConfig.MaxAge = durationpb.New(defaultTestShortTimeout)
   795  
   796  			// Start a test backend, and setup the fake RLS server to return
   797  			// this as a target in the RLS response.
   798  			testBackendCh, testBackendAddress := startBackend(t)
   799  			rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
   800  				return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{testBackendAddress}}}
   801  			})
   802  
   803  			// Register a manual resolver and push the RLS service config
   804  			// through it.
   805  			r := startManualResolverWithConfig(t, rlsConfig)
   806  
   807  			// Dial the backend.
   808  			cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
   809  			if err != nil {
   810  				t.Fatalf("grpc.Dial() failed: %v", err)
   811  			}
   812  			defer cc.Close()
   813  
   814  			// Make an RPC and ensure it gets routed to the test backend.
   815  			ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   816  			defer cancel()
   817  			makeTestRPCAndExpectItToReachBackend(ctx, t, cc, testBackendCh)
   818  
   819  			// Make sure an RLS request is sent out.
   820  			verifyRLSRequest(t, rlsReqCh, true)
   821  			firstRPCDone.Fire()
   822  
   823  			// At this point, we have a cache entry with a small maxAge, and the
   824  			// RLS server is configured to block on further RLS requests. As we
   825  			// retry the RPC, at some point the cache entry would expire and
   826  			// force us to send an RLS request which would block on the server,
   827  			// giving us a pending cache entry for the duration of the test.
   828  			go func() {
   829  				for client := testgrpc.NewTestServiceClient(cc); ctx.Err() == nil; <-time.After(defaultTestShortTimeout) {
   830  					client.EmptyCall(ctx, &testpb.Empty{})
   831  				}
   832  			}()
   833  			verifyRLSRequest(t, rlsReqCh, true)
   834  
   835  			// Another RPC at this point should find the pending entry and be queued.
   836  			// But since we pass a small deadline, this RPC should fail with a
   837  			// deadline exceeded error since the pending request does not return until
   838  			// the test is done. And since we have a pending entry, we expect no RLS
   839  			// request to be sent out.
   840  			sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
   841  			defer sCancel()
   842  			makeTestRPCAndVerifyError(sCtx, t, cc, codes.DeadlineExceeded, context.DeadlineExceeded)
   843  			verifyRLSRequest(t, rlsReqCh, false)
   844  		})
   845  	}
   846  }
   847  
   848  func TestIsFullMethodNameValid(t *testing.T) {
   849  	tests := []struct {
   850  		desc       string
   851  		methodName string
   852  		want       bool
   853  	}{
   854  		{
   855  			desc:       "does not start with a slash",
   856  			methodName: "service/method",
   857  			want:       false,
   858  		},
   859  		{
   860  			desc:       "does not contain a method",
   861  			methodName: "/service",
   862  			want:       false,
   863  		},
   864  		{
   865  			desc:       "path has more elements",
   866  			methodName: "/service/path/to/method",
   867  			want:       false,
   868  		},
   869  		{
   870  			desc:       "valid",
   871  			methodName: "/service/method",
   872  			want:       true,
   873  		},
   874  	}
   875  
   876  	for _, test := range tests {
   877  		t.Run(test.desc, func(t *testing.T) {
   878  			if got := isFullMethodNameValid(test.methodName); got != test.want {
   879  				t.Fatalf("isFullMethodNameValid(%q) = %v, want %v", test.methodName, got, test.want)
   880  			}
   881  		})
   882  	}
   883  }
   884  

View as plain text