...

Source file src/cloud.google.com/go/bigquery/bigquery.go

Documentation: cloud.google.com/go/bigquery

     1  // Copyright 2015 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //      http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package bigquery
    16  
    17  import (
    18  	"context"
    19  	"errors"
    20  	"fmt"
    21  	"io"
    22  	"net/http"
    23  	"net/url"
    24  	"os"
    25  	"strings"
    26  	"time"
    27  
    28  	"cloud.google.com/go/bigquery/internal"
    29  	cloudinternal "cloud.google.com/go/internal"
    30  	"cloud.google.com/go/internal/detect"
    31  	"cloud.google.com/go/internal/trace"
    32  	"cloud.google.com/go/internal/version"
    33  	gax "github.com/googleapis/gax-go/v2"
    34  	bq "google.golang.org/api/bigquery/v2"
    35  	"google.golang.org/api/googleapi"
    36  	"google.golang.org/api/option"
    37  )
    38  
    39  const (
    40  	// Scope is the Oauth2 scope for the service.
    41  	// For relevant BigQuery scopes, see:
    42  	// https://developers.google.com/identity/protocols/googlescopes#bigqueryv2
    43  	Scope           = "https://www.googleapis.com/auth/bigquery"
    44  	userAgentPrefix = "gcloud-golang-bigquery"
    45  )
    46  
    47  var xGoogHeader = fmt.Sprintf("gl-go/%s gccl/%s", version.Go(), internal.Version)
    48  
    49  func setClientHeader(headers http.Header) {
    50  	headers.Set("x-goog-api-client", xGoogHeader)
    51  }
    52  
    53  // Client may be used to perform BigQuery operations.
    54  type Client struct {
    55  	// Location, if set, will be used as the default location for all subsequent
    56  	// dataset creation and job operations. A location specified directly in one of
    57  	// those operations will override this value.
    58  	Location string
    59  
    60  	projectID string
    61  	bqs       *bq.Service
    62  	rc        *readClient
    63  
    64  	// governs use of preview query features.
    65  	enableQueryPreview bool
    66  }
    67  
    68  // DetectProjectID is a sentinel value that instructs NewClient to detect the
    69  // project ID. It is given in place of the projectID argument. NewClient will
    70  // use the project ID from the given credentials or the default credentials
    71  // (https://developers.google.com/accounts/docs/application-default-credentials)
    72  // if no credentials were provided. When providing credentials, not all
    73  // options will allow NewClient to extract the project ID. Specifically a JWT
    74  // does not have the project ID encoded.
    75  const DetectProjectID = "*detect-project-id*"
    76  
    77  // NewClient constructs a new Client which can perform BigQuery operations.
    78  // Operations performed via the client are billed to the specified GCP project.
    79  //
    80  // If the project ID is set to DetectProjectID, NewClient will attempt to detect
    81  // the project ID from credentials.
    82  //
    83  // This client supports enabling query-related preview features via environmental
    84  // variables.  By setting the environment variable QUERY_PREVIEW_ENABLED to the string
    85  // "TRUE", the client will enable preview features, though behavior may still be
    86  // controlled via the bigquery service as well.  Currently, the feature(s) in scope
    87  // include: stateless queries (query execution without corresponding job metadata).
    88  func NewClient(ctx context.Context, projectID string, opts ...option.ClientOption) (*Client, error) {
    89  	o := []option.ClientOption{
    90  		option.WithScopes(Scope),
    91  		option.WithUserAgent(fmt.Sprintf("%s/%s", userAgentPrefix, internal.Version)),
    92  	}
    93  	o = append(o, opts...)
    94  	bqs, err := bq.NewService(ctx, o...)
    95  	if err != nil {
    96  		return nil, fmt.Errorf("bigquery: constructing client: %w", err)
    97  	}
    98  
    99  	// Handle project autodetection.
   100  	projectID, err = detect.ProjectID(ctx, projectID, "", opts...)
   101  	if err != nil {
   102  		return nil, err
   103  	}
   104  
   105  	var preview bool
   106  	if v, ok := os.LookupEnv("QUERY_PREVIEW_ENABLED"); ok {
   107  		if strings.ToUpper(v) == "TRUE" {
   108  			preview = true
   109  		}
   110  	}
   111  
   112  	c := &Client{
   113  		projectID:          projectID,
   114  		bqs:                bqs,
   115  		enableQueryPreview: preview,
   116  	}
   117  	return c, nil
   118  }
   119  
   120  // EnableStorageReadClient sets up Storage API connection to be used when fetching
   121  // large datasets from tables, jobs or queries.
   122  // Currently out of pagination methods like PageInfo().Token and RowIterator.StartIndex
   123  // are not supported when the Storage API is enabled.
   124  // Calling this method twice will return an error.
   125  func (c *Client) EnableStorageReadClient(ctx context.Context, opts ...option.ClientOption) error {
   126  	if c.isStorageReadAvailable() {
   127  		return fmt.Errorf("failed: storage read client already set up")
   128  	}
   129  	rc, err := newReadClient(ctx, c.projectID, opts...)
   130  	if err != nil {
   131  		return err
   132  	}
   133  	c.rc = rc
   134  	return nil
   135  }
   136  
   137  func (c *Client) isStorageReadAvailable() bool {
   138  	return c.rc != nil
   139  }
   140  
   141  // Project returns the project ID or number for this instance of the client, which may have
   142  // either been explicitly specified or autodetected.
   143  func (c *Client) Project() string {
   144  	return c.projectID
   145  }
   146  
   147  // Close closes any resources held by the client.
   148  // Close should be called when the client is no longer needed.
   149  // It need not be called at program exit.
   150  func (c *Client) Close() error {
   151  	if c.isStorageReadAvailable() {
   152  		err := c.rc.close()
   153  		if err != nil {
   154  			return err
   155  		}
   156  	}
   157  	return nil
   158  }
   159  
   160  // Calls the Jobs.Insert RPC and returns a Job.
   161  func (c *Client) insertJob(ctx context.Context, job *bq.Job, media io.Reader, mediaOpts ...googleapi.MediaOption) (*Job, error) {
   162  	call := c.bqs.Jobs.Insert(c.projectID, job).Context(ctx)
   163  	setClientHeader(call.Header())
   164  	if media != nil {
   165  		call.Media(media, mediaOpts...)
   166  	}
   167  	var res *bq.Job
   168  	var err error
   169  	invoke := func() error {
   170  		sCtx := trace.StartSpan(ctx, "bigquery.jobs.insert")
   171  		res, err = call.Do()
   172  		trace.EndSpan(sCtx, err)
   173  		return err
   174  	}
   175  	// A job with a client-generated ID can be retried; the presence of the
   176  	// ID makes the insert operation idempotent.
   177  	// We don't retry if there is media, because it is an io.Reader. We'd
   178  	// have to read the contents and keep it in memory, and that could be expensive.
   179  	// TODO(jba): Look into retrying if media != nil.
   180  	if job.JobReference != nil && media == nil {
   181  		// We deviate from default retries due to BigQuery wanting to retry structured internal job errors.
   182  		err = runWithRetryExplicit(ctx, invoke, jobRetryReasons)
   183  	} else {
   184  		err = invoke()
   185  	}
   186  	if err != nil {
   187  		return nil, err
   188  	}
   189  	return bqToJob(res, c)
   190  }
   191  
   192  // runQuery invokes the optimized query path.
   193  // Due to differences in options it supports, it cannot be used for all existing
   194  // jobs.insert requests that are query jobs.
   195  func (c *Client) runQuery(ctx context.Context, queryRequest *bq.QueryRequest) (*bq.QueryResponse, error) {
   196  	call := c.bqs.Jobs.Query(c.projectID, queryRequest).Context(ctx)
   197  	setClientHeader(call.Header())
   198  
   199  	var res *bq.QueryResponse
   200  	var err error
   201  	invoke := func() error {
   202  		sCtx := trace.StartSpan(ctx, "bigquery.jobs.query")
   203  		res, err = call.Do()
   204  		trace.EndSpan(sCtx, err)
   205  		return err
   206  	}
   207  
   208  	// We control request ID, so we can always runWithRetry.
   209  	err = runWithRetryExplicit(ctx, invoke, jobRetryReasons)
   210  	if err != nil {
   211  		return nil, err
   212  	}
   213  	return res, nil
   214  }
   215  
   216  // Convert a number of milliseconds since the Unix epoch to a time.Time.
   217  // Treat an input of zero specially: convert it to the zero time,
   218  // rather than the start of the epoch.
   219  func unixMillisToTime(m int64) time.Time {
   220  	if m == 0 {
   221  		return time.Time{}
   222  	}
   223  	return time.Unix(0, m*1e6)
   224  }
   225  
   226  // runWithRetry calls the function until it returns nil or a non-retryable error, or
   227  // the context is done.
   228  // See the similar function in ../storage/invoke.go. The main difference is the
   229  // reason for retrying.
   230  func runWithRetry(ctx context.Context, call func() error) error {
   231  	return runWithRetryExplicit(ctx, call, defaultRetryReasons)
   232  }
   233  
   234  func runWithRetryExplicit(ctx context.Context, call func() error, allowedReasons []string) error {
   235  	// These parameters match the suggestions in https://cloud.google.com/bigquery/sla.
   236  	backoff := gax.Backoff{
   237  		Initial:    1 * time.Second,
   238  		Max:        32 * time.Second,
   239  		Multiplier: 2,
   240  	}
   241  	return cloudinternal.Retry(ctx, backoff, func() (stop bool, err error) {
   242  		err = call()
   243  		if err == nil {
   244  			return true, nil
   245  		}
   246  		return !retryableError(err, allowedReasons), err
   247  	})
   248  }
   249  
   250  var (
   251  	defaultRetryReasons = []string{"backendError", "rateLimitExceeded"}
   252  
   253  	// These reasons are used exclusively for enqueuing jobs (jobs.insert and jobs.query).
   254  	// Using them for polling may cause unwanted retries until context deadline/cancellation/etc.
   255  	jobRetryReasons = []string{"backendError", "rateLimitExceeded", "jobRateLimitExceeded", "internalError"}
   256  
   257  	retry5xxCodes = []int{
   258  		http.StatusInternalServerError,
   259  		http.StatusBadGateway,
   260  		http.StatusServiceUnavailable,
   261  		http.StatusGatewayTimeout,
   262  	}
   263  )
   264  
   265  // retryableError is the unary retry predicate for this library.  In addition to structured error
   266  // reasons, it specifies some HTTP codes (500, 502, 503, 504) and network/transport reasons.
   267  func retryableError(err error, allowedReasons []string) bool {
   268  	if err == nil {
   269  		return false
   270  	}
   271  	if err == io.ErrUnexpectedEOF {
   272  		return true
   273  	}
   274  	// Special case due to http2: https://github.com/googleapis/google-cloud-go/issues/1793
   275  	// Due to Go's default being higher for streams-per-connection than is accepted by the
   276  	// BQ backend, it's possible to get streams refused immediately after a connection is
   277  	// started but before we receive SETTINGS frame from the backend.  This generally only
   278  	// happens when we try to enqueue > 100 requests onto a newly initiated connection.
   279  	if err.Error() == "http2: stream closed" {
   280  		return true
   281  	}
   282  
   283  	switch e := err.(type) {
   284  	case *googleapi.Error:
   285  		// We received a structured error from backend.
   286  		var reason string
   287  		if len(e.Errors) > 0 {
   288  			reason = e.Errors[0].Reason
   289  			for _, r := range allowedReasons {
   290  				if reason == r {
   291  					return true
   292  				}
   293  			}
   294  		}
   295  		for _, code := range retry5xxCodes {
   296  			if e.Code == code {
   297  				return true
   298  			}
   299  		}
   300  	case *url.Error:
   301  		retryable := []string{"connection refused", "connection reset"}
   302  		for _, s := range retryable {
   303  			if strings.Contains(e.Error(), s) {
   304  				return true
   305  			}
   306  		}
   307  	case interface{ Temporary() bool }:
   308  		if e.Temporary() {
   309  			return true
   310  		}
   311  	}
   312  	// Check wrapped error.
   313  	return retryableError(errors.Unwrap(err), allowedReasons)
   314  }
   315  

View as plain text