...

Source file src/cloud.google.com/go/bigquery/query.go

Documentation: cloud.google.com/go/bigquery

     1  // Copyright 2015 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //      http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package bigquery
    16  
    17  import (
    18  	"context"
    19  	"errors"
    20  	"fmt"
    21  	"time"
    22  
    23  	"cloud.google.com/go/internal/trace"
    24  	"cloud.google.com/go/internal/uid"
    25  	bq "google.golang.org/api/bigquery/v2"
    26  )
    27  
    28  // QueryConfig holds the configuration for a query job.
    29  type QueryConfig struct {
    30  	// Dst is the table into which the results of the query will be written.
    31  	// If this field is nil, a temporary table will be created.
    32  	Dst *Table
    33  
    34  	// The query to execute. See https://cloud.google.com/bigquery/query-reference for details.
    35  	Q string
    36  
    37  	// DefaultProjectID and DefaultDatasetID specify the dataset to use for unqualified table names in the query.
    38  	// If DefaultProjectID is set, DefaultDatasetID must also be set.
    39  	DefaultProjectID string
    40  	DefaultDatasetID string
    41  
    42  	// TableDefinitions describes data sources outside of BigQuery.
    43  	// The map keys may be used as table names in the query string.
    44  	//
    45  	// When a QueryConfig is returned from Job.Config, the map values
    46  	// are always of type *ExternalDataConfig.
    47  	TableDefinitions map[string]ExternalData
    48  
    49  	// CreateDisposition specifies the circumstances under which the destination table will be created.
    50  	// The default is CreateIfNeeded.
    51  	CreateDisposition TableCreateDisposition
    52  
    53  	// WriteDisposition specifies how existing data in the destination table is treated.
    54  	// The default is WriteEmpty.
    55  	WriteDisposition TableWriteDisposition
    56  
    57  	// DisableQueryCache prevents results being fetched from the query cache.
    58  	// If this field is false, results are fetched from the cache if they are available.
    59  	// The query cache is a best-effort cache that is flushed whenever tables in the query are modified.
    60  	// Cached results are only available when TableID is unspecified in the query's destination Table.
    61  	// For more information, see https://cloud.google.com/bigquery/querying-data#querycaching
    62  	DisableQueryCache bool
    63  
    64  	// DisableFlattenedResults prevents results being flattened.
    65  	// If this field is false, results from nested and repeated fields are flattened.
    66  	// DisableFlattenedResults implies AllowLargeResults
    67  	// For more information, see https://cloud.google.com/bigquery/docs/data#nested
    68  	DisableFlattenedResults bool
    69  
    70  	// AllowLargeResults allows the query to produce arbitrarily large result tables.
    71  	// The destination must be a table.
    72  	// When using this option, queries will take longer to execute, even if the result set is small.
    73  	// For additional limitations, see https://cloud.google.com/bigquery/querying-data#largequeryresults
    74  	AllowLargeResults bool
    75  
    76  	// Priority specifies the priority with which to schedule the query.
    77  	// The default priority is InteractivePriority.
    78  	// For more information, see https://cloud.google.com/bigquery/querying-data#batchqueries
    79  	Priority QueryPriority
    80  
    81  	// MaxBillingTier sets the maximum billing tier for a Query.
    82  	// Queries that have resource usage beyond this tier will fail (without
    83  	// incurring a charge). If this field is zero, the project default will be used.
    84  	MaxBillingTier int
    85  
    86  	// MaxBytesBilled limits the number of bytes billed for
    87  	// this job.  Queries that would exceed this limit will fail (without incurring
    88  	// a charge).
    89  	// If this field is less than 1, the project default will be
    90  	// used.
    91  	MaxBytesBilled int64
    92  
    93  	// UseStandardSQL causes the query to use standard SQL. The default.
    94  	// Deprecated: use UseLegacySQL.
    95  	UseStandardSQL bool
    96  
    97  	// UseLegacySQL causes the query to use legacy SQL.
    98  	UseLegacySQL bool
    99  
   100  	// Parameters is a list of query parameters. The presence of parameters
   101  	// implies the use of standard SQL.
   102  	// If the query uses positional syntax ("?"), then no parameter may have a name.
   103  	// If the query uses named syntax ("@p"), then all parameters must have names.
   104  	// It is illegal to mix positional and named syntax.
   105  	Parameters []QueryParameter
   106  
   107  	// TimePartitioning specifies time-based partitioning
   108  	// for the destination table.
   109  	TimePartitioning *TimePartitioning
   110  
   111  	// RangePartitioning specifies integer range-based partitioning
   112  	// for the destination table.
   113  	RangePartitioning *RangePartitioning
   114  
   115  	// Clustering specifies the data clustering configuration for the destination table.
   116  	Clustering *Clustering
   117  
   118  	// The labels associated with this job.
   119  	Labels map[string]string
   120  
   121  	// If true, don't actually run this job. A valid query will return a mostly
   122  	// empty response with some processing statistics, while an invalid query will
   123  	// return the same error it would if it wasn't a dry run.
   124  	//
   125  	// Query.Read will fail with dry-run queries. Call Query.Run instead, and then
   126  	// call LastStatus on the returned job to get statistics. Calling Status on a
   127  	// dry-run job will fail.
   128  	DryRun bool
   129  
   130  	// Custom encryption configuration (e.g., Cloud KMS keys).
   131  	DestinationEncryptionConfig *EncryptionConfig
   132  
   133  	// Allows the schema of the destination table to be updated as a side effect of
   134  	// the query job.
   135  	SchemaUpdateOptions []string
   136  
   137  	// CreateSession will trigger creation of a new session when true.
   138  	CreateSession bool
   139  
   140  	// ConnectionProperties are optional key-values settings.
   141  	ConnectionProperties []*ConnectionProperty
   142  
   143  	// Sets a best-effort deadline on a specific job.  If job execution exceeds this
   144  	// timeout, BigQuery may attempt to cancel this work automatically.
   145  	//
   146  	// This deadline cannot be adjusted or removed once the job is created.  Consider
   147  	// using Job.Cancel in situations where you need more dynamic behavior.
   148  	//
   149  	// Experimental: this option is experimental and may be modified or removed in future versions,
   150  	// regardless of any other documented package stability guarantees.
   151  	JobTimeout time.Duration
   152  
   153  	// Force usage of Storage API if client is available. For test scenarios
   154  	forceStorageAPI bool
   155  }
   156  
   157  func (qc *QueryConfig) toBQ() (*bq.JobConfiguration, error) {
   158  	qconf := &bq.JobConfigurationQuery{
   159  		Query:                              qc.Q,
   160  		CreateDisposition:                  string(qc.CreateDisposition),
   161  		WriteDisposition:                   string(qc.WriteDisposition),
   162  		AllowLargeResults:                  qc.AllowLargeResults,
   163  		Priority:                           string(qc.Priority),
   164  		MaximumBytesBilled:                 qc.MaxBytesBilled,
   165  		TimePartitioning:                   qc.TimePartitioning.toBQ(),
   166  		RangePartitioning:                  qc.RangePartitioning.toBQ(),
   167  		Clustering:                         qc.Clustering.toBQ(),
   168  		DestinationEncryptionConfiguration: qc.DestinationEncryptionConfig.toBQ(),
   169  		SchemaUpdateOptions:                qc.SchemaUpdateOptions,
   170  		CreateSession:                      qc.CreateSession,
   171  	}
   172  	if len(qc.TableDefinitions) > 0 {
   173  		qconf.TableDefinitions = make(map[string]bq.ExternalDataConfiguration)
   174  	}
   175  	for name, data := range qc.TableDefinitions {
   176  		qconf.TableDefinitions[name] = data.toBQ()
   177  	}
   178  	if qc.DefaultProjectID != "" || qc.DefaultDatasetID != "" {
   179  		qconf.DefaultDataset = &bq.DatasetReference{
   180  			DatasetId: qc.DefaultDatasetID,
   181  			ProjectId: qc.DefaultProjectID,
   182  		}
   183  	}
   184  	if tier := int64(qc.MaxBillingTier); tier > 0 {
   185  		qconf.MaximumBillingTier = &tier
   186  	}
   187  	f := false
   188  	if qc.DisableQueryCache {
   189  		qconf.UseQueryCache = &f
   190  	}
   191  	if qc.DisableFlattenedResults {
   192  		qconf.FlattenResults = &f
   193  		// DisableFlattenResults implies AllowLargeResults.
   194  		qconf.AllowLargeResults = true
   195  	}
   196  	if qc.UseStandardSQL && qc.UseLegacySQL {
   197  		return nil, errors.New("bigquery: cannot provide both UseStandardSQL and UseLegacySQL")
   198  	}
   199  	if len(qc.Parameters) > 0 && qc.UseLegacySQL {
   200  		return nil, errors.New("bigquery: cannot provide both Parameters (implying standard SQL) and UseLegacySQL")
   201  	}
   202  	ptrue := true
   203  	pfalse := false
   204  	if qc.UseLegacySQL {
   205  		qconf.UseLegacySql = &ptrue
   206  	} else {
   207  		qconf.UseLegacySql = &pfalse
   208  	}
   209  	if qc.Dst != nil && !qc.Dst.implicitTable() {
   210  		qconf.DestinationTable = qc.Dst.toBQ()
   211  	}
   212  	for _, p := range qc.Parameters {
   213  		qp, err := p.toBQ()
   214  		if err != nil {
   215  			return nil, err
   216  		}
   217  		qconf.QueryParameters = append(qconf.QueryParameters, qp)
   218  	}
   219  	if len(qc.ConnectionProperties) > 0 {
   220  		bqcp := make([]*bq.ConnectionProperty, len(qc.ConnectionProperties))
   221  		for k, v := range qc.ConnectionProperties {
   222  			bqcp[k] = v.toBQ()
   223  		}
   224  		qconf.ConnectionProperties = bqcp
   225  	}
   226  	jc := &bq.JobConfiguration{
   227  		Labels: qc.Labels,
   228  		DryRun: qc.DryRun,
   229  		Query:  qconf,
   230  	}
   231  	if qc.JobTimeout > 0 {
   232  		jc.JobTimeoutMs = qc.JobTimeout.Milliseconds()
   233  	}
   234  	return jc, nil
   235  }
   236  
   237  func bqToQueryConfig(q *bq.JobConfiguration, c *Client) (*QueryConfig, error) {
   238  	qq := q.Query
   239  	qc := &QueryConfig{
   240  		Labels:                      q.Labels,
   241  		DryRun:                      q.DryRun,
   242  		JobTimeout:                  time.Duration(q.JobTimeoutMs) * time.Millisecond,
   243  		Q:                           qq.Query,
   244  		CreateDisposition:           TableCreateDisposition(qq.CreateDisposition),
   245  		WriteDisposition:            TableWriteDisposition(qq.WriteDisposition),
   246  		AllowLargeResults:           qq.AllowLargeResults,
   247  		Priority:                    QueryPriority(qq.Priority),
   248  		MaxBytesBilled:              qq.MaximumBytesBilled,
   249  		UseLegacySQL:                qq.UseLegacySql == nil || *qq.UseLegacySql,
   250  		TimePartitioning:            bqToTimePartitioning(qq.TimePartitioning),
   251  		RangePartitioning:           bqToRangePartitioning(qq.RangePartitioning),
   252  		Clustering:                  bqToClustering(qq.Clustering),
   253  		DestinationEncryptionConfig: bqToEncryptionConfig(qq.DestinationEncryptionConfiguration),
   254  		SchemaUpdateOptions:         qq.SchemaUpdateOptions,
   255  		CreateSession:               qq.CreateSession,
   256  	}
   257  	qc.UseStandardSQL = !qc.UseLegacySQL
   258  
   259  	if len(qq.TableDefinitions) > 0 {
   260  		qc.TableDefinitions = make(map[string]ExternalData)
   261  	}
   262  	for name, qedc := range qq.TableDefinitions {
   263  		edc, err := bqToExternalDataConfig(&qedc)
   264  		if err != nil {
   265  			return nil, err
   266  		}
   267  		qc.TableDefinitions[name] = edc
   268  	}
   269  	if qq.DefaultDataset != nil {
   270  		qc.DefaultProjectID = qq.DefaultDataset.ProjectId
   271  		qc.DefaultDatasetID = qq.DefaultDataset.DatasetId
   272  	}
   273  	if qq.MaximumBillingTier != nil {
   274  		qc.MaxBillingTier = int(*qq.MaximumBillingTier)
   275  	}
   276  	if qq.UseQueryCache != nil && !*qq.UseQueryCache {
   277  		qc.DisableQueryCache = true
   278  	}
   279  	if qq.FlattenResults != nil && !*qq.FlattenResults {
   280  		qc.DisableFlattenedResults = true
   281  	}
   282  	if qq.DestinationTable != nil {
   283  		qc.Dst = bqToTable(qq.DestinationTable, c)
   284  	}
   285  	for _, qp := range qq.QueryParameters {
   286  		p, err := bqToQueryParameter(qp)
   287  		if err != nil {
   288  			return nil, err
   289  		}
   290  		qc.Parameters = append(qc.Parameters, p)
   291  	}
   292  	if len(qq.ConnectionProperties) > 0 {
   293  		props := make([]*ConnectionProperty, len(qq.ConnectionProperties))
   294  		for k, v := range qq.ConnectionProperties {
   295  			props[k] = bqToConnectionProperty(v)
   296  		}
   297  		qc.ConnectionProperties = props
   298  	}
   299  	return qc, nil
   300  }
   301  
   302  // QueryPriority specifies a priority with which a query is to be executed.
   303  type QueryPriority string
   304  
   305  const (
   306  	// BatchPriority specifies that the query should be scheduled with the
   307  	// batch priority.  BigQuery queues each batch query on your behalf, and
   308  	// starts the query as soon as idle resources are available, usually within
   309  	// a few minutes. If BigQuery hasn't started the query within 24 hours,
   310  	// BigQuery changes the job priority to interactive. Batch queries don't
   311  	// count towards your concurrent rate limit, which can make it easier to
   312  	// start many queries at once.
   313  	//
   314  	// More information can be found at https://cloud.google.com/bigquery/docs/running-queries#batchqueries.
   315  	BatchPriority QueryPriority = "BATCH"
   316  	// InteractivePriority specifies that the query should be scheduled with
   317  	// interactive priority, which means that the query is executed as soon as
   318  	// possible. Interactive queries count towards your concurrent rate limit
   319  	// and your daily limit. It is the default priority with which queries get
   320  	// executed.
   321  	//
   322  	// More information can be found at https://cloud.google.com/bigquery/docs/running-queries#queries.
   323  	InteractivePriority QueryPriority = "INTERACTIVE"
   324  )
   325  
   326  // A Query queries data from a BigQuery table. Use Client.Query to create a Query.
   327  type Query struct {
   328  	JobIDConfig
   329  	QueryConfig
   330  	client *Client
   331  }
   332  
   333  // Query creates a query with string q.
   334  // The returned Query may optionally be further configured before its Run method is called.
   335  func (c *Client) Query(q string) *Query {
   336  	return &Query{
   337  		client:      c,
   338  		QueryConfig: QueryConfig{Q: q},
   339  	}
   340  }
   341  
   342  // Run initiates a query job.
   343  func (q *Query) Run(ctx context.Context) (j *Job, err error) {
   344  	ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Query.Run")
   345  	defer func() { trace.EndSpan(ctx, err) }()
   346  
   347  	job, err := q.newJob()
   348  	if err != nil {
   349  		return nil, err
   350  	}
   351  	j, err = q.client.insertJob(ctx, job, nil)
   352  	if err != nil {
   353  		return nil, err
   354  	}
   355  	return j, nil
   356  }
   357  
   358  func (q *Query) newJob() (*bq.Job, error) {
   359  	config, err := q.QueryConfig.toBQ()
   360  	if err != nil {
   361  		return nil, err
   362  	}
   363  	return &bq.Job{
   364  		JobReference:  q.JobIDConfig.createJobRef(q.client),
   365  		Configuration: config,
   366  	}, nil
   367  }
   368  
   369  // Read submits a query for execution and returns the results via a RowIterator.
   370  // If the request can be satisfied by running using the optimized query path, it
   371  // is used in place of the jobs.insert path as this path does not expose a job
   372  // object.
   373  func (q *Query) Read(ctx context.Context) (it *RowIterator, err error) {
   374  	if q.QueryConfig.DryRun {
   375  		return nil, errors.New("bigquery: cannot evaluate Query.Read() for dry-run queries")
   376  	}
   377  	ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Query.Run")
   378  	defer func() { trace.EndSpan(ctx, err) }()
   379  	queryRequest, err := q.probeFastPath()
   380  	if err != nil {
   381  		// Any error means we fallback to the older mechanism.
   382  		job, err := q.Run(ctx)
   383  		if err != nil {
   384  			return nil, err
   385  		}
   386  		return job.Read(ctx)
   387  	}
   388  
   389  	// we have a config, run on fastPath.
   390  	resp, err := q.client.runQuery(ctx, queryRequest)
   391  	if err != nil {
   392  		return nil, err
   393  	}
   394  
   395  	// construct a minimal job for backing the row iterator.
   396  	var minimalJob *Job
   397  	if resp.JobReference != nil {
   398  		minimalJob = &Job{
   399  			c:         q.client,
   400  			jobID:     resp.JobReference.JobId,
   401  			location:  resp.JobReference.Location,
   402  			projectID: resp.JobReference.ProjectId,
   403  		}
   404  	}
   405  
   406  	if resp.JobComplete {
   407  		// If more pages are available, discard and use the Storage API instead
   408  		if resp.PageToken != "" && q.client.isStorageReadAvailable() {
   409  			it, err = newStorageRowIteratorFromJob(ctx, minimalJob)
   410  			if err == nil {
   411  				return it, nil
   412  			}
   413  		}
   414  		rowSource := &rowSource{
   415  			j:       minimalJob,
   416  			queryID: resp.QueryId,
   417  			// RowIterator can precache results from the iterator to save a lookup.
   418  			cachedRows:      resp.Rows,
   419  			cachedSchema:    resp.Schema,
   420  			cachedNextToken: resp.PageToken,
   421  		}
   422  		return newRowIterator(ctx, rowSource, fetchPage), nil
   423  	}
   424  	// We're on the fastPath, but we need to poll because the job is incomplete.
   425  	// Fallback to job-based Read().
   426  	//
   427  	// (Issue 2937) In order to satisfy basic probing of the job in classic path,
   428  	// we need to supply additional config which is probed for presence, not contents.
   429  	//
   430  	minimalJob.config = &bq.JobConfiguration{
   431  		Query: &bq.JobConfigurationQuery{},
   432  	}
   433  
   434  	return minimalJob.Read(ctx)
   435  }
   436  
   437  // probeFastPath is used to attempt configuring a jobs.Query request based on a
   438  // user's Query configuration.  If all the options set on the job are supported on the
   439  // faster query path, this method returns a QueryRequest suitable for execution.
   440  func (q *Query) probeFastPath() (*bq.QueryRequest, error) {
   441  	if q.forceStorageAPI && q.client.isStorageReadAvailable() {
   442  		return nil, fmt.Errorf("force Storage API usage")
   443  	}
   444  	// This is a denylist of settings which prevent us from composing an equivalent
   445  	// bq.QueryRequest due to differences between configuration parameters accepted
   446  	// by jobs.insert vs jobs.query.
   447  	if q.QueryConfig.Dst != nil ||
   448  		q.QueryConfig.TableDefinitions != nil ||
   449  		q.QueryConfig.CreateDisposition != "" ||
   450  		q.QueryConfig.WriteDisposition != "" ||
   451  		!(q.QueryConfig.Priority == "" || q.QueryConfig.Priority == InteractivePriority) ||
   452  		q.QueryConfig.UseLegacySQL ||
   453  		q.QueryConfig.MaxBillingTier != 0 ||
   454  		q.QueryConfig.TimePartitioning != nil ||
   455  		q.QueryConfig.RangePartitioning != nil ||
   456  		q.QueryConfig.Clustering != nil ||
   457  		q.QueryConfig.DestinationEncryptionConfig != nil ||
   458  		q.QueryConfig.SchemaUpdateOptions != nil ||
   459  		q.QueryConfig.JobTimeout != 0 ||
   460  		// User has defined the jobID generation behavior
   461  		q.JobIDConfig.JobID != "" {
   462  		return nil, fmt.Errorf("QueryConfig incompatible with fastPath")
   463  	}
   464  	pfalse := false
   465  	qRequest := &bq.QueryRequest{
   466  		Query:              q.QueryConfig.Q,
   467  		CreateSession:      q.CreateSession,
   468  		Location:           q.Location,
   469  		UseLegacySql:       &pfalse,
   470  		MaximumBytesBilled: q.QueryConfig.MaxBytesBilled,
   471  		RequestId:          uid.NewSpace("request", nil).New(),
   472  		Labels:             q.Labels,
   473  		FormatOptions: &bq.DataFormatOptions{
   474  			UseInt64Timestamp: true,
   475  		},
   476  	}
   477  	if q.QueryConfig.DisableQueryCache {
   478  		qRequest.UseQueryCache = &pfalse
   479  	}
   480  	// Convert query parameters
   481  	for _, p := range q.QueryConfig.Parameters {
   482  		qp, err := p.toBQ()
   483  		if err != nil {
   484  			return nil, err
   485  		}
   486  		qRequest.QueryParameters = append(qRequest.QueryParameters, qp)
   487  	}
   488  	if q.QueryConfig.DefaultDatasetID != "" {
   489  		qRequest.DefaultDataset = &bq.DatasetReference{
   490  			ProjectId: q.QueryConfig.DefaultProjectID,
   491  			DatasetId: q.QueryConfig.DefaultDatasetID,
   492  		}
   493  	}
   494  	if q.client.enableQueryPreview {
   495  		qRequest.JobCreationMode = "JOB_CREATION_OPTIONAL"
   496  	}
   497  	return qRequest, nil
   498  }
   499  
   500  // ConnectionProperty represents a single key and value pair that can be sent alongside a query request or load job.
   501  type ConnectionProperty struct {
   502  	// Name of the connection property to set.
   503  	Key string
   504  	// Value of the connection property.
   505  	Value string
   506  }
   507  
   508  func (cp *ConnectionProperty) toBQ() *bq.ConnectionProperty {
   509  	if cp == nil {
   510  		return nil
   511  	}
   512  	return &bq.ConnectionProperty{
   513  		Key:   cp.Key,
   514  		Value: cp.Value,
   515  	}
   516  }
   517  
   518  func bqToConnectionProperty(in *bq.ConnectionProperty) *ConnectionProperty {
   519  	if in == nil {
   520  		return nil
   521  	}
   522  	return &ConnectionProperty{
   523  		Key:   in.Key,
   524  		Value: in.Value,
   525  	}
   526  }
   527  

View as plain text