1
18
19 package csds_test
20
21 import (
22 "context"
23 "fmt"
24 "io"
25 "sort"
26 "strings"
27 "testing"
28 "time"
29
30 "github.com/google/go-cmp/cmp"
31 "github.com/google/uuid"
32 "google.golang.org/grpc"
33 "google.golang.org/grpc/credentials/insecure"
34 "google.golang.org/grpc/internal/grpctest"
35 "google.golang.org/grpc/internal/testutils"
36 "google.golang.org/grpc/internal/testutils/xds/bootstrap"
37 "google.golang.org/grpc/internal/testutils/xds/e2e"
38 "google.golang.org/grpc/xds/csds"
39 "google.golang.org/grpc/xds/internal/xdsclient"
40 "google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
41 "google.golang.org/protobuf/encoding/prototext"
42 "google.golang.org/protobuf/testing/protocmp"
43 "google.golang.org/protobuf/types/known/anypb"
44
45 v3adminpb "github.com/envoyproxy/go-control-plane/envoy/admin/v3"
46 v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
47 v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
48 v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
49 v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
50 v3statuspb "github.com/envoyproxy/go-control-plane/envoy/service/status/v3"
51 v3statuspbgrpc "github.com/envoyproxy/go-control-plane/envoy/service/status/v3"
52
53 _ "google.golang.org/grpc/xds/internal/httpfilter/router"
54 )
55
56 const defaultTestTimeout = 5 * time.Second
57
58 var cmpOpts = cmp.Options{
59 cmp.Transformer("sort", func(in []*v3statuspb.ClientConfig_GenericXdsConfig) []*v3statuspb.ClientConfig_GenericXdsConfig {
60 out := append([]*v3statuspb.ClientConfig_GenericXdsConfig(nil), in...)
61 sort.Slice(out, func(i, j int) bool {
62 a, b := out[i], out[j]
63 if a == nil {
64 return true
65 }
66 if b == nil {
67 return false
68 }
69 if strings.Compare(a.TypeUrl, b.TypeUrl) == 0 {
70 return strings.Compare(a.Name, b.Name) < 0
71 }
72 return strings.Compare(a.TypeUrl, b.TypeUrl) < 0
73 })
74 return out
75 }),
76 protocmp.Transform(),
77 protocmp.IgnoreFields((*v3statuspb.ClientConfig_GenericXdsConfig)(nil), "last_updated"),
78 protocmp.IgnoreFields((*v3adminpb.UpdateFailureState)(nil), "last_update_attempt", "details"),
79 }
80
81 type s struct {
82 grpctest.Tester
83 }
84
85 func Test(t *testing.T) {
86 grpctest.RunSubTests(t, s{})
87 }
88
89
90
91
92
93 type unimplementedListenerWatcher struct{}
94
95 func (unimplementedListenerWatcher) OnUpdate(*xdsresource.ListenerResourceData) {}
96 func (unimplementedListenerWatcher) OnError(error) {}
97 func (unimplementedListenerWatcher) OnResourceDoesNotExist() {}
98
99 type unimplementedRouteConfigWatcher struct{}
100
101 func (unimplementedRouteConfigWatcher) OnUpdate(*xdsresource.RouteConfigResourceData) {}
102 func (unimplementedRouteConfigWatcher) OnError(error) {}
103 func (unimplementedRouteConfigWatcher) OnResourceDoesNotExist() {}
104
105 type unimplementedClusterWatcher struct{}
106
107 func (unimplementedClusterWatcher) OnUpdate(*xdsresource.ClusterResourceData) {}
108 func (unimplementedClusterWatcher) OnError(error) {}
109 func (unimplementedClusterWatcher) OnResourceDoesNotExist() {}
110
111 type unimplementedEndpointsWatcher struct{}
112
113 func (unimplementedEndpointsWatcher) OnUpdate(*xdsresource.EndpointsResourceData) {}
114 func (unimplementedEndpointsWatcher) OnError(error) {}
115 func (unimplementedEndpointsWatcher) OnResourceDoesNotExist() {}
116
117 func (s) TestCSDS(t *testing.T) {
118
119 nodeID := uuid.New().String()
120 mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{})
121 if err != nil {
122 t.Fatal(err)
123 }
124 defer mgmtServer.Stop()
125
126
127 bootstrapCleanup, err := bootstrap.CreateFile(bootstrap.Options{
128 NodeID: nodeID,
129 ServerURI: mgmtServer.Address,
130 })
131 if err != nil {
132 t.Fatal(err)
133 }
134 defer bootstrapCleanup()
135
136
137
138 xdsC, close, err := xdsclient.New()
139 if err != nil {
140 t.Fatalf("Failed to create xDS client: %v", err)
141 }
142 defer close()
143
144
145 server := grpc.NewServer()
146 csdss, err := csds.NewClientStatusDiscoveryServer()
147 if err != nil {
148 t.Fatal(err)
149 }
150 v3statuspbgrpc.RegisterClientStatusDiscoveryServiceServer(server, csdss)
151 defer func() {
152 server.Stop()
153 csdss.Close()
154 }()
155
156
157 lis, err := testutils.LocalTCPListener()
158 if err != nil {
159 t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
160 }
161 go func() {
162 if err := server.Serve(lis); err != nil {
163 t.Errorf("Serve() failed: %v", err)
164 }
165 }()
166
167
168 conn, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
169 if err != nil {
170 t.Fatalf("Failed to dial CSDS server %q: %v", lis.Addr().String(), err)
171 }
172 c := v3statuspbgrpc.NewClientStatusDiscoveryServiceClient(conn)
173 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
174 defer cancel()
175 stream, err := c.StreamClientStatus(ctx, grpc.WaitForReady(true))
176 if err != nil {
177 t.Fatalf("Failed to create a stream for CSDS: %v", err)
178 }
179 defer conn.Close()
180
181
182 if err := checkClientStatusResponse(stream, nil); err != nil {
183 t.Fatal(err)
184 }
185
186
187 ldsTargets := []string{"lds.target.good:0000", "lds.target.good:1111"}
188 rdsTargets := []string{"route-config-0", "route-config-1"}
189 cdsTargets := []string{"cluster-0", "cluster-1"}
190 edsTargets := []string{"endpoints-0", "endpoints-1"}
191 listeners := make([]*v3listenerpb.Listener, len(ldsTargets))
192 listenerAnys := make([]*anypb.Any, len(ldsTargets))
193 for i := range ldsTargets {
194 listeners[i] = e2e.DefaultClientListener(ldsTargets[i], rdsTargets[i])
195 listenerAnys[i] = testutils.MarshalAny(t, listeners[i])
196 }
197 routes := make([]*v3routepb.RouteConfiguration, len(rdsTargets))
198 routeAnys := make([]*anypb.Any, len(rdsTargets))
199 for i := range rdsTargets {
200 routes[i] = e2e.DefaultRouteConfig(rdsTargets[i], ldsTargets[i], cdsTargets[i])
201 routeAnys[i] = testutils.MarshalAny(t, routes[i])
202 }
203 clusters := make([]*v3clusterpb.Cluster, len(cdsTargets))
204 clusterAnys := make([]*anypb.Any, len(cdsTargets))
205 for i := range cdsTargets {
206 clusters[i] = e2e.DefaultCluster(cdsTargets[i], edsTargets[i], e2e.SecurityLevelNone)
207 clusterAnys[i] = testutils.MarshalAny(t, clusters[i])
208 }
209 endpoints := make([]*v3endpointpb.ClusterLoadAssignment, len(edsTargets))
210 endpointAnys := make([]*anypb.Any, len(edsTargets))
211 ips := []string{"0.0.0.0", "1.1.1.1"}
212 ports := []uint32{123, 456}
213 for i := range edsTargets {
214 endpoints[i] = e2e.DefaultEndpoint(edsTargets[i], ips[i], ports[i:i+1])
215 endpointAnys[i] = testutils.MarshalAny(t, endpoints[i])
216 }
217
218
219 for _, target := range ldsTargets {
220 xdsresource.WatchListener(xdsC, target, unimplementedListenerWatcher{})
221 }
222 for _, target := range rdsTargets {
223 xdsresource.WatchRouteConfig(xdsC, target, unimplementedRouteConfigWatcher{})
224 }
225 for _, target := range cdsTargets {
226 xdsresource.WatchCluster(xdsC, target, unimplementedClusterWatcher{})
227 }
228 for _, target := range edsTargets {
229 xdsresource.WatchEndpoints(xdsC, target, unimplementedEndpointsWatcher{})
230 }
231
232
233
234 want := []*v3statuspb.ClientConfig_GenericXdsConfig{}
235 for i := range ldsTargets {
236 want = append(want, makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTargets[i], "", v3adminpb.ClientResourceStatus_REQUESTED, nil))
237 }
238 for i := range rdsTargets {
239 want = append(want, makeGenericXdsConfig("type.googleapis.com/envoy.config.route.v3.RouteConfiguration", rdsTargets[i], "", v3adminpb.ClientResourceStatus_REQUESTED, nil))
240 }
241 for i := range cdsTargets {
242 want = append(want, makeGenericXdsConfig("type.googleapis.com/envoy.config.cluster.v3.Cluster", cdsTargets[i], "", v3adminpb.ClientResourceStatus_REQUESTED, nil))
243 }
244 for i := range edsTargets {
245 want = append(want, makeGenericXdsConfig("type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", edsTargets[i], "", v3adminpb.ClientResourceStatus_REQUESTED, nil))
246 }
247 for {
248 if err := ctx.Err(); err != nil {
249 t.Fatalf("Timeout when waiting for resources in \"Requested\" state: %v", err)
250 }
251 if err := checkClientStatusResponse(stream, want); err == nil {
252 break
253 }
254 time.Sleep(time.Millisecond * 100)
255 }
256
257
258
259 if err := mgmtServer.Update(ctx, e2e.UpdateOptions{
260 NodeID: nodeID,
261 Listeners: listeners,
262 Routes: routes,
263 Clusters: clusters,
264 Endpoints: endpoints,
265 }); err != nil {
266 t.Fatal(err)
267 }
268
269
270
271 want = nil
272 for i := range ldsTargets {
273 want = append(want, makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTargets[i], "1", v3adminpb.ClientResourceStatus_ACKED, listenerAnys[i]))
274 }
275 for i := range rdsTargets {
276 want = append(want, makeGenericXdsConfig("type.googleapis.com/envoy.config.route.v3.RouteConfiguration", rdsTargets[i], "1", v3adminpb.ClientResourceStatus_ACKED, routeAnys[i]))
277 }
278 for i := range cdsTargets {
279 want = append(want, makeGenericXdsConfig("type.googleapis.com/envoy.config.cluster.v3.Cluster", cdsTargets[i], "1", v3adminpb.ClientResourceStatus_ACKED, clusterAnys[i]))
280 }
281 for i := range edsTargets {
282 want = append(want, makeGenericXdsConfig("type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", edsTargets[i], "1", v3adminpb.ClientResourceStatus_ACKED, endpointAnys[i]))
283 }
284 for {
285 if err := ctx.Err(); err != nil {
286 t.Fatalf("Timeout when waiting for resources in \"ACKed\" state: %v", err)
287 }
288 err := checkClientStatusResponse(stream, want)
289 if err == nil {
290 break
291 }
292 time.Sleep(time.Millisecond * 100)
293 }
294
295
296
297 const nackResourceIdx = 0
298 listeners[nackResourceIdx].ApiListener = &v3listenerpb.ApiListener{}
299 routes[nackResourceIdx].VirtualHosts = []*v3routepb.VirtualHost{{Routes: []*v3routepb.Route{{}}}}
300 clusters[nackResourceIdx].ClusterDiscoveryType = &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_STATIC}
301 endpoints[nackResourceIdx].Endpoints = []*v3endpointpb.LocalityLbEndpoints{{}}
302 if err := mgmtServer.Update(ctx, e2e.UpdateOptions{
303 NodeID: nodeID,
304 Listeners: listeners,
305 Routes: routes,
306 Clusters: clusters,
307 Endpoints: endpoints,
308 SkipValidation: true,
309 }); err != nil {
310 t.Fatal(err)
311 }
312
313
314
315
316
317
318 want = nil
319 for i := range ldsTargets {
320 config := makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTargets[i], "2", v3adminpb.ClientResourceStatus_ACKED, listenerAnys[i])
321 if i == nackResourceIdx {
322 config.VersionInfo = "1"
323 config.ClientStatus = v3adminpb.ClientResourceStatus_NACKED
324 config.ErrorState = &v3adminpb.UpdateFailureState{VersionInfo: "2"}
325 }
326 want = append(want, config)
327 }
328 for i := range rdsTargets {
329 config := makeGenericXdsConfig("type.googleapis.com/envoy.config.route.v3.RouteConfiguration", rdsTargets[i], "2", v3adminpb.ClientResourceStatus_ACKED, routeAnys[i])
330 if i == nackResourceIdx {
331 config.VersionInfo = "1"
332 config.ClientStatus = v3adminpb.ClientResourceStatus_NACKED
333 config.ErrorState = &v3adminpb.UpdateFailureState{VersionInfo: "2"}
334 }
335 want = append(want, config)
336 }
337 for i := range cdsTargets {
338 config := makeGenericXdsConfig("type.googleapis.com/envoy.config.cluster.v3.Cluster", cdsTargets[i], "2", v3adminpb.ClientResourceStatus_ACKED, clusterAnys[i])
339 if i == nackResourceIdx {
340 config.VersionInfo = "1"
341 config.ClientStatus = v3adminpb.ClientResourceStatus_NACKED
342 config.ErrorState = &v3adminpb.UpdateFailureState{VersionInfo: "2"}
343 }
344 want = append(want, config)
345 }
346 for i := range edsTargets {
347 config := makeGenericXdsConfig("type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", edsTargets[i], "2", v3adminpb.ClientResourceStatus_ACKED, endpointAnys[i])
348 if i == nackResourceIdx {
349 config.VersionInfo = "1"
350 config.ClientStatus = v3adminpb.ClientResourceStatus_NACKED
351 config.ErrorState = &v3adminpb.UpdateFailureState{VersionInfo: "2"}
352 }
353 want = append(want, config)
354 }
355 for {
356 if err := ctx.Err(); err != nil {
357 t.Fatalf("Timeout when waiting for resources in \"NACKed\" state: %v", err)
358 }
359 err := checkClientStatusResponse(stream, want)
360 if err == nil {
361 break
362 }
363 time.Sleep(time.Millisecond * 100)
364 }
365 }
366
367 func makeGenericXdsConfig(typeURL, name, version string, status v3adminpb.ClientResourceStatus, config *anypb.Any) *v3statuspb.ClientConfig_GenericXdsConfig {
368 return &v3statuspb.ClientConfig_GenericXdsConfig{
369 TypeUrl: typeURL,
370 Name: name,
371 VersionInfo: version,
372 ClientStatus: status,
373 XdsConfig: config,
374 }
375 }
376
377 func checkClientStatusResponse(stream v3statuspbgrpc.ClientStatusDiscoveryService_StreamClientStatusClient, want []*v3statuspb.ClientConfig_GenericXdsConfig) error {
378 if err := stream.Send(&v3statuspb.ClientStatusRequest{Node: nil}); err != nil {
379 if err != io.EOF {
380 return fmt.Errorf("failed to send ClientStatusRequest: %v", err)
381 }
382
383
384 for {
385 if _, err := stream.Recv(); err != nil {
386 return fmt.Errorf("failed to recv ClientStatusResponse: %v", err)
387 }
388 }
389 }
390 resp, err := stream.Recv()
391 if err != nil {
392 return fmt.Errorf("failed to recv ClientStatusResponse: %v", err)
393 }
394
395 if n := len(resp.Config); n != 1 {
396 return fmt.Errorf("got %d configs, want 1: %v", n, prototext.Format(resp))
397 }
398
399 if diff := cmp.Diff(resp.Config[0].GenericXdsConfigs, want, cmpOpts); diff != "" {
400 return fmt.Errorf(diff)
401 }
402 return nil
403 }
404
405 func (s) TestCSDSNoXDSClient(t *testing.T) {
406
407
408
409 bootstrapCleanup, err := bootstrap.CreateFile(bootstrap.Options{})
410 if err != nil {
411 t.Fatal(err)
412 }
413 t.Cleanup(func() { bootstrapCleanup() })
414
415
416 server := grpc.NewServer()
417 csdss, err := csds.NewClientStatusDiscoveryServer()
418 if err != nil {
419 t.Fatal(err)
420 }
421 defer csdss.Close()
422 v3statuspbgrpc.RegisterClientStatusDiscoveryServiceServer(server, csdss)
423
424
425 lis, err := testutils.LocalTCPListener()
426 if err != nil {
427 t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
428 }
429 go func() {
430 if err := server.Serve(lis); err != nil {
431 t.Errorf("Serve() failed: %v", err)
432 }
433 }()
434 defer server.Stop()
435
436
437 conn, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
438 if err != nil {
439 t.Fatalf("Failed to dial CSDS server %q: %v", lis.Addr().String(), err)
440 }
441 defer conn.Close()
442 c := v3statuspbgrpc.NewClientStatusDiscoveryServiceClient(conn)
443 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
444 defer cancel()
445 stream, err := c.StreamClientStatus(ctx, grpc.WaitForReady(true))
446 if err != nil {
447 t.Fatalf("Failed to create a stream for CSDS: %v", err)
448 }
449
450 if err := stream.Send(&v3statuspb.ClientStatusRequest{Node: nil}); err != nil {
451 t.Fatalf("Failed to send ClientStatusRequest: %v", err)
452 }
453 r, err := stream.Recv()
454 if err != nil {
455
456 t.Fatalf("Failed to recv ClientStatusResponse: %v", err)
457 }
458 if n := len(r.Config); n != 0 {
459 t.Fatalf("got %d configs, want 0: %v", n, prototext.Format(r))
460 }
461 }
462
View as plain text