1
18
19
22 package benchmark
23
24 import (
25 "context"
26 "fmt"
27 "io"
28 "log"
29 "math/rand"
30 "net"
31 "strconv"
32 "time"
33
34 "google.golang.org/grpc"
35 "google.golang.org/grpc/codes"
36 "google.golang.org/grpc/grpclog"
37 "google.golang.org/grpc/metadata"
38 "google.golang.org/grpc/status"
39
40 testgrpc "google.golang.org/grpc/interop/grpc_testing"
41 testpb "google.golang.org/grpc/interop/grpc_testing"
42 )
43
44 var logger = grpclog.Component("benchmark")
45
46
47 func setPayload(p *testpb.Payload, t testpb.PayloadType, size int) {
48 if size < 0 {
49 logger.Fatalf("Requested a response with invalid length %d", size)
50 }
51 body := make([]byte, size)
52 switch t {
53 case testpb.PayloadType_COMPRESSABLE:
54 default:
55 logger.Fatalf("Unsupported payload type: %d", t)
56 }
57 p.Type = t
58 p.Body = body
59 }
60
61
62 func NewPayload(t testpb.PayloadType, size int) *testpb.Payload {
63 p := new(testpb.Payload)
64 setPayload(p, t, size)
65 return p
66 }
67
68 type testServer struct {
69 testgrpc.UnimplementedBenchmarkServiceServer
70 }
71
72 func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
73 return &testpb.SimpleResponse{
74 Payload: NewPayload(in.ResponseType, int(in.ResponseSize)),
75 }, nil
76 }
77
78
79
80
81 const UnconstrainedStreamingHeader = "unconstrained-streaming"
82
83
84
85 const UnconstrainedStreamingDelayHeader = "unconstrained-streaming-delay"
86
87
88
89
90 const PreloadMsgSizeHeader = "preload-msg-size"
91
92 func (s *testServer) StreamingCall(stream testgrpc.BenchmarkService_StreamingCallServer) error {
93 preloadMsgSize := 0
94 if md, ok := metadata.FromIncomingContext(stream.Context()); ok && len(md[PreloadMsgSizeHeader]) != 0 {
95 val := md[PreloadMsgSizeHeader][0]
96 var err error
97 preloadMsgSize, err = strconv.Atoi(val)
98 if err != nil {
99 return fmt.Errorf("%q header value is not an integer: %s", PreloadMsgSizeHeader, err)
100 }
101 }
102
103 if md, ok := metadata.FromIncomingContext(stream.Context()); ok && len(md[UnconstrainedStreamingHeader]) != 0 {
104 return s.UnconstrainedStreamingCall(stream, preloadMsgSize)
105 }
106 response := &testpb.SimpleResponse{
107 Payload: new(testpb.Payload),
108 }
109 preloadedResponse := &grpc.PreparedMsg{}
110 if preloadMsgSize > 0 {
111 setPayload(response.Payload, testpb.PayloadType_COMPRESSABLE, preloadMsgSize)
112 if err := preloadedResponse.Encode(stream, response); err != nil {
113 return err
114 }
115 }
116 in := new(testpb.SimpleRequest)
117 for {
118
119 err := stream.(grpc.ServerStream).RecvMsg(in)
120 if err == io.EOF {
121
122 return nil
123 }
124 if err != nil {
125 return err
126 }
127 if preloadMsgSize > 0 {
128 err = stream.SendMsg(preloadedResponse)
129 } else {
130 setPayload(response.Payload, in.ResponseType, int(in.ResponseSize))
131 err = stream.Send(response)
132 }
133 if err != nil {
134 return err
135 }
136 }
137 }
138
139 func (s *testServer) UnconstrainedStreamingCall(stream testgrpc.BenchmarkService_StreamingCallServer, preloadMsgSize int) error {
140 maxSleep := 0
141 if md, ok := metadata.FromIncomingContext(stream.Context()); ok && len(md[UnconstrainedStreamingDelayHeader]) != 0 {
142 val := md[UnconstrainedStreamingDelayHeader][0]
143 d, err := time.ParseDuration(val)
144 if err != nil {
145 return fmt.Errorf("can't parse %q header: %s", UnconstrainedStreamingDelayHeader, err)
146 }
147 maxSleep = int(d)
148 }
149
150 in := new(testpb.SimpleRequest)
151
152 err := stream.RecvMsg(in)
153 if err == io.EOF {
154
155 return nil
156 }
157 if err != nil {
158 return err
159 }
160
161 response := &testpb.SimpleResponse{
162 Payload: new(testpb.Payload),
163 }
164 setPayload(response.Payload, in.ResponseType, int(in.ResponseSize))
165
166 preloadedResponse := &grpc.PreparedMsg{}
167 if preloadMsgSize > 0 {
168 if err := preloadedResponse.Encode(stream, response); err != nil {
169 return err
170 }
171 }
172
173 go func() {
174 for {
175
176 err := stream.RecvMsg(in)
177 switch status.Code(err) {
178 case codes.Canceled:
179 return
180 case codes.OK:
181 default:
182 log.Fatalf("server recv error: %v", err)
183 }
184 }
185 }()
186
187 go func() {
188 for {
189 if maxSleep > 0 {
190 time.Sleep(time.Duration(rand.Intn(maxSleep)))
191 }
192 var err error
193 if preloadMsgSize > 0 {
194 err = stream.SendMsg(preloadedResponse)
195 } else {
196 err = stream.Send(response)
197 }
198 switch status.Code(err) {
199 case codes.Unavailable, codes.Canceled:
200 return
201 case codes.OK:
202 default:
203 log.Fatalf("server send error: %v", err)
204 }
205 }
206 }()
207
208 <-stream.Context().Done()
209 return stream.Context().Err()
210 }
211
212
213
214 type byteBufServer struct {
215 testgrpc.UnimplementedBenchmarkServiceServer
216 respSize int32
217 }
218
219
220
221 func (s *byteBufServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
222 return &testpb.SimpleResponse{}, nil
223 }
224
225 func (s *byteBufServer) StreamingCall(stream testgrpc.BenchmarkService_StreamingCallServer) error {
226 for {
227 var in []byte
228 err := stream.(grpc.ServerStream).RecvMsg(&in)
229 if err == io.EOF {
230 return nil
231 }
232 if err != nil {
233 return err
234 }
235 out := make([]byte, s.respSize)
236 if err := stream.(grpc.ServerStream).SendMsg(&out); err != nil {
237 return err
238 }
239 }
240 }
241
242
243 type ServerInfo struct {
244
245
246 Type string
247
248
249
250
251 Metadata any
252
253
254 Listener net.Listener
255 }
256
257
258
259 func StartServer(info ServerInfo, opts ...grpc.ServerOption) func() {
260 s := grpc.NewServer(opts...)
261 switch info.Type {
262 case "protobuf":
263 testgrpc.RegisterBenchmarkServiceServer(s, &testServer{})
264 case "bytebuf":
265 respSize, ok := info.Metadata.(int32)
266 if !ok {
267 logger.Fatalf("failed to StartServer, invalid metadata: %v, for Type: %v", info.Metadata, info.Type)
268 }
269 testgrpc.RegisterBenchmarkServiceServer(s, &byteBufServer{respSize: respSize})
270 default:
271 logger.Fatalf("failed to StartServer, unknown Type: %v", info.Type)
272 }
273 go s.Serve(info.Listener)
274 return func() {
275 s.Stop()
276 }
277 }
278
279
280 func DoUnaryCall(tc testgrpc.BenchmarkServiceClient, reqSize, respSize int) error {
281 pl := NewPayload(testpb.PayloadType_COMPRESSABLE, reqSize)
282 req := &testpb.SimpleRequest{
283 ResponseType: pl.Type,
284 ResponseSize: int32(respSize),
285 Payload: pl,
286 }
287 if _, err := tc.UnaryCall(context.Background(), req); err != nil {
288 return fmt.Errorf("/BenchmarkService/UnaryCall(_, _) = _, %v, want _, <nil>", err)
289 }
290 return nil
291 }
292
293
294 func DoStreamingRoundTrip(stream testgrpc.BenchmarkService_StreamingCallClient, reqSize, respSize int) error {
295 pl := NewPayload(testpb.PayloadType_COMPRESSABLE, reqSize)
296 req := &testpb.SimpleRequest{
297 ResponseType: pl.Type,
298 ResponseSize: int32(respSize),
299 Payload: pl,
300 }
301 return DoStreamingRoundTripPreloaded(stream, req)
302 }
303
304
305 func DoStreamingRoundTripPreloaded(stream testgrpc.BenchmarkService_StreamingCallClient, req any) error {
306
307 if err := stream.SendMsg(req); err != nil {
308 return fmt.Errorf("/BenchmarkService/StreamingCall.Send(_) = %v, want <nil>", err)
309 }
310 if _, err := stream.Recv(); err != nil {
311
312 if err == io.EOF {
313 return nil
314 }
315 return fmt.Errorf("/BenchmarkService/StreamingCall.Recv(_) = %v, want <nil>", err)
316 }
317 return nil
318 }
319
320
321 func DoByteBufStreamingRoundTrip(stream testgrpc.BenchmarkService_StreamingCallClient, reqSize, respSize int) error {
322 out := make([]byte, reqSize)
323 if err := stream.(grpc.ClientStream).SendMsg(&out); err != nil {
324 return fmt.Errorf("/BenchmarkService/StreamingCall.(ClientStream).SendMsg(_) = %v, want <nil>", err)
325 }
326 var in []byte
327 if err := stream.(grpc.ClientStream).RecvMsg(&in); err != nil {
328
329 if err == io.EOF {
330 return nil
331 }
332 return fmt.Errorf("/BenchmarkService/StreamingCall.(ClientStream).RecvMsg(_) = %v, want <nil>", err)
333 }
334 return nil
335 }
336
337
338 func NewClientConn(addr string, opts ...grpc.DialOption) *grpc.ClientConn {
339 return NewClientConnWithContext(context.Background(), addr, opts...)
340 }
341
342
343 func NewClientConnWithContext(ctx context.Context, addr string, opts ...grpc.DialOption) *grpc.ClientConn {
344 conn, err := grpc.DialContext(ctx, addr, opts...)
345 if err != nil {
346 logger.Fatalf("NewClientConn(%q) failed to create a ClientConn: %v", addr, err)
347 }
348 return conn
349 }
350
View as plain text