1
18
19
20 package main
21
22 import (
23 "context"
24 "flag"
25 "strings"
26 "sync"
27 "time"
28
29 "google.golang.org/grpc"
30 "google.golang.org/grpc/credentials/google"
31 "google.golang.org/grpc/credentials/insecure"
32 "google.golang.org/grpc/grpclog"
33 "google.golang.org/grpc/interop"
34
35 _ "google.golang.org/grpc/balancer/grpclb"
36 _ "google.golang.org/grpc/balancer/rls"
37 _ "google.golang.org/grpc/xds/googledirectpath"
38
39 testgrpc "google.golang.org/grpc/interop/grpc_testing"
40 )
41
42 const (
43 computeEngineCredsName = "compute_engine_channel_creds"
44 insecureCredsName = "INSECURE_CREDENTIALS"
45 )
46
47 var (
48 serverURIs = flag.String("server_uris", "", "Comma-separated list of sever URIs to make RPCs to")
49 credentialsTypes = flag.String("credentials_types", "", "Comma-separated list of credentials, each entry is used for the server of the corresponding index in server_uris. Supported values: compute_engine_channel_creds, INSECURE_CREDENTIALS")
50 soakIterations = flag.Int("soak_iterations", 10, "The number of iterations to use for the two soak tests: rpc_soak and channel_soak")
51 soakMaxFailures = flag.Int("soak_max_failures", 0, "The number of iterations in soak tests that are allowed to fail (either due to non-OK status code or exceeding the per-iteration max acceptable latency).")
52 soakPerIterationMaxAcceptableLatencyMs = flag.Int("soak_per_iteration_max_acceptable_latency_ms", 1000, "The number of milliseconds a single iteration in the two soak tests (rpc_soak and channel_soak) should take.")
53 soakOverallTimeoutSeconds = flag.Int("soak_overall_timeout_seconds", 10, "The overall number of seconds after which a soak test should stop and fail, if the desired number of iterations have not yet completed.")
54 soakMinTimeMsBetweenRPCs = flag.Int("soak_min_time_ms_between_rpcs", 0, "The minimum time in milliseconds between consecutive RPCs in a soak test (rpc_soak or channel_soak), useful for limiting QPS")
55 soakRequestSize = flag.Int("soak_request_size", 271828, "The request size in a soak RPC. The default value is set based on the interop large unary test case.")
56 soakResponseSize = flag.Int("soak_response_size", 314159, "The response size in a soak RPC. The default value is set based on the interop large unary test case.")
57 testCase = flag.String("test_case", "rpc_soak",
58 `Configure different test cases. Valid options are:
59 rpc_soak: sends --soak_iterations large_unary RPCs;
60 channel_soak: sends --soak_iterations RPCs, rebuilding the channel each time`)
61
62 logger = grpclog.Component("interop")
63 )
64
65 type clientConfig struct {
66 tc testgrpc.TestServiceClient
67 opts []grpc.DialOption
68 uri string
69 }
70
71 func main() {
72 flag.Parse()
73
74 uris := strings.Split(*serverURIs, ",")
75 creds := strings.Split(*credentialsTypes, ",")
76 if len(uris) != len(creds) {
77 logger.Fatalf("Number of entries in --server_uris (%d) != number of entries in --credentials_types (%d)", len(uris), len(creds))
78 }
79 for _, c := range creds {
80 if c != computeEngineCredsName && c != insecureCredsName {
81 logger.Fatalf("Unsupported credentials type: %v", c)
82 }
83 }
84 var resetChannel bool
85 switch *testCase {
86 case "rpc_soak":
87 resetChannel = false
88 case "channel_soak":
89 resetChannel = true
90 default:
91 logger.Fatal("Unsupported test case: ", *testCase)
92 }
93
94
95 var clients []clientConfig
96 for i := range uris {
97 var opts []grpc.DialOption
98 switch creds[i] {
99 case computeEngineCredsName:
100 opts = append(opts, grpc.WithCredentialsBundle(google.NewComputeEngineCredentials()))
101 case insecureCredsName:
102 opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
103 }
104 cc, err := grpc.Dial(uris[i], opts...)
105 if err != nil {
106 logger.Fatalf("Fail to dial %v: %v", uris[i], err)
107 }
108 defer cc.Close()
109 clients = append(clients, clientConfig{
110 tc: testgrpc.NewTestServiceClient(cc),
111 opts: opts,
112 uri: uris[i],
113 })
114 }
115
116
117 logger.Infof("Clients running with test case %q", *testCase)
118 var wg sync.WaitGroup
119 ctx := context.Background()
120 for i := range clients {
121 wg.Add(1)
122 go func(c clientConfig) {
123 ctxWithDeadline, cancel := context.WithTimeout(ctx, time.Duration(*soakOverallTimeoutSeconds)*time.Second)
124 defer cancel()
125 interop.DoSoakTest(ctxWithDeadline, c.tc, c.uri, c.opts, resetChannel, *soakIterations, *soakMaxFailures, *soakRequestSize, *soakResponseSize, time.Duration(*soakPerIterationMaxAcceptableLatencyMs)*time.Millisecond, time.Duration(*soakMinTimeMsBetweenRPCs)*time.Millisecond)
126 logger.Infof("%s test done for server: %s", *testCase, c.uri)
127 wg.Done()
128 }(clients[i])
129 }
130 wg.Wait()
131 logger.Infoln("All clients done!")
132 }
133
View as plain text