1
18
19 package orca_test
20
21 import (
22 "context"
23 "fmt"
24 "sync"
25 "testing"
26 "time"
27
28 "github.com/google/go-cmp/cmp"
29 "google.golang.org/grpc"
30 "google.golang.org/grpc/credentials/insecure"
31 "google.golang.org/grpc/internal/pretty"
32 "google.golang.org/grpc/internal/testutils"
33 "google.golang.org/grpc/orca"
34 "google.golang.org/grpc/orca/internal"
35 "google.golang.org/protobuf/proto"
36 "google.golang.org/protobuf/types/known/durationpb"
37
38 v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
39 v3orcaservicegrpc "github.com/cncf/xds/go/xds/service/orca/v3"
40 v3orcaservicepb "github.com/cncf/xds/go/xds/service/orca/v3"
41 testgrpc "google.golang.org/grpc/interop/grpc_testing"
42 testpb "google.golang.org/grpc/interop/grpc_testing"
43 )
44
45 const requestsMetricKey = "test-service-requests"
46
47
48
49
50 type testServiceImpl struct {
51 mu sync.Mutex
52 requests int64
53
54 testgrpc.TestServiceServer
55 smr orca.ServerMetricsRecorder
56 }
57
58 func (t *testServiceImpl) UnaryCall(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
59 t.mu.Lock()
60 t.requests++
61 t.mu.Unlock()
62
63 t.smr.SetNamedUtilization(requestsMetricKey, float64(t.requests)*0.01)
64 t.smr.SetCPUUtilization(50.0)
65 t.smr.SetMemoryUtilization(0.9)
66 t.smr.SetApplicationUtilization(1.2)
67 return &testpb.SimpleResponse{}, nil
68 }
69
70 func (t *testServiceImpl) EmptyCall(context.Context, *testpb.Empty) (*testpb.Empty, error) {
71 t.smr.DeleteNamedUtilization(requestsMetricKey)
72 t.smr.SetCPUUtilization(0)
73 t.smr.SetMemoryUtilization(0)
74 t.smr.DeleteApplicationUtilization()
75 return &testpb.Empty{}, nil
76 }
77
78
79
80
81
82
83
84 func (s) TestE2E_CustomBackendMetrics_OutOfBand(t *testing.T) {
85 lis, err := testutils.LocalTCPListener()
86 if err != nil {
87 t.Fatal(err)
88 }
89
90
91 const shortReportingInterval = 10 * time.Millisecond
92 smr := orca.NewServerMetricsRecorder()
93 opts := orca.ServiceOptions{MinReportingInterval: shortReportingInterval, ServerMetricsProvider: smr}
94 internal.AllowAnyMinReportingInterval.(func(*orca.ServiceOptions))(&opts)
95
96
97 s := grpc.NewServer()
98 if err := orca.Register(s, opts); err != nil {
99 t.Fatalf("orca.EnableOutOfBandMetricsReportingForTesting() failed: %v", err)
100 }
101
102
103 testgrpc.RegisterTestServiceServer(s, &testServiceImpl{smr: smr})
104 go s.Serve(lis)
105 defer s.Stop()
106 t.Logf("Started gRPC server at %s...", lis.Addr().String())
107
108
109 cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
110 if err != nil {
111 t.Fatalf("grpc.NewClient(%s) failed: %v", lis.Addr().String(), err)
112 }
113 defer cc.Close()
114
115
116
117
118 const numRequests = 20
119 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
120 defer cancel()
121 testStub := testgrpc.NewTestServiceClient(cc)
122 errCh := make(chan error, 1)
123 go func() {
124 for i := 0; i < numRequests; i++ {
125 if _, err := testStub.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
126 errCh <- fmt.Errorf("UnaryCall failed: %v", err)
127 return
128 }
129 time.Sleep(time.Millisecond)
130 }
131 errCh <- nil
132 }()
133
134
135 oobStub := v3orcaservicegrpc.NewOpenRcaServiceClient(cc)
136 stream, err := oobStub.StreamCoreMetrics(ctx, &v3orcaservicepb.OrcaLoadReportRequest{ReportInterval: durationpb.New(shortReportingInterval)})
137 if err != nil {
138 t.Fatalf("Failed to create a stream for out-of-band metrics")
139 }
140
141
142
143 for {
144 select {
145 case <-ctx.Done():
146 t.Fatal("Timeout when waiting for out-of-band custom backend metrics to match expected values")
147 case err := <-errCh:
148 if err != nil {
149 t.Fatal(err)
150 }
151 default:
152 }
153
154 wantProto := &v3orcapb.OrcaLoadReport{
155 CpuUtilization: 50.0,
156 MemUtilization: 0.9,
157 ApplicationUtilization: 1.2,
158 Utilization: map[string]float64{requestsMetricKey: numRequests * 0.01},
159 }
160 gotProto, err := stream.Recv()
161 if err != nil {
162 t.Fatalf("Recv() failed: %v", err)
163 }
164 if !cmp.Equal(gotProto, wantProto, cmp.Comparer(proto.Equal)) {
165 t.Logf("Received load report from stream: %s, want: %s", pretty.ToJSON(gotProto), pretty.ToJSON(wantProto))
166 continue
167 }
168
169 break
170 }
171
172
173 if _, err := testStub.EmptyCall(ctx, &testpb.Empty{}); err != nil {
174 t.Fatalf("EmptyCall failed: %v", err)
175 }
176
177
178 for {
179 select {
180 case <-ctx.Done():
181 t.Fatal("Timeout when waiting for out-of-band custom backend metrics to match expected values")
182 default:
183 }
184
185 wantProto := &v3orcapb.OrcaLoadReport{}
186 gotProto, err := stream.Recv()
187 if err != nil {
188 t.Fatalf("Recv() failed: %v", err)
189 }
190 if !cmp.Equal(gotProto, wantProto, cmp.Comparer(proto.Equal)) {
191 t.Logf("Received load report from stream: %s, want: %s", pretty.ToJSON(gotProto), pretty.ToJSON(wantProto))
192 continue
193 }
194
195 break
196 }
197 }
198
View as plain text