...

Source file src/google.golang.org/grpc/test/xds/xds_telemetry_labels_test.go

Documentation: google.golang.org/grpc/test/xds

     1  /*
     2   *
     3   * Copyright 2024 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  package xds_test
    19  
    20  import (
    21  	"context"
    22  	"fmt"
    23  	"testing"
    24  
    25  	"google.golang.org/grpc"
    26  	"google.golang.org/grpc/credentials/insecure"
    27  	istats "google.golang.org/grpc/internal/stats"
    28  	"google.golang.org/grpc/internal/stubserver"
    29  	"google.golang.org/grpc/internal/testutils"
    30  	"google.golang.org/grpc/internal/testutils/xds/e2e"
    31  	"google.golang.org/grpc/stats"
    32  
    33  	v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
    34  	testgrpc "google.golang.org/grpc/interop/grpc_testing"
    35  	testpb "google.golang.org/grpc/interop/grpc_testing"
    36  	"google.golang.org/protobuf/types/known/structpb"
    37  )
    38  
    39  const serviceNameKey = "service_name"
    40  const serviceNamespaceKey = "service_namespace"
    41  const serviceNameValue = "grpc-service"
    42  const serviceNamespaceValue = "grpc-service-namespace"
    43  
    44  // TestTelemetryLabels tests that telemetry labels from CDS make their way to
    45  // the stats handler. The stats handler sets the mutable context value that the
    46  // cluster impl picker will write telemetry labels to, and then the stats
    47  // handler asserts that subsequent HandleRPC calls from the RPC lifecycle
    48  // contain telemetry labels that it can see.
    49  func (s) TestTelemetryLabels(t *testing.T) {
    50  	managementServer, nodeID, _, resolver, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
    51  	defer cleanup()
    52  
    53  	server := stubserver.StartTestService(t, nil)
    54  	defer server.Stop()
    55  
    56  	const xdsServiceName = "my-service-client-side-xds"
    57  	resources := e2e.DefaultClientResources(e2e.ResourceParams{
    58  		DialTarget: xdsServiceName,
    59  		NodeID:     nodeID,
    60  		Host:       "localhost",
    61  		Port:       testutils.ParsePort(t, server.Address),
    62  		SecLevel:   e2e.SecurityLevelNone,
    63  	})
    64  
    65  	resources.Clusters[0].Metadata = &v3corepb.Metadata{
    66  		FilterMetadata: map[string]*structpb.Struct{
    67  			"com.google.csm.telemetry_labels": {
    68  				Fields: map[string]*structpb.Value{
    69  					serviceNameKey:      structpb.NewStringValue(serviceNameValue),
    70  					serviceNamespaceKey: structpb.NewStringValue(serviceNamespaceValue),
    71  				},
    72  			},
    73  		},
    74  	}
    75  
    76  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
    77  	defer cancel()
    78  	if err := managementServer.Update(ctx, resources); err != nil {
    79  		t.Fatal(err)
    80  	}
    81  
    82  	fsh := &fakeStatsHandler{
    83  		t: t,
    84  	}
    85  
    86  	cc, err := grpc.NewClient(fmt.Sprintf("xds:///%s", xdsServiceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolver), grpc.WithStatsHandler(fsh))
    87  	if err != nil {
    88  		t.Fatalf("failed to create a new client to local test server: %v", err)
    89  	}
    90  	defer cc.Close()
    91  
    92  	client := testgrpc.NewTestServiceClient(cc)
    93  	if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
    94  		t.Fatalf("rpc EmptyCall() failed: %v", err)
    95  	}
    96  }
    97  
    98  type fakeStatsHandler struct {
    99  	labels *istats.Labels
   100  
   101  	t *testing.T
   102  }
   103  
   104  func (fsh *fakeStatsHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context {
   105  	return ctx
   106  }
   107  
   108  func (fsh *fakeStatsHandler) HandleConn(context.Context, stats.ConnStats) {}
   109  
   110  func (fsh *fakeStatsHandler) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) context.Context {
   111  	labels := &istats.Labels{
   112  		TelemetryLabels: make(map[string]string),
   113  	}
   114  	fsh.labels = labels
   115  	ctx = istats.SetLabels(ctx, labels) // ctx passed is immutable, however cluster_impl writes to the map of Telemetry Labels on the heap.
   116  	return ctx
   117  }
   118  
   119  func (fsh *fakeStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
   120  	switch rs.(type) {
   121  	// stats.Begin won't get Telemetry Labels because happens after picker
   122  	// picks.
   123  
   124  	// These three stats callouts trigger all metrics for OpenTelemetry that
   125  	// aren't started. All of these should have access to the desired telemetry
   126  	// labels.
   127  	case *stats.OutPayload, *stats.InPayload, *stats.End:
   128  		if label, ok := fsh.labels.TelemetryLabels[serviceNameKey]; !ok || label != serviceNameValue {
   129  			fsh.t.Fatalf("for telemetry label %v, want: %v, got: %v", serviceNameKey, serviceNameValue, label)
   130  		}
   131  		if label, ok := fsh.labels.TelemetryLabels[serviceNamespaceKey]; !ok || label != serviceNamespaceValue {
   132  			fsh.t.Fatalf("for telemetry label %v, want: %v, got: %v", serviceNamespaceKey, serviceNamespaceValue, label)
   133  		}
   134  
   135  	default:
   136  		// Nothing to assert for the other stats.Handler callouts.
   137  	}
   138  
   139  }
   140  

View as plain text