1
2
3
4
5
6
7
8 package grpc
9
10 import (
11 "context"
12 "errors"
13 "log"
14 "net"
15 "os"
16 "strings"
17 "sync"
18 "time"
19
20 "cloud.google.com/go/auth"
21 "cloud.google.com/go/auth/credentials"
22 "cloud.google.com/go/auth/grpctransport"
23 "cloud.google.com/go/auth/oauth2adapt"
24 "cloud.google.com/go/compute/metadata"
25 "go.opencensus.io/plugin/ocgrpc"
26 "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
27 "golang.org/x/oauth2"
28 "golang.org/x/time/rate"
29 "google.golang.org/api/internal"
30 "google.golang.org/api/option"
31 "google.golang.org/grpc"
32 grpcgoogle "google.golang.org/grpc/credentials/google"
33 grpcinsecure "google.golang.org/grpc/credentials/insecure"
34 "google.golang.org/grpc/credentials/oauth"
35 "google.golang.org/grpc/stats"
36
37
38 _ "google.golang.org/grpc/balancer/grpclb"
39 )
40
41
42 const disableDirectPath = "GOOGLE_CLOUD_DISABLE_DIRECT_PATH"
43
44
45 const enableDirectPathXds = "GOOGLE_CLOUD_ENABLE_DIRECT_PATH_XDS"
46
47
48 var timeoutDialerOption grpc.DialOption
49
50
51 var logRateLimiter = rate.Sometimes{Interval: 1 * time.Second}
52
53
54 var dialContext = grpc.DialContext
55
56
57
58
59
60
61
62 var (
63 initOtelStatsHandlerOnce sync.Once
64 otelStatsHandler stats.Handler
65 )
66
67
68
69 func otelGRPCStatsHandler() stats.Handler {
70 initOtelStatsHandlerOnce.Do(func() {
71 otelStatsHandler = otelgrpc.NewClientHandler()
72 })
73 return otelStatsHandler
74 }
75
76
77
78 func Dial(ctx context.Context, opts ...option.ClientOption) (*grpc.ClientConn, error) {
79 o, err := processAndValidateOpts(opts)
80 if err != nil {
81 return nil, err
82 }
83 if o.GRPCConnPool != nil {
84 return o.GRPCConnPool.Conn(), nil
85 }
86 if o.IsNewAuthLibraryEnabled() {
87 pool, err := dialPoolNewAuth(ctx, true, 1, o)
88 if err != nil {
89 return nil, err
90 }
91 return pool.Connection(), nil
92 }
93
94
95
96
97 return dial(ctx, false, o)
98 }
99
100
101
102
103 func DialInsecure(ctx context.Context, opts ...option.ClientOption) (*grpc.ClientConn, error) {
104 o, err := processAndValidateOpts(opts)
105 if err != nil {
106 return nil, err
107 }
108 if o.IsNewAuthLibraryEnabled() {
109 pool, err := dialPoolNewAuth(ctx, false, 1, o)
110 if err != nil {
111 return nil, err
112 }
113 return pool.Connection(), nil
114 }
115 return dial(ctx, true, o)
116 }
117
118
119
120
121
122
123
124
125 func DialPool(ctx context.Context, opts ...option.ClientOption) (ConnPool, error) {
126 o, err := processAndValidateOpts(opts)
127 if err != nil {
128 return nil, err
129 }
130 if o.GRPCConnPool != nil {
131 return o.GRPCConnPool, nil
132 }
133
134 if o.IsNewAuthLibraryEnabled() {
135 if o.GRPCConn != nil {
136 return &singleConnPool{o.GRPCConn}, nil
137 }
138 pool, err := dialPoolNewAuth(ctx, true, o.GRPCConnPoolSize, o)
139 if err != nil {
140 return nil, err
141 }
142 return &poolAdapter{pool}, nil
143 }
144
145 poolSize := o.GRPCConnPoolSize
146 if o.GRPCConn != nil {
147
148
149 poolSize = 1
150 }
151 o.GRPCConnPoolSize = 0
152
153 if poolSize == 0 || poolSize == 1 {
154
155 conn, err := dial(ctx, false, o)
156 if err != nil {
157 return nil, err
158 }
159 return &singleConnPool{conn}, nil
160 }
161
162 pool := &roundRobinConnPool{}
163 for i := 0; i < poolSize; i++ {
164 conn, err := dial(ctx, false, o)
165 if err != nil {
166 defer pool.Close()
167 return nil, err
168 }
169 pool.conns = append(pool.conns, conn)
170 }
171 return pool, nil
172 }
173
174
175 func dialPoolNewAuth(ctx context.Context, secure bool, poolSize int, ds *internal.DialSettings) (grpctransport.GRPCClientConnPool, error) {
176
177 var creds *auth.Credentials
178 if ds.InternalCredentials != nil {
179 creds = oauth2adapt.AuthCredentialsFromOauth2Credentials(ds.InternalCredentials)
180 } else if ds.Credentials != nil {
181 creds = oauth2adapt.AuthCredentialsFromOauth2Credentials(ds.Credentials)
182 } else if ds.AuthCredentials != nil {
183 creds = ds.AuthCredentials
184 } else if ds.TokenSource != nil {
185 credOpts := &auth.CredentialsOptions{
186 TokenProvider: oauth2adapt.TokenProviderFromTokenSource(ds.TokenSource),
187 }
188 if ds.QuotaProject != "" {
189 credOpts.QuotaProjectIDProvider = auth.CredentialsPropertyFunc(func(ctx context.Context) (string, error) {
190 return ds.QuotaProject, nil
191 })
192 }
193 creds = auth.NewCredentials(credOpts)
194 }
195
196 var skipValidation bool
197
198
199 if ds.SkipValidation || ds.InternalCredentials != nil {
200 skipValidation = true
201 }
202
203 var aud string
204 if len(ds.Audiences) > 0 {
205 aud = ds.Audiences[0]
206 }
207 metadata := map[string]string{}
208 if ds.QuotaProject != "" {
209 metadata["X-goog-user-project"] = ds.QuotaProject
210 }
211 if ds.RequestReason != "" {
212 metadata["X-goog-request-reason"] = ds.RequestReason
213 }
214
215
216 defaultEndpointTemplate := ds.DefaultEndpointTemplate
217 if defaultEndpointTemplate == "" {
218 defaultEndpointTemplate = ds.DefaultEndpoint
219 }
220
221 pool, err := grpctransport.Dial(ctx, secure, &grpctransport.Options{
222 DisableTelemetry: ds.TelemetryDisabled,
223 DisableAuthentication: ds.NoAuth,
224 Endpoint: ds.Endpoint,
225 Metadata: metadata,
226 GRPCDialOpts: ds.GRPCDialOpts,
227 PoolSize: poolSize,
228 Credentials: creds,
229 DetectOpts: &credentials.DetectOptions{
230 Scopes: ds.Scopes,
231 Audience: aud,
232 CredentialsFile: ds.CredentialsFile,
233 CredentialsJSON: ds.CredentialsJSON,
234 Client: oauth2.NewClient(ctx, nil),
235 },
236 InternalOptions: &grpctransport.InternalOptions{
237 EnableNonDefaultSAForDirectPath: ds.AllowNonDefaultServiceAccount,
238 EnableDirectPath: ds.EnableDirectPath,
239 EnableDirectPathXds: ds.EnableDirectPathXds,
240 EnableJWTWithScope: ds.EnableJwtWithScope,
241 DefaultAudience: ds.DefaultAudience,
242 DefaultEndpointTemplate: defaultEndpointTemplate,
243 DefaultMTLSEndpoint: ds.DefaultMTLSEndpoint,
244 DefaultScopes: ds.DefaultScopes,
245 SkipValidation: skipValidation,
246 },
247 })
248 return pool, err
249 }
250
251 func dial(ctx context.Context, insecure bool, o *internal.DialSettings) (*grpc.ClientConn, error) {
252 if o.HTTPClient != nil {
253 return nil, errors.New("unsupported HTTP client specified")
254 }
255 if o.GRPCConn != nil {
256 return o.GRPCConn, nil
257 }
258 transportCreds, endpoint, err := internal.GetGRPCTransportConfigAndEndpoint(o)
259 if err != nil {
260 return nil, err
261 }
262
263 if insecure {
264 transportCreds = grpcinsecure.NewCredentials()
265 }
266
267
268 grpcOpts := []grpc.DialOption{
269 grpc.WithTransportCredentials(transportCreds),
270 }
271
272
273
274
275
276 if !o.NoAuth && !insecure {
277 if o.APIKey != "" {
278 grpcOpts = append(grpcOpts, grpc.WithPerRPCCredentials(grpcAPIKey{
279 apiKey: o.APIKey,
280 requestReason: o.RequestReason,
281 }))
282 } else {
283 creds, err := internal.Creds(ctx, o)
284 if err != nil {
285 return nil, err
286 }
287 if o.TokenSource == nil {
288
289
290 credsUniverseDomain, err := internal.GetUniverseDomain(creds)
291 if err != nil {
292 return nil, err
293 }
294 if o.GetUniverseDomain() != credsUniverseDomain {
295 return nil, internal.ErrUniverseNotMatch(o.GetUniverseDomain(), credsUniverseDomain)
296 }
297 }
298 grpcOpts = append(grpcOpts, grpc.WithPerRPCCredentials(grpcTokenSource{
299 TokenSource: oauth.TokenSource{TokenSource: creds.TokenSource},
300 quotaProject: internal.GetQuotaProject(creds, o.QuotaProject),
301 requestReason: o.RequestReason,
302 }))
303
304 logRateLimiter.Do(func() {
305 logDirectPathMisconfig(endpoint, creds.TokenSource, o)
306 })
307 if isDirectPathEnabled(endpoint, o) && isTokenSourceDirectPathCompatible(creds.TokenSource, o) && metadata.OnGCE() {
308
309 grpcOpts = []grpc.DialOption{
310 grpc.WithCredentialsBundle(grpcgoogle.NewDefaultCredentialsWithOptions(
311 grpcgoogle.DefaultCredentialsOptions{
312 PerRPCCreds: oauth.TokenSource{TokenSource: creds.TokenSource},
313 })),
314 }
315 if timeoutDialerOption != nil {
316 grpcOpts = append(grpcOpts, timeoutDialerOption)
317 }
318
319 if isDirectPathXdsUsed(o) {
320
321 if addr, _, err := net.SplitHostPort(endpoint); err == nil {
322 endpoint = "google-c2p:///" + addr
323 } else {
324 endpoint = "google-c2p:///" + endpoint
325 }
326 } else {
327 if !strings.HasPrefix(endpoint, "dns:///") {
328 endpoint = "dns:///" + endpoint
329 }
330 grpcOpts = append(grpcOpts,
331
332
333
334 grpc.WithDisableServiceConfig(),
335 grpc.WithDefaultServiceConfig(`{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"pick_first":{}}]}}]}`))
336 }
337
338 }
339 }
340 }
341
342
343
344
345 grpcOpts = addOCStatsHandler(grpcOpts, o)
346 grpcOpts = addOpenTelemetryStatsHandler(grpcOpts, o)
347 grpcOpts = append(grpcOpts, o.GRPCDialOpts...)
348 if o.UserAgent != "" {
349 grpcOpts = append(grpcOpts, grpc.WithUserAgent(o.UserAgent))
350 }
351
352 return dialContext(ctx, endpoint, grpcOpts...)
353 }
354
355 func addOCStatsHandler(opts []grpc.DialOption, settings *internal.DialSettings) []grpc.DialOption {
356 if settings.TelemetryDisabled {
357 return opts
358 }
359 return append(opts, grpc.WithStatsHandler(&ocgrpc.ClientHandler{}))
360 }
361
362 func addOpenTelemetryStatsHandler(opts []grpc.DialOption, settings *internal.DialSettings) []grpc.DialOption {
363 if settings.TelemetryDisabled {
364 return opts
365 }
366 return append(opts, grpc.WithStatsHandler(otelGRPCStatsHandler()))
367 }
368
369
370 type grpcTokenSource struct {
371 oauth.TokenSource
372
373
374 quotaProject string
375 requestReason string
376 }
377
378
379 func (ts grpcTokenSource) GetRequestMetadata(ctx context.Context, uri ...string) (
380 map[string]string, error) {
381 metadata, err := ts.TokenSource.GetRequestMetadata(ctx, uri...)
382 if err != nil {
383 return nil, err
384 }
385
386
387 if ts.quotaProject != "" {
388 metadata["X-goog-user-project"] = ts.quotaProject
389 }
390 if ts.requestReason != "" {
391 metadata["X-goog-request-reason"] = ts.requestReason
392 }
393 return metadata, nil
394 }
395
396
397 type grpcAPIKey struct {
398 apiKey string
399
400
401 requestReason string
402 }
403
404
405 func (ts grpcAPIKey) GetRequestMetadata(ctx context.Context, uri ...string) (
406 map[string]string, error) {
407 metadata := map[string]string{
408 "X-goog-api-key": ts.apiKey,
409 }
410 if ts.requestReason != "" {
411 metadata["X-goog-request-reason"] = ts.requestReason
412 }
413 return metadata, nil
414 }
415
416
417 func (ts grpcAPIKey) RequireTransportSecurity() bool {
418 return true
419 }
420
421 func isDirectPathEnabled(endpoint string, o *internal.DialSettings) bool {
422 if !o.EnableDirectPath {
423 return false
424 }
425 if !checkDirectPathEndPoint(endpoint) {
426 return false
427 }
428 if strings.EqualFold(os.Getenv(disableDirectPath), "true") {
429 return false
430 }
431 return true
432 }
433
434 func isDirectPathXdsUsed(o *internal.DialSettings) bool {
435
436 if strings.EqualFold(os.Getenv(enableDirectPathXds), "true") {
437 return true
438 }
439
440 if o.EnableDirectPathXds {
441 return true
442 }
443 return false
444
445 }
446
447 func isTokenSourceDirectPathCompatible(ts oauth2.TokenSource, o *internal.DialSettings) bool {
448 if ts == nil {
449 return false
450 }
451 tok, err := ts.Token()
452 if err != nil {
453 return false
454 }
455 if tok == nil {
456 return false
457 }
458 if o.AllowNonDefaultServiceAccount {
459 return true
460 }
461 if source, _ := tok.Extra("oauth2.google.tokenSource").(string); source != "compute-metadata" {
462 return false
463 }
464 if acct, _ := tok.Extra("oauth2.google.serviceAccount").(string); acct != "default" {
465 return false
466 }
467 return true
468 }
469
470 func checkDirectPathEndPoint(endpoint string) bool {
471
472
473
474
475
476
477 if strings.Contains(endpoint, "://") && !strings.HasPrefix(endpoint, "dns:///") {
478 return false
479 }
480
481 if endpoint == "" {
482 return false
483 }
484
485 return true
486 }
487
488 func logDirectPathMisconfig(endpoint string, ts oauth2.TokenSource, o *internal.DialSettings) {
489 if isDirectPathXdsUsed(o) {
490
491 if !isDirectPathEnabled(endpoint, o) {
492 log.Println("WARNING: DirectPath is misconfigured. Please set the EnableDirectPath option along with the EnableDirectPathXds option.")
493 } else {
494
495 if !isTokenSourceDirectPathCompatible(ts, o) {
496 log.Println("WARNING: DirectPath is misconfigured. Please make sure the token source is fetched from GCE metadata server and the default service account is used.")
497 }
498
499 if !metadata.OnGCE() {
500 log.Println("WARNING: DirectPath is misconfigured. DirectPath is only available in a GCE environment.")
501 }
502 }
503 }
504 }
505
506 func processAndValidateOpts(opts []option.ClientOption) (*internal.DialSettings, error) {
507 var o internal.DialSettings
508 for _, opt := range opts {
509 opt.Apply(&o)
510 }
511 if err := o.Validate(); err != nil {
512 return nil, err
513 }
514
515 return &o, nil
516 }
517
518 type connPoolOption struct{ ConnPool }
519
520
521
522
523
524
525
526 func WithConnPool(p ConnPool) option.ClientOption {
527 return connPoolOption{p}
528 }
529
530 func (o connPoolOption) Apply(s *internal.DialSettings) {
531 s.GRPCConnPool = o.ConnPool
532 }
533
View as plain text