1
18
19 package xds
20
21 import (
22 "context"
23 "encoding/json"
24 "errors"
25 "fmt"
26 "net"
27 "reflect"
28 "strconv"
29 "strings"
30 "sync"
31 "testing"
32 "time"
33
34 "github.com/google/go-cmp/cmp"
35 "github.com/google/uuid"
36 "google.golang.org/grpc"
37 "google.golang.org/grpc/connectivity"
38 "google.golang.org/grpc/credentials/insecure"
39 "google.golang.org/grpc/credentials/tls/certprovider"
40 "google.golang.org/grpc/credentials/xds"
41 "google.golang.org/grpc/internal/grpctest"
42 "google.golang.org/grpc/internal/testutils"
43 "google.golang.org/grpc/internal/testutils/xds/bootstrap"
44 "google.golang.org/grpc/internal/testutils/xds/e2e"
45 "google.golang.org/grpc/xds/internal/xdsclient"
46 "google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
47
48 v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
49 v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
50
51 _ "google.golang.org/grpc/xds/internal/httpfilter/router"
52 )
53
54 const (
55 defaultTestTimeout = 5 * time.Second
56 defaultTestShortTimeout = 10 * time.Millisecond
57 nonExistentManagementServer = "non-existent-management-server"
58 )
59
60 type s struct {
61 grpctest.Tester
62 }
63
64 func Test(t *testing.T) {
65 grpctest.RunSubTests(t, s{})
66 }
67
68 type fakeGRPCServer struct {
69 done chan struct{}
70 registerServiceCh *testutils.Channel
71 serveCh *testutils.Channel
72 }
73
74 func (f *fakeGRPCServer) RegisterService(*grpc.ServiceDesc, any) {
75 f.registerServiceCh.Send(nil)
76 }
77
78 func (f *fakeGRPCServer) Serve(lis net.Listener) error {
79 f.serveCh.Send(nil)
80 <-f.done
81 lis.Close()
82 return nil
83 }
84
85 func (f *fakeGRPCServer) Stop() {
86 close(f.done)
87 }
88 func (f *fakeGRPCServer) GracefulStop() {
89 close(f.done)
90 }
91
92 func (f *fakeGRPCServer) GetServiceInfo() map[string]grpc.ServiceInfo {
93 panic("implement me")
94 }
95
96 func newFakeGRPCServer() *fakeGRPCServer {
97 return &fakeGRPCServer{
98 done: make(chan struct{}),
99 registerServiceCh: testutils.NewChannel(),
100 serveCh: testutils.NewChannel(),
101 }
102 }
103
104 func generateBootstrapContents(t *testing.T, nodeID, serverURI string) []byte {
105 t.Helper()
106
107 bs, err := e2e.DefaultBootstrapContents(nodeID, serverURI)
108 if err != nil {
109 t.Fatal(err)
110 }
111 return bs
112 }
113
114 func (s) TestNewServer_Success(t *testing.T) {
115 xdsCreds, err := xds.NewServerCredentials(xds.ServerOptions{FallbackCreds: insecure.NewCredentials()})
116 if err != nil {
117 t.Fatalf("failed to create xds server credentials: %v", err)
118 }
119
120 tests := []struct {
121 desc string
122 serverOpts []grpc.ServerOption
123 wantXDSCredsInUse bool
124 }{
125 {
126 desc: "without_xds_creds",
127 serverOpts: []grpc.ServerOption{
128 grpc.Creds(insecure.NewCredentials()),
129 BootstrapContentsForTesting(generateBootstrapContents(t, uuid.NewString(), nonExistentManagementServer)),
130 },
131 },
132 {
133 desc: "with_xds_creds",
134 serverOpts: []grpc.ServerOption{
135 grpc.Creds(xdsCreds),
136 BootstrapContentsForTesting(generateBootstrapContents(t, uuid.NewString(), nonExistentManagementServer)),
137 },
138 wantXDSCredsInUse: true,
139 },
140 }
141
142 for _, test := range tests {
143 t.Run(test.desc, func(t *testing.T) {
144
145
146 wantServerOpts := len(test.serverOpts) + 2
147
148 origNewGRPCServer := newGRPCServer
149 newGRPCServer = func(opts ...grpc.ServerOption) grpcServer {
150 if got := len(opts); got != wantServerOpts {
151 t.Fatalf("%d ServerOptions passed to grpc.Server, want %d", got, wantServerOpts)
152 }
153
154 if !reflect.DeepEqual(opts[2:], test.serverOpts) {
155 t.Fatalf("got ServerOptions %v, want %v", opts[2:], test.serverOpts)
156 }
157 return grpc.NewServer(opts...)
158 }
159 defer func() {
160 newGRPCServer = origNewGRPCServer
161 }()
162
163 s, err := NewGRPCServer(test.serverOpts...)
164 if err != nil {
165 t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
166 }
167 defer s.Stop()
168 })
169 }
170 }
171
172 func (s) TestNewServer_Failure(t *testing.T) {
173 xdsCreds, err := xds.NewServerCredentials(xds.ServerOptions{FallbackCreds: insecure.NewCredentials()})
174 if err != nil {
175 t.Fatalf("failed to create xds server credentials: %v", err)
176 }
177
178 tests := []struct {
179 desc string
180 serverOpts []grpc.ServerOption
181 wantErr string
182 }{
183 {
184 desc: "bootstrap env var not set",
185 serverOpts: []grpc.ServerOption{grpc.Creds(xdsCreds)},
186 wantErr: "bootstrap env vars are unspecified",
187 },
188 {
189 desc: "empty bootstrap config",
190 serverOpts: []grpc.ServerOption{
191 grpc.Creds(xdsCreds),
192 BootstrapContentsForTesting([]byte(`{}`)),
193 },
194 wantErr: "xDS client creation failed",
195 },
196 {
197 desc: "server_listener_resource_name_template is missing",
198 serverOpts: []grpc.ServerOption{
199 grpc.Creds(xdsCreds),
200 func() grpc.ServerOption {
201 bs, err := bootstrap.Contents(bootstrap.Options{
202 NodeID: uuid.New().String(),
203 ServerURI: nonExistentManagementServer,
204 CertificateProviders: map[string]json.RawMessage{
205 "cert-provider-instance": json.RawMessage("{}"),
206 },
207 })
208 if err != nil {
209 t.Errorf("Failed to create bootstrap configuration: %v", err)
210 }
211 return BootstrapContentsForTesting(bs)
212 }(),
213 },
214 wantErr: "missing server_listener_resource_name_template in the bootstrap configuration",
215 },
216 }
217
218 for _, test := range tests {
219 t.Run(test.desc, func(t *testing.T) {
220 s, err := NewGRPCServer(test.serverOpts...)
221 if err == nil {
222 s.Stop()
223 t.Fatal("NewGRPCServer() succeeded when expected to fail")
224 }
225 if !strings.Contains(err.Error(), test.wantErr) {
226 t.Fatalf("NewGRPCServer() failed with error: %v, want: %s", err, test.wantErr)
227 }
228 })
229 }
230 }
231
232 func (s) TestRegisterService(t *testing.T) {
233 fs := newFakeGRPCServer()
234
235 origNewGRPCServer := newGRPCServer
236 newGRPCServer = func(opts ...grpc.ServerOption) grpcServer { return fs }
237 defer func() { newGRPCServer = origNewGRPCServer }()
238
239 s, err := NewGRPCServer(BootstrapContentsForTesting(generateBootstrapContents(t, uuid.NewString(), "non-existent-management-server")))
240 if err != nil {
241 t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
242 }
243 defer s.Stop()
244
245 s.RegisterService(&grpc.ServiceDesc{}, nil)
246 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
247 defer cancel()
248 if _, err := fs.registerServiceCh.Receive(ctx); err != nil {
249 t.Fatalf("Timeout when expecting RegisterService() to called on grpc.Server: %v", err)
250 }
251 }
252
253 const (
254 fakeProvider1Name = "fake-certificate-provider-1"
255 fakeProvider2Name = "fake-certificate-provider-2"
256 )
257
258 var (
259 fpb1, fpb2 *fakeProviderBuilder
260 fakeProvider1Config json.RawMessage
261 fakeProvider2Config json.RawMessage
262 )
263
264 func init() {
265 fpb1 = &fakeProviderBuilder{
266 name: fakeProvider1Name,
267 buildCh: testutils.NewChannel(),
268 }
269 fpb2 = &fakeProviderBuilder{
270 name: fakeProvider2Name,
271 buildCh: testutils.NewChannel(),
272 }
273 certprovider.Register(fpb1)
274 certprovider.Register(fpb2)
275
276 fakeProvider1Config = json.RawMessage(fmt.Sprintf(`{
277 "plugin_name": "%s",
278 "config": "my fake config 1"
279 }`, fakeProvider1Name))
280 fakeProvider2Config = json.RawMessage(fmt.Sprintf(`{
281 "plugin_name": "%s",
282 "config": "my fake config 2"
283 }`, fakeProvider2Name))
284 }
285
286
287
288 type fakeProviderBuilder struct {
289 name string
290 buildCh *testutils.Channel
291 }
292
293 func (b *fakeProviderBuilder) ParseConfig(cfg any) (*certprovider.BuildableConfig, error) {
294 var config string
295 if err := json.Unmarshal(cfg.(json.RawMessage), &config); err != nil {
296 return nil, fmt.Errorf("providerBuilder %s failed to unmarshal config: %v", b.name, cfg)
297 }
298 return certprovider.NewBuildableConfig(b.name, []byte(config), func(certprovider.BuildOptions) certprovider.Provider {
299 b.buildCh.Send(nil)
300 return &fakeProvider{
301 Distributor: certprovider.NewDistributor(),
302 config: config,
303 }
304 }), nil
305 }
306
307 func (b *fakeProviderBuilder) Name() string {
308 return b.name
309 }
310
311
312
313 type fakeProvider struct {
314 *certprovider.Distributor
315 config string
316 }
317
318
319 func (p *fakeProvider) Close() {
320 p.Distributor.Stop()
321 }
322
323 func verifyCertProviderNotCreated() error {
324 sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
325 defer sCancel()
326 if _, err := fpb1.buildCh.Receive(sCtx); err != context.DeadlineExceeded {
327 return errors.New("certificate provider created when no xDS creds were specified")
328 }
329 sCtx, sCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
330 defer sCancel()
331 if _, err := fpb2.buildCh.Receive(sCtx); err != context.DeadlineExceeded {
332 return errors.New("certificate provider created when no xDS creds were specified")
333 }
334 return nil
335 }
336
337 func hostPortFromListener(t *testing.T, lis net.Listener) (string, uint32) {
338 t.Helper()
339
340 host, p, err := net.SplitHostPort(lis.Addr().String())
341 if err != nil {
342 t.Fatalf("net.SplitHostPort(%s) failed: %v", lis.Addr().String(), err)
343 }
344 port, err := strconv.ParseInt(p, 10, 32)
345 if err != nil {
346 t.Fatalf("strconv.ParseInt(%s, 10, 32) failed: %v", p, err)
347 }
348 return host, uint32(port)
349 }
350
351
352
353
354
355 func (s) TestServeSuccess(t *testing.T) {
356
357
358 ldsRequestCh := make(chan []string, 1)
359 mgmtServer, nodeID, bootstrapContents, _, cancel := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{
360 OnStreamRequest: func(id int64, req *v3discoverypb.DiscoveryRequest) error {
361 if req.GetTypeUrl() == version.V3ListenerURL {
362 select {
363 case ldsRequestCh <- req.GetResourceNames():
364 default:
365 }
366 }
367 return nil
368 },
369 })
370 defer cancel()
371
372
373
374 fs := newFakeGRPCServer()
375 origNewGRPCServer := newGRPCServer
376 newGRPCServer = func(opts ...grpc.ServerOption) grpcServer { return fs }
377 defer func() { newGRPCServer = origNewGRPCServer }()
378
379
380
381 modeChangeCh := testutils.NewChannel()
382 modeChangeOption := ServingModeCallback(func(addr net.Addr, args ServingModeChangeArgs) {
383 t.Logf("Server mode change callback invoked for listener %q with mode %q and error %v", addr.String(), args.Mode, args.Err)
384 modeChangeCh.Send(args.Mode)
385 })
386 server, err := NewGRPCServer(modeChangeOption, BootstrapContentsForTesting(bootstrapContents))
387 if err != nil {
388 t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
389 }
390 defer server.Stop()
391
392
393 lis, err := testutils.LocalTCPListener()
394 if err != nil {
395 t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
396 }
397 go func() {
398 if err := server.Serve(lis); err != nil {
399 t.Error(err)
400 }
401 }()
402
403
404 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
405 defer cancel()
406 var gotNames []string
407 select {
408 case gotNames = <-ldsRequestCh:
409 case <-ctx.Done():
410 t.Fatalf("Timeout when waiting for an LDS request to be sent out")
411 }
412 wantNames := []string{strings.Replace(e2e.ServerListenerResourceNameTemplate, "%s", lis.Addr().String(), -1)}
413 if !cmp.Equal(gotNames, wantNames) {
414 t.Fatalf("LDS watch registered for names %v, want %v", gotNames, wantNames)
415 }
416
417
418 host, port := hostPortFromListener(t, lis)
419 resources := e2e.UpdateOptions{
420 NodeID: nodeID,
421 Listeners: []*v3listenerpb.Listener{e2e.DefaultServerListener(host, port, e2e.SecurityLevelNone, "routeName")},
422 }
423 if err := mgmtServer.Update(ctx, resources); err != nil {
424 t.Fatal(err)
425 }
426
427
428 v, err := modeChangeCh.Receive(ctx)
429 if err != nil {
430 t.Fatalf("Timeout when waiting for serving mode to change: %v", err)
431 }
432 if mode := v.(connectivity.ServingMode); mode != connectivity.ServingModeServing {
433 t.Fatalf("Serving mode is %q, want %q", mode, connectivity.ServingModeServing)
434 }
435
436
437 if _, err := fs.serveCh.Receive(ctx); err != nil {
438 t.Fatalf("Timeout when waiting for Serve() to be invoked on the grpc.Server")
439 }
440
441
442
443
444 resources.Listeners[0].ListenerFilters = []*v3listenerpb.ListenerFilter{{Name: "foo"}}
445 if err := mgmtServer.Update(ctx, resources); err != nil {
446 t.Fatal(err)
447 }
448
449
450
451 sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
452 defer sCancel()
453 if v, err := modeChangeCh.Receive(sCtx); err != context.DeadlineExceeded {
454 t.Fatalf("Unexpected change in serving mode. New mode is %v", v.(connectivity.ServingMode))
455 }
456
457
458
459
460 resources.Listeners = nil
461 if err := mgmtServer.Update(ctx, resources); err != nil {
462 t.Fatal(err)
463 }
464 v, err = modeChangeCh.Receive(ctx)
465 if err != nil {
466 t.Fatalf("Timeout when waiting for serving mode to change: %v", err)
467 }
468 if mode := v.(connectivity.ServingMode); mode != connectivity.ServingModeNotServing {
469 t.Fatalf("Serving mode is %q, want %q", mode, connectivity.ServingModeNotServing)
470 }
471 }
472
473
474
475 func (s) TestNewServer_ClientCreationFailure(t *testing.T) {
476 origNewXDSClient := newXDSClient
477 newXDSClient = func() (xdsclient.XDSClient, func(), error) {
478 return nil, nil, errors.New("xdsClient creation failed")
479 }
480 defer func() { newXDSClient = origNewXDSClient }()
481
482 if _, err := NewGRPCServer(); err == nil {
483 t.Fatal("NewGRPCServer() succeeded when expected to fail")
484 }
485 }
486
487
488
489
490 func (s) TestHandleListenerUpdate_NoXDSCreds(t *testing.T) {
491 mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{})
492 if err != nil {
493 t.Fatalf("Failed to start xDS management server: %v", err)
494 }
495 defer mgmtServer.Stop()
496
497
498
499
500 nodeID := uuid.NewString()
501 bootstrapContents, err := bootstrap.Contents(bootstrap.Options{
502 NodeID: nodeID,
503 ServerURI: mgmtServer.Address,
504 CertificateProviders: map[string]json.RawMessage{
505 e2e.ServerSideCertProviderInstance: fakeProvider1Config,
506 e2e.ClientSideCertProviderInstance: fakeProvider2Config,
507 },
508 ServerListenerResourceNameTemplate: e2e.ServerListenerResourceNameTemplate,
509 })
510 if err != nil {
511 t.Fatalf("Failed to create bootstrap configuration: %v", err)
512 }
513
514
515
516
517 modeChangeCh := testutils.NewChannel()
518 modeChangeOption := ServingModeCallback(func(addr net.Addr, args ServingModeChangeArgs) {
519 t.Logf("Server mode change callback invoked for listener %q with mode %q and error %v", addr.String(), args.Mode, args.Err)
520 modeChangeCh.Send(args.Mode)
521 })
522 server, err := NewGRPCServer(modeChangeOption, BootstrapContentsForTesting(bootstrapContents))
523 if err != nil {
524 t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
525 }
526 defer server.Stop()
527
528
529 lis, err := testutils.LocalTCPListener()
530 if err != nil {
531 t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
532 }
533 go func() {
534 if err := server.Serve(lis); err != nil {
535 t.Error(err)
536 }
537 }()
538
539
540
541 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
542 defer cancel()
543 host, port := hostPortFromListener(t, lis)
544 resources := e2e.UpdateOptions{
545 NodeID: nodeID,
546 Listeners: []*v3listenerpb.Listener{e2e.DefaultServerListener(host, port, e2e.SecurityLevelMTLS, "routeName")},
547 }
548 if err := mgmtServer.Update(ctx, resources); err != nil {
549 t.Fatal(err)
550 }
551
552
553 v, err := modeChangeCh.Receive(ctx)
554 if err != nil {
555 t.Fatalf("Timeout when waiting for serving mode to change: %v", err)
556 }
557 if mode := v.(connectivity.ServingMode); mode != connectivity.ServingModeServing {
558 t.Fatalf("Serving mode is %q, want %q", mode, connectivity.ServingModeServing)
559 }
560
561
562 if err := verifyCertProviderNotCreated(); err != nil {
563 t.Fatal(err)
564 }
565 }
566
567
568
569
570 func (s) TestHandleListenerUpdate_ErrorUpdate(t *testing.T) {
571
572
573 ldsRequestCh := make(chan []string, 1)
574 mgmtServer, nodeID, _, _, cancel := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{
575 OnStreamRequest: func(id int64, req *v3discoverypb.DiscoveryRequest) error {
576 if req.GetTypeUrl() == version.V3ListenerURL {
577 select {
578 case ldsRequestCh <- req.GetResourceNames():
579 default:
580 }
581 }
582 return nil
583 },
584 })
585 defer cancel()
586
587
588
589
590 bootstrapContents, err := bootstrap.Contents(bootstrap.Options{
591 NodeID: nodeID,
592 ServerURI: mgmtServer.Address,
593 CertificateProviders: map[string]json.RawMessage{
594 e2e.ServerSideCertProviderInstance: fakeProvider1Config,
595 e2e.ClientSideCertProviderInstance: fakeProvider2Config,
596 },
597 ServerListenerResourceNameTemplate: e2e.ServerListenerResourceNameTemplate,
598 })
599 if err != nil {
600 t.Fatalf("Failed to create bootstrap configuration: %v", err)
601 }
602
603
604
605
606 modeChangeCh := testutils.NewChannel()
607 modeChangeOption := ServingModeCallback(func(addr net.Addr, args ServingModeChangeArgs) {
608 t.Logf("Server mode change callback invoked for listener %q with mode %q and error %v", addr.String(), args.Mode, args.Err)
609 modeChangeCh.Send(args.Mode)
610 })
611 server, err := NewGRPCServer(modeChangeOption, BootstrapContentsForTesting(bootstrapContents))
612 if err != nil {
613 t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
614 }
615 defer server.Stop()
616
617
618 lis, err := testutils.LocalTCPListener()
619 if err != nil {
620 t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
621 }
622 go server.Serve(lis)
623
624
625
626
627 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
628 defer cancel()
629 host, port := hostPortFromListener(t, lis)
630 listener := e2e.DefaultServerListener(host, port, e2e.SecurityLevelMTLS, "routeName")
631 listener.ListenerFilters = []*v3listenerpb.ListenerFilter{{Name: "foo"}}
632 resources := e2e.UpdateOptions{
633 NodeID: nodeID,
634 Listeners: []*v3listenerpb.Listener{listener},
635 }
636 if err := mgmtServer.Update(ctx, resources); err != nil {
637 t.Fatal(err)
638 }
639
640
641 var gotNames []string
642 select {
643 case gotNames = <-ldsRequestCh:
644 case <-ctx.Done():
645 t.Fatalf("Timeout when waiting for an LDS request to be sent out")
646 }
647 wantNames := []string{strings.Replace(e2e.ServerListenerResourceNameTemplate, "%s", lis.Addr().String(), -1)}
648 if !cmp.Equal(gotNames, wantNames) {
649 t.Fatalf("LDS watch registered for names %v, want %v", gotNames, wantNames)
650 }
651
652
653 if err := verifyCertProviderNotCreated(); err != nil {
654 t.Fatal(err)
655 }
656
657
658
659
660
661
662 sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
663 defer sCancel()
664 if _, err := modeChangeCh.Receive(sCtx); err != context.DeadlineExceeded {
665 t.Fatal("Serving mode changed received when none expected")
666 }
667 }
668
669
670
671 func (s) TestServeReturnsErrorAfterClose(t *testing.T) {
672 server, err := NewGRPCServer(BootstrapContentsForTesting(generateBootstrapContents(t, uuid.NewString(), nonExistentManagementServer)))
673 if err != nil {
674 t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
675 }
676
677 lis, err := testutils.LocalTCPListener()
678 if err != nil {
679 t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
680 }
681 server.Stop()
682 err = server.Serve(lis)
683 if err == nil || !strings.Contains(err.Error(), grpc.ErrServerStopped.Error()) {
684 t.Fatalf("server erorred with wrong error, want: %v, got :%v", grpc.ErrServerStopped, err)
685 }
686 }
687
688
689
690 func (s) TestServeAndCloseDoNotRace(t *testing.T) {
691 lis, err := testutils.LocalTCPListener()
692 if err != nil {
693 t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
694 }
695
696 wg := sync.WaitGroup{}
697 for i := 0; i < 100; i++ {
698 server, err := NewGRPCServer(BootstrapContentsForTesting(generateBootstrapContents(t, uuid.NewString(), nonExistentManagementServer)))
699 if err != nil {
700 t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
701 }
702 wg.Add(1)
703 go func() {
704 server.Serve(lis)
705 wg.Done()
706 }()
707 wg.Add(1)
708 go func() {
709 server.Stop()
710 wg.Done()
711 }()
712 }
713 wg.Wait()
714 }
715
View as plain text