...
1 package agent_test
2
3 import (
4 "context"
5 "errors"
6 "fmt"
7 "io"
8 "net"
9 "testing"
10
11 "google.golang.org/grpc"
12 "google.golang.org/grpc/credentials/insecure"
13
14 "github.com/datawire/ambassador/v2/pkg/agent"
15 envoyMetrics "github.com/datawire/ambassador/v2/pkg/api/envoy/service/metrics/v3"
16 "github.com/datawire/dlib/dgroup"
17 "github.com/datawire/dlib/dlog"
18 )
19
20
21
22
23
24
25
26
27
28 func TestMetricsContext(t *testing.T) {
29 grp := dgroup.NewGroup(dlog.NewTestContext(t, true), dgroup.GroupConfig{
30 EnableWithSoftness: true,
31 ShutdownOnNonError: true,
32 })
33
34 listener, err := net.Listen("tcp", ":0")
35 if err != nil {
36 t.Fatal(err)
37 }
38
39 grp.Go("server", func(ctx context.Context) error {
40 type testCtxKey struct{}
41 ctx = context.WithValue(ctx, testCtxKey{}, "sentinel")
42 srv := agent.NewMetricsServer(func(ctx context.Context, _ *envoyMetrics.StreamMetricsMessage) {
43 if val, _ := ctx.Value(testCtxKey{}).(string); val != "sentinel" {
44 t.Error("context did not get passed through")
45 } else {
46 t.Log("SUCCESS!!")
47 }
48 })
49 return srv.Serve(ctx, listener)
50 })
51 grp.Go("client", func(ctx context.Context) error {
52 grpcClient, err := grpc.DialContext(ctx, listener.Addr().String(),
53 grpc.WithTransportCredentials(insecure.NewCredentials()))
54 if err != nil {
55 return fmt.Errorf("grpc.DialContext: %w", err)
56 }
57 metricsClient := envoyMetrics.NewMetricsServiceClient(grpcClient)
58 stream, err := metricsClient.StreamMetrics(ctx)
59 if err != nil {
60 return fmt.Errorf("metricsClient.StreamMetrics: %w", err)
61 }
62 if err := stream.Send(&envoyMetrics.StreamMetricsMessage{}); err != nil {
63 return fmt.Errorf("stream.Send: %w", err)
64 }
65 if _, err := stream.CloseAndRecv(); err != nil && !errors.Is(err, io.EOF) {
66 return fmt.Errorf("stream.CloseAndRecv: %w", err)
67 }
68 if err := grpcClient.Close(); err != nil {
69 return err
70 }
71 return nil
72 })
73 if err := grp.Wait(); err != nil {
74 t.Error(err)
75 }
76 }
77
View as plain text