1
16
17 package e2e_test
18
19 import (
20 "context"
21 "encoding/json"
22 "fmt"
23 "strings"
24 "testing"
25 "time"
26
27 "github.com/google/go-cmp/cmp"
28 "google.golang.org/grpc"
29 "google.golang.org/grpc/balancer"
30 "google.golang.org/grpc/balancer/roundrobin"
31 "google.golang.org/grpc/codes"
32 "google.golang.org/grpc/connectivity"
33 "google.golang.org/grpc/credentials/insecure"
34 "google.golang.org/grpc/internal"
35 "google.golang.org/grpc/internal/balancer/stub"
36 iserviceconfig "google.golang.org/grpc/internal/serviceconfig"
37 "google.golang.org/grpc/internal/stubserver"
38 "google.golang.org/grpc/internal/testutils"
39 "google.golang.org/grpc/internal/testutils/xds/e2e"
40 "google.golang.org/grpc/resolver"
41 "google.golang.org/grpc/resolver/manual"
42 "google.golang.org/grpc/serviceconfig"
43 "google.golang.org/grpc/status"
44 "google.golang.org/grpc/xds/internal/balancer/clusterimpl"
45 "google.golang.org/grpc/xds/internal/balancer/outlierdetection"
46 "google.golang.org/grpc/xds/internal/balancer/priority"
47 "google.golang.org/grpc/xds/internal/balancer/wrrlocality"
48 "google.golang.org/grpc/xds/internal/xdsclient"
49 "google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
50 "google.golang.org/protobuf/types/known/durationpb"
51 "google.golang.org/protobuf/types/known/wrapperspb"
52
53 v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
54 v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
55 v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
56 v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
57 testgrpc "google.golang.org/grpc/interop/grpc_testing"
58 testpb "google.golang.org/grpc/interop/grpc_testing"
59
60 _ "google.golang.org/grpc/xds/internal/balancer/cdsbalancer"
61 )
62
63
64
65
66
67
68
69
70
71 func setupAndDial(t *testing.T, bootstrapContents []byte) (*grpc.ClientConn, func()) {
72 t.Helper()
73
74
75 xdsC, xdsClose, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
76 if err != nil {
77 t.Fatalf("Failed to create xDS client: %v", err)
78 }
79
80
81
82
83 r := manual.NewBuilderWithScheme("whatever")
84 jsonSC := fmt.Sprintf(`{
85 "loadBalancingConfig":[{
86 "cds_experimental":{
87 "cluster": "%s"
88 }
89 }]
90 }`, clusterName)
91 scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC)
92 r.InitialState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, xdsC))
93
94
95 cc, err := grpc.Dial(r.Scheme()+":///test.service", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
96 if err != nil {
97 xdsClose()
98 t.Fatalf("Failed to dial local test server: %v", err)
99 }
100 return cc, func() {
101 xdsClose()
102 cc.Close()
103 }
104 }
105
106
107
108
109
110
111
112 func (s) TestErrorFromParentLB_ConnectionError(t *testing.T) {
113
114
115 lis, err := testutils.LocalTCPListener()
116 if err != nil {
117 t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
118 }
119
120
121
122 streamClosedCh := make(chan struct{}, 1)
123 managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{
124 Listener: lis,
125 OnStreamClosed: func(int64, *v3corepb.Node) {
126 select {
127 case streamClosedCh <- struct{}{}:
128 default:
129 }
130 },
131 })
132 defer cleanup()
133
134 server := stubserver.StartTestService(t, nil)
135 defer server.Stop()
136
137
138 resources := e2e.UpdateOptions{
139 NodeID: nodeID,
140 Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, edsServiceName, e2e.SecurityLevelNone)},
141 Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(edsServiceName, "localhost", []uint32{testutils.ParsePort(t, server.Address)})},
142 SkipValidation: true,
143 }
144 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
145 defer cancel()
146 if err := managementServer.Update(ctx, resources); err != nil {
147 t.Fatal(err)
148 }
149
150
151
152 cc, cleanup := setupAndDial(t, bootstrapContents)
153 defer cleanup()
154
155 client := testgrpc.NewTestServiceClient(cc)
156 if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
157 t.Fatalf("EmptyCall() failed: %v", err)
158 }
159
160
161 lis.Close()
162 select {
163 case <-ctx.Done():
164 t.Fatal("Timeout when waiting for ADS stream to close")
165 default:
166 }
167
168
169 for end := time.Now().Add(time.Second); time.Now().Before(end); <-time.After(defaultTestShortTimeout) {
170 if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
171 t.Fatalf("EmptyCall() failed: %v", err)
172 }
173 }
174 }
175
176
177
178
179
180
181
182
183 func (s) TestErrorFromParentLB_ResourceNotFound(t *testing.T) {
184
185
186
187
188
189
190 edsResourceRequestedCh := make(chan struct{}, 1)
191 edsResourceCanceledCh := make(chan struct{}, 1)
192 managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{
193 OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
194 if req.GetTypeUrl() == version.V3EndpointsURL {
195 switch len(req.GetResourceNames()) {
196 case 0:
197 select {
198 case edsResourceCanceledCh <- struct{}{}:
199 default:
200 }
201 case 1:
202 if req.GetResourceNames()[0] == edsServiceName {
203 select {
204 case edsResourceRequestedCh <- struct{}{}:
205 default:
206 }
207 }
208 default:
209 t.Errorf("Unexpected number of resources, %d, in an EDS request", len(req.GetResourceNames()))
210 }
211 }
212 return nil
213 },
214 })
215 defer cleanup()
216
217 server := stubserver.StartTestService(t, nil)
218 defer server.Stop()
219
220
221 resources := e2e.UpdateOptions{
222 NodeID: nodeID,
223 Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, edsServiceName, e2e.SecurityLevelNone)},
224 Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(edsServiceName, "localhost", []uint32{testutils.ParsePort(t, server.Address)})},
225 SkipValidation: true,
226 }
227 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
228 defer cancel()
229 if err := managementServer.Update(ctx, resources); err != nil {
230 t.Fatal(err)
231 }
232
233
234
235 cc, cleanup := setupAndDial(t, bootstrapContents)
236 defer cleanup()
237
238
239 select {
240 case <-ctx.Done():
241 t.Fatal("Timeout when waiting for EDS resource to be requested")
242 case <-edsResourceRequestedCh:
243 }
244
245
246 client := testgrpc.NewTestServiceClient(cc)
247 if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
248 t.Fatalf("EmptyCall() failed: %v", err)
249 }
250
251
252 resources.Clusters = nil
253 if err := managementServer.Update(ctx, resources); err != nil {
254 t.Fatal(err)
255 }
256
257
258 select {
259 case <-ctx.Done():
260 t.Fatal("Timeout when waiting for EDS resource to not requested")
261 case <-edsResourceCanceledCh:
262 }
263
264
265 for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) {
266 sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
267 defer sCancel()
268 _, err := client.EmptyCall(sCtx, &testpb.Empty{})
269 if status.Code(err) == codes.Unavailable && strings.Contains(err.Error(), "all priorities are removed") {
270 break
271 }
272 if err != nil {
273 t.Logf("EmptyCall failed: %v", err)
274 }
275 }
276 if ctx.Err() != nil {
277 t.Fatalf("RPCs did not fail after removal of Cluster resource")
278 }
279
280 testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)
281
282
283 resources = e2e.UpdateOptions{
284 NodeID: nodeID,
285 Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, edsServiceName, e2e.SecurityLevelNone)},
286 Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(edsServiceName, "localhost", []uint32{testutils.ParsePort(t, server.Address)})},
287 SkipValidation: true,
288 }
289 if err := managementServer.Update(ctx, resources); err != nil {
290 t.Fatal(err)
291 }
292
293
294 select {
295 case <-ctx.Done():
296 t.Fatal("Timeout when waiting for EDS resource to be requested")
297 case <-edsResourceRequestedCh:
298 }
299
300
301 for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) {
302 sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
303 defer sCancel()
304 if _, err := client.EmptyCall(sCtx, &testpb.Empty{}); err != nil {
305 t.Logf("EmptyCall failed: %v", err)
306 continue
307 }
308 break
309 }
310 if ctx.Err() != nil {
311 t.Fatalf("RPCs did not fail after removal of Cluster resource")
312 }
313 }
314
315
316
317
318 func (s) TestOutlierDetectionConfigPropagationToChildPolicy(t *testing.T) {
319
320
321
322 priorityBuilder := balancer.Get(priority.Name)
323 internal.BalancerUnregister(priorityBuilder.Name())
324 lbCfgCh := make(chan serviceconfig.LoadBalancingConfig, 1)
325 stub.Register(priority.Name, stub.BalancerFuncs{
326 Init: func(bd *stub.BalancerData) {
327 bd.Data = priorityBuilder.Build(bd.ClientConn, bd.BuildOptions)
328 },
329 ParseConfig: func(lbCfg json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
330 return priorityBuilder.(balancer.ConfigParser).ParseConfig(lbCfg)
331 },
332 UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
333 select {
334 case lbCfgCh <- ccs.BalancerConfig:
335 default:
336 }
337 bal := bd.Data.(balancer.Balancer)
338 return bal.UpdateClientConnState(ccs)
339 },
340 Close: func(bd *stub.BalancerData) {
341 bal := bd.Data.(balancer.Balancer)
342 bal.Close()
343 },
344 })
345 defer balancer.Register(priorityBuilder)
346
347 managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
348 defer cleanup()
349
350 server := stubserver.StartTestService(t, nil)
351 defer server.Stop()
352
353
354 cluster := e2e.DefaultCluster(clusterName, edsServiceName, e2e.SecurityLevelNone)
355 cluster.OutlierDetection = &v3clusterpb.OutlierDetection{
356 Interval: durationpb.New(10 * time.Second),
357 BaseEjectionTime: durationpb.New(30 * time.Second),
358 MaxEjectionTime: durationpb.New(300 * time.Second),
359 MaxEjectionPercent: wrapperspb.UInt32(10),
360 SuccessRateStdevFactor: wrapperspb.UInt32(2000),
361 EnforcingSuccessRate: wrapperspb.UInt32(50),
362 SuccessRateMinimumHosts: wrapperspb.UInt32(10),
363 SuccessRateRequestVolume: wrapperspb.UInt32(50),
364 }
365 resources := e2e.UpdateOptions{
366 NodeID: nodeID,
367 Clusters: []*v3clusterpb.Cluster{cluster},
368 Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(edsServiceName, "localhost", []uint32{testutils.ParsePort(t, server.Address)})},
369 SkipValidation: true,
370 }
371 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
372 defer cancel()
373 if err := managementServer.Update(ctx, resources); err != nil {
374 t.Fatal(err)
375 }
376
377
378
379 _, cleanup = setupAndDial(t, bootstrapContents)
380 defer cleanup()
381
382
383
384 wantCfg := &priority.LBConfig{
385 Children: map[string]*priority.Child{
386 "priority-0-0": {
387 Config: &iserviceconfig.BalancerConfig{
388 Name: outlierdetection.Name,
389 Config: &outlierdetection.LBConfig{
390 Interval: iserviceconfig.Duration(10 * time.Second),
391 BaseEjectionTime: iserviceconfig.Duration(30 * time.Second),
392 MaxEjectionTime: iserviceconfig.Duration(300 * time.Second),
393 MaxEjectionPercent: 10,
394 SuccessRateEjection: &outlierdetection.SuccessRateEjection{
395 StdevFactor: 2000,
396 EnforcementPercentage: 50,
397 MinimumHosts: 10,
398 RequestVolume: 50,
399 },
400 ChildPolicy: &iserviceconfig.BalancerConfig{
401 Name: clusterimpl.Name,
402 Config: &clusterimpl.LBConfig{
403 Cluster: clusterName,
404 EDSServiceName: edsServiceName,
405 ChildPolicy: &iserviceconfig.BalancerConfig{
406 Name: wrrlocality.Name,
407 Config: &wrrlocality.LBConfig{
408 ChildPolicy: &iserviceconfig.BalancerConfig{
409 Name: roundrobin.Name,
410 },
411 },
412 },
413 },
414 },
415 },
416 },
417 IgnoreReresolutionRequests: true,
418 },
419 },
420 Priorities: []string{"priority-0-0"},
421 }
422
423 select {
424 case lbCfg := <-lbCfgCh:
425 gotCfg := lbCfg.(*priority.LBConfig)
426 if diff := cmp.Diff(wantCfg, gotCfg); diff != "" {
427 t.Fatalf("Child policy received unexpected diff in config (-want +got):\n%s", diff)
428 }
429 case <-ctx.Done():
430 t.Fatalf("Timeout when waiting for child policy to receive its configuration")
431 }
432 }
433
View as plain text