1
18
19
20
21
22
23
24 package main
25
26 import (
27 "context"
28 "crypto/tls"
29 "crypto/x509"
30 "flag"
31 "net"
32 "os"
33 "strconv"
34 "strings"
35 "time"
36
37 "golang.org/x/oauth2"
38 "google.golang.org/grpc"
39 "google.golang.org/grpc/credentials"
40 "google.golang.org/grpc/credentials/alts"
41 "google.golang.org/grpc/credentials/google"
42 "google.golang.org/grpc/credentials/insecure"
43 "google.golang.org/grpc/credentials/oauth"
44 "google.golang.org/grpc/grpclog"
45 "google.golang.org/grpc/interop"
46 "google.golang.org/grpc/metadata"
47 "google.golang.org/grpc/resolver"
48 "google.golang.org/grpc/testdata"
49
50 _ "google.golang.org/grpc/balancer/grpclb"
51 _ "google.golang.org/grpc/balancer/rls"
52 _ "google.golang.org/grpc/xds/googledirectpath"
53
54 testgrpc "google.golang.org/grpc/interop/grpc_testing"
55 )
56
57 const (
58 googleDefaultCredsName = "google_default_credentials"
59 computeEngineCredsName = "compute_engine_channel_creds"
60 )
61
62 var (
63 caFile = flag.String("ca_file", "", "The file containning the CA root cert file")
64 useTLS = flag.Bool("use_tls", false, "Connection uses TLS if true")
65 useALTS = flag.Bool("use_alts", false, "Connection uses ALTS if true (this option can only be used on GCP)")
66 customCredentialsType = flag.String("custom_credentials_type", "", "Custom creds to use, excluding TLS or ALTS")
67 altsHSAddr = flag.String("alts_handshaker_service_address", "", "ALTS handshaker gRPC service address")
68 testCA = flag.Bool("use_test_ca", false, "Whether to replace platform root CAs with test CA as the CA root")
69 serviceAccountKeyFile = flag.String("service_account_key_file", "", "Path to service account json key file")
70 oauthScope = flag.String("oauth_scope", "", "The scope for OAuth2 tokens")
71 defaultServiceAccount = flag.String("default_service_account", "", "Email of GCE default service account")
72 serverHost = flag.String("server_host", "localhost", "The server host name")
73 serverPort = flag.Int("server_port", 10000, "The server port number")
74 serviceConfigJSON = flag.String("service_config_json", "", "Disables service config lookups and sets the provided string as the default service config.")
75 soakIterations = flag.Int("soak_iterations", 10, "The number of iterations to use for the two soak tests: rpc_soak and channel_soak")
76 soakMaxFailures = flag.Int("soak_max_failures", 0, "The number of iterations in soak tests that are allowed to fail (either due to non-OK status code or exceeding the per-iteration max acceptable latency).")
77 soakPerIterationMaxAcceptableLatencyMs = flag.Int("soak_per_iteration_max_acceptable_latency_ms", 1000, "The number of milliseconds a single iteration in the two soak tests (rpc_soak and channel_soak) should take.")
78 soakOverallTimeoutSeconds = flag.Int("soak_overall_timeout_seconds", 10, "The overall number of seconds after which a soak test should stop and fail, if the desired number of iterations have not yet completed.")
79 soakMinTimeMsBetweenRPCs = flag.Int("soak_min_time_ms_between_rpcs", 0, "The minimum time in milliseconds between consecutive RPCs in a soak test (rpc_soak or channel_soak), useful for limiting QPS")
80 soakRequestSize = flag.Int("soak_request_size", 271828, "The request size in a soak RPC. The default value is set based on the interop large unary test case.")
81 soakResponseSize = flag.Int("soak_response_size", 314159, "The response size in a soak RPC. The default value is set based on the interop large unary test case.")
82 tlsServerName = flag.String("server_host_override", "", "The server name used to verify the hostname returned by TLS handshake if it is not empty. Otherwise, --server_host is used.")
83 additionalMetadata = flag.String("additional_metadata", "", "Additional metadata to send in each request, as a semicolon-separated list of key:value pairs.")
84 testCase = flag.String("test_case", "large_unary",
85 `Configure different test cases. Valid options are:
86 empty_unary : empty (zero bytes) request and response;
87 large_unary : single request and (large) response;
88 client_streaming : request streaming with single response;
89 server_streaming : single request with response streaming;
90 ping_pong : full-duplex streaming;
91 empty_stream : full-duplex streaming with zero message;
92 timeout_on_sleeping_server: fullduplex streaming on a sleeping server;
93 compute_engine_creds: large_unary with compute engine auth;
94 service_account_creds: large_unary with service account auth;
95 jwt_token_creds: large_unary with jwt token auth;
96 per_rpc_creds: large_unary with per rpc token;
97 oauth2_auth_token: large_unary with oauth2 token auth;
98 google_default_credentials: large_unary with google default credentials
99 compute_engine_channel_credentials: large_unary with compute engine creds
100 cancel_after_begin: cancellation after metadata has been sent but before payloads are sent;
101 cancel_after_first_response: cancellation after receiving 1st message from the server;
102 status_code_and_message: status code propagated back to client;
103 special_status_message: Unicode and whitespace is correctly processed in status message;
104 custom_metadata: server will echo custom metadata;
105 unimplemented_method: client attempts to call unimplemented method;
106 unimplemented_service: client attempts to call unimplemented service;
107 pick_first_unary: all requests are sent to one server despite multiple servers are resolved;
108 orca_per_rpc: the client verifies ORCA per-RPC metrics are provided;
109 orca_oob: the client verifies ORCA out-of-band metrics are provided.`)
110
111 logger = grpclog.Component("interop")
112 )
113
114 type credsMode uint8
115
116 const (
117 credsNone credsMode = iota
118 credsTLS
119 credsALTS
120 credsGoogleDefaultCreds
121 credsComputeEngineCreds
122 )
123
124
125
126
127 func parseAdditionalMetadataFlag() []string {
128 if len(*additionalMetadata) == 0 {
129 return nil
130 }
131 r := *additionalMetadata
132 addMd := make([]string, 0)
133 for len(r) > 0 {
134 i := strings.Index(r, ":")
135 if i < 0 {
136 logger.Fatalf("Error parsing --additional_metadata flag: missing colon separator")
137 }
138 addMd = append(addMd, r[:i])
139 r = r[i+1:]
140 i = strings.Index(r, ";")
141
142 if i < 0 {
143 addMd = append(addMd, r)
144 break
145 }
146 addMd = append(addMd, r[:i])
147 r = r[i+1:]
148 }
149 return addMd
150 }
151
152 func main() {
153 flag.Parse()
154 logger.Infof("Client running with test case %q", *testCase)
155 var useGDC bool
156 var useCEC bool
157 if *customCredentialsType != "" {
158 switch *customCredentialsType {
159 case googleDefaultCredsName:
160 useGDC = true
161 case computeEngineCredsName:
162 useCEC = true
163 default:
164 logger.Fatalf("If set, custom_credentials_type can only be set to one of %v or %v",
165 googleDefaultCredsName, computeEngineCredsName)
166 }
167 }
168 if (*useTLS && *useALTS) || (*useTLS && useGDC) || (*useALTS && useGDC) || (*useTLS && useCEC) || (*useALTS && useCEC) {
169 logger.Fatalf("only one of TLS, ALTS, google default creds, or compute engine creds can be used")
170 }
171
172 ctx := context.Background()
173
174 var credsChosen credsMode
175 switch {
176 case *useTLS:
177 credsChosen = credsTLS
178 case *useALTS:
179 credsChosen = credsALTS
180 case useGDC:
181 credsChosen = credsGoogleDefaultCreds
182 case useCEC:
183 credsChosen = credsComputeEngineCreds
184 }
185
186 resolver.SetDefaultScheme("dns")
187 serverAddr := *serverHost
188 if *serverPort != 0 {
189 serverAddr = net.JoinHostPort(*serverHost, strconv.Itoa(*serverPort))
190 }
191 var opts []grpc.DialOption
192 switch credsChosen {
193 case credsTLS:
194 var roots *x509.CertPool
195 if *testCA {
196 if *caFile == "" {
197 *caFile = testdata.Path("ca.pem")
198 }
199 b, err := os.ReadFile(*caFile)
200 if err != nil {
201 logger.Fatalf("Failed to read root certificate file %q: %v", *caFile, err)
202 }
203 roots = x509.NewCertPool()
204 if !roots.AppendCertsFromPEM(b) {
205 logger.Fatalf("Failed to append certificates: %s", string(b))
206 }
207 }
208 var creds credentials.TransportCredentials
209 if *tlsServerName != "" {
210 creds = credentials.NewClientTLSFromCert(roots, *tlsServerName)
211 } else {
212 creds = credentials.NewTLS(&tls.Config{RootCAs: roots})
213 }
214 opts = append(opts, grpc.WithTransportCredentials(creds))
215 case credsALTS:
216 altsOpts := alts.DefaultClientOptions()
217 if *altsHSAddr != "" {
218 altsOpts.HandshakerServiceAddress = *altsHSAddr
219 }
220 altsTC := alts.NewClientCreds(altsOpts)
221 opts = append(opts, grpc.WithTransportCredentials(altsTC))
222 case credsGoogleDefaultCreds:
223 opts = append(opts, grpc.WithCredentialsBundle(google.NewDefaultCredentials()))
224 case credsComputeEngineCreds:
225 opts = append(opts, grpc.WithCredentialsBundle(google.NewComputeEngineCredentials()))
226 case credsNone:
227 opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
228 default:
229 logger.Fatal("Invalid creds")
230 }
231 if credsChosen == credsTLS {
232 if *testCase == "compute_engine_creds" {
233 opts = append(opts, grpc.WithPerRPCCredentials(oauth.NewComputeEngine()))
234 } else if *testCase == "service_account_creds" {
235 jwtCreds, err := oauth.NewServiceAccountFromFile(*serviceAccountKeyFile, *oauthScope)
236 if err != nil {
237 logger.Fatalf("Failed to create JWT credentials: %v", err)
238 }
239 opts = append(opts, grpc.WithPerRPCCredentials(jwtCreds))
240 } else if *testCase == "jwt_token_creds" {
241 jwtCreds, err := oauth.NewJWTAccessFromFile(*serviceAccountKeyFile)
242 if err != nil {
243 logger.Fatalf("Failed to create JWT credentials: %v", err)
244 }
245 opts = append(opts, grpc.WithPerRPCCredentials(jwtCreds))
246 } else if *testCase == "oauth2_auth_token" {
247 opts = append(opts, grpc.WithPerRPCCredentials(oauth.TokenSource{TokenSource: oauth2.StaticTokenSource(interop.GetToken(ctx, *serviceAccountKeyFile, *oauthScope))}))
248 }
249 }
250 if len(*serviceConfigJSON) > 0 {
251 opts = append(opts, grpc.WithDisableServiceConfig(), grpc.WithDefaultServiceConfig(*serviceConfigJSON))
252 }
253 if addMd := parseAdditionalMetadataFlag(); addMd != nil {
254 unaryAddMd := func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
255 ctx = metadata.AppendToOutgoingContext(ctx, addMd...)
256 return invoker(ctx, method, req, reply, cc, opts...)
257 }
258 streamingAddMd := func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
259 ctx = metadata.AppendToOutgoingContext(ctx, addMd...)
260 return streamer(ctx, desc, cc, method, opts...)
261 }
262 opts = append(opts, grpc.WithUnaryInterceptor(unaryAddMd), grpc.WithStreamInterceptor(streamingAddMd))
263 }
264 conn, err := grpc.Dial(serverAddr, opts...)
265 if err != nil {
266 logger.Fatalf("Fail to dial: %v", err)
267 }
268 defer conn.Close()
269 tc := testgrpc.NewTestServiceClient(conn)
270 ctxWithDeadline, cancel := context.WithTimeout(ctx, time.Duration(*soakOverallTimeoutSeconds)*time.Second)
271 defer cancel()
272 switch *testCase {
273 case "empty_unary":
274 interop.DoEmptyUnaryCall(ctx, tc)
275 logger.Infoln("EmptyUnaryCall done")
276 case "large_unary":
277 interop.DoLargeUnaryCall(ctx, tc)
278 logger.Infoln("LargeUnaryCall done")
279 case "client_streaming":
280 interop.DoClientStreaming(ctx, tc)
281 logger.Infoln("ClientStreaming done")
282 case "server_streaming":
283 interop.DoServerStreaming(ctx, tc)
284 logger.Infoln("ServerStreaming done")
285 case "ping_pong":
286 interop.DoPingPong(ctx, tc)
287 logger.Infoln("Pingpong done")
288 case "empty_stream":
289 interop.DoEmptyStream(ctx, tc)
290 logger.Infoln("Emptystream done")
291 case "timeout_on_sleeping_server":
292 interop.DoTimeoutOnSleepingServer(ctx, tc)
293 logger.Infoln("TimeoutOnSleepingServer done")
294 case "compute_engine_creds":
295 if credsChosen != credsTLS {
296 logger.Fatalf("TLS credentials need to be set for compute_engine_creds test case.")
297 }
298 interop.DoComputeEngineCreds(ctx, tc, *defaultServiceAccount, *oauthScope)
299 logger.Infoln("ComputeEngineCreds done")
300 case "service_account_creds":
301 if credsChosen != credsTLS {
302 logger.Fatalf("TLS credentials need to be set for service_account_creds test case.")
303 }
304 interop.DoServiceAccountCreds(ctx, tc, *serviceAccountKeyFile, *oauthScope)
305 logger.Infoln("ServiceAccountCreds done")
306 case "jwt_token_creds":
307 if credsChosen != credsTLS {
308 logger.Fatalf("TLS credentials need to be set for jwt_token_creds test case.")
309 }
310 interop.DoJWTTokenCreds(ctx, tc, *serviceAccountKeyFile)
311 logger.Infoln("JWTtokenCreds done")
312 case "per_rpc_creds":
313 if credsChosen != credsTLS {
314 logger.Fatalf("TLS credentials need to be set for per_rpc_creds test case.")
315 }
316 interop.DoPerRPCCreds(ctx, tc, *serviceAccountKeyFile, *oauthScope)
317 logger.Infoln("PerRPCCreds done")
318 case "oauth2_auth_token":
319 if credsChosen != credsTLS {
320 logger.Fatalf("TLS credentials need to be set for oauth2_auth_token test case.")
321 }
322 interop.DoOauth2TokenCreds(ctx, tc, *serviceAccountKeyFile, *oauthScope)
323 logger.Infoln("Oauth2TokenCreds done")
324 case "google_default_credentials":
325 if credsChosen != credsGoogleDefaultCreds {
326 logger.Fatalf("GoogleDefaultCredentials need to be set for google_default_credentials test case.")
327 }
328 interop.DoGoogleDefaultCredentials(ctx, tc, *defaultServiceAccount)
329 logger.Infoln("GoogleDefaultCredentials done")
330 case "compute_engine_channel_credentials":
331 if credsChosen != credsComputeEngineCreds {
332 logger.Fatalf("ComputeEngineCreds need to be set for compute_engine_channel_credentials test case.")
333 }
334 interop.DoComputeEngineChannelCredentials(ctx, tc, *defaultServiceAccount)
335 logger.Infoln("ComputeEngineChannelCredentials done")
336 case "cancel_after_begin":
337 interop.DoCancelAfterBegin(ctx, tc)
338 logger.Infoln("CancelAfterBegin done")
339 case "cancel_after_first_response":
340 interop.DoCancelAfterFirstResponse(ctx, tc)
341 logger.Infoln("CancelAfterFirstResponse done")
342 case "status_code_and_message":
343 interop.DoStatusCodeAndMessage(ctx, tc)
344 logger.Infoln("StatusCodeAndMessage done")
345 case "special_status_message":
346 interop.DoSpecialStatusMessage(ctx, tc)
347 logger.Infoln("SpecialStatusMessage done")
348 case "custom_metadata":
349 interop.DoCustomMetadata(ctx, tc)
350 logger.Infoln("CustomMetadata done")
351 case "unimplemented_method":
352 interop.DoUnimplementedMethod(ctx, conn)
353 logger.Infoln("UnimplementedMethod done")
354 case "unimplemented_service":
355 interop.DoUnimplementedService(ctx, testgrpc.NewUnimplementedServiceClient(conn))
356 logger.Infoln("UnimplementedService done")
357 case "pick_first_unary":
358 interop.DoPickFirstUnary(ctx, tc)
359 logger.Infoln("PickFirstUnary done")
360 case "rpc_soak":
361 interop.DoSoakTest(ctxWithDeadline, tc, serverAddr, opts, false , *soakIterations, *soakMaxFailures, *soakRequestSize, *soakResponseSize, time.Duration(*soakPerIterationMaxAcceptableLatencyMs)*time.Millisecond, time.Duration(*soakMinTimeMsBetweenRPCs)*time.Millisecond)
362 logger.Infoln("RpcSoak done")
363 case "channel_soak":
364 interop.DoSoakTest(ctxWithDeadline, tc, serverAddr, opts, true , *soakIterations, *soakMaxFailures, *soakRequestSize, *soakResponseSize, time.Duration(*soakPerIterationMaxAcceptableLatencyMs)*time.Millisecond, time.Duration(*soakMinTimeMsBetweenRPCs)*time.Millisecond)
365 logger.Infoln("ChannelSoak done")
366 case "orca_per_rpc":
367 interop.DoORCAPerRPCTest(ctx, tc)
368 logger.Infoln("ORCAPerRPC done")
369 case "orca_oob":
370 interop.DoORCAOOBTest(ctx, tc)
371 logger.Infoln("ORCAOOB done")
372 default:
373 logger.Fatal("Unsupported test case: ", *testCase)
374 }
375 }
376
View as plain text