1
18 package xds_test
19
20 import (
21 "context"
22 "fmt"
23 "testing"
24
25 "google.golang.org/grpc"
26 "google.golang.org/grpc/credentials/insecure"
27 istats "google.golang.org/grpc/internal/stats"
28 "google.golang.org/grpc/internal/stubserver"
29 "google.golang.org/grpc/internal/testutils"
30 "google.golang.org/grpc/internal/testutils/xds/e2e"
31 "google.golang.org/grpc/stats"
32
33 v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
34 testgrpc "google.golang.org/grpc/interop/grpc_testing"
35 testpb "google.golang.org/grpc/interop/grpc_testing"
36 "google.golang.org/protobuf/types/known/structpb"
37 )
38
39 const serviceNameKey = "service_name"
40 const serviceNamespaceKey = "service_namespace"
41 const serviceNameValue = "grpc-service"
42 const serviceNamespaceValue = "grpc-service-namespace"
43
44
45
46
47
48
49 func (s) TestTelemetryLabels(t *testing.T) {
50 managementServer, nodeID, _, resolver, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
51 defer cleanup()
52
53 server := stubserver.StartTestService(t, nil)
54 defer server.Stop()
55
56 const xdsServiceName = "my-service-client-side-xds"
57 resources := e2e.DefaultClientResources(e2e.ResourceParams{
58 DialTarget: xdsServiceName,
59 NodeID: nodeID,
60 Host: "localhost",
61 Port: testutils.ParsePort(t, server.Address),
62 SecLevel: e2e.SecurityLevelNone,
63 })
64
65 resources.Clusters[0].Metadata = &v3corepb.Metadata{
66 FilterMetadata: map[string]*structpb.Struct{
67 "com.google.csm.telemetry_labels": {
68 Fields: map[string]*structpb.Value{
69 serviceNameKey: structpb.NewStringValue(serviceNameValue),
70 serviceNamespaceKey: structpb.NewStringValue(serviceNamespaceValue),
71 },
72 },
73 },
74 }
75
76 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
77 defer cancel()
78 if err := managementServer.Update(ctx, resources); err != nil {
79 t.Fatal(err)
80 }
81
82 fsh := &fakeStatsHandler{
83 t: t,
84 }
85
86 cc, err := grpc.NewClient(fmt.Sprintf("xds:///%s", xdsServiceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolver), grpc.WithStatsHandler(fsh))
87 if err != nil {
88 t.Fatalf("failed to create a new client to local test server: %v", err)
89 }
90 defer cc.Close()
91
92 client := testgrpc.NewTestServiceClient(cc)
93 if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
94 t.Fatalf("rpc EmptyCall() failed: %v", err)
95 }
96 }
97
98 type fakeStatsHandler struct {
99 labels *istats.Labels
100
101 t *testing.T
102 }
103
104 func (fsh *fakeStatsHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context {
105 return ctx
106 }
107
108 func (fsh *fakeStatsHandler) HandleConn(context.Context, stats.ConnStats) {}
109
110 func (fsh *fakeStatsHandler) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) context.Context {
111 labels := &istats.Labels{
112 TelemetryLabels: make(map[string]string),
113 }
114 fsh.labels = labels
115 ctx = istats.SetLabels(ctx, labels)
116 return ctx
117 }
118
119 func (fsh *fakeStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
120 switch rs.(type) {
121
122
123
124
125
126
127 case *stats.OutPayload, *stats.InPayload, *stats.End:
128 if label, ok := fsh.labels.TelemetryLabels[serviceNameKey]; !ok || label != serviceNameValue {
129 fsh.t.Fatalf("for telemetry label %v, want: %v, got: %v", serviceNameKey, serviceNameValue, label)
130 }
131 if label, ok := fsh.labels.TelemetryLabels[serviceNamespaceKey]; !ok || label != serviceNamespaceValue {
132 fsh.t.Fatalf("for telemetry label %v, want: %v, got: %v", serviceNamespaceKey, serviceNamespaceValue, label)
133 }
134
135 default:
136
137 }
138
139 }
140
View as plain text