...

Source file src/google.golang.org/api/transport/grpc/dial.go

Documentation: google.golang.org/api/transport/grpc

     1  // Copyright 2015 Google LLC.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  // Package grpc supports network connections to GRPC servers.
     6  // This package is not intended for use by end developers. Use the
     7  // google.golang.org/api/option package to configure API clients.
     8  package grpc
     9  
    10  import (
    11  	"context"
    12  	"errors"
    13  	"log"
    14  	"net"
    15  	"os"
    16  	"strings"
    17  	"sync"
    18  	"time"
    19  
    20  	"cloud.google.com/go/auth"
    21  	"cloud.google.com/go/auth/credentials"
    22  	"cloud.google.com/go/auth/grpctransport"
    23  	"cloud.google.com/go/auth/oauth2adapt"
    24  	"cloud.google.com/go/compute/metadata"
    25  	"go.opencensus.io/plugin/ocgrpc"
    26  	"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
    27  	"golang.org/x/oauth2"
    28  	"golang.org/x/time/rate"
    29  	"google.golang.org/api/internal"
    30  	"google.golang.org/api/option"
    31  	"google.golang.org/grpc"
    32  	grpcgoogle "google.golang.org/grpc/credentials/google"
    33  	grpcinsecure "google.golang.org/grpc/credentials/insecure"
    34  	"google.golang.org/grpc/credentials/oauth"
    35  	"google.golang.org/grpc/stats"
    36  
    37  	// Install grpclb, which is required for direct path.
    38  	_ "google.golang.org/grpc/balancer/grpclb"
    39  )
    40  
    41  // Check env to disable DirectPath traffic.
    42  const disableDirectPath = "GOOGLE_CLOUD_DISABLE_DIRECT_PATH"
    43  
    44  // Check env to decide if using google-c2p resolver for DirectPath traffic.
    45  const enableDirectPathXds = "GOOGLE_CLOUD_ENABLE_DIRECT_PATH_XDS"
    46  
    47  // Set at init time by dial_socketopt.go. If nil, socketopt is not supported.
    48  var timeoutDialerOption grpc.DialOption
    49  
    50  // Log rate limiter
    51  var logRateLimiter = rate.Sometimes{Interval: 1 * time.Second}
    52  
    53  // Assign to var for unit test replacement
    54  var dialContext = grpc.DialContext
    55  
    56  // otelStatsHandler is a singleton otelgrpc.clientHandler to be used across
    57  // all dial connections to avoid the memory leak documented in
    58  // https://github.com/open-telemetry/opentelemetry-go-contrib/issues/4226
    59  //
    60  // TODO: If 4226 has been fixed in opentelemetry-go-contrib, replace this
    61  // singleton with inline usage for simplicity.
    62  var (
    63  	initOtelStatsHandlerOnce sync.Once
    64  	otelStatsHandler         stats.Handler
    65  )
    66  
    67  // otelGRPCStatsHandler returns singleton otelStatsHandler for reuse across all
    68  // dial connections.
    69  func otelGRPCStatsHandler() stats.Handler {
    70  	initOtelStatsHandlerOnce.Do(func() {
    71  		otelStatsHandler = otelgrpc.NewClientHandler()
    72  	})
    73  	return otelStatsHandler
    74  }
    75  
    76  // Dial returns a GRPC connection for use communicating with a Google cloud
    77  // service, configured with the given ClientOptions.
    78  func Dial(ctx context.Context, opts ...option.ClientOption) (*grpc.ClientConn, error) {
    79  	o, err := processAndValidateOpts(opts)
    80  	if err != nil {
    81  		return nil, err
    82  	}
    83  	if o.GRPCConnPool != nil {
    84  		return o.GRPCConnPool.Conn(), nil
    85  	}
    86  	if o.IsNewAuthLibraryEnabled() {
    87  		pool, err := dialPoolNewAuth(ctx, true, 1, o)
    88  		if err != nil {
    89  			return nil, err
    90  		}
    91  		return pool.Connection(), nil
    92  	}
    93  	// NOTE(cbro): We removed support for option.WithGRPCConnPool (GRPCConnPoolSize)
    94  	// on 2020-02-12 because RoundRobin and WithBalancer are deprecated and we need to remove usages of it.
    95  	//
    96  	// Connection pooling is only done via DialPool.
    97  	return dial(ctx, false, o)
    98  }
    99  
   100  // DialInsecure returns an insecure GRPC connection for use communicating
   101  // with fake or mock Google cloud service implementations, such as emulators.
   102  // The connection is configured with the given ClientOptions.
   103  func DialInsecure(ctx context.Context, opts ...option.ClientOption) (*grpc.ClientConn, error) {
   104  	o, err := processAndValidateOpts(opts)
   105  	if err != nil {
   106  		return nil, err
   107  	}
   108  	if o.IsNewAuthLibraryEnabled() {
   109  		pool, err := dialPoolNewAuth(ctx, false, 1, o)
   110  		if err != nil {
   111  			return nil, err
   112  		}
   113  		return pool.Connection(), nil
   114  	}
   115  	return dial(ctx, true, o)
   116  }
   117  
   118  // DialPool returns a pool of GRPC connections for the given service.
   119  // This differs from the connection pooling implementation used by Dial, which uses a custom GRPC load balancer.
   120  // DialPool should be used instead of Dial when a pool is used by default or a different custom GRPC load balancer is needed.
   121  // The context and options are shared between each Conn in the pool.
   122  // The pool size is configured using the WithGRPCConnectionPool option.
   123  //
   124  // This API is subject to change as we further refine requirements. It will go away if gRPC stubs accept an interface instead of the concrete ClientConn type. See https://github.com/grpc/grpc-go/issues/1287.
   125  func DialPool(ctx context.Context, opts ...option.ClientOption) (ConnPool, error) {
   126  	o, err := processAndValidateOpts(opts)
   127  	if err != nil {
   128  		return nil, err
   129  	}
   130  	if o.GRPCConnPool != nil {
   131  		return o.GRPCConnPool, nil
   132  	}
   133  
   134  	if o.IsNewAuthLibraryEnabled() {
   135  		if o.GRPCConn != nil {
   136  			return &singleConnPool{o.GRPCConn}, nil
   137  		}
   138  		pool, err := dialPoolNewAuth(ctx, true, o.GRPCConnPoolSize, o)
   139  		if err != nil {
   140  			return nil, err
   141  		}
   142  		return &poolAdapter{pool}, nil
   143  	}
   144  
   145  	poolSize := o.GRPCConnPoolSize
   146  	if o.GRPCConn != nil {
   147  		// WithGRPCConn is technically incompatible with WithGRPCConnectionPool.
   148  		// Always assume pool size is 1 when a grpc.ClientConn is explicitly used.
   149  		poolSize = 1
   150  	}
   151  	o.GRPCConnPoolSize = 0 // we don't *need* to set this to zero, but it's safe to.
   152  
   153  	if poolSize == 0 || poolSize == 1 {
   154  		// Fast path for common case for a connection pool with a single connection.
   155  		conn, err := dial(ctx, false, o)
   156  		if err != nil {
   157  			return nil, err
   158  		}
   159  		return &singleConnPool{conn}, nil
   160  	}
   161  
   162  	pool := &roundRobinConnPool{}
   163  	for i := 0; i < poolSize; i++ {
   164  		conn, err := dial(ctx, false, o)
   165  		if err != nil {
   166  			defer pool.Close() // NOTE: error from Close is ignored.
   167  			return nil, err
   168  		}
   169  		pool.conns = append(pool.conns, conn)
   170  	}
   171  	return pool, nil
   172  }
   173  
   174  // dialPoolNewAuth is an adapter to call new auth library.
   175  func dialPoolNewAuth(ctx context.Context, secure bool, poolSize int, ds *internal.DialSettings) (grpctransport.GRPCClientConnPool, error) {
   176  	// honor options if set
   177  	var creds *auth.Credentials
   178  	if ds.InternalCredentials != nil {
   179  		creds = oauth2adapt.AuthCredentialsFromOauth2Credentials(ds.InternalCredentials)
   180  	} else if ds.Credentials != nil {
   181  		creds = oauth2adapt.AuthCredentialsFromOauth2Credentials(ds.Credentials)
   182  	} else if ds.AuthCredentials != nil {
   183  		creds = ds.AuthCredentials
   184  	} else if ds.TokenSource != nil {
   185  		credOpts := &auth.CredentialsOptions{
   186  			TokenProvider: oauth2adapt.TokenProviderFromTokenSource(ds.TokenSource),
   187  		}
   188  		if ds.QuotaProject != "" {
   189  			credOpts.QuotaProjectIDProvider = auth.CredentialsPropertyFunc(func(ctx context.Context) (string, error) {
   190  				return ds.QuotaProject, nil
   191  			})
   192  		}
   193  		creds = auth.NewCredentials(credOpts)
   194  	}
   195  
   196  	var skipValidation bool
   197  	// If our clients explicitly setup the credential skip validation as it is
   198  	// assumed correct
   199  	if ds.SkipValidation || ds.InternalCredentials != nil {
   200  		skipValidation = true
   201  	}
   202  
   203  	var aud string
   204  	if len(ds.Audiences) > 0 {
   205  		aud = ds.Audiences[0]
   206  	}
   207  	metadata := map[string]string{}
   208  	if ds.QuotaProject != "" {
   209  		metadata["X-goog-user-project"] = ds.QuotaProject
   210  	}
   211  	if ds.RequestReason != "" {
   212  		metadata["X-goog-request-reason"] = ds.RequestReason
   213  	}
   214  
   215  	// Defaults for older clients that don't set this value yet
   216  	defaultEndpointTemplate := ds.DefaultEndpointTemplate
   217  	if defaultEndpointTemplate == "" {
   218  		defaultEndpointTemplate = ds.DefaultEndpoint
   219  	}
   220  
   221  	pool, err := grpctransport.Dial(ctx, secure, &grpctransport.Options{
   222  		DisableTelemetry:      ds.TelemetryDisabled,
   223  		DisableAuthentication: ds.NoAuth,
   224  		Endpoint:              ds.Endpoint,
   225  		Metadata:              metadata,
   226  		GRPCDialOpts:          ds.GRPCDialOpts,
   227  		PoolSize:              poolSize,
   228  		Credentials:           creds,
   229  		DetectOpts: &credentials.DetectOptions{
   230  			Scopes:          ds.Scopes,
   231  			Audience:        aud,
   232  			CredentialsFile: ds.CredentialsFile,
   233  			CredentialsJSON: ds.CredentialsJSON,
   234  			Client:          oauth2.NewClient(ctx, nil),
   235  		},
   236  		InternalOptions: &grpctransport.InternalOptions{
   237  			EnableNonDefaultSAForDirectPath: ds.AllowNonDefaultServiceAccount,
   238  			EnableDirectPath:                ds.EnableDirectPath,
   239  			EnableDirectPathXds:             ds.EnableDirectPathXds,
   240  			EnableJWTWithScope:              ds.EnableJwtWithScope,
   241  			DefaultAudience:                 ds.DefaultAudience,
   242  			DefaultEndpointTemplate:         defaultEndpointTemplate,
   243  			DefaultMTLSEndpoint:             ds.DefaultMTLSEndpoint,
   244  			DefaultScopes:                   ds.DefaultScopes,
   245  			SkipValidation:                  skipValidation,
   246  		},
   247  	})
   248  	return pool, err
   249  }
   250  
   251  func dial(ctx context.Context, insecure bool, o *internal.DialSettings) (*grpc.ClientConn, error) {
   252  	if o.HTTPClient != nil {
   253  		return nil, errors.New("unsupported HTTP client specified")
   254  	}
   255  	if o.GRPCConn != nil {
   256  		return o.GRPCConn, nil
   257  	}
   258  	transportCreds, endpoint, err := internal.GetGRPCTransportConfigAndEndpoint(o)
   259  	if err != nil {
   260  		return nil, err
   261  	}
   262  
   263  	if insecure {
   264  		transportCreds = grpcinsecure.NewCredentials()
   265  	}
   266  
   267  	// Initialize gRPC dial options with transport-level security options.
   268  	grpcOpts := []grpc.DialOption{
   269  		grpc.WithTransportCredentials(transportCreds),
   270  	}
   271  
   272  	// Authentication can only be sent when communicating over a secure connection.
   273  	//
   274  	// TODO: Should we be more lenient in the future and allow sending credentials
   275  	// when dialing an insecure connection?
   276  	if !o.NoAuth && !insecure {
   277  		if o.APIKey != "" {
   278  			grpcOpts = append(grpcOpts, grpc.WithPerRPCCredentials(grpcAPIKey{
   279  				apiKey:        o.APIKey,
   280  				requestReason: o.RequestReason,
   281  			}))
   282  		} else {
   283  			creds, err := internal.Creds(ctx, o)
   284  			if err != nil {
   285  				return nil, err
   286  			}
   287  			if o.TokenSource == nil {
   288  				// We only validate non-tokensource creds, as TokenSource-based credentials
   289  				// don't propagate universe.
   290  				credsUniverseDomain, err := internal.GetUniverseDomain(creds)
   291  				if err != nil {
   292  					return nil, err
   293  				}
   294  				if o.GetUniverseDomain() != credsUniverseDomain {
   295  					return nil, internal.ErrUniverseNotMatch(o.GetUniverseDomain(), credsUniverseDomain)
   296  				}
   297  			}
   298  			grpcOpts = append(grpcOpts, grpc.WithPerRPCCredentials(grpcTokenSource{
   299  				TokenSource:   oauth.TokenSource{TokenSource: creds.TokenSource},
   300  				quotaProject:  internal.GetQuotaProject(creds, o.QuotaProject),
   301  				requestReason: o.RequestReason,
   302  			}))
   303  			// Attempt Direct Path:
   304  			logRateLimiter.Do(func() {
   305  				logDirectPathMisconfig(endpoint, creds.TokenSource, o)
   306  			})
   307  			if isDirectPathEnabled(endpoint, o) && isTokenSourceDirectPathCompatible(creds.TokenSource, o) && metadata.OnGCE() {
   308  				// Overwrite all of the previously specific DialOptions, DirectPath uses its own set of credentials and certificates.
   309  				grpcOpts = []grpc.DialOption{
   310  					grpc.WithCredentialsBundle(grpcgoogle.NewDefaultCredentialsWithOptions(
   311  						grpcgoogle.DefaultCredentialsOptions{
   312  							PerRPCCreds: oauth.TokenSource{TokenSource: creds.TokenSource},
   313  						})),
   314  				}
   315  				if timeoutDialerOption != nil {
   316  					grpcOpts = append(grpcOpts, timeoutDialerOption)
   317  				}
   318  				// Check if google-c2p resolver is enabled for DirectPath
   319  				if isDirectPathXdsUsed(o) {
   320  					// google-c2p resolver target must not have a port number
   321  					if addr, _, err := net.SplitHostPort(endpoint); err == nil {
   322  						endpoint = "google-c2p:///" + addr
   323  					} else {
   324  						endpoint = "google-c2p:///" + endpoint
   325  					}
   326  				} else {
   327  					if !strings.HasPrefix(endpoint, "dns:///") {
   328  						endpoint = "dns:///" + endpoint
   329  					}
   330  					grpcOpts = append(grpcOpts,
   331  						// For now all DirectPath go clients will be using the following lb config, but in future
   332  						// when different services need different configs, then we should change this to a
   333  						// per-service config.
   334  						grpc.WithDisableServiceConfig(),
   335  						grpc.WithDefaultServiceConfig(`{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"pick_first":{}}]}}]}`))
   336  				}
   337  				// TODO(cbro): add support for system parameters (quota project, request reason) via chained interceptor.
   338  			}
   339  		}
   340  	}
   341  
   342  	// Add tracing, but before the other options, so that clients can override the
   343  	// gRPC stats handler.
   344  	// This assumes that gRPC options are processed in order, left to right.
   345  	grpcOpts = addOCStatsHandler(grpcOpts, o)
   346  	grpcOpts = addOpenTelemetryStatsHandler(grpcOpts, o)
   347  	grpcOpts = append(grpcOpts, o.GRPCDialOpts...)
   348  	if o.UserAgent != "" {
   349  		grpcOpts = append(grpcOpts, grpc.WithUserAgent(o.UserAgent))
   350  	}
   351  
   352  	return dialContext(ctx, endpoint, grpcOpts...)
   353  }
   354  
   355  func addOCStatsHandler(opts []grpc.DialOption, settings *internal.DialSettings) []grpc.DialOption {
   356  	if settings.TelemetryDisabled {
   357  		return opts
   358  	}
   359  	return append(opts, grpc.WithStatsHandler(&ocgrpc.ClientHandler{}))
   360  }
   361  
   362  func addOpenTelemetryStatsHandler(opts []grpc.DialOption, settings *internal.DialSettings) []grpc.DialOption {
   363  	if settings.TelemetryDisabled {
   364  		return opts
   365  	}
   366  	return append(opts, grpc.WithStatsHandler(otelGRPCStatsHandler()))
   367  }
   368  
   369  // grpcTokenSource supplies PerRPCCredentials from an oauth.TokenSource.
   370  type grpcTokenSource struct {
   371  	oauth.TokenSource
   372  
   373  	// Additional metadata attached as headers.
   374  	quotaProject  string
   375  	requestReason string
   376  }
   377  
   378  // GetRequestMetadata gets the request metadata as a map from a grpcTokenSource.
   379  func (ts grpcTokenSource) GetRequestMetadata(ctx context.Context, uri ...string) (
   380  	map[string]string, error) {
   381  	metadata, err := ts.TokenSource.GetRequestMetadata(ctx, uri...)
   382  	if err != nil {
   383  		return nil, err
   384  	}
   385  
   386  	// Attach system parameter
   387  	if ts.quotaProject != "" {
   388  		metadata["X-goog-user-project"] = ts.quotaProject
   389  	}
   390  	if ts.requestReason != "" {
   391  		metadata["X-goog-request-reason"] = ts.requestReason
   392  	}
   393  	return metadata, nil
   394  }
   395  
   396  // grpcAPIKey supplies PerRPCCredentials from an API Key.
   397  type grpcAPIKey struct {
   398  	apiKey string
   399  
   400  	// Additional metadata attached as headers.
   401  	requestReason string
   402  }
   403  
   404  // GetRequestMetadata gets the request metadata as a map from a grpcAPIKey.
   405  func (ts grpcAPIKey) GetRequestMetadata(ctx context.Context, uri ...string) (
   406  	map[string]string, error) {
   407  	metadata := map[string]string{
   408  		"X-goog-api-key": ts.apiKey,
   409  	}
   410  	if ts.requestReason != "" {
   411  		metadata["X-goog-request-reason"] = ts.requestReason
   412  	}
   413  	return metadata, nil
   414  }
   415  
   416  // RequireTransportSecurity indicates whether the credentials requires transport security.
   417  func (ts grpcAPIKey) RequireTransportSecurity() bool {
   418  	return true
   419  }
   420  
   421  func isDirectPathEnabled(endpoint string, o *internal.DialSettings) bool {
   422  	if !o.EnableDirectPath {
   423  		return false
   424  	}
   425  	if !checkDirectPathEndPoint(endpoint) {
   426  		return false
   427  	}
   428  	if strings.EqualFold(os.Getenv(disableDirectPath), "true") {
   429  		return false
   430  	}
   431  	return true
   432  }
   433  
   434  func isDirectPathXdsUsed(o *internal.DialSettings) bool {
   435  	// Method 1: Enable DirectPath xDS by env;
   436  	if strings.EqualFold(os.Getenv(enableDirectPathXds), "true") {
   437  		return true
   438  	}
   439  	// Method 2: Enable DirectPath xDS by option;
   440  	if o.EnableDirectPathXds {
   441  		return true
   442  	}
   443  	return false
   444  
   445  }
   446  
   447  func isTokenSourceDirectPathCompatible(ts oauth2.TokenSource, o *internal.DialSettings) bool {
   448  	if ts == nil {
   449  		return false
   450  	}
   451  	tok, err := ts.Token()
   452  	if err != nil {
   453  		return false
   454  	}
   455  	if tok == nil {
   456  		return false
   457  	}
   458  	if o.AllowNonDefaultServiceAccount {
   459  		return true
   460  	}
   461  	if source, _ := tok.Extra("oauth2.google.tokenSource").(string); source != "compute-metadata" {
   462  		return false
   463  	}
   464  	if acct, _ := tok.Extra("oauth2.google.serviceAccount").(string); acct != "default" {
   465  		return false
   466  	}
   467  	return true
   468  }
   469  
   470  func checkDirectPathEndPoint(endpoint string) bool {
   471  	// Only [dns:///]host[:port] is supported, not other schemes (e.g., "tcp://" or "unix://").
   472  	// Also don't try direct path if the user has chosen an alternate name resolver
   473  	// (i.e., via ":///" prefix).
   474  	//
   475  	// TODO(cbro): once gRPC has introspectible options, check the user hasn't
   476  	// provided a custom dialer in gRPC options.
   477  	if strings.Contains(endpoint, "://") && !strings.HasPrefix(endpoint, "dns:///") {
   478  		return false
   479  	}
   480  
   481  	if endpoint == "" {
   482  		return false
   483  	}
   484  
   485  	return true
   486  }
   487  
   488  func logDirectPathMisconfig(endpoint string, ts oauth2.TokenSource, o *internal.DialSettings) {
   489  	if isDirectPathXdsUsed(o) {
   490  		// Case 1: does not enable DirectPath
   491  		if !isDirectPathEnabled(endpoint, o) {
   492  			log.Println("WARNING: DirectPath is misconfigured. Please set the EnableDirectPath option along with the EnableDirectPathXds option.")
   493  		} else {
   494  			// Case 2: credential is not correctly set
   495  			if !isTokenSourceDirectPathCompatible(ts, o) {
   496  				log.Println("WARNING: DirectPath is misconfigured. Please make sure the token source is fetched from GCE metadata server and the default service account is used.")
   497  			}
   498  			// Case 3: not running on GCE
   499  			if !metadata.OnGCE() {
   500  				log.Println("WARNING: DirectPath is misconfigured. DirectPath is only available in a GCE environment.")
   501  			}
   502  		}
   503  	}
   504  }
   505  
   506  func processAndValidateOpts(opts []option.ClientOption) (*internal.DialSettings, error) {
   507  	var o internal.DialSettings
   508  	for _, opt := range opts {
   509  		opt.Apply(&o)
   510  	}
   511  	if err := o.Validate(); err != nil {
   512  		return nil, err
   513  	}
   514  
   515  	return &o, nil
   516  }
   517  
   518  type connPoolOption struct{ ConnPool }
   519  
   520  // WithConnPool returns a ClientOption that specifies the ConnPool
   521  // connection to use as the basis of communications.
   522  //
   523  // This is only to be used by Google client libraries internally, for example
   524  // when creating a longrunning API client that shares the same connection pool
   525  // as a service client.
   526  func WithConnPool(p ConnPool) option.ClientOption {
   527  	return connPoolOption{p}
   528  }
   529  
   530  func (o connPoolOption) Apply(s *internal.DialSettings) {
   531  	s.GRPCConnPool = o.ConnPool
   532  }
   533  

View as plain text