1
18
19 package rls
20
21 import (
22 "context"
23 "errors"
24 "fmt"
25 "testing"
26 "time"
27
28 "google.golang.org/grpc"
29 "google.golang.org/grpc/codes"
30 "google.golang.org/grpc/credentials/insecure"
31 "google.golang.org/grpc/internal/grpcsync"
32 "google.golang.org/grpc/internal/stubserver"
33 rlstest "google.golang.org/grpc/internal/testutils/rls"
34 "google.golang.org/grpc/metadata"
35 "google.golang.org/grpc/status"
36 "google.golang.org/protobuf/types/known/durationpb"
37
38 rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
39 testgrpc "google.golang.org/grpc/interop/grpc_testing"
40 testpb "google.golang.org/grpc/interop/grpc_testing"
41 )
42
43
44
45
46
47
48 func (s) TestNoNonEmptyTargetsReturnsError(t *testing.T) {
49
50 rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil)
51 rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
52 return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{}}
53 })
54
55
56 rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)
57 r := startManualResolverWithConfig(t, rlsConfig)
58
59
60 cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
61 if err != nil {
62 t.Fatalf("grpc.Dial() failed: %v", err)
63 }
64 defer cc.Close()
65
66
67
68 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
69 defer cancel()
70 makeTestRPCAndVerifyError(ctx, t, cc, codes.Unavailable, errors.New("RLS response's target list does not contain any entries for key"))
71
72
73
74 verifyRLSRequest(t, rlsReqCh, true)
75 }
76
77
78
79 func (s) TestPick_DataCacheMiss_NoPendingEntry_ThrottledWithDefaultTarget(t *testing.T) {
80
81 rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil)
82 overrideAdaptiveThrottler(t, alwaysThrottlingThrottler())
83
84
85 rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)
86 defBackendCh, defBackendAddress := startBackend(t)
87 rlsConfig.RouteLookupConfig.DefaultTarget = defBackendAddress
88
89
90 r := startManualResolverWithConfig(t, rlsConfig)
91
92 cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
93 if err != nil {
94 t.Fatalf("grpc.Dial() failed: %v", err)
95 }
96 defer cc.Close()
97
98
99 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
100 defer cancel()
101 makeTestRPCAndExpectItToReachBackend(ctx, t, cc, defBackendCh)
102
103
104 verifyRLSRequest(t, rlsReqCh, false)
105 }
106
107
108
109
110
111 func (s) TestPick_DataCacheMiss_NoPendingEntry_ThrottledWithoutDefaultTarget(t *testing.T) {
112
113 rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil)
114 overrideAdaptiveThrottler(t, alwaysThrottlingThrottler())
115
116
117 rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)
118
119
120 r := startManualResolverWithConfig(t, rlsConfig)
121
122
123 cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
124 if err != nil {
125 t.Fatalf("grpc.Dial() failed: %v", err)
126 }
127 defer cc.Close()
128
129
130 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
131 defer cancel()
132 makeTestRPCAndVerifyError(ctx, t, cc, codes.Unavailable, errRLSThrottled)
133
134
135 verifyRLSRequest(t, rlsReqCh, false)
136 }
137
138
139
140
141
142 func (s) TestPick_DataCacheMiss_NoPendingEntry_NotThrottled(t *testing.T) {
143
144 rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil)
145 overrideAdaptiveThrottler(t, neverThrottlingThrottler())
146
147
148 rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)
149
150
151 r := startManualResolverWithConfig(t, rlsConfig)
152
153
154 cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
155 if err != nil {
156 t.Fatalf("grpc.Dial() failed: %v", err)
157 }
158 defer cc.Close()
159
160
161
162 ctx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
163 defer cancel()
164 makeTestRPCAndVerifyError(ctx, t, cc, codes.Unavailable, errors.New("RLS response's target list does not contain any entries for key"))
165
166
167 verifyRLSRequest(t, rlsReqCh, true)
168 }
169
170
171
172
173 func (s) TestPick_DataCacheMiss_PendingEntryExists(t *testing.T) {
174 tests := []struct {
175 name string
176 withDefaultTarget bool
177 }{
178 {
179 name: "withDefaultTarget",
180 withDefaultTarget: true,
181 },
182 {
183 name: "withoutDefaultTarget",
184 withDefaultTarget: false,
185 },
186 }
187
188 for _, test := range tests {
189 t.Run(test.name, func(t *testing.T) {
190
191
192
193
194
195 rlsReqCh := make(chan struct{}, 1)
196 interceptor := func(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
197 rlsReqCh <- struct{}{}
198 <-ctx.Done()
199 return nil, ctx.Err()
200 }
201
202
203 rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil, grpc.UnaryInterceptor(interceptor))
204 overrideAdaptiveThrottler(t, neverThrottlingThrottler())
205
206
207 rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)
208 if test.withDefaultTarget {
209 _, defBackendAddress := startBackend(t)
210 rlsConfig.RouteLookupConfig.DefaultTarget = defBackendAddress
211 }
212
213
214
215 r := startManualResolverWithConfig(t, rlsConfig)
216
217
218 cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
219 if err != nil {
220 t.Fatalf("grpc.Dial() failed: %v", err)
221 }
222 defer cc.Close()
223
224
225
226
227
228 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
229 defer cancel()
230 go func() {
231 client := testgrpc.NewTestServiceClient(cc)
232 client.EmptyCall(ctx, &testpb.Empty{})
233 }()
234
235
236 verifyRLSRequest(t, rlsReqCh, true)
237
238
239 ctx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
240 defer cancel()
241 makeTestRPCAndVerifyError(ctx, t, cc, codes.DeadlineExceeded, context.DeadlineExceeded)
242
243
244 verifyRLSRequest(t, rlsReqCh, false)
245 })
246 }
247 }
248
249
250
251
252 func (s) TestPick_DataCacheHit_NoPendingEntry_ValidEntry(t *testing.T) {
253
254 rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil)
255 overrideAdaptiveThrottler(t, neverThrottlingThrottler())
256
257
258 rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)
259
260
261
262 testBackendCh, testBackendAddress := startBackend(t)
263 rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
264 return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{testBackendAddress}}}
265 })
266
267
268 r := startManualResolverWithConfig(t, rlsConfig)
269
270
271 cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
272 if err != nil {
273 t.Fatalf("grpc.Dial() failed: %v", err)
274 }
275 defer cc.Close()
276
277
278 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
279 defer cancel()
280 makeTestRPCAndExpectItToReachBackend(ctx, t, cc, testBackendCh)
281
282
283 verifyRLSRequest(t, rlsReqCh, true)
284
285
286 makeTestRPCAndExpectItToReachBackend(ctx, t, cc, testBackendCh)
287
288
289 verifyRLSRequest(t, rlsReqCh, false)
290 }
291
292
293
294
295 func (s) TestPick_DataCacheHit_NoPendingEntry_ValidEntry_WithHeaderData(t *testing.T) {
296
297 rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil)
298 overrideAdaptiveThrottler(t, neverThrottlingThrottler())
299
300
301 rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)
302
303
304
305 const headerDataContents = "foo,bar,baz"
306 backend := &stubserver.StubServer{
307 EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
308 gotHeaderData := metadata.ValueFromIncomingContext(ctx, "x-google-rls-data")
309 if len(gotHeaderData) != 1 || gotHeaderData[0] != headerDataContents {
310 return nil, fmt.Errorf("got metadata in `X-Google-RLS-Data` is %v, want %s", gotHeaderData, headerDataContents)
311 }
312 return &testpb.Empty{}, nil
313 },
314 }
315 if err := backend.StartServer(); err != nil {
316 t.Fatalf("Failed to start backend: %v", err)
317 }
318 t.Logf("Started TestService backend at: %q", backend.Address)
319 defer backend.Stop()
320
321
322
323 rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
324 return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{
325 Targets: []string{backend.Address},
326 HeaderData: headerDataContents,
327 }}
328 })
329
330
331 r := startManualResolverWithConfig(t, rlsConfig)
332
333
334 cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
335 if err != nil {
336 t.Fatalf("grpc.Dial() failed: %v", err)
337 }
338 defer cc.Close()
339
340
341
342 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
343 defer cancel()
344 if _, err := testgrpc.NewTestServiceClient(cc).EmptyCall(ctx, &testpb.Empty{}); err != nil {
345 t.Fatalf("EmptyCall() RPC: %v", err)
346 }
347 }
348
349
350
351
352 func (s) TestPick_DataCacheHit_NoPendingEntry_StaleEntry(t *testing.T) {
353
354
355 tests := []struct {
356 name string
357 throttled bool
358 }{
359 {
360 name: "throttled",
361 throttled: true,
362 },
363 {
364 name: "notThrottled",
365 throttled: false,
366 },
367 }
368
369 for _, test := range tests {
370 t.Run(test.name, func(t *testing.T) {
371
372 rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil)
373 var throttler *fakeThrottler
374 firstRPCDone := grpcsync.NewEvent()
375 if test.throttled {
376 throttler = oneTimeAllowingThrottler(firstRPCDone)
377 overrideAdaptiveThrottler(t, throttler)
378 } else {
379 throttler = neverThrottlingThrottler()
380 overrideAdaptiveThrottler(t, throttler)
381 }
382
383
384
385 rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)
386 rlsConfig.RouteLookupConfig.MaxAge = durationpb.New(time.Minute)
387 rlsConfig.RouteLookupConfig.StaleAge = durationpb.New(defaultTestShortTimeout)
388
389
390
391 testBackendCh, testBackendAddress := startBackend(t)
392 rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
393 return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{testBackendAddress}}}
394 })
395
396
397
398 r := startManualResolverWithConfig(t, rlsConfig)
399
400
401 cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
402 if err != nil {
403 t.Fatalf("grpc.Dial() failed: %v", err)
404 }
405 defer cc.Close()
406
407
408 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
409 defer cancel()
410 makeTestRPCAndExpectItToReachBackend(ctx, t, cc, testBackendCh)
411
412
413 verifyRLSRequest(t, rlsReqCh, true)
414 firstRPCDone.Fire()
415
416
417
418
419
420
421
422
423 for {
424
425 makeTestRPCAndExpectItToReachBackend(ctx, t, cc, testBackendCh)
426
427 if !test.throttled {
428 select {
429 case <-time.After(defaultTestShortTimeout):
430
431 case <-rlsReqCh:
432 return
433 }
434 } else {
435 select {
436 case <-time.After(defaultTestShortTimeout):
437
438 case <-throttler.throttleCh:
439 return
440 }
441 }
442 }
443 })
444 }
445 }
446
447
448
449 func (s) TestPick_DataCacheHit_NoPendingEntry_ExpiredEntry(t *testing.T) {
450 tests := []struct {
451 name string
452 throttled bool
453 withDefaultTarget bool
454 }{
455 {
456 name: "throttledWithDefaultTarget",
457 throttled: true,
458 withDefaultTarget: true,
459 },
460 {
461 name: "throttledWithoutDefaultTarget",
462 throttled: true,
463 withDefaultTarget: false,
464 },
465 {
466 name: "notThrottled",
467 throttled: false,
468 },
469 }
470
471 for _, test := range tests {
472 t.Run(test.name, func(t *testing.T) {
473
474 rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil)
475 var throttler *fakeThrottler
476 firstRPCDone := grpcsync.NewEvent()
477 if test.throttled {
478 throttler = oneTimeAllowingThrottler(firstRPCDone)
479 overrideAdaptiveThrottler(t, throttler)
480 } else {
481 throttler = neverThrottlingThrottler()
482 overrideAdaptiveThrottler(t, throttler)
483 }
484
485
486
487 rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)
488 rlsConfig.RouteLookupConfig.MaxAge = durationpb.New(defaultTestShortTimeout)
489
490
491 var defBackendCh chan struct{}
492 if test.withDefaultTarget {
493 var defBackendAddress string
494 defBackendCh, defBackendAddress = startBackend(t)
495 rlsConfig.RouteLookupConfig.DefaultTarget = defBackendAddress
496 }
497
498
499
500 testBackendCh, testBackendAddress := startBackend(t)
501 rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
502 return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{testBackendAddress}}}
503 })
504
505
506
507 r := startManualResolverWithConfig(t, rlsConfig)
508
509
510 cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
511 if err != nil {
512 t.Fatalf("grpc.Dial() failed: %v", err)
513 }
514 defer cc.Close()
515
516
517 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
518 defer cancel()
519 makeTestRPCAndExpectItToReachBackend(ctx, t, cc, testBackendCh)
520
521
522 verifyRLSRequest(t, rlsReqCh, true)
523 firstRPCDone.Fire()
524
525
526
527 switch {
528 case test.throttled && test.withDefaultTarget:
529 makeTestRPCAndExpectItToReachBackend(ctx, t, cc, defBackendCh)
530 <-throttler.throttleCh
531 case test.throttled && !test.withDefaultTarget:
532 makeTestRPCAndVerifyError(ctx, t, cc, codes.Unavailable, errRLSThrottled)
533 <-throttler.throttleCh
534 case !test.throttled:
535 for {
536
537
538
539
540
541 makeTestRPCAndExpectItToReachBackend(ctx, t, cc, testBackendCh)
542 select {
543 case <-time.After(defaultTestShortTimeout):
544
545 case <-rlsReqCh:
546 return
547 }
548 }
549 }
550 })
551 }
552 }
553
554
555
556 func (s) TestPick_DataCacheHit_NoPendingEntry_ExpiredEntryInBackoff(t *testing.T) {
557 tests := []struct {
558 name string
559 withDefaultTarget bool
560 }{
561 {
562 name: "withDefaultTarget",
563 withDefaultTarget: true,
564 },
565 {
566 name: "withoutDefaultTarget",
567 withDefaultTarget: false,
568 },
569 }
570
571 for _, test := range tests {
572 t.Run(test.name, func(t *testing.T) {
573
574 rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil)
575 overrideAdaptiveThrottler(t, neverThrottlingThrottler())
576
577
578
579
580 origBackoffStrategy := defaultBackoffStrategy
581 defaultBackoffStrategy = &fakeBackoffStrategy{backoff: defaultTestTimeout}
582 defer func() { defaultBackoffStrategy = origBackoffStrategy }()
583
584
585
586 rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)
587 rlsConfig.RouteLookupConfig.MaxAge = durationpb.New(defaultTestShortTimeout)
588
589
590 var defBackendCh chan struct{}
591 if test.withDefaultTarget {
592 var defBackendAddress string
593 defBackendCh, defBackendAddress = startBackend(t)
594 rlsConfig.RouteLookupConfig.DefaultTarget = defBackendAddress
595 }
596
597
598
599 testBackendCh, testBackendAddress := startBackend(t)
600 rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
601 return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{testBackendAddress}}}
602 })
603
604
605 r := startManualResolverWithConfig(t, rlsConfig)
606
607
608 cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
609 if err != nil {
610 t.Fatalf("grpc.Dial() failed: %v", err)
611 }
612 defer cc.Close()
613
614
615 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
616 defer cancel()
617 makeTestRPCAndExpectItToReachBackend(ctx, t, cc, testBackendCh)
618
619
620 verifyRLSRequest(t, rlsReqCh, true)
621
622
623
624 var rlsLastErr = status.Error(codes.DeadlineExceeded, "last RLS request failed")
625 rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
626 return &rlstest.RouteLookupResponse{Err: rlsLastErr}
627 })
628
629
630
631
632
633 if test.withDefaultTarget {
634 makeTestRPCAndExpectItToReachBackend(ctx, t, cc, defBackendCh)
635 } else {
636 makeTestRPCAndVerifyError(ctx, t, cc, codes.Unavailable, rlsLastErr)
637 }
638 })
639 }
640 }
641
642
643
644 func (s) TestPick_DataCacheHit_PendingEntryExists_StaleEntry(t *testing.T) {
645 tests := []struct {
646 name string
647 withDefaultTarget bool
648 }{
649 {
650 name: "withDefaultTarget",
651 withDefaultTarget: true,
652 },
653 {
654 name: "withoutDefaultTarget",
655 withDefaultTarget: false,
656 },
657 }
658
659 for _, test := range tests {
660 t.Run(test.name, func(t *testing.T) {
661
662
663
664
665
666
667
668
669 rlsReqCh := make(chan struct{}, 1)
670 firstRPCDone := grpcsync.NewEvent()
671 interceptor := func(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
672 select {
673 case rlsReqCh <- struct{}{}:
674 default:
675 }
676 if firstRPCDone.HasFired() {
677 <-ctx.Done()
678 return nil, ctx.Err()
679 }
680 return handler(ctx, req)
681 }
682
683
684 rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil, grpc.UnaryInterceptor(interceptor))
685 overrideAdaptiveThrottler(t, neverThrottlingThrottler())
686
687
688 rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)
689 if test.withDefaultTarget {
690 _, defBackendAddress := startBackend(t)
691 rlsConfig.RouteLookupConfig.DefaultTarget = defBackendAddress
692 }
693
694
695 rlsConfig.RouteLookupConfig.MaxAge = durationpb.New(time.Minute)
696 rlsConfig.RouteLookupConfig.StaleAge = durationpb.New(defaultTestShortTimeout)
697
698
699
700 testBackendCh, testBackendAddress := startBackend(t)
701 rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
702 return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{testBackendAddress}}}
703 })
704
705
706
707 r := startManualResolverWithConfig(t, rlsConfig)
708
709
710 cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
711 if err != nil {
712 t.Fatalf("grpc.Dial() failed: %v", err)
713 }
714 defer cc.Close()
715
716
717 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
718 defer cancel()
719 makeTestRPCAndExpectItToReachBackend(ctx, t, cc, testBackendCh)
720
721
722 verifyRLSRequest(t, rlsReqCh, true)
723 firstRPCDone.Fire()
724
725
726
727
728 for {
729 makeTestRPCAndExpectItToReachBackend(ctx, t, cc, testBackendCh)
730
731 select {
732 case <-time.After(defaultTestShortTimeout):
733
734 case <-rlsReqCh:
735 return
736 }
737 }
738 })
739 }
740 }
741
742
743
744 func (s) TestPick_DataCacheHit_PendingEntryExists_ExpiredEntry(t *testing.T) {
745 tests := []struct {
746 name string
747 withDefaultTarget bool
748 }{
749 {
750 name: "withDefaultTarget",
751 withDefaultTarget: true,
752 },
753 {
754 name: "withoutDefaultTarget",
755 withDefaultTarget: false,
756 },
757 }
758
759 for _, test := range tests {
760 t.Run(test.name, func(t *testing.T) {
761
762
763
764
765
766
767
768
769 rlsReqCh := make(chan struct{}, 1)
770 firstRPCDone := grpcsync.NewEvent()
771 interceptor := func(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
772 select {
773 case rlsReqCh <- struct{}{}:
774 default:
775 }
776 if firstRPCDone.HasFired() {
777 <-ctx.Done()
778 return nil, ctx.Err()
779 }
780 return handler(ctx, req)
781 }
782
783
784 rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil, grpc.UnaryInterceptor(interceptor))
785 overrideAdaptiveThrottler(t, neverThrottlingThrottler())
786
787
788 rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)
789 if test.withDefaultTarget {
790 _, defBackendAddress := startBackend(t)
791 rlsConfig.RouteLookupConfig.DefaultTarget = defBackendAddress
792 }
793
794 rlsConfig.RouteLookupConfig.MaxAge = durationpb.New(defaultTestShortTimeout)
795
796
797
798 testBackendCh, testBackendAddress := startBackend(t)
799 rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
800 return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{testBackendAddress}}}
801 })
802
803
804
805 r := startManualResolverWithConfig(t, rlsConfig)
806
807
808 cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
809 if err != nil {
810 t.Fatalf("grpc.Dial() failed: %v", err)
811 }
812 defer cc.Close()
813
814
815 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
816 defer cancel()
817 makeTestRPCAndExpectItToReachBackend(ctx, t, cc, testBackendCh)
818
819
820 verifyRLSRequest(t, rlsReqCh, true)
821 firstRPCDone.Fire()
822
823
824
825
826
827
828 go func() {
829 for client := testgrpc.NewTestServiceClient(cc); ctx.Err() == nil; <-time.After(defaultTestShortTimeout) {
830 client.EmptyCall(ctx, &testpb.Empty{})
831 }
832 }()
833 verifyRLSRequest(t, rlsReqCh, true)
834
835
836
837
838
839
840 sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
841 defer sCancel()
842 makeTestRPCAndVerifyError(sCtx, t, cc, codes.DeadlineExceeded, context.DeadlineExceeded)
843 verifyRLSRequest(t, rlsReqCh, false)
844 })
845 }
846 }
847
848 func TestIsFullMethodNameValid(t *testing.T) {
849 tests := []struct {
850 desc string
851 methodName string
852 want bool
853 }{
854 {
855 desc: "does not start with a slash",
856 methodName: "service/method",
857 want: false,
858 },
859 {
860 desc: "does not contain a method",
861 methodName: "/service",
862 want: false,
863 },
864 {
865 desc: "path has more elements",
866 methodName: "/service/path/to/method",
867 want: false,
868 },
869 {
870 desc: "valid",
871 methodName: "/service/method",
872 want: true,
873 },
874 }
875
876 for _, test := range tests {
877 t.Run(test.desc, func(t *testing.T) {
878 if got := isFullMethodNameValid(test.methodName); got != test.want {
879 t.Fatalf("isFullMethodNameValid(%q) = %v, want %v", test.methodName, got, test.want)
880 }
881 })
882 }
883 }
884
View as plain text