1
18
19 package rls
20
21 import (
22 "context"
23 "strings"
24 "testing"
25 "time"
26
27 "google.golang.org/grpc"
28 "google.golang.org/grpc/balancer/rls/internal/test/e2e"
29 "google.golang.org/grpc/codes"
30 "google.golang.org/grpc/internal"
31 "google.golang.org/grpc/internal/grpcsync"
32 "google.golang.org/grpc/internal/grpctest"
33 rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
34 internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
35 "google.golang.org/grpc/internal/stubserver"
36 testgrpc "google.golang.org/grpc/interop/grpc_testing"
37 testpb "google.golang.org/grpc/interop/grpc_testing"
38 "google.golang.org/grpc/resolver"
39 "google.golang.org/grpc/resolver/manual"
40 "google.golang.org/grpc/serviceconfig"
41 "google.golang.org/grpc/status"
42 "google.golang.org/protobuf/types/known/durationpb"
43 )
44
45 const (
46 defaultTestTimeout = 5 * time.Second
47 defaultTestShortTimeout = 100 * time.Millisecond
48 )
49
50 type s struct {
51 grpctest.Tester
52 }
53
54 func Test(t *testing.T) {
55 grpctest.RunSubTests(t, s{})
56 }
57
58
59
60 type fakeBackoffStrategy struct {
61 backoff time.Duration
62 }
63
64 func (f *fakeBackoffStrategy) Backoff(retries int) time.Duration {
65 return f.backoff
66 }
67
68
69 type fakeThrottler struct {
70 throttleFunc func() bool
71 throttleCh chan struct{}
72 }
73
74 func (f *fakeThrottler) ShouldThrottle() bool {
75 select {
76 case <-f.throttleCh:
77 default:
78 }
79 f.throttleCh <- struct{}{}
80
81 return f.throttleFunc()
82 }
83
84 func (f *fakeThrottler) RegisterBackendResponse(bool) {}
85
86
87 func alwaysThrottlingThrottler() *fakeThrottler {
88 return &fakeThrottler{
89 throttleFunc: func() bool { return true },
90 throttleCh: make(chan struct{}, 1),
91 }
92 }
93
94
95 func neverThrottlingThrottler() *fakeThrottler {
96 return &fakeThrottler{
97 throttleFunc: func() bool { return false },
98 throttleCh: make(chan struct{}, 1),
99 }
100 }
101
102
103
104
105
106 func oneTimeAllowingThrottler(firstRPCDone *grpcsync.Event) *fakeThrottler {
107 return &fakeThrottler{
108 throttleFunc: firstRPCDone.HasFired,
109 throttleCh: make(chan struct{}, 1),
110 }
111 }
112
113 func overrideAdaptiveThrottler(t *testing.T, f *fakeThrottler) {
114 origAdaptiveThrottler := newAdaptiveThrottler
115 newAdaptiveThrottler = func() adaptiveThrottler { return f }
116 t.Cleanup(func() { newAdaptiveThrottler = origAdaptiveThrottler })
117 }
118
119
120
121
122 func buildBasicRLSConfig(childPolicyName, rlsServerAddress string) *e2e.RLSConfig {
123 return &e2e.RLSConfig{
124 RouteLookupConfig: &rlspb.RouteLookupConfig{
125 GrpcKeybuilders: []*rlspb.GrpcKeyBuilder{
126 {
127 Names: []*rlspb.GrpcKeyBuilder_Name{{Service: "grpc.testing.TestService"}},
128 Headers: []*rlspb.NameMatcher{
129 {Key: "k1", Names: []string{"n1"}},
130 {Key: "k2", Names: []string{"n2"}},
131 },
132 },
133 },
134 LookupService: rlsServerAddress,
135 LookupServiceTimeout: durationpb.New(defaultTestTimeout),
136 CacheSizeBytes: 1024,
137 },
138 RouteLookupChannelServiceConfig: `{"loadBalancingConfig": [{"pick_first": {}}]}`,
139 ChildPolicy: &internalserviceconfig.BalancerConfig{Name: childPolicyName},
140 ChildPolicyConfigTargetFieldName: e2e.RLSChildPolicyTargetNameField,
141 }
142 }
143
144
145
146
147 func buildBasicRLSConfigWithChildPolicy(t *testing.T, childPolicyName, rlsServerAddress string) *e2e.RLSConfig {
148 childPolicyName = "test-child-policy" + childPolicyName
149 e2e.RegisterRLSChildPolicy(childPolicyName, nil)
150 t.Logf("Registered child policy with name %q", childPolicyName)
151
152 return &e2e.RLSConfig{
153 RouteLookupConfig: &rlspb.RouteLookupConfig{
154 GrpcKeybuilders: []*rlspb.GrpcKeyBuilder{{Names: []*rlspb.GrpcKeyBuilder_Name{{Service: "grpc.testing.TestService"}}}},
155 LookupService: rlsServerAddress,
156 LookupServiceTimeout: durationpb.New(defaultTestTimeout),
157 CacheSizeBytes: 1024,
158 },
159 RouteLookupChannelServiceConfig: `{"loadBalancingConfig": [{"pick_first": {}}]}`,
160 ChildPolicy: &internalserviceconfig.BalancerConfig{Name: childPolicyName},
161 ChildPolicyConfigTargetFieldName: e2e.RLSChildPolicyTargetNameField,
162 }
163 }
164
165
166
167
168
169 func startBackend(t *testing.T, sopts ...grpc.ServerOption) (rpcCh chan struct{}, address string) {
170 t.Helper()
171
172 rpcCh = make(chan struct{}, 1)
173 backend := &stubserver.StubServer{
174 EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
175 select {
176 case rpcCh <- struct{}{}:
177 default:
178 }
179 return &testpb.Empty{}, nil
180 },
181 }
182 if err := backend.StartServer(sopts...); err != nil {
183 t.Fatalf("Failed to start backend: %v", err)
184 }
185 t.Logf("Started TestService backend at: %q", backend.Address)
186 t.Cleanup(func() { backend.Stop() })
187 return rpcCh, backend.Address
188 }
189
190
191
192 func startManualResolverWithConfig(t *testing.T, rlsConfig *e2e.RLSConfig) *manual.Resolver {
193 t.Helper()
194
195 scJSON, err := rlsConfig.ServiceConfigJSON()
196 if err != nil {
197 t.Fatal(err)
198 }
199
200 sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(scJSON)
201 r := manual.NewBuilderWithScheme("rls-e2e")
202 r.InitialState(resolver.State{ServiceConfig: sc})
203 t.Cleanup(r.Close)
204 return r
205 }
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223 func makeTestRPCAndExpectItToReachBackend(ctx context.Context, t *testing.T, cc *grpc.ClientConn, ch chan struct{}) {
224 t.Helper()
225
226
227
228 select {
229 case <-ch:
230 default:
231 }
232
233 for {
234 if err := ctx.Err(); err != nil {
235 t.Fatalf("Timeout when waiting for RPCs to be routed to the given target: %v", err)
236 }
237 sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
238 client := testgrpc.NewTestServiceClient(cc)
239 client.EmptyCall(sCtx, &testpb.Empty{})
240
241 select {
242 case <-sCtx.Done():
243 case <-ch:
244 sCancel()
245 return
246 }
247 }
248 }
249
250
251
252
253
254
255
256 func makeTestRPCAndVerifyError(ctx context.Context, t *testing.T, cc *grpc.ClientConn, wantCode codes.Code, wantErr error) {
257 t.Helper()
258
259 for {
260 if err := ctx.Err(); err != nil {
261 t.Fatalf("Timeout when waiting for RPCs to fail with given error: %v", err)
262 }
263 sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
264 client := testgrpc.NewTestServiceClient(cc)
265 _, err := client.EmptyCall(sCtx, &testpb.Empty{})
266
267
268
269
270 if code := status.Code(err); code == wantCode {
271 if wantErr == nil || strings.Contains(err.Error(), wantErr.Error()) {
272 sCancel()
273 return
274 }
275 }
276 <-sCtx.Done()
277 }
278 }
279
280
281
282
283 func verifyRLSRequest(t *testing.T, ch chan struct{}, wantRequest bool) {
284 t.Helper()
285
286 if wantRequest {
287 select {
288 case <-time.After(defaultTestTimeout):
289 t.Fatalf("Timeout when waiting for an RLS request to be sent out")
290 case <-ch:
291 }
292 } else {
293 select {
294 case <-time.After(defaultTestShortTimeout):
295 case <-ch:
296 t.Fatalf("RLS request sent out when not expecting one")
297 }
298 }
299 }
300
View as plain text