1
18
19 package xds_test
20
21 import (
22 "context"
23 "errors"
24 "fmt"
25 "testing"
26 "time"
27
28 v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
29 v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
30 v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
31 v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
32 "github.com/google/go-cmp/cmp"
33 "google.golang.org/grpc"
34 "google.golang.org/grpc/credentials/insecure"
35 "google.golang.org/grpc/internal/stubserver"
36 "google.golang.org/grpc/internal/testutils"
37 "google.golang.org/grpc/internal/testutils/xds/e2e"
38 testgrpc "google.golang.org/grpc/interop/grpc_testing"
39 testpb "google.golang.org/grpc/interop/grpc_testing"
40 "google.golang.org/grpc/peer"
41 "google.golang.org/grpc/resolver"
42 "google.golang.org/protobuf/types/known/durationpb"
43 "google.golang.org/protobuf/types/known/wrapperspb"
44 )
45
46
47
48
49
50
51
52 func (s) TestOutlierDetection_NoopConfig(t *testing.T) {
53 managementServer, nodeID, _, resolver, cleanup1 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
54 defer cleanup1()
55
56 server := &stubserver.StubServer{
57 EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil },
58 }
59 server.StartServer()
60 t.Logf("Started test service backend at %q", server.Address)
61 defer server.Stop()
62
63 const serviceName = "my-service-client-side-xds"
64 resources := e2e.DefaultClientResources(e2e.ResourceParams{
65 DialTarget: serviceName,
66 NodeID: nodeID,
67 Host: "localhost",
68 Port: testutils.ParsePort(t, server.Address),
69 SecLevel: e2e.SecurityLevelNone,
70 })
71 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
72 defer cancel()
73 if err := managementServer.Update(ctx, resources); err != nil {
74 t.Fatal(err)
75 }
76
77
78 cc, err := grpc.NewClient(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolver))
79 if err != nil {
80 t.Fatalf("failed to dial local test server: %v", err)
81 }
82 defer cc.Close()
83
84 client := testgrpc.NewTestServiceClient(cc)
85 if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
86 t.Fatalf("rpc EmptyCall() failed: %v", err)
87 }
88 }
89
90
91
92
93
94
95 func clientResourcesMultipleBackendsAndOD(params e2e.ResourceParams, ports []uint32, od *v3clusterpb.OutlierDetection) e2e.UpdateOptions {
96 routeConfigName := "route-" + params.DialTarget
97 clusterName := "cluster-" + params.DialTarget
98 endpointsName := "endpoints-" + params.DialTarget
99 return e2e.UpdateOptions{
100 NodeID: params.NodeID,
101 Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(params.DialTarget, routeConfigName)},
102 Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(routeConfigName, params.DialTarget, clusterName)},
103 Clusters: []*v3clusterpb.Cluster{clusterWithOutlierDetection(clusterName, endpointsName, params.SecLevel, od)},
104 Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(endpointsName, params.Host, ports)},
105 }
106 }
107
108 func clusterWithOutlierDetection(clusterName, edsServiceName string, secLevel e2e.SecurityLevel, od *v3clusterpb.OutlierDetection) *v3clusterpb.Cluster {
109 cluster := e2e.DefaultCluster(clusterName, edsServiceName, secLevel)
110 cluster.OutlierDetection = od
111 return cluster
112 }
113
114
115
116
117
118
119
120 func checkRoundRobinRPCs(ctx context.Context, client testgrpc.TestServiceClient, addrs []resolver.Address) error {
121 wantAddrCount := make(map[string]int)
122 for _, addr := range addrs {
123 wantAddrCount[addr.Addr]++
124 }
125 for ; ctx.Err() == nil; <-time.After(time.Millisecond) {
126
127 var iterations [][]string
128 for i := 0; i < 3; i++ {
129 iteration := make([]string, len(addrs))
130 for c := 0; c < len(addrs); c++ {
131 var peer peer.Peer
132 client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer))
133 if peer.Addr != nil {
134 iteration[c] = peer.Addr.String()
135 }
136 }
137 iterations = append(iterations, iteration)
138 }
139
140 gotAddrCount := make(map[string]int)
141 for _, addr := range iterations[0] {
142 gotAddrCount[addr]++
143 }
144 if diff := cmp.Diff(gotAddrCount, wantAddrCount); diff != "" {
145 continue
146 }
147
148 if !cmp.Equal(iterations[0], iterations[1]) || !cmp.Equal(iterations[0], iterations[2]) {
149 continue
150 }
151 return nil
152 }
153 return fmt.Errorf("timeout when waiting for roundrobin distribution of RPCs across addresses: %v", addrs)
154 }
155
156
157
158
159
160
161
162
163
164 func (s) TestOutlierDetectionWithOutlier(t *testing.T) {
165 managementServer, nodeID, _, r, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
166 defer cleanup()
167
168
169 backend1 := stubserver.StartTestService(t, nil)
170 port1 := testutils.ParsePort(t, backend1.Address)
171 defer backend1.Stop()
172
173
174 backend2 := stubserver.StartTestService(t, nil)
175 port2 := testutils.ParsePort(t, backend2.Address)
176 defer backend2.Stop()
177
178
179 backend3 := stubserver.StartTestService(t, &stubserver.StubServer{
180 EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { return nil, errors.New("some error") },
181 })
182 port3 := testutils.ParsePort(t, backend3.Address)
183 defer backend3.Stop()
184
185 const serviceName = "my-service-client-side-xds"
186 resources := clientResourcesMultipleBackendsAndOD(e2e.ResourceParams{
187 DialTarget: serviceName,
188 NodeID: nodeID,
189 Host: "localhost",
190 SecLevel: e2e.SecurityLevelNone,
191 }, []uint32{port1, port2, port3}, &v3clusterpb.OutlierDetection{
192 Interval: &durationpb.Duration{Nanos: 50000000},
193 BaseEjectionTime: &durationpb.Duration{Seconds: 30},
194 MaxEjectionTime: &durationpb.Duration{Seconds: 300},
195 MaxEjectionPercent: &wrapperspb.UInt32Value{Value: 1},
196 FailurePercentageThreshold: &wrapperspb.UInt32Value{Value: 50},
197 EnforcingFailurePercentage: &wrapperspb.UInt32Value{Value: 100},
198 FailurePercentageRequestVolume: &wrapperspb.UInt32Value{Value: 8},
199 FailurePercentageMinimumHosts: &wrapperspb.UInt32Value{Value: 3},
200 })
201 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
202 defer cancel()
203 if err := managementServer.Update(ctx, resources); err != nil {
204 t.Fatal(err)
205 }
206
207 cc, err := grpc.NewClient(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
208 if err != nil {
209 t.Fatalf("failed to dial local test server: %v", err)
210 }
211 defer cc.Close()
212
213 client := testgrpc.NewTestServiceClient(cc)
214
215 fullAddresses := []resolver.Address{
216 {Addr: backend1.Address},
217 {Addr: backend2.Address},
218 {Addr: backend3.Address},
219 }
220
221
222 if err = checkRoundRobinRPCs(ctx, client, fullAddresses); err != nil {
223 t.Fatalf("error in expected round robin: %v", err)
224 }
225
226
227 okAddresses := []resolver.Address{
228 {Addr: backend1.Address},
229 {Addr: backend2.Address},
230 }
231
232
233
234
235 if err = checkRoundRobinRPCs(ctx, client, okAddresses); err != nil {
236 t.Fatalf("error in expected round robin: %v", err)
237 }
238 }
239
240
241
242
243
244
245
246 func (s) TestOutlierDetectionXDSDefaultOn(t *testing.T) {
247 managementServer, nodeID, _, r, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
248 defer cleanup()
249
250
251 backend1 := stubserver.StartTestService(t, nil)
252 port1 := testutils.ParsePort(t, backend1.Address)
253 defer backend1.Stop()
254
255
256 backend2 := stubserver.StartTestService(t, nil)
257 port2 := testutils.ParsePort(t, backend2.Address)
258 defer backend2.Stop()
259
260
261 backend3 := stubserver.StartTestService(t, &stubserver.StubServer{
262 EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { return nil, errors.New("some error") },
263 })
264 port3 := testutils.ParsePort(t, backend3.Address)
265 defer backend3.Stop()
266
267
268
269
270
271
272
273 const serviceName = "my-service-client-side-xds"
274 resources := clientResourcesMultipleBackendsAndOD(e2e.ResourceParams{
275 DialTarget: serviceName,
276 NodeID: nodeID,
277 Host: "localhost",
278 SecLevel: e2e.SecurityLevelNone,
279 }, []uint32{port1, port2, port3}, &v3clusterpb.OutlierDetection{
280
281 Interval: &durationpb.Duration{Nanos: 50000000},
282
283
284 SuccessRateMinimumHosts: &wrapperspb.UInt32Value{Value: 1},
285 SuccessRateRequestVolume: &wrapperspb.UInt32Value{Value: 8},
286 SuccessRateStdevFactor: &wrapperspb.UInt32Value{Value: 1},
287 })
288 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
289 defer cancel()
290 if err := managementServer.Update(ctx, resources); err != nil {
291 t.Fatal(err)
292 }
293
294 cc, err := grpc.NewClient(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
295 if err != nil {
296 t.Fatalf("failed to dial local test server: %v", err)
297 }
298 defer cc.Close()
299
300 client := testgrpc.NewTestServiceClient(cc)
301
302 fullAddresses := []resolver.Address{
303 {Addr: backend1.Address},
304 {Addr: backend2.Address},
305 {Addr: backend3.Address},
306 }
307
308
309 if err = checkRoundRobinRPCs(ctx, client, fullAddresses); err != nil {
310 t.Fatalf("error in expected round robin: %v", err)
311 }
312
313
314 okAddresses := []resolver.Address{
315 {Addr: backend1.Address},
316 {Addr: backend2.Address},
317 }
318
319
320
321
322 if err = checkRoundRobinRPCs(ctx, client, okAddresses); err != nil {
323 t.Fatalf("error in expected round robin: %v", err)
324 }
325 }
326
View as plain text