...

Source file src/google.golang.org/grpc/balancer/rls/helpers_test.go

Documentation: google.golang.org/grpc/balancer/rls

     1  /*
     2   *
     3   * Copyright 2021 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package rls
    20  
    21  import (
    22  	"context"
    23  	"strings"
    24  	"testing"
    25  	"time"
    26  
    27  	"google.golang.org/grpc"
    28  	"google.golang.org/grpc/balancer/rls/internal/test/e2e"
    29  	"google.golang.org/grpc/codes"
    30  	"google.golang.org/grpc/internal"
    31  	"google.golang.org/grpc/internal/grpcsync"
    32  	"google.golang.org/grpc/internal/grpctest"
    33  	rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
    34  	internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
    35  	"google.golang.org/grpc/internal/stubserver"
    36  	testgrpc "google.golang.org/grpc/interop/grpc_testing"
    37  	testpb "google.golang.org/grpc/interop/grpc_testing"
    38  	"google.golang.org/grpc/resolver"
    39  	"google.golang.org/grpc/resolver/manual"
    40  	"google.golang.org/grpc/serviceconfig"
    41  	"google.golang.org/grpc/status"
    42  	"google.golang.org/protobuf/types/known/durationpb"
    43  )
    44  
    45  const (
    46  	defaultTestTimeout      = 5 * time.Second
    47  	defaultTestShortTimeout = 100 * time.Millisecond
    48  )
    49  
    50  type s struct {
    51  	grpctest.Tester
    52  }
    53  
    54  func Test(t *testing.T) {
    55  	grpctest.RunSubTests(t, s{})
    56  }
    57  
    58  // fakeBackoffStrategy is a fake implementation of the backoff.Strategy
    59  // interface, for tests to inject the backoff duration.
    60  type fakeBackoffStrategy struct {
    61  	backoff time.Duration
    62  }
    63  
    64  func (f *fakeBackoffStrategy) Backoff(retries int) time.Duration {
    65  	return f.backoff
    66  }
    67  
    68  // fakeThrottler is a fake implementation of the adaptiveThrottler interface.
    69  type fakeThrottler struct {
    70  	throttleFunc func() bool   // Fake throttler implementation.
    71  	throttleCh   chan struct{} // Invocation of ShouldThrottle signals here.
    72  }
    73  
    74  func (f *fakeThrottler) ShouldThrottle() bool {
    75  	select {
    76  	case <-f.throttleCh:
    77  	default:
    78  	}
    79  	f.throttleCh <- struct{}{}
    80  
    81  	return f.throttleFunc()
    82  }
    83  
    84  func (f *fakeThrottler) RegisterBackendResponse(bool) {}
    85  
    86  // alwaysThrottlingThrottler returns a fake throttler which always throttles.
    87  func alwaysThrottlingThrottler() *fakeThrottler {
    88  	return &fakeThrottler{
    89  		throttleFunc: func() bool { return true },
    90  		throttleCh:   make(chan struct{}, 1),
    91  	}
    92  }
    93  
    94  // neverThrottlingThrottler returns a fake throttler which never throttles.
    95  func neverThrottlingThrottler() *fakeThrottler {
    96  	return &fakeThrottler{
    97  		throttleFunc: func() bool { return false },
    98  		throttleCh:   make(chan struct{}, 1),
    99  	}
   100  }
   101  
   102  // oneTimeAllowingThrottler returns a fake throttler which does not throttle
   103  // requests until the client RPC succeeds, but throttles everything that comes
   104  // after. This is useful for tests which need to set up a valid cache entry
   105  // before testing other cases.
   106  func oneTimeAllowingThrottler(firstRPCDone *grpcsync.Event) *fakeThrottler {
   107  	return &fakeThrottler{
   108  		throttleFunc: firstRPCDone.HasFired,
   109  		throttleCh:   make(chan struct{}, 1),
   110  	}
   111  }
   112  
   113  func overrideAdaptiveThrottler(t *testing.T, f *fakeThrottler) {
   114  	origAdaptiveThrottler := newAdaptiveThrottler
   115  	newAdaptiveThrottler = func() adaptiveThrottler { return f }
   116  	t.Cleanup(func() { newAdaptiveThrottler = origAdaptiveThrottler })
   117  }
   118  
   119  // buildBasicRLSConfig constructs a basic service config for the RLS LB policy
   120  // with header matching rules. This expects the passed child policy name to
   121  // have been registered by the caller.
   122  func buildBasicRLSConfig(childPolicyName, rlsServerAddress string) *e2e.RLSConfig {
   123  	return &e2e.RLSConfig{
   124  		RouteLookupConfig: &rlspb.RouteLookupConfig{
   125  			GrpcKeybuilders: []*rlspb.GrpcKeyBuilder{
   126  				{
   127  					Names: []*rlspb.GrpcKeyBuilder_Name{{Service: "grpc.testing.TestService"}},
   128  					Headers: []*rlspb.NameMatcher{
   129  						{Key: "k1", Names: []string{"n1"}},
   130  						{Key: "k2", Names: []string{"n2"}},
   131  					},
   132  				},
   133  			},
   134  			LookupService:        rlsServerAddress,
   135  			LookupServiceTimeout: durationpb.New(defaultTestTimeout),
   136  			CacheSizeBytes:       1024,
   137  		},
   138  		RouteLookupChannelServiceConfig:  `{"loadBalancingConfig": [{"pick_first": {}}]}`,
   139  		ChildPolicy:                      &internalserviceconfig.BalancerConfig{Name: childPolicyName},
   140  		ChildPolicyConfigTargetFieldName: e2e.RLSChildPolicyTargetNameField,
   141  	}
   142  }
   143  
   144  // buildBasicRLSConfigWithChildPolicy constructs a very basic service config for
   145  // the RLS LB policy. It also registers a test LB policy which is capable of
   146  // being a child of the RLS LB policy.
   147  func buildBasicRLSConfigWithChildPolicy(t *testing.T, childPolicyName, rlsServerAddress string) *e2e.RLSConfig {
   148  	childPolicyName = "test-child-policy" + childPolicyName
   149  	e2e.RegisterRLSChildPolicy(childPolicyName, nil)
   150  	t.Logf("Registered child policy with name %q", childPolicyName)
   151  
   152  	return &e2e.RLSConfig{
   153  		RouteLookupConfig: &rlspb.RouteLookupConfig{
   154  			GrpcKeybuilders:      []*rlspb.GrpcKeyBuilder{{Names: []*rlspb.GrpcKeyBuilder_Name{{Service: "grpc.testing.TestService"}}}},
   155  			LookupService:        rlsServerAddress,
   156  			LookupServiceTimeout: durationpb.New(defaultTestTimeout),
   157  			CacheSizeBytes:       1024,
   158  		},
   159  		RouteLookupChannelServiceConfig:  `{"loadBalancingConfig": [{"pick_first": {}}]}`,
   160  		ChildPolicy:                      &internalserviceconfig.BalancerConfig{Name: childPolicyName},
   161  		ChildPolicyConfigTargetFieldName: e2e.RLSChildPolicyTargetNameField,
   162  	}
   163  }
   164  
   165  // startBackend starts a backend implementing the TestService on a local port.
   166  // It returns a channel for tests to get notified whenever an RPC is invoked on
   167  // the backend. This allows tests to ensure that RPCs reach expected backends.
   168  // Also returns the address of the backend.
   169  func startBackend(t *testing.T, sopts ...grpc.ServerOption) (rpcCh chan struct{}, address string) {
   170  	t.Helper()
   171  
   172  	rpcCh = make(chan struct{}, 1)
   173  	backend := &stubserver.StubServer{
   174  		EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
   175  			select {
   176  			case rpcCh <- struct{}{}:
   177  			default:
   178  			}
   179  			return &testpb.Empty{}, nil
   180  		},
   181  	}
   182  	if err := backend.StartServer(sopts...); err != nil {
   183  		t.Fatalf("Failed to start backend: %v", err)
   184  	}
   185  	t.Logf("Started TestService backend at: %q", backend.Address)
   186  	t.Cleanup(func() { backend.Stop() })
   187  	return rpcCh, backend.Address
   188  }
   189  
   190  // startManualResolverWithConfig registers and returns a manual resolver which
   191  // pushes the RLS LB policy's service config on the channel.
   192  func startManualResolverWithConfig(t *testing.T, rlsConfig *e2e.RLSConfig) *manual.Resolver {
   193  	t.Helper()
   194  
   195  	scJSON, err := rlsConfig.ServiceConfigJSON()
   196  	if err != nil {
   197  		t.Fatal(err)
   198  	}
   199  
   200  	sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(scJSON)
   201  	r := manual.NewBuilderWithScheme("rls-e2e")
   202  	r.InitialState(resolver.State{ServiceConfig: sc})
   203  	t.Cleanup(r.Close)
   204  	return r
   205  }
   206  
   207  // makeTestRPCAndExpectItToReachBackend is a test helper function which makes
   208  // the EmptyCall RPC on the given ClientConn and verifies that it reaches a
   209  // backend. The latter is accomplished by listening on the provided channel
   210  // which gets pushed to whenever the backend in question gets an RPC.
   211  //
   212  // There are many instances where it can take a while before the attempted RPC
   213  // reaches the expected backend. Examples include, but are not limited to:
   214  //   - control channel is changed in a config update. The RLS LB policy creates a
   215  //     new control channel, and sends a new picker to gRPC. But it takes a while
   216  //     before gRPC actually starts using the new picker.
   217  //   - test is waiting for a cache entry to expire after which we expect a
   218  //     different behavior because we have configured the fake RLS server to return
   219  //     different backends.
   220  //
   221  // Therefore, we do not return an error when the RPC fails. Instead, we wait for
   222  // the context to expire before failing.
   223  func makeTestRPCAndExpectItToReachBackend(ctx context.Context, t *testing.T, cc *grpc.ClientConn, ch chan struct{}) {
   224  	t.Helper()
   225  
   226  	// Drain the backend channel before performing the RPC to remove any
   227  	// notifications from previous RPCs.
   228  	select {
   229  	case <-ch:
   230  	default:
   231  	}
   232  
   233  	for {
   234  		if err := ctx.Err(); err != nil {
   235  			t.Fatalf("Timeout when waiting for RPCs to be routed to the given target: %v", err)
   236  		}
   237  		sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
   238  		client := testgrpc.NewTestServiceClient(cc)
   239  		client.EmptyCall(sCtx, &testpb.Empty{})
   240  
   241  		select {
   242  		case <-sCtx.Done():
   243  		case <-ch:
   244  			sCancel()
   245  			return
   246  		}
   247  	}
   248  }
   249  
   250  // makeTestRPCAndVerifyError is a test helper function which makes the EmptyCall
   251  // RPC on the given ClientConn and verifies that the RPC fails with the given
   252  // status code and error.
   253  //
   254  // Similar to makeTestRPCAndExpectItToReachBackend, retries until expected
   255  // outcome is reached or the provided context has expired.
   256  func makeTestRPCAndVerifyError(ctx context.Context, t *testing.T, cc *grpc.ClientConn, wantCode codes.Code, wantErr error) {
   257  	t.Helper()
   258  
   259  	for {
   260  		if err := ctx.Err(); err != nil {
   261  			t.Fatalf("Timeout when waiting for RPCs to fail with given error: %v", err)
   262  		}
   263  		sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
   264  		client := testgrpc.NewTestServiceClient(cc)
   265  		_, err := client.EmptyCall(sCtx, &testpb.Empty{})
   266  
   267  		// If the RPC fails with the expected code and expected error message (if
   268  		// one was provided), we return. Else we retry after blocking for a little
   269  		// while to ensure that we don't keep blasting away with RPCs.
   270  		if code := status.Code(err); code == wantCode {
   271  			if wantErr == nil || strings.Contains(err.Error(), wantErr.Error()) {
   272  				sCancel()
   273  				return
   274  			}
   275  		}
   276  		<-sCtx.Done()
   277  	}
   278  }
   279  
   280  // verifyRLSRequest is a test helper which listens on a channel to see if an RLS
   281  // request was received by the fake RLS server. Based on whether the test
   282  // expects a request to be sent out or not, it uses a different timeout.
   283  func verifyRLSRequest(t *testing.T, ch chan struct{}, wantRequest bool) {
   284  	t.Helper()
   285  
   286  	if wantRequest {
   287  		select {
   288  		case <-time.After(defaultTestTimeout):
   289  			t.Fatalf("Timeout when waiting for an RLS request to be sent out")
   290  		case <-ch:
   291  		}
   292  	} else {
   293  		select {
   294  		case <-time.After(defaultTestShortTimeout):
   295  		case <-ch:
   296  			t.Fatalf("RLS request sent out when not expecting one")
   297  		}
   298  	}
   299  }
   300  

View as plain text