1
18
19
20
21
22
23
24 package interop
25
26 import (
27 "bytes"
28 "context"
29 "fmt"
30 "io"
31 "os"
32 "strings"
33 "sync"
34 "time"
35
36 "golang.org/x/oauth2"
37 "golang.org/x/oauth2/google"
38 "google.golang.org/grpc"
39 "google.golang.org/grpc/benchmark/stats"
40 "google.golang.org/grpc/codes"
41 "google.golang.org/grpc/grpclog"
42 "google.golang.org/grpc/metadata"
43 "google.golang.org/grpc/orca"
44 "google.golang.org/grpc/peer"
45 "google.golang.org/grpc/status"
46 "google.golang.org/protobuf/proto"
47
48 v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
49 testgrpc "google.golang.org/grpc/interop/grpc_testing"
50 testpb "google.golang.org/grpc/interop/grpc_testing"
51 )
52
53 var (
54 reqSizes = []int{27182, 8, 1828, 45904}
55 respSizes = []int{31415, 9, 2653, 58979}
56 largeReqSize = 271828
57 largeRespSize = 314159
58 initialMetadataKey = "x-grpc-test-echo-initial"
59 trailingMetadataKey = "x-grpc-test-echo-trailing-bin"
60
61 logger = grpclog.Component("interop")
62 )
63
64
65 func ClientNewPayload(t testpb.PayloadType, size int) *testpb.Payload {
66 if size < 0 {
67 logger.Fatalf("Requested a response with invalid length %d", size)
68 }
69 body := make([]byte, size)
70 switch t {
71 case testpb.PayloadType_COMPRESSABLE:
72 default:
73 logger.Fatalf("Unsupported payload type: %d", t)
74 }
75 return &testpb.Payload{
76 Type: t,
77 Body: body,
78 }
79 }
80
81
82 func DoEmptyUnaryCall(ctx context.Context, tc testgrpc.TestServiceClient, args ...grpc.CallOption) {
83 reply, err := tc.EmptyCall(ctx, &testpb.Empty{}, args...)
84 if err != nil {
85 logger.Fatal("/TestService/EmptyCall RPC failed: ", err)
86 }
87 if !proto.Equal(&testpb.Empty{}, reply) {
88 logger.Fatalf("/TestService/EmptyCall receives %v, want %v", reply, testpb.Empty{})
89 }
90 }
91
92
93 func DoLargeUnaryCall(ctx context.Context, tc testgrpc.TestServiceClient, args ...grpc.CallOption) {
94 pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
95 req := &testpb.SimpleRequest{
96 ResponseType: testpb.PayloadType_COMPRESSABLE,
97 ResponseSize: int32(largeRespSize),
98 Payload: pl,
99 }
100 reply, err := tc.UnaryCall(ctx, req, args...)
101 if err != nil {
102 logger.Fatal("/TestService/UnaryCall RPC failed: ", err)
103 }
104 t := reply.GetPayload().GetType()
105 s := len(reply.GetPayload().GetBody())
106 if t != testpb.PayloadType_COMPRESSABLE || s != largeRespSize {
107 logger.Fatalf("Got the reply with type %d len %d; want %d, %d", t, s, testpb.PayloadType_COMPRESSABLE, largeRespSize)
108 }
109 }
110
111
112 func DoClientStreaming(ctx context.Context, tc testgrpc.TestServiceClient, args ...grpc.CallOption) {
113 stream, err := tc.StreamingInputCall(ctx, args...)
114 if err != nil {
115 logger.Fatalf("%v.StreamingInputCall(_) = _, %v", tc, err)
116 }
117 var sum int
118 for _, s := range reqSizes {
119 pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, s)
120 req := &testpb.StreamingInputCallRequest{
121 Payload: pl,
122 }
123 if err := stream.Send(req); err != nil {
124 logger.Fatalf("%v has error %v while sending %v", stream, err, req)
125 }
126 sum += s
127 }
128 reply, err := stream.CloseAndRecv()
129 if err != nil {
130 logger.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)
131 }
132 if reply.GetAggregatedPayloadSize() != int32(sum) {
133 logger.Fatalf("%v.CloseAndRecv().GetAggregatePayloadSize() = %v; want %v", stream, reply.GetAggregatedPayloadSize(), sum)
134 }
135 }
136
137
138 func DoServerStreaming(ctx context.Context, tc testgrpc.TestServiceClient, args ...grpc.CallOption) {
139 respParam := make([]*testpb.ResponseParameters, len(respSizes))
140 for i, s := range respSizes {
141 respParam[i] = &testpb.ResponseParameters{
142 Size: int32(s),
143 }
144 }
145 req := &testpb.StreamingOutputCallRequest{
146 ResponseType: testpb.PayloadType_COMPRESSABLE,
147 ResponseParameters: respParam,
148 }
149 stream, err := tc.StreamingOutputCall(ctx, req, args...)
150 if err != nil {
151 logger.Fatalf("%v.StreamingOutputCall(_) = _, %v", tc, err)
152 }
153 var rpcStatus error
154 var respCnt int
155 var index int
156 for {
157 reply, err := stream.Recv()
158 if err != nil {
159 rpcStatus = err
160 break
161 }
162 t := reply.GetPayload().GetType()
163 if t != testpb.PayloadType_COMPRESSABLE {
164 logger.Fatalf("Got the reply of type %d, want %d", t, testpb.PayloadType_COMPRESSABLE)
165 }
166 size := len(reply.GetPayload().GetBody())
167 if size != respSizes[index] {
168 logger.Fatalf("Got reply body of length %d, want %d", size, respSizes[index])
169 }
170 index++
171 respCnt++
172 }
173 if rpcStatus != io.EOF {
174 logger.Fatalf("Failed to finish the server streaming rpc: %v", rpcStatus)
175 }
176 if respCnt != len(respSizes) {
177 logger.Fatalf("Got %d reply, want %d", len(respSizes), respCnt)
178 }
179 }
180
181
182 func DoPingPong(ctx context.Context, tc testgrpc.TestServiceClient, args ...grpc.CallOption) {
183 stream, err := tc.FullDuplexCall(ctx, args...)
184 if err != nil {
185 logger.Fatalf("%v.FullDuplexCall(_) = _, %v", tc, err)
186 }
187 var index int
188 for index < len(reqSizes) {
189 respParam := []*testpb.ResponseParameters{
190 {
191 Size: int32(respSizes[index]),
192 },
193 }
194 pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, reqSizes[index])
195 req := &testpb.StreamingOutputCallRequest{
196 ResponseType: testpb.PayloadType_COMPRESSABLE,
197 ResponseParameters: respParam,
198 Payload: pl,
199 }
200 if err := stream.Send(req); err != nil {
201 logger.Fatalf("%v has error %v while sending %v", stream, err, req)
202 }
203 reply, err := stream.Recv()
204 if err != nil {
205 logger.Fatalf("%v.Recv() = %v", stream, err)
206 }
207 t := reply.GetPayload().GetType()
208 if t != testpb.PayloadType_COMPRESSABLE {
209 logger.Fatalf("Got the reply of type %d, want %d", t, testpb.PayloadType_COMPRESSABLE)
210 }
211 size := len(reply.GetPayload().GetBody())
212 if size != respSizes[index] {
213 logger.Fatalf("Got reply body of length %d, want %d", size, respSizes[index])
214 }
215 index++
216 }
217 if err := stream.CloseSend(); err != nil {
218 logger.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
219 }
220 if _, err := stream.Recv(); err != io.EOF {
221 logger.Fatalf("%v failed to complele the ping pong test: %v", stream, err)
222 }
223 }
224
225
226 func DoEmptyStream(ctx context.Context, tc testgrpc.TestServiceClient, args ...grpc.CallOption) {
227 stream, err := tc.FullDuplexCall(ctx, args...)
228 if err != nil {
229 logger.Fatalf("%v.FullDuplexCall(_) = _, %v", tc, err)
230 }
231 if err := stream.CloseSend(); err != nil {
232 logger.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
233 }
234 if _, err := stream.Recv(); err != io.EOF {
235 logger.Fatalf("%v failed to complete the empty stream test: %v", stream, err)
236 }
237 }
238
239
240 func DoTimeoutOnSleepingServer(ctx context.Context, tc testgrpc.TestServiceClient, args ...grpc.CallOption) {
241 ctx, cancel := context.WithTimeout(ctx, 1*time.Millisecond)
242 defer cancel()
243 stream, err := tc.FullDuplexCall(ctx, args...)
244 if err != nil {
245 if status.Code(err) == codes.DeadlineExceeded {
246 return
247 }
248 logger.Fatalf("%v.FullDuplexCall(_) = _, %v", tc, err)
249 }
250 pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, 27182)
251 req := &testpb.StreamingOutputCallRequest{
252 ResponseType: testpb.PayloadType_COMPRESSABLE,
253 Payload: pl,
254 }
255 if err := stream.Send(req); err != nil && err != io.EOF {
256 logger.Fatalf("%v.Send(_) = %v", stream, err)
257 }
258 if _, err := stream.Recv(); status.Code(err) != codes.DeadlineExceeded {
259 logger.Fatalf("%v.Recv() = _, %v, want error code %d", stream, err, codes.DeadlineExceeded)
260 }
261 }
262
263
264 func DoComputeEngineCreds(ctx context.Context, tc testgrpc.TestServiceClient, serviceAccount, oauthScope string) {
265 pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
266 req := &testpb.SimpleRequest{
267 ResponseType: testpb.PayloadType_COMPRESSABLE,
268 ResponseSize: int32(largeRespSize),
269 Payload: pl,
270 FillUsername: true,
271 FillOauthScope: true,
272 }
273 reply, err := tc.UnaryCall(ctx, req)
274 if err != nil {
275 logger.Fatal("/TestService/UnaryCall RPC failed: ", err)
276 }
277 user := reply.GetUsername()
278 scope := reply.GetOauthScope()
279 if user != serviceAccount {
280 logger.Fatalf("Got user name %q, want %q.", user, serviceAccount)
281 }
282 if !strings.Contains(oauthScope, scope) {
283 logger.Fatalf("Got OAuth scope %q which is NOT a substring of %q.", scope, oauthScope)
284 }
285 }
286
287 func getServiceAccountJSONKey(keyFile string) []byte {
288 jsonKey, err := os.ReadFile(keyFile)
289 if err != nil {
290 logger.Fatalf("Failed to read the service account key file: %v", err)
291 }
292 return jsonKey
293 }
294
295
296 func DoServiceAccountCreds(ctx context.Context, tc testgrpc.TestServiceClient, serviceAccountKeyFile, oauthScope string) {
297 pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
298 req := &testpb.SimpleRequest{
299 ResponseType: testpb.PayloadType_COMPRESSABLE,
300 ResponseSize: int32(largeRespSize),
301 Payload: pl,
302 FillUsername: true,
303 FillOauthScope: true,
304 }
305 reply, err := tc.UnaryCall(ctx, req)
306 if err != nil {
307 logger.Fatal("/TestService/UnaryCall RPC failed: ", err)
308 }
309 jsonKey := getServiceAccountJSONKey(serviceAccountKeyFile)
310 user := reply.GetUsername()
311 scope := reply.GetOauthScope()
312 if !strings.Contains(string(jsonKey), user) {
313 logger.Fatalf("Got user name %q which is NOT a substring of %q.", user, jsonKey)
314 }
315 if !strings.Contains(oauthScope, scope) {
316 logger.Fatalf("Got OAuth scope %q which is NOT a substring of %q.", scope, oauthScope)
317 }
318 }
319
320
321 func DoJWTTokenCreds(ctx context.Context, tc testgrpc.TestServiceClient, serviceAccountKeyFile string) {
322 pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
323 req := &testpb.SimpleRequest{
324 ResponseType: testpb.PayloadType_COMPRESSABLE,
325 ResponseSize: int32(largeRespSize),
326 Payload: pl,
327 FillUsername: true,
328 }
329 reply, err := tc.UnaryCall(ctx, req)
330 if err != nil {
331 logger.Fatal("/TestService/UnaryCall RPC failed: ", err)
332 }
333 jsonKey := getServiceAccountJSONKey(serviceAccountKeyFile)
334 user := reply.GetUsername()
335 if !strings.Contains(string(jsonKey), user) {
336 logger.Fatalf("Got user name %q which is NOT a substring of %q.", user, jsonKey)
337 }
338 }
339
340
341 func GetToken(ctx context.Context, serviceAccountKeyFile string, oauthScope string) *oauth2.Token {
342 jsonKey := getServiceAccountJSONKey(serviceAccountKeyFile)
343 config, err := google.JWTConfigFromJSON(jsonKey, oauthScope)
344 if err != nil {
345 logger.Fatalf("Failed to get the config: %v", err)
346 }
347 token, err := config.TokenSource(ctx).Token()
348 if err != nil {
349 logger.Fatalf("Failed to get the token: %v", err)
350 }
351 return token
352 }
353
354
355 func DoOauth2TokenCreds(ctx context.Context, tc testgrpc.TestServiceClient, serviceAccountKeyFile, oauthScope string) {
356 pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
357 req := &testpb.SimpleRequest{
358 ResponseType: testpb.PayloadType_COMPRESSABLE,
359 ResponseSize: int32(largeRespSize),
360 Payload: pl,
361 FillUsername: true,
362 FillOauthScope: true,
363 }
364 reply, err := tc.UnaryCall(ctx, req)
365 if err != nil {
366 logger.Fatal("/TestService/UnaryCall RPC failed: ", err)
367 }
368 jsonKey := getServiceAccountJSONKey(serviceAccountKeyFile)
369 user := reply.GetUsername()
370 scope := reply.GetOauthScope()
371 if !strings.Contains(string(jsonKey), user) {
372 logger.Fatalf("Got user name %q which is NOT a substring of %q.", user, jsonKey)
373 }
374 if !strings.Contains(oauthScope, scope) {
375 logger.Fatalf("Got OAuth scope %q which is NOT a substring of %q.", scope, oauthScope)
376 }
377 }
378
379
380 func DoPerRPCCreds(ctx context.Context, tc testgrpc.TestServiceClient, serviceAccountKeyFile, oauthScope string) {
381 jsonKey := getServiceAccountJSONKey(serviceAccountKeyFile)
382 pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
383 req := &testpb.SimpleRequest{
384 ResponseType: testpb.PayloadType_COMPRESSABLE,
385 ResponseSize: int32(largeRespSize),
386 Payload: pl,
387 FillUsername: true,
388 FillOauthScope: true,
389 }
390 token := GetToken(ctx, serviceAccountKeyFile, oauthScope)
391 kv := map[string]string{"authorization": token.Type() + " " + token.AccessToken}
392 ctx = metadata.NewOutgoingContext(ctx, metadata.MD{"authorization": []string{kv["authorization"]}})
393 reply, err := tc.UnaryCall(ctx, req)
394 if err != nil {
395 logger.Fatal("/TestService/UnaryCall RPC failed: ", err)
396 }
397 user := reply.GetUsername()
398 scope := reply.GetOauthScope()
399 if !strings.Contains(string(jsonKey), user) {
400 logger.Fatalf("Got user name %q which is NOT a substring of %q.", user, jsonKey)
401 }
402 if !strings.Contains(oauthScope, scope) {
403 logger.Fatalf("Got OAuth scope %q which is NOT a substring of %q.", scope, oauthScope)
404 }
405 }
406
407
408 func DoGoogleDefaultCredentials(ctx context.Context, tc testgrpc.TestServiceClient, defaultServiceAccount string) {
409 pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
410 req := &testpb.SimpleRequest{
411 ResponseType: testpb.PayloadType_COMPRESSABLE,
412 ResponseSize: int32(largeRespSize),
413 Payload: pl,
414 FillUsername: true,
415 FillOauthScope: true,
416 }
417 reply, err := tc.UnaryCall(ctx, req)
418 if err != nil {
419 logger.Fatal("/TestService/UnaryCall RPC failed: ", err)
420 }
421 if reply.GetUsername() != defaultServiceAccount {
422 logger.Fatalf("Got user name %q; wanted %q. ", reply.GetUsername(), defaultServiceAccount)
423 }
424 }
425
426
427 func DoComputeEngineChannelCredentials(ctx context.Context, tc testgrpc.TestServiceClient, defaultServiceAccount string) {
428 pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
429 req := &testpb.SimpleRequest{
430 ResponseType: testpb.PayloadType_COMPRESSABLE,
431 ResponseSize: int32(largeRespSize),
432 Payload: pl,
433 FillUsername: true,
434 FillOauthScope: true,
435 }
436 reply, err := tc.UnaryCall(ctx, req)
437 if err != nil {
438 logger.Fatal("/TestService/UnaryCall RPC failed: ", err)
439 }
440 if reply.GetUsername() != defaultServiceAccount {
441 logger.Fatalf("Got user name %q; wanted %q. ", reply.GetUsername(), defaultServiceAccount)
442 }
443 }
444
445 var testMetadata = metadata.MD{
446 "key1": []string{"value1"},
447 "key2": []string{"value2"},
448 }
449
450
451 func DoCancelAfterBegin(ctx context.Context, tc testgrpc.TestServiceClient, args ...grpc.CallOption) {
452 ctx, cancel := context.WithCancel(metadata.NewOutgoingContext(ctx, testMetadata))
453 stream, err := tc.StreamingInputCall(ctx, args...)
454 if err != nil {
455 logger.Fatalf("%v.StreamingInputCall(_) = _, %v", tc, err)
456 }
457 cancel()
458 _, err = stream.CloseAndRecv()
459 if status.Code(err) != codes.Canceled {
460 logger.Fatalf("%v.CloseAndRecv() got error code %d, want %d", stream, status.Code(err), codes.Canceled)
461 }
462 }
463
464
465 func DoCancelAfterFirstResponse(ctx context.Context, tc testgrpc.TestServiceClient, args ...grpc.CallOption) {
466 ctx, cancel := context.WithCancel(ctx)
467 stream, err := tc.FullDuplexCall(ctx, args...)
468 if err != nil {
469 logger.Fatalf("%v.FullDuplexCall(_) = _, %v", tc, err)
470 }
471 respParam := []*testpb.ResponseParameters{
472 {
473 Size: 31415,
474 },
475 }
476 pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, 27182)
477 req := &testpb.StreamingOutputCallRequest{
478 ResponseType: testpb.PayloadType_COMPRESSABLE,
479 ResponseParameters: respParam,
480 Payload: pl,
481 }
482 if err := stream.Send(req); err != nil {
483 logger.Fatalf("%v has error %v while sending %v", stream, err, req)
484 }
485 if _, err := stream.Recv(); err != nil {
486 logger.Fatalf("%v.Recv() = %v", stream, err)
487 }
488 cancel()
489 if _, err := stream.Recv(); status.Code(err) != codes.Canceled {
490 logger.Fatalf("%v compleled with error code %d, want %d", stream, status.Code(err), codes.Canceled)
491 }
492 }
493
494 var (
495 initialMetadataValue = "test_initial_metadata_value"
496 trailingMetadataValue = "\x0a\x0b\x0a\x0b\x0a\x0b"
497 customMetadata = metadata.Pairs(
498 initialMetadataKey, initialMetadataValue,
499 trailingMetadataKey, trailingMetadataValue,
500 )
501 )
502
503 func validateMetadata(header, trailer metadata.MD) {
504 if len(header[initialMetadataKey]) != 1 {
505 logger.Fatalf("Expected exactly one header from server. Received %d", len(header[initialMetadataKey]))
506 }
507 if header[initialMetadataKey][0] != initialMetadataValue {
508 logger.Fatalf("Got header %s; want %s", header[initialMetadataKey][0], initialMetadataValue)
509 }
510 if len(trailer[trailingMetadataKey]) != 1 {
511 logger.Fatalf("Expected exactly one trailer from server. Received %d", len(trailer[trailingMetadataKey]))
512 }
513 if trailer[trailingMetadataKey][0] != trailingMetadataValue {
514 logger.Fatalf("Got trailer %s; want %s", trailer[trailingMetadataKey][0], trailingMetadataValue)
515 }
516 }
517
518
519 func DoCustomMetadata(ctx context.Context, tc testgrpc.TestServiceClient, args ...grpc.CallOption) {
520
521 pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, 1)
522 req := &testpb.SimpleRequest{
523 ResponseType: testpb.PayloadType_COMPRESSABLE,
524 ResponseSize: int32(1),
525 Payload: pl,
526 }
527 ctx = metadata.NewOutgoingContext(ctx, customMetadata)
528 var header, trailer metadata.MD
529 args = append(args, grpc.Header(&header), grpc.Trailer(&trailer))
530 reply, err := tc.UnaryCall(
531 ctx,
532 req,
533 args...,
534 )
535 if err != nil {
536 logger.Fatal("/TestService/UnaryCall RPC failed: ", err)
537 }
538 t := reply.GetPayload().GetType()
539 s := len(reply.GetPayload().GetBody())
540 if t != testpb.PayloadType_COMPRESSABLE || s != 1 {
541 logger.Fatalf("Got the reply with type %d len %d; want %d, %d", t, s, testpb.PayloadType_COMPRESSABLE, 1)
542 }
543 validateMetadata(header, trailer)
544
545
546 stream, err := tc.FullDuplexCall(ctx, args...)
547 if err != nil {
548 logger.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
549 }
550 respParam := []*testpb.ResponseParameters{
551 {
552 Size: 1,
553 },
554 }
555 streamReq := &testpb.StreamingOutputCallRequest{
556 ResponseType: testpb.PayloadType_COMPRESSABLE,
557 ResponseParameters: respParam,
558 Payload: pl,
559 }
560 if err := stream.Send(streamReq); err != nil {
561 logger.Fatalf("%v has error %v while sending %v", stream, err, streamReq)
562 }
563 streamHeader, err := stream.Header()
564 if err != nil {
565 logger.Fatalf("%v.Header() = %v", stream, err)
566 }
567 if _, err := stream.Recv(); err != nil {
568 logger.Fatalf("%v.Recv() = %v", stream, err)
569 }
570 if err := stream.CloseSend(); err != nil {
571 logger.Fatalf("%v.CloseSend() = %v, want <nil>", stream, err)
572 }
573 if _, err := stream.Recv(); err != io.EOF {
574 logger.Fatalf("%v failed to complete the custom metadata test: %v", stream, err)
575 }
576 streamTrailer := stream.Trailer()
577 validateMetadata(streamHeader, streamTrailer)
578 }
579
580
581 func DoStatusCodeAndMessage(ctx context.Context, tc testgrpc.TestServiceClient, args ...grpc.CallOption) {
582 var code int32 = 2
583 msg := "test status message"
584 expectedErr := status.Error(codes.Code(code), msg)
585 respStatus := &testpb.EchoStatus{
586 Code: code,
587 Message: msg,
588 }
589
590 req := &testpb.SimpleRequest{
591 ResponseStatus: respStatus,
592 }
593 if _, err := tc.UnaryCall(ctx, req, args...); err == nil || err.Error() != expectedErr.Error() {
594 logger.Fatalf("%v.UnaryCall(_, %v) = _, %v, want _, %v", tc, req, err, expectedErr)
595 }
596
597 stream, err := tc.FullDuplexCall(ctx, args...)
598 if err != nil {
599 logger.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
600 }
601 streamReq := &testpb.StreamingOutputCallRequest{
602 ResponseStatus: respStatus,
603 }
604 if err := stream.Send(streamReq); err != nil {
605 logger.Fatalf("%v has error %v while sending %v, want <nil>", stream, err, streamReq)
606 }
607 if err := stream.CloseSend(); err != nil {
608 logger.Fatalf("%v.CloseSend() = %v, want <nil>", stream, err)
609 }
610 if _, err = stream.Recv(); err.Error() != expectedErr.Error() {
611 logger.Fatalf("%v.Recv() returned error %v, want %v", stream, err, expectedErr)
612 }
613 }
614
615
616
617 func DoSpecialStatusMessage(ctx context.Context, tc testgrpc.TestServiceClient, args ...grpc.CallOption) {
618 const (
619 code int32 = 2
620 msg string = "\t\ntest with whitespace\r\nand Unicode BMP ☺ and non-BMP 😈\t\n"
621 )
622 expectedErr := status.Error(codes.Code(code), msg)
623 req := &testpb.SimpleRequest{
624 ResponseStatus: &testpb.EchoStatus{
625 Code: code,
626 Message: msg,
627 },
628 }
629 ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
630 defer cancel()
631 if _, err := tc.UnaryCall(ctx, req, args...); err == nil || err.Error() != expectedErr.Error() {
632 logger.Fatalf("%v.UnaryCall(_, %v) = _, %v, want _, %v", tc, req, err, expectedErr)
633 }
634 }
635
636
637 func DoUnimplementedService(ctx context.Context, tc testgrpc.UnimplementedServiceClient) {
638 _, err := tc.UnimplementedCall(ctx, &testpb.Empty{})
639 if status.Code(err) != codes.Unimplemented {
640 logger.Fatalf("%v.UnimplementedCall() = _, %v, want _, %v", tc, status.Code(err), codes.Unimplemented)
641 }
642 }
643
644
645 func DoUnimplementedMethod(ctx context.Context, cc *grpc.ClientConn) {
646 var req, reply proto.Message
647 if err := cc.Invoke(ctx, "/grpc.testing.TestService/UnimplementedCall", req, reply); err == nil || status.Code(err) != codes.Unimplemented {
648 logger.Fatalf("ClientConn.Invoke(_, _, _, _, _) = %v, want error code %s", err, codes.Unimplemented)
649 }
650 }
651
652
653
654 func DoPickFirstUnary(ctx context.Context, tc testgrpc.TestServiceClient) {
655 const rpcCount = 100
656
657 pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, 1)
658 req := &testpb.SimpleRequest{
659 ResponseType: testpb.PayloadType_COMPRESSABLE,
660 ResponseSize: int32(1),
661 Payload: pl,
662 FillServerId: true,
663 }
664
665 ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
666 defer cancel()
667 var serverID string
668 for i := 0; i < rpcCount; i++ {
669 resp, err := tc.UnaryCall(ctx, req)
670 if err != nil {
671 logger.Fatalf("iteration %d, failed to do UnaryCall: %v", i, err)
672 }
673 id := resp.ServerId
674 if id == "" {
675 logger.Fatalf("iteration %d, got empty server ID", i)
676 }
677 if i == 0 {
678 serverID = id
679 continue
680 }
681 if serverID != id {
682 logger.Fatalf("iteration %d, got different server ids: %q vs %q", i, serverID, id)
683 }
684 }
685 }
686
687 func doOneSoakIteration(ctx context.Context, tc testgrpc.TestServiceClient, resetChannel bool, serverAddr string, soakRequestSize int, soakResponseSize int, dopts []grpc.DialOption, copts []grpc.CallOption) (latency time.Duration, err error) {
688 start := time.Now()
689 client := tc
690 if resetChannel {
691 var conn *grpc.ClientConn
692 conn, err = grpc.Dial(serverAddr, dopts...)
693 if err != nil {
694 return
695 }
696 defer conn.Close()
697 client = testgrpc.NewTestServiceClient(conn)
698 }
699
700 defer func() { latency = time.Since(start) }()
701
702 pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, soakRequestSize)
703 req := &testpb.SimpleRequest{
704 ResponseType: testpb.PayloadType_COMPRESSABLE,
705 ResponseSize: int32(soakResponseSize),
706 Payload: pl,
707 }
708 var reply *testpb.SimpleResponse
709 reply, err = client.UnaryCall(ctx, req, copts...)
710 if err != nil {
711 err = fmt.Errorf("/TestService/UnaryCall RPC failed: %s", err)
712 return
713 }
714 t := reply.GetPayload().GetType()
715 s := len(reply.GetPayload().GetBody())
716 if t != testpb.PayloadType_COMPRESSABLE || s != soakResponseSize {
717 err = fmt.Errorf("got the reply with type %d len %d; want %d, %d", t, s, testpb.PayloadType_COMPRESSABLE, soakResponseSize)
718 return
719 }
720 return
721 }
722
723
724
725
726
727 func DoSoakTest(ctx context.Context, tc testgrpc.TestServiceClient, serverAddr string, dopts []grpc.DialOption, resetChannel bool, soakIterations int, maxFailures int, soakRequestSize int, soakResponseSize int, perIterationMaxAcceptableLatency time.Duration, minTimeBetweenRPCs time.Duration) {
728 start := time.Now()
729 var elapsedTime float64
730 iterationsDone := 0
731 totalFailures := 0
732 hopts := stats.HistogramOptions{
733 NumBuckets: 20,
734 GrowthFactor: 1,
735 BaseBucketSize: 1,
736 MinValue: 0,
737 }
738 h := stats.NewHistogram(hopts)
739 for i := 0; i < soakIterations; i++ {
740 if ctx.Err() != nil {
741 elapsedTime = time.Since(start).Seconds()
742 break
743 }
744 earliestNextStart := time.After(minTimeBetweenRPCs)
745 iterationsDone++
746 var p peer.Peer
747 latency, err := doOneSoakIteration(ctx, tc, resetChannel, serverAddr, soakRequestSize, soakResponseSize, dopts, []grpc.CallOption{grpc.Peer(&p)})
748 latencyMs := int64(latency / time.Millisecond)
749 h.Add(latencyMs)
750 if err != nil {
751 totalFailures++
752 addrStr := "nil"
753 if p.Addr != nil {
754 addrStr = p.Addr.String()
755 }
756 fmt.Fprintf(os.Stderr, "soak iteration: %d elapsed_ms: %d peer: %s server_uri: %s failed: %s\n", i, latencyMs, addrStr, serverAddr, err)
757 <-earliestNextStart
758 continue
759 }
760 if latency > perIterationMaxAcceptableLatency {
761 totalFailures++
762 fmt.Fprintf(os.Stderr, "soak iteration: %d elapsed_ms: %d peer: %s server_uri: %s exceeds max acceptable latency: %d\n", i, latencyMs, p.Addr.String(), serverAddr, perIterationMaxAcceptableLatency.Milliseconds())
763 <-earliestNextStart
764 continue
765 }
766 fmt.Fprintf(os.Stderr, "soak iteration: %d elapsed_ms: %d peer: %s server_uri: %s succeeded\n", i, latencyMs, p.Addr.String(), serverAddr)
767 <-earliestNextStart
768 }
769 var b bytes.Buffer
770 h.Print(&b)
771 fmt.Fprintf(os.Stderr, "(server_uri: %s) histogram of per-iteration latencies in milliseconds: %s\n", serverAddr, b.String())
772 fmt.Fprintf(os.Stderr, "(server_uri: %s) soak test ran: %d / %d iterations. total failures: %d. max failures threshold: %d. See breakdown above for which iterations succeeded, failed, and why for more info.\n", serverAddr, iterationsDone, soakIterations, totalFailures, maxFailures)
773 if iterationsDone < soakIterations {
774 logger.Fatalf("(server_uri: %s) soak test consumed all %f seconds of time and quit early, only having ran %d out of desired %d iterations.", serverAddr, elapsedTime, iterationsDone, soakIterations)
775 }
776 if totalFailures > maxFailures {
777 logger.Fatalf("(server_uri: %s) soak test total failures: %d exceeds max failures threshold: %d.", serverAddr, totalFailures, maxFailures)
778 }
779 }
780
781 type testServer struct {
782 testgrpc.UnimplementedTestServiceServer
783
784 orcaMu sync.Mutex
785 metricsRecorder orca.ServerMetricsRecorder
786 }
787
788
789
790 type NewTestServerOptions struct {
791 MetricsRecorder orca.ServerMetricsRecorder
792 }
793
794
795
796
797 func NewTestServer(opts ...NewTestServerOptions) testgrpc.TestServiceServer {
798 if len(opts) > 0 {
799 return &testServer{metricsRecorder: opts[0].MetricsRecorder}
800 }
801 return &testServer{}
802 }
803
804 func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
805 return new(testpb.Empty), nil
806 }
807
808 func serverNewPayload(t testpb.PayloadType, size int32) (*testpb.Payload, error) {
809 if size < 0 {
810 return nil, fmt.Errorf("requested a response with invalid length %d", size)
811 }
812 body := make([]byte, size)
813 switch t {
814 case testpb.PayloadType_COMPRESSABLE:
815 default:
816 return nil, fmt.Errorf("unsupported payload type: %d", t)
817 }
818 return &testpb.Payload{
819 Type: t,
820 Body: body,
821 }, nil
822 }
823
824 func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
825 st := in.GetResponseStatus()
826 if md, ok := metadata.FromIncomingContext(ctx); ok {
827 if initialMetadata, ok := md[initialMetadataKey]; ok {
828 header := metadata.Pairs(initialMetadataKey, initialMetadata[0])
829 grpc.SendHeader(ctx, header)
830 }
831 if trailingMetadata, ok := md[trailingMetadataKey]; ok {
832 trailer := metadata.Pairs(trailingMetadataKey, trailingMetadata[0])
833 grpc.SetTrailer(ctx, trailer)
834 }
835 }
836 if st != nil && st.Code != 0 {
837 return nil, status.Error(codes.Code(st.Code), st.Message)
838 }
839 pl, err := serverNewPayload(in.GetResponseType(), in.GetResponseSize())
840 if err != nil {
841 return nil, err
842 }
843 if r, orcaData := orca.CallMetricsRecorderFromContext(ctx), in.GetOrcaPerQueryReport(); r != nil && orcaData != nil {
844
845
846 setORCAMetrics(r, orcaData)
847 }
848 return &testpb.SimpleResponse{
849 Payload: pl,
850 }, nil
851 }
852
853 func setORCAMetrics(r orca.ServerMetricsRecorder, orcaData *testpb.TestOrcaReport) {
854 r.SetCPUUtilization(orcaData.CpuUtilization)
855 r.SetMemoryUtilization(orcaData.MemoryUtilization)
856 if rq, ok := r.(orca.CallMetricsRecorder); ok {
857 for k, v := range orcaData.RequestCost {
858 rq.SetRequestCost(k, v)
859 }
860 }
861 for k, v := range orcaData.Utilization {
862 r.SetNamedUtilization(k, v)
863 }
864 }
865
866 func (s *testServer) StreamingOutputCall(args *testpb.StreamingOutputCallRequest, stream testgrpc.TestService_StreamingOutputCallServer) error {
867 cs := args.GetResponseParameters()
868 for _, c := range cs {
869 if us := c.GetIntervalUs(); us > 0 {
870 time.Sleep(time.Duration(us) * time.Microsecond)
871 }
872 pl, err := serverNewPayload(args.GetResponseType(), c.GetSize())
873 if err != nil {
874 return err
875 }
876 if err := stream.Send(&testpb.StreamingOutputCallResponse{
877 Payload: pl,
878 }); err != nil {
879 return err
880 }
881 }
882 return nil
883 }
884
885 func (s *testServer) StreamingInputCall(stream testgrpc.TestService_StreamingInputCallServer) error {
886 var sum int
887 for {
888 in, err := stream.Recv()
889 if err == io.EOF {
890 return stream.SendAndClose(&testpb.StreamingInputCallResponse{
891 AggregatedPayloadSize: int32(sum),
892 })
893 }
894 if err != nil {
895 return err
896 }
897 p := in.GetPayload().GetBody()
898 sum += len(p)
899 }
900 }
901
902 func (s *testServer) FullDuplexCall(stream testgrpc.TestService_FullDuplexCallServer) error {
903 if md, ok := metadata.FromIncomingContext(stream.Context()); ok {
904 if initialMetadata, ok := md[initialMetadataKey]; ok {
905 header := metadata.Pairs(initialMetadataKey, initialMetadata[0])
906 stream.SendHeader(header)
907 }
908 if trailingMetadata, ok := md[trailingMetadataKey]; ok {
909 trailer := metadata.Pairs(trailingMetadataKey, trailingMetadata[0])
910 stream.SetTrailer(trailer)
911 }
912 }
913 hasORCALock := false
914 for {
915 in, err := stream.Recv()
916 if err == io.EOF {
917
918 return nil
919 }
920 if err != nil {
921 return err
922 }
923 st := in.GetResponseStatus()
924 if st != nil && st.Code != 0 {
925 return status.Error(codes.Code(st.Code), st.Message)
926 }
927
928 if r, orcaData := s.metricsRecorder, in.GetOrcaOobReport(); r != nil && orcaData != nil {
929
930
931 if !hasORCALock {
932 s.orcaMu.Lock()
933 defer s.orcaMu.Unlock()
934 hasORCALock = true
935 }
936 setORCAMetrics(r, orcaData)
937 }
938
939 cs := in.GetResponseParameters()
940 for _, c := range cs {
941 if us := c.GetIntervalUs(); us > 0 {
942 time.Sleep(time.Duration(us) * time.Microsecond)
943 }
944 pl, err := serverNewPayload(in.GetResponseType(), c.GetSize())
945 if err != nil {
946 return err
947 }
948 if err := stream.Send(&testpb.StreamingOutputCallResponse{
949 Payload: pl,
950 }); err != nil {
951 return err
952 }
953 }
954 }
955 }
956
957 func (s *testServer) HalfDuplexCall(stream testgrpc.TestService_HalfDuplexCallServer) error {
958 var msgBuf []*testpb.StreamingOutputCallRequest
959 for {
960 in, err := stream.Recv()
961 if err == io.EOF {
962
963 break
964 }
965 if err != nil {
966 return err
967 }
968 msgBuf = append(msgBuf, in)
969 }
970 for _, m := range msgBuf {
971 cs := m.GetResponseParameters()
972 for _, c := range cs {
973 if us := c.GetIntervalUs(); us > 0 {
974 time.Sleep(time.Duration(us) * time.Microsecond)
975 }
976 pl, err := serverNewPayload(m.GetResponseType(), c.GetSize())
977 if err != nil {
978 return err
979 }
980 if err := stream.Send(&testpb.StreamingOutputCallResponse{
981 Payload: pl,
982 }); err != nil {
983 return err
984 }
985 }
986 }
987 return nil
988 }
989
990
991
992 func DoORCAPerRPCTest(ctx context.Context, tc testgrpc.TestServiceClient) {
993 ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
994 defer cancel()
995 orcaRes := &v3orcapb.OrcaLoadReport{}
996 _, err := tc.UnaryCall(contextWithORCAResult(ctx, &orcaRes), &testpb.SimpleRequest{
997 OrcaPerQueryReport: &testpb.TestOrcaReport{
998 CpuUtilization: 0.8210,
999 MemoryUtilization: 0.5847,
1000 RequestCost: map[string]float64{"cost": 3456.32},
1001 Utilization: map[string]float64{"util": 0.30499},
1002 },
1003 })
1004 if err != nil {
1005 logger.Fatalf("/TestService/UnaryCall RPC failed: ", err)
1006 }
1007 want := &v3orcapb.OrcaLoadReport{
1008 CpuUtilization: 0.8210,
1009 MemUtilization: 0.5847,
1010 RequestCost: map[string]float64{"cost": 3456.32},
1011 Utilization: map[string]float64{"util": 0.30499},
1012 }
1013 if !proto.Equal(orcaRes, want) {
1014 logger.Fatalf("/TestService/UnaryCall RPC received ORCA load report %+v; want %+v", orcaRes, want)
1015 }
1016 }
1017
1018
1019
1020 func DoORCAOOBTest(ctx context.Context, tc testgrpc.TestServiceClient) {
1021 ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
1022 defer cancel()
1023 stream, err := tc.FullDuplexCall(ctx)
1024 if err != nil {
1025 logger.Fatalf("/TestService/FullDuplexCall received error starting stream: %v", err)
1026 }
1027 err = stream.Send(&testpb.StreamingOutputCallRequest{
1028 OrcaOobReport: &testpb.TestOrcaReport{
1029 CpuUtilization: 0.8210,
1030 MemoryUtilization: 0.5847,
1031 Utilization: map[string]float64{"util": 0.30499},
1032 },
1033 ResponseParameters: []*testpb.ResponseParameters{{Size: 1}},
1034 })
1035 if err != nil {
1036 logger.Fatalf("/TestService/FullDuplexCall received error sending: %v", err)
1037 }
1038 _, err = stream.Recv()
1039 if err != nil {
1040 logger.Fatalf("/TestService/FullDuplexCall received error receiving: %v", err)
1041 }
1042
1043 want := &v3orcapb.OrcaLoadReport{
1044 CpuUtilization: 0.8210,
1045 MemUtilization: 0.5847,
1046 Utilization: map[string]float64{"util": 0.30499},
1047 }
1048 checkORCAMetrics(ctx, tc, want)
1049
1050 err = stream.Send(&testpb.StreamingOutputCallRequest{
1051 OrcaOobReport: &testpb.TestOrcaReport{
1052 CpuUtilization: 0.29309,
1053 MemoryUtilization: 0.2,
1054 Utilization: map[string]float64{"util": 0.2039},
1055 },
1056 ResponseParameters: []*testpb.ResponseParameters{{Size: 1}},
1057 })
1058 if err != nil {
1059 logger.Fatalf("/TestService/FullDuplexCall received error sending: %v", err)
1060 }
1061 _, err = stream.Recv()
1062 if err != nil {
1063 logger.Fatalf("/TestService/FullDuplexCall received error receiving: %v", err)
1064 }
1065
1066 want = &v3orcapb.OrcaLoadReport{
1067 CpuUtilization: 0.29309,
1068 MemUtilization: 0.2,
1069 Utilization: map[string]float64{"util": 0.2039},
1070 }
1071 checkORCAMetrics(ctx, tc, want)
1072 }
1073
1074 func checkORCAMetrics(ctx context.Context, tc testgrpc.TestServiceClient, want *v3orcapb.OrcaLoadReport) {
1075 for ctx.Err() == nil {
1076 orcaRes := &v3orcapb.OrcaLoadReport{}
1077 if _, err := tc.UnaryCall(contextWithORCAResult(ctx, &orcaRes), &testpb.SimpleRequest{}); err != nil {
1078 logger.Fatalf("/TestService/UnaryCall RPC failed: ", err)
1079 }
1080 if proto.Equal(orcaRes, want) {
1081 return
1082 }
1083 logger.Infof("/TestService/UnaryCall RPC received ORCA load report %+v; want %+v", orcaRes, want)
1084 time.Sleep(time.Second)
1085 }
1086 logger.Fatalf("timed out waiting for expected ORCA load report")
1087 }
1088
View as plain text