1
18
19 package xds_test
20
21 import (
22 "context"
23 "fmt"
24 "net"
25 "testing"
26 "time"
27
28 "google.golang.org/grpc"
29 "google.golang.org/grpc/connectivity"
30 "google.golang.org/grpc/credentials/insecure"
31 xdscreds "google.golang.org/grpc/credentials/xds"
32 "google.golang.org/grpc/internal/testutils"
33 "google.golang.org/grpc/internal/testutils/xds/e2e"
34 "google.golang.org/grpc/xds"
35
36 v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
37 testgrpc "google.golang.org/grpc/interop/grpc_testing"
38 testpb "google.golang.org/grpc/interop/grpc_testing"
39 )
40
41
42
43
44
45 func (s) TestServerSideXDS_RedundantUpdateSuppression(t *testing.T) {
46 managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
47 defer cleanup()
48
49 creds, err := xdscreds.NewServerCredentials(xdscreds.ServerOptions{FallbackCreds: insecure.NewCredentials()})
50 if err != nil {
51 t.Fatal(err)
52 }
53 lis, err := testutils.LocalTCPListener()
54 if err != nil {
55 t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
56 }
57 updateCh := make(chan connectivity.ServingMode, 1)
58
59
60 modeChangeOpt := xds.ServingModeCallback(func(addr net.Addr, args xds.ServingModeChangeArgs) {
61 t.Logf("serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err)
62 updateCh <- args.Mode
63 })
64
65
66 server, err := xds.NewGRPCServer(grpc.Creds(creds), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents))
67 if err != nil {
68 t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
69 }
70 defer server.Stop()
71 testgrpc.RegisterTestServiceServer(server, &testService{})
72
73
74 host, port, err := hostPortFromListener(lis)
75 if err != nil {
76 t.Fatalf("failed to retrieve host and port of server: %v", err)
77 }
78 listener := e2e.DefaultServerListener(host, port, e2e.SecurityLevelNone, "routeName")
79 resources := e2e.UpdateOptions{
80 NodeID: nodeID,
81 Listeners: []*v3listenerpb.Listener{listener},
82 }
83 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
84 defer cancel()
85 if err := managementServer.Update(ctx, resources); err != nil {
86 t.Fatal(err)
87 }
88
89 go func() {
90 if err := server.Serve(lis); err != nil {
91 t.Errorf("Serve() failed: %v", err)
92 }
93 }()
94
95
96 select {
97 case <-ctx.Done():
98 t.Fatalf("timed out waiting for a mode change update: %v", err)
99 case mode := <-updateCh:
100 if mode != connectivity.ServingModeServing {
101 t.Fatalf("listener received new mode %v, want %v", mode, connectivity.ServingModeServing)
102 }
103 }
104
105
106 cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
107 if err != nil {
108 t.Fatalf("failed to dial local test server: %v", err)
109 }
110 defer cc.Close()
111 waitForSuccessfulRPC(ctx, t, cc)
112
113
114
115
116 errCh := make(chan error, 1)
117 go func() {
118 prev := connectivity.Ready
119 for {
120 curr := cc.GetState()
121 if !(curr == connectivity.Ready || curr == connectivity.Idle) {
122 errCh <- fmt.Errorf("unexpected connectivity state change {%s --> %s} on the client connection", prev, curr)
123 return
124 }
125 if !cc.WaitForStateChange(ctx, curr) {
126
127 break
128 }
129 prev = curr
130 }
131 errCh <- nil
132 }()
133
134
135
136
137 if err := managementServer.Update(ctx, e2e.UpdateOptions{
138 NodeID: nodeID,
139 Listeners: []*v3listenerpb.Listener{listener},
140 }); err != nil {
141 t.Fatal(err)
142 }
143
144
145
146 sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
147 defer sCancel()
148 select {
149 case <-sCtx.Done():
150 case mode := <-updateCh:
151 t.Fatalf("unexpected mode change callback with new mode %v", mode)
152 }
153
154
155 waitForSuccessfulRPC(ctx, t, cc)
156
157
158
159 cancel()
160 if err := <-errCh; err != nil {
161 t.Fatal(err)
162 }
163 }
164
165
166
167
168 func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) {
169 managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
170 defer cleanup()
171
172
173 creds, err := xdscreds.NewServerCredentials(xdscreds.ServerOptions{
174 FallbackCreds: insecure.NewCredentials(),
175 })
176 if err != nil {
177 t.Fatal(err)
178 }
179
180
181 lis1, err := testutils.LocalTCPListener()
182 if err != nil {
183 t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
184 }
185 lis2, err := testutils.LocalTCPListener()
186 if err != nil {
187 t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
188 }
189
190
191 updateCh1 := make(chan connectivity.ServingMode, 1)
192 updateCh2 := make(chan connectivity.ServingMode, 1)
193
194
195
196 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
197 defer cancel()
198 modeChangeOpt := xds.ServingModeCallback(func(addr net.Addr, args xds.ServingModeChangeArgs) {
199 t.Logf("serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err)
200 switch addr.String() {
201 case lis1.Addr().String():
202 updateCh1 <- args.Mode
203 case lis2.Addr().String():
204 updateCh2 <- args.Mode
205 default:
206 t.Errorf("serving mode callback invoked for unknown listener address: %q", addr.String())
207 }
208 })
209
210
211 server, err := xds.NewGRPCServer(grpc.Creds(creds), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents))
212 if err != nil {
213 t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
214 }
215 defer server.Stop()
216 testgrpc.RegisterTestServiceServer(server, &testService{})
217
218
219
220 host1, port1, err := hostPortFromListener(lis1)
221 if err != nil {
222 t.Fatalf("failed to retrieve host and port of server: %v", err)
223 }
224 listener1 := e2e.DefaultServerListener(host1, port1, e2e.SecurityLevelNone, "routeName")
225 host2, port2, err := hostPortFromListener(lis2)
226 if err != nil {
227 t.Fatalf("failed to retrieve host and port of server: %v", err)
228 }
229 listener2 := e2e.DefaultServerListener(host2, port2, e2e.SecurityLevelNone, "routeName")
230 resources := e2e.UpdateOptions{
231 NodeID: nodeID,
232 Listeners: []*v3listenerpb.Listener{listener1, listener2},
233 }
234 if err := managementServer.Update(ctx, resources); err != nil {
235 t.Fatal(err)
236 }
237
238 go func() {
239 if err := server.Serve(lis1); err != nil {
240 t.Errorf("Serve() failed: %v", err)
241 }
242 }()
243 go func() {
244 if err := server.Serve(lis2); err != nil {
245 t.Errorf("Serve() failed: %v", err)
246 }
247 }()
248
249
250 select {
251 case <-ctx.Done():
252 t.Fatalf("timed out waiting for a mode change update: %v", err)
253 case mode := <-updateCh1:
254 if mode != connectivity.ServingModeServing {
255 t.Fatalf("listener received new mode %v, want %v", mode, connectivity.ServingModeServing)
256 }
257 }
258 select {
259 case <-ctx.Done():
260 t.Fatalf("timed out waiting for a mode change update: %v", err)
261 case mode := <-updateCh2:
262 if mode != connectivity.ServingModeServing {
263 t.Fatalf("listener received new mode %v, want %v", mode, connectivity.ServingModeServing)
264 }
265 }
266
267
268 cc1, err := grpc.NewClient(lis1.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
269 if err != nil {
270 t.Fatalf("failed to dial local test server: %v", err)
271 }
272 defer cc1.Close()
273 waitForSuccessfulRPC(ctx, t, cc1)
274
275
276 cc2, err := grpc.NewClient(lis2.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
277 if err != nil {
278 t.Fatalf("failed to dial local test server: %v", err)
279 }
280 defer cc2.Close()
281 waitForSuccessfulRPC(ctx, t, cc2)
282
283
284
285 if err := managementServer.Update(ctx, e2e.UpdateOptions{
286 NodeID: nodeID,
287 Listeners: []*v3listenerpb.Listener{listener1},
288 }); err != nil {
289 t.Fatal(err)
290 }
291
292
293 select {
294 case <-ctx.Done():
295 t.Fatalf("timed out waiting for a mode change update: %v", err)
296 case mode := <-updateCh2:
297 if mode != connectivity.ServingModeNotServing {
298 t.Fatalf("listener received new mode %v, want %v", mode, connectivity.ServingModeNotServing)
299 }
300 }
301
302
303 waitForSuccessfulRPC(ctx, t, cc1)
304 waitForFailedRPC(ctx, t, cc2)
305
306
307
308
309 if err := managementServer.Update(ctx, e2e.UpdateOptions{
310 NodeID: nodeID,
311 Listeners: []*v3listenerpb.Listener{},
312 }); err != nil {
313 t.Fatal(err)
314 }
315
316
317
318
319 select {
320 case <-ctx.Done():
321 t.Fatalf("timed out waiting for a mode change update: %v", err)
322 case mode := <-updateCh1:
323 if mode != connectivity.ServingModeNotServing {
324 t.Fatalf("listener received new mode %v, want %v", mode, connectivity.ServingModeNotServing)
325 }
326 }
327
328
329 waitForFailedRPC(ctx, t, cc1)
330 waitForFailedRPC(ctx, t, cc2)
331
332
333
334 sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
335 defer sCancel()
336 if _, err := grpc.DialContext(sCtx, lis1.Addr().String(), grpc.WithBlock(), grpc.WithTransportCredentials(insecure.NewCredentials())); err == nil {
337 t.Fatal("successfully created clientConn to a server in \"not-serving\" state")
338 }
339
340
341 if err := managementServer.Update(ctx, e2e.UpdateOptions{
342 NodeID: nodeID,
343 Listeners: []*v3listenerpb.Listener{listener1, listener2},
344 }); err != nil {
345 t.Fatal(err)
346 }
347
348
349 select {
350 case <-ctx.Done():
351 t.Fatalf("timed out waiting for a mode change update: %v", err)
352 case mode := <-updateCh1:
353 if mode != connectivity.ServingModeServing {
354 t.Fatalf("listener received new mode %v, want %v", mode, connectivity.ServingModeServing)
355 }
356 }
357 select {
358 case <-ctx.Done():
359 t.Fatalf("timed out waiting for a mode change update: %v", err)
360 case mode := <-updateCh2:
361 if mode != connectivity.ServingModeServing {
362 t.Fatalf("listener received new mode %v, want %v", mode, connectivity.ServingModeServing)
363 }
364 }
365
366
367 waitForSuccessfulRPC(ctx, t, cc1)
368 waitForSuccessfulRPC(ctx, t, cc2)
369 }
370
371 func waitForSuccessfulRPC(ctx context.Context, t *testing.T, cc *grpc.ClientConn) {
372 t.Helper()
373
374 c := testgrpc.NewTestServiceClient(cc)
375 if _, err := c.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
376 t.Fatalf("rpc EmptyCall() failed: %v", err)
377 }
378 }
379
380 func waitForFailedRPC(ctx context.Context, t *testing.T, cc *grpc.ClientConn) {
381 t.Helper()
382
383
384 c := testgrpc.NewTestServiceClient(cc)
385 if _, err := c.EmptyCall(ctx, &testpb.Empty{}); err != nil {
386 return
387 }
388
389 ticker := time.NewTicker(10 * time.Millisecond)
390 defer ticker.Stop()
391 for {
392 select {
393 case <-ctx.Done():
394 t.Fatalf("failure when waiting for RPCs to fail: %v", ctx.Err())
395 case <-ticker.C:
396 if _, err := c.EmptyCall(ctx, &testpb.Empty{}); err != nil {
397 return
398 }
399 }
400 }
401 }
402
View as plain text