1
16
17 package cdsbalancer
18
19 import (
20 "context"
21 "crypto/tls"
22 "crypto/x509"
23 "encoding/json"
24 "fmt"
25 "os"
26 "strings"
27 "testing"
28 "unsafe"
29
30 "github.com/google/go-cmp/cmp"
31 "google.golang.org/grpc"
32 "google.golang.org/grpc/attributes"
33 "google.golang.org/grpc/balancer"
34 "google.golang.org/grpc/connectivity"
35 "google.golang.org/grpc/credentials"
36 "google.golang.org/grpc/credentials/insecure"
37 "google.golang.org/grpc/credentials/tls/certprovider"
38 "google.golang.org/grpc/credentials/xds"
39 "google.golang.org/grpc/internal"
40 "google.golang.org/grpc/internal/balancer/stub"
41 xdscredsinternal "google.golang.org/grpc/internal/credentials/xds"
42 "google.golang.org/grpc/internal/stubserver"
43 "google.golang.org/grpc/internal/testutils"
44 xdsbootstrap "google.golang.org/grpc/internal/testutils/xds/bootstrap"
45 "google.golang.org/grpc/internal/testutils/xds/e2e"
46 "google.golang.org/grpc/peer"
47 "google.golang.org/grpc/resolver"
48 "google.golang.org/grpc/resolver/manual"
49 "google.golang.org/grpc/serviceconfig"
50 "google.golang.org/grpc/testdata"
51 "google.golang.org/grpc/xds/internal/xdsclient"
52
53 v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
54 v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
55 v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
56 v3tlspb "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3"
57 testgrpc "google.golang.org/grpc/interop/grpc_testing"
58 testpb "google.golang.org/grpc/interop/grpc_testing"
59
60 _ "google.golang.org/grpc/credentials/tls/certprovider/pemfile"
61 )
62
63
64
65 type testCCWrapper struct {
66 balancer.ClientConn
67 handshakeInfoCh chan *xdscredsinternal.HandshakeInfo
68 }
69
70
71
72
73
74
75 func (tcc *testCCWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
76 if len(addrs) != 1 {
77 return nil, fmt.Errorf("NewSubConn got %d addresses, want 1", len(addrs))
78 }
79 getHI := internal.GetXDSHandshakeInfoForTesting.(func(attr *attributes.Attributes) *unsafe.Pointer)
80 hi := getHI(addrs[0].Attributes)
81 if hi == nil {
82 return nil, fmt.Errorf("NewSubConn got address without xDS handshake info")
83 }
84
85 sc, err := tcc.ClientConn.NewSubConn(addrs, opts)
86 select {
87 case tcc.handshakeInfoCh <- (*xdscredsinternal.HandshakeInfo)(*hi):
88 default:
89 }
90 return sc, err
91 }
92
93
94
95
96
97
98
99
100 func registerWrappedCDSPolicyWithNewSubConnOverride(t *testing.T, ch chan *xdscredsinternal.HandshakeInfo) {
101 cdsBuilder := balancer.Get(cdsName)
102 internal.BalancerUnregister(cdsBuilder.Name())
103 var ccWrapper *testCCWrapper
104 stub.Register(cdsBuilder.Name(), stub.BalancerFuncs{
105 Init: func(bd *stub.BalancerData) {
106 ccWrapper = &testCCWrapper{
107 ClientConn: bd.ClientConn,
108 handshakeInfoCh: ch,
109 }
110 bd.Data = cdsBuilder.Build(ccWrapper, bd.BuildOptions)
111 },
112 ParseConfig: func(lbCfg json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
113 return cdsBuilder.(balancer.ConfigParser).ParseConfig(lbCfg)
114 },
115 UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
116 bal := bd.Data.(balancer.Balancer)
117 return bal.UpdateClientConnState(ccs)
118 },
119 Close: func(bd *stub.BalancerData) {
120 bal := bd.Data.(balancer.Balancer)
121 bal.Close()
122 },
123 })
124 t.Cleanup(func() { balancer.Register(cdsBuilder) })
125 }
126
127
128
129
130
131
132
133
134
135
136
137 func setupForSecurityTests(t *testing.T, bootstrapContents []byte, clientCreds, serverCreds credentials.TransportCredentials) (*grpc.ClientConn, string) {
138 t.Helper()
139
140 xdsClient, xdsClose, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
141 if err != nil {
142 t.Fatalf("Failed to create xDS client: %v", err)
143 }
144 t.Cleanup(xdsClose)
145
146
147
148 r := manual.NewBuilderWithScheme("whatever")
149 jsonSC := fmt.Sprintf(`{
150 "loadBalancingConfig":[{
151 "cds_experimental":{
152 "cluster": "%s"
153 }
154 }]
155 }`, clusterName)
156 scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC)
157 state := xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, xdsClient)
158 r.InitialState(state)
159
160
161 cc, err := grpc.Dial(r.Scheme()+":///test.service", grpc.WithTransportCredentials(clientCreds), grpc.WithResolvers(r))
162 if err != nil {
163 t.Fatalf("Failed to dial local test server: %v", err)
164 }
165 t.Cleanup(func() { cc.Close() })
166
167
168 sOpts := []grpc.ServerOption{}
169 if serverCreds != nil {
170 sOpts = append(sOpts, grpc.Creds(serverCreds))
171 }
172 server := stubserver.StartTestService(t, nil, sOpts...)
173 t.Cleanup(server.Stop)
174
175 return cc, server.Address
176 }
177
178
179
180
181 func xdsClientCredsWithInsecureFallback(t *testing.T) credentials.TransportCredentials {
182 t.Helper()
183
184 xdsCreds, err := xds.NewClientCredentials(xds.ClientOptions{FallbackCreds: insecure.NewCredentials()})
185 if err != nil {
186 t.Fatalf("Failed to create xDS credentials: %v", err)
187 }
188 return xdsCreds
189 }
190
191
192
193
194
195 func tlsServerCreds(t *testing.T) credentials.TransportCredentials {
196 t.Helper()
197
198 cert, err := tls.LoadX509KeyPair(testdata.Path("x509/server1_cert.pem"), testdata.Path("x509/server1_key.pem"))
199 if err != nil {
200 t.Fatalf("Failed to load server cert and key: %v", err)
201
202 }
203 pemData, err := os.ReadFile(testdata.Path("x509/client_ca_cert.pem"))
204 if err != nil {
205 t.Fatalf("Failed to read client CA cert: %v", err)
206 }
207 roots := x509.NewCertPool()
208 roots.AppendCertsFromPEM(pemData)
209 cfg := &tls.Config{
210 Certificates: []tls.Certificate{cert},
211 ClientCAs: roots,
212 }
213 return credentials.NewTLS(cfg)
214 }
215
216
217
218 func verifySecurityInformationFromPeer(t *testing.T, pr *peer.Peer, wantSecLevel e2e.SecurityLevel) {
219
220
221
222
223
224
225
226
227
228
229 t.Helper()
230
231 switch wantSecLevel {
232 case e2e.SecurityLevelNone:
233 if pr.AuthInfo.AuthType() != "insecure" {
234 t.Fatalf("AuthType() is %s, want insecure", pr.AuthInfo.AuthType())
235 }
236 case e2e.SecurityLevelMTLS:
237 ai, ok := pr.AuthInfo.(credentials.TLSInfo)
238 if !ok {
239 t.Fatalf("AuthInfo type is %T, want %T", pr.AuthInfo, credentials.TLSInfo{})
240 }
241 if len(ai.State.PeerCertificates) != 1 {
242 t.Fatalf("Number of peer certificates is %d, want 1", len(ai.State.PeerCertificates))
243 }
244 cert := ai.State.PeerCertificates[0]
245 const wantCommonName = "test-server1"
246 if cn := cert.Subject.CommonName; cn != wantCommonName {
247 t.Fatalf("Common name in peer certificate is %s, want %s", cn, wantCommonName)
248 }
249 }
250 }
251
252
253
254
255
256 func (s) TestSecurityConfigWithoutXDSCreds(t *testing.T) {
257
258
259 handshakeInfoCh := make(chan *xdscredsinternal.HandshakeInfo, 1)
260 registerWrappedCDSPolicyWithNewSubConnOverride(t, handshakeInfoCh)
261
262
263 mgmtServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
264 t.Cleanup(cleanup)
265
266
267
268 cc, serverAddress := setupForSecurityTests(t, bootstrapContents, insecure.NewCredentials(), nil)
269
270
271
272 resources := e2e.UpdateOptions{
273 NodeID: nodeID,
274 Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelMTLS)},
275 Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, serverAddress)})},
276 SkipValidation: true,
277 }
278 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
279 defer cancel()
280 if err := mgmtServer.Update(ctx, resources); err != nil {
281 t.Fatal(err)
282 }
283
284
285 client := testgrpc.NewTestServiceClient(cc)
286 if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
287 t.Fatalf("EmptyCall() failed: %v", err)
288 }
289
290
291 var gotHI *xdscredsinternal.HandshakeInfo
292 select {
293 case gotHI = <-handshakeInfoCh:
294 case <-ctx.Done():
295 t.Fatal("Timeout when waiting to read handshake info passed to NewSubConn")
296 }
297 wantHI := xdscredsinternal.NewHandshakeInfo(nil, nil, nil, false)
298 if !cmp.Equal(gotHI, wantHI) {
299 t.Fatalf("NewSubConn got handshake info %+v, want %+v", gotHI, wantHI)
300 }
301 }
302
303
304
305
306
307 func (s) TestNoSecurityConfigWithXDSCreds(t *testing.T) {
308
309
310 handshakeInfoCh := make(chan *xdscredsinternal.HandshakeInfo, 1)
311 registerWrappedCDSPolicyWithNewSubConnOverride(t, handshakeInfoCh)
312
313
314 mgmtServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
315 t.Cleanup(cleanup)
316
317
318
319 cc, serverAddress := setupForSecurityTests(t, bootstrapContents, xdsClientCredsWithInsecureFallback(t), nil)
320
321
322
323 resources := e2e.UpdateOptions{
324 NodeID: nodeID,
325 Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelNone)},
326 Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, serverAddress)})},
327 SkipValidation: true,
328 }
329 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
330 defer cancel()
331 if err := mgmtServer.Update(ctx, resources); err != nil {
332 t.Fatal(err)
333 }
334
335
336 client := testgrpc.NewTestServiceClient(cc)
337 if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
338 t.Fatalf("EmptyCall() failed: %v", err)
339 }
340
341
342 var gotHI *xdscredsinternal.HandshakeInfo
343 select {
344 case gotHI = <-handshakeInfoCh:
345 case <-ctx.Done():
346 t.Fatal("Timeout when waiting to read handshake info passed to NewSubConn")
347 }
348 wantHI := xdscredsinternal.NewHandshakeInfo(nil, nil, nil, false)
349 if !cmp.Equal(gotHI, wantHI) {
350 t.Fatalf("NewSubConn got handshake info %+v, want %+v", gotHI, wantHI)
351 }
352 if !gotHI.UseFallbackCreds() {
353 t.Fatal("NewSubConn got handshake info that does not specify the use of fallback creds")
354 }
355 }
356
357
358
359
360 func (s) TestSecurityConfigNotFoundInBootstrap(t *testing.T) {
361
362 mgmtServer, nodeID, _, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
363 t.Cleanup(cleanup)
364
365
366
367
368 bootstrapContents, err := xdsbootstrap.Contents(xdsbootstrap.Options{
369 NodeID: nodeID,
370 ServerURI: mgmtServer.Address,
371 ServerListenerResourceNameTemplate: e2e.ServerListenerResourceNameTemplate,
372 })
373 if err != nil {
374 t.Fatalf("Failed to create bootstrap configuration: %v", err)
375 }
376
377
378 cc, _ := setupForSecurityTests(t, bootstrapContents, xdsClientCredsWithInsecureFallback(t), nil)
379
380
381
382 resources := e2e.UpdateOptions{
383 NodeID: nodeID,
384 Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelMTLS)},
385 SkipValidation: true,
386 }
387 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
388 defer cancel()
389 if err := mgmtServer.Update(ctx, resources); err != nil {
390 t.Fatal(err)
391 }
392
393 testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)
394 }
395
396
397
398 type errCertProviderBuilder struct{}
399
400 const errCertProviderName = "err-cert-provider"
401
402 func (e errCertProviderBuilder) ParseConfig(any) (*certprovider.BuildableConfig, error) {
403
404
405 bc := certprovider.NewBuildableConfig(errCertProviderName, nil, func(certprovider.BuildOptions) certprovider.Provider { return nil })
406 return bc, nil
407 }
408
409 func (e errCertProviderBuilder) Name() string {
410 return errCertProviderName
411 }
412
413 func init() {
414 certprovider.Register(errCertProviderBuilder{})
415 }
416
417
418
419
420 func (s) TestCertproviderStoreError(t *testing.T) {
421 mgmtServer, nodeID, _, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
422 t.Cleanup(cleanup)
423
424
425
426
427 providerCfg := json.RawMessage(fmt.Sprintf(`{
428 "plugin_name": "%s",
429 "config": {}
430 }`, errCertProviderName))
431 bootstrapContents, err := xdsbootstrap.Contents(xdsbootstrap.Options{
432 NodeID: nodeID,
433 ServerURI: mgmtServer.Address,
434 CertificateProviders: map[string]json.RawMessage{e2e.ClientSideCertProviderInstance: providerCfg},
435 ServerListenerResourceNameTemplate: e2e.ServerListenerResourceNameTemplate,
436 })
437 if err != nil {
438 t.Fatalf("Failed to create bootstrap configuration: %v", err)
439 }
440
441
442 cc, _ := setupForSecurityTests(t, bootstrapContents, xdsClientCredsWithInsecureFallback(t), nil)
443
444
445
446 resources := e2e.UpdateOptions{
447 NodeID: nodeID,
448 Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelMTLS)},
449 SkipValidation: true,
450 }
451 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
452 defer cancel()
453 if err := mgmtServer.Update(ctx, resources); err != nil {
454 t.Fatal(err)
455 }
456
457 testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)
458 }
459
460
461
462
463
464 func (s) TestGoodSecurityConfig(t *testing.T) {
465
466 mgmtServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
467 t.Cleanup(cleanup)
468
469
470
471 cc, serverAddress := setupForSecurityTests(t, bootstrapContents, xdsClientCredsWithInsecureFallback(t), tlsServerCreds(t))
472
473
474
475 resources := e2e.UpdateOptions{
476 NodeID: nodeID,
477 Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelMTLS)},
478 Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, serverAddress)})},
479 SkipValidation: true,
480 }
481 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
482 defer cancel()
483 if err := mgmtServer.Update(ctx, resources); err != nil {
484 t.Fatal(err)
485 }
486
487
488 client := testgrpc.NewTestServiceClient(cc)
489 peer := &peer.Peer{}
490 if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(peer)); err != nil {
491 t.Fatalf("EmptyCall() failed: %v", err)
492 }
493 verifySecurityInformationFromPeer(t, peer, e2e.SecurityLevelMTLS)
494 }
495
496
497
498
499
500
501
502
503 func (s) TestSecurityConfigUpdate_BadToGood(t *testing.T) {
504
505 mgmtServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
506 t.Cleanup(cleanup)
507
508
509
510 cc, serverAddress := setupForSecurityTests(t, bootstrapContents, xdsClientCredsWithInsecureFallback(t), tlsServerCreds(t))
511
512
513
514
515 cluster := e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelNone)
516 cluster.TransportSocket = &v3corepb.TransportSocket{
517 Name: "envoy.transport_sockets.tls",
518 ConfigType: &v3corepb.TransportSocket_TypedConfig{
519 TypedConfig: testutils.MarshalAny(t, &v3tlspb.UpstreamTlsContext{
520 CommonTlsContext: &v3tlspb.CommonTlsContext{
521 ValidationContextType: &v3tlspb.CommonTlsContext_ValidationContextCertificateProviderInstance{
522 ValidationContextCertificateProviderInstance: &v3tlspb.CommonTlsContext_CertificateProviderInstance{
523 InstanceName: "unknown-certificate-provider-instance",
524 },
525 },
526 },
527 }),
528 },
529 }
530 resources := e2e.UpdateOptions{
531 NodeID: nodeID,
532 Clusters: []*v3clusterpb.Cluster{cluster},
533 Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, serverAddress)})},
534 SkipValidation: true,
535 }
536 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
537 defer cancel()
538 if err := mgmtServer.Update(ctx, resources); err != nil {
539 t.Fatal(err)
540 }
541
542 testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)
543
544
545
546
547 resources = e2e.UpdateOptions{
548 NodeID: nodeID,
549 Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelMTLS)},
550 Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, serverAddress)})},
551 SkipValidation: true,
552 }
553 if err := mgmtServer.Update(ctx, resources); err != nil {
554 t.Fatal(err)
555 }
556
557
558 client := testgrpc.NewTestServiceClient(cc)
559 peer := &peer.Peer{}
560 if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(peer)); err != nil {
561 t.Fatalf("EmptyCall() failed: %v", err)
562 }
563 verifySecurityInformationFromPeer(t, peer, e2e.SecurityLevelMTLS)
564 }
565
566
567
568
569
570
571
572 func (s) TestSecurityConfigUpdate_GoodToFallback(t *testing.T) {
573
574 mgmtServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
575 t.Cleanup(cleanup)
576
577
578
579 cc, serverAddress := setupForSecurityTests(t, bootstrapContents, xdsClientCredsWithInsecureFallback(t), tlsServerCreds(t))
580
581
582
583 resources := e2e.UpdateOptions{
584 NodeID: nodeID,
585 Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelMTLS)},
586 Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, serverAddress)})},
587 SkipValidation: true,
588 }
589 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
590 defer cancel()
591 if err := mgmtServer.Update(ctx, resources); err != nil {
592 t.Fatal(err)
593 }
594
595
596 client := testgrpc.NewTestServiceClient(cc)
597 peer := &peer.Peer{}
598 if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(peer)); err != nil {
599 t.Fatalf("EmptyCall() failed: %v", err)
600 }
601 verifySecurityInformationFromPeer(t, peer, e2e.SecurityLevelMTLS)
602
603
604 insecureServer := stubserver.StartTestService(t, nil)
605 t.Cleanup(insecureServer.Stop)
606
607
608
609
610 resources = e2e.UpdateOptions{
611 NodeID: nodeID,
612 Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelNone)},
613 Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, insecureServer.Address)})},
614 SkipValidation: true,
615 }
616 if err := mgmtServer.Update(ctx, resources); err != nil {
617 t.Fatal(err)
618 }
619
620
621
622 for ctx.Err() == nil {
623 if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(peer)); err != nil {
624 t.Logf("EmptyCall() failed: %v", err)
625 }
626 if peer.Addr.String() == insecureServer.Address {
627 break
628 }
629 }
630 if ctx.Err() != nil {
631 t.Fatal("Timed out when waiting for connection to switch to second backend")
632 }
633 verifySecurityInformationFromPeer(t, peer, e2e.SecurityLevelNone)
634 }
635
636
637
638
639
640
641
642
643 func (s) TestSecurityConfigUpdate_GoodToBad(t *testing.T) {
644
645
646
647 _, resolverErrCh, _, _ := registerWrappedClusterResolverPolicy(t)
648
649
650 mgmtServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
651 t.Cleanup(cleanup)
652
653
654
655 cc, serverAddress := setupForSecurityTests(t, bootstrapContents, xdsClientCredsWithInsecureFallback(t), tlsServerCreds(t))
656
657
658
659 resources := e2e.UpdateOptions{
660 NodeID: nodeID,
661 Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelMTLS)},
662 Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, serverAddress)})},
663 SkipValidation: true,
664 }
665 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
666 defer cancel()
667 if err := mgmtServer.Update(ctx, resources); err != nil {
668 t.Fatal(err)
669 }
670
671
672 client := testgrpc.NewTestServiceClient(cc)
673 peer := &peer.Peer{}
674 if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(peer)); err != nil {
675 t.Fatalf("EmptyCall() failed: %v", err)
676 }
677 verifySecurityInformationFromPeer(t, peer, e2e.SecurityLevelMTLS)
678
679
680
681
682 cluster := e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelNone)
683 cluster.TransportSocket = &v3corepb.TransportSocket{
684 Name: "envoy.transport_sockets.tls",
685 ConfigType: &v3corepb.TransportSocket_TypedConfig{
686 TypedConfig: testutils.MarshalAny(t, &v3tlspb.UpstreamTlsContext{
687 CommonTlsContext: &v3tlspb.CommonTlsContext{
688 ValidationContextType: &v3tlspb.CommonTlsContext_ValidationContextCertificateProviderInstance{
689 ValidationContextCertificateProviderInstance: &v3tlspb.CommonTlsContext_CertificateProviderInstance{
690 InstanceName: "unknown-certificate-provider-instance",
691 },
692 },
693 },
694 }),
695 },
696 }
697 resources = e2e.UpdateOptions{
698 NodeID: nodeID,
699 Clusters: []*v3clusterpb.Cluster{cluster},
700 Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, serverAddress)})},
701 SkipValidation: true,
702 }
703 if err := mgmtServer.Update(ctx, resources); err != nil {
704 t.Fatal(err)
705 }
706
707 const wantNACKErr = "instance name \"unknown-certificate-provider-instance\" missing in bootstrap configuration"
708 select {
709 case err := <-resolverErrCh:
710 if !strings.Contains(err.Error(), wantNACKErr) {
711 t.Fatalf("Child policy got resolver error: %v, want err: %v", err, wantNACKErr)
712 }
713 case <-ctx.Done():
714 t.Fatal("Timeout when waiting for resolver error to be pushed to the child policy")
715 }
716
717
718 if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
719 t.Fatalf("EmptyCall() failed: %v", err)
720 }
721 verifySecurityInformationFromPeer(t, peer, e2e.SecurityLevelMTLS)
722 }
723
View as plain text