1
18
19 package clusterimpl_test
20
21 import (
22 "context"
23 "fmt"
24 "strings"
25 "testing"
26 "time"
27
28 "google.golang.org/grpc"
29 "google.golang.org/grpc/codes"
30 "google.golang.org/grpc/credentials/insecure"
31 "google.golang.org/grpc/internal/grpctest"
32 "google.golang.org/grpc/internal/stubserver"
33 "google.golang.org/grpc/internal/testutils"
34 "google.golang.org/grpc/internal/testutils/xds/e2e"
35 "google.golang.org/grpc/status"
36
37 v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
38 v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
39 testgrpc "google.golang.org/grpc/interop/grpc_testing"
40 testpb "google.golang.org/grpc/interop/grpc_testing"
41
42 _ "google.golang.org/grpc/xds"
43 )
44
45 const (
46 defaultTestTimeout = 5 * time.Second
47 defaultTestShortTimeout = 100 * time.Millisecond
48 )
49
50 type s struct {
51 grpctest.Tester
52 }
53
54 func Test(t *testing.T) {
55 grpctest.RunSubTests(t, s{})
56 }
57
58
59
60
61
62
63 func (s) TestConfigUpdateWithSameLoadReportingServerConfig(t *testing.T) {
64
65 opts := e2e.ManagementServerOptions{SupportLoadReportingService: true}
66 mgmtServer, nodeID, _, resolver, mgmtServerCleanup := e2e.SetupManagementServer(t, opts)
67 defer mgmtServerCleanup()
68
69
70 server := stubserver.StartTestService(t, nil)
71 defer server.Stop()
72
73
74
75 const serviceName = "my-test-xds-service"
76 resources := e2e.DefaultClientResources(e2e.ResourceParams{
77 DialTarget: serviceName,
78 NodeID: nodeID,
79 Host: "localhost",
80 Port: testutils.ParsePort(t, server.Address),
81 SecLevel: e2e.SecurityLevelNone,
82 })
83 resources.Clusters[0].LrsServer = &v3corepb.ConfigSource{
84 ConfigSourceSpecifier: &v3corepb.ConfigSource_Self{
85 Self: &v3corepb.SelfConfigSource{},
86 },
87 }
88 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
89 defer cancel()
90 if err := mgmtServer.Update(ctx, resources); err != nil {
91 t.Fatal(err)
92 }
93
94
95 cc, err := grpc.NewClient(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolver))
96 if err != nil {
97 t.Fatalf("failed to dial local test server: %v", err)
98 }
99 defer cc.Close()
100
101 client := testgrpc.NewTestServiceClient(cc)
102 if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
103 t.Fatalf("rpc EmptyCall() failed: %v", err)
104 }
105
106
107 if _, err := mgmtServer.LRSServer.LRSStreamOpenChan.Receive(ctx); err != nil {
108 t.Fatalf("Failure when waiting for an LRS stream to be opened: %v", err)
109 }
110
111
112
113 resources.Endpoints = []*v3endpointpb.ClusterLoadAssignment{
114 e2e.EndpointResourceWithOptions(e2e.EndpointOptions{
115 ClusterName: "endpoints-" + serviceName,
116 Host: "localhost",
117 Localities: []e2e.LocalityOptions{
118 {
119 Backends: []e2e.BackendOptions{{Port: testutils.ParsePort(t, server.Address)}},
120 Weight: 1,
121 },
122 },
123 DropPercents: map[string]int{"test-drop-everything": 100},
124 }),
125 }
126 if err := mgmtServer.Update(ctx, resources); err != nil {
127 t.Fatal(err)
128 }
129
130
131
132
133 for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) {
134 _, err := client.EmptyCall(ctx, &testpb.Empty{})
135 if err != nil && status.Code(err) == codes.Unavailable && strings.Contains(err.Error(), "RPC is dropped") {
136 break
137 }
138 }
139 if ctx.Err() != nil {
140 t.Fatalf("Timeout when waiting for RPCs to be dropped after config update")
141 }
142
143
144 sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
145 defer sCancel()
146 if _, err := mgmtServer.LRSServer.LRSStreamCloseChan.Receive(sCtx); err == nil {
147 t.Fatal("LRS stream closed when expected not to")
148 }
149
150
151 sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout)
152 defer sCancel()
153 if _, err := mgmtServer.LRSServer.LRSStreamOpenChan.Receive(sCtx); err == nil {
154 t.Fatal("New LRS stream created when expected not to")
155 }
156 }
157
View as plain text