...

Source file src/cloud.google.com/go/bigquery/job.go

Documentation: cloud.google.com/go/bigquery

     1  // Copyright 2015 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //      http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package bigquery
    16  
    17  import (
    18  	"context"
    19  	"errors"
    20  	"fmt"
    21  	"time"
    22  
    23  	"cloud.google.com/go/internal"
    24  	"cloud.google.com/go/internal/trace"
    25  	gax "github.com/googleapis/gax-go/v2"
    26  	bq "google.golang.org/api/bigquery/v2"
    27  	"google.golang.org/api/googleapi"
    28  	"google.golang.org/api/iterator"
    29  )
    30  
    31  // A Job represents an operation which has been submitted to BigQuery for processing.
    32  type Job struct {
    33  	c          *Client
    34  	projectID  string
    35  	jobID      string
    36  	location   string
    37  	email      string
    38  	config     *bq.JobConfiguration
    39  	lastStatus *JobStatus
    40  }
    41  
    42  // JobFromID creates a Job which refers to an existing BigQuery job. The job
    43  // need not have been created by this package. For example, the job may have
    44  // been created in the BigQuery console.
    45  //
    46  // For jobs whose location is other than "US" or "EU", set Client.Location or use
    47  // JobFromIDLocation.
    48  func (c *Client) JobFromID(ctx context.Context, id string) (*Job, error) {
    49  	return c.JobFromProject(ctx, c.projectID, id, c.Location)
    50  }
    51  
    52  // JobFromIDLocation creates a Job which refers to an existing BigQuery job. The job
    53  // need not have been created by this package (for example, it may have
    54  // been created in the BigQuery console), but it must exist in the specified location.
    55  func (c *Client) JobFromIDLocation(ctx context.Context, id, location string) (j *Job, err error) {
    56  	return c.JobFromProject(ctx, c.projectID, id, location)
    57  }
    58  
    59  // JobFromProject creates a Job which refers to an existing BigQuery job. The job
    60  // need not have been created by this package, nor does it need to reside within the same
    61  // project or location as the instantiated client.
    62  func (c *Client) JobFromProject(ctx context.Context, projectID, jobID, location string) (j *Job, err error) {
    63  	ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.JobFromProject")
    64  	defer func() { trace.EndSpan(ctx, err) }()
    65  
    66  	bqjob, err := c.getJobInternal(ctx, jobID, location, projectID, "user_email", "configuration", "jobReference", "status", "statistics")
    67  	if err != nil {
    68  		return nil, err
    69  	}
    70  	return bqToJob(bqjob, c)
    71  }
    72  
    73  // ProjectID returns the job's associated project.
    74  func (j *Job) ProjectID() string {
    75  	return j.projectID
    76  }
    77  
    78  // ID returns the job's ID.
    79  func (j *Job) ID() string {
    80  	return j.jobID
    81  }
    82  
    83  // Location returns the job's location.
    84  func (j *Job) Location() string {
    85  	return j.location
    86  }
    87  
    88  // Email returns the email of the job's creator.
    89  func (j *Job) Email() string {
    90  	return j.email
    91  }
    92  
    93  // State is one of a sequence of states that a Job progresses through as it is processed.
    94  type State int
    95  
    96  const (
    97  	// StateUnspecified is the default JobIterator state.
    98  	StateUnspecified State = iota
    99  	// Pending is a state that describes that the job is pending.
   100  	Pending
   101  	// Running is a state that describes that the job is running.
   102  	Running
   103  	// Done is a state that describes that the job is done.
   104  	Done
   105  )
   106  
   107  // JobStatus contains the current State of a job, and errors encountered while processing that job.
   108  type JobStatus struct {
   109  	State State
   110  
   111  	err error
   112  
   113  	// All errors encountered during the running of the job.
   114  	// Not all Errors are fatal, so errors here do not necessarily mean that the job has completed or was unsuccessful.
   115  	Errors []*Error
   116  
   117  	// Statistics about the job.
   118  	Statistics *JobStatistics
   119  }
   120  
   121  // JobConfig contains configuration information for a job. It is implemented by
   122  // *CopyConfig, *ExtractConfig, *LoadConfig and *QueryConfig.
   123  type JobConfig interface {
   124  	isJobConfig()
   125  }
   126  
   127  func (*CopyConfig) isJobConfig()    {}
   128  func (*ExtractConfig) isJobConfig() {}
   129  func (*LoadConfig) isJobConfig()    {}
   130  func (*QueryConfig) isJobConfig()   {}
   131  
   132  // Config returns the configuration information for j.
   133  func (j *Job) Config() (JobConfig, error) {
   134  	return bqToJobConfig(j.config, j.c)
   135  }
   136  
   137  // Children returns a job iterator for enumerating child jobs
   138  // of the current job.  Currently only scripts, a form of query job,
   139  // will create child jobs.
   140  func (j *Job) Children(ctx context.Context) *JobIterator {
   141  	it := j.c.Jobs(ctx)
   142  	it.ParentJobID = j.ID()
   143  	return it
   144  }
   145  
   146  func bqToJobConfig(q *bq.JobConfiguration, c *Client) (JobConfig, error) {
   147  	switch {
   148  	case q == nil:
   149  		return nil, nil
   150  	case q.Copy != nil:
   151  		return bqToCopyConfig(q, c), nil
   152  	case q.Extract != nil:
   153  		return bqToExtractConfig(q, c), nil
   154  	case q.Load != nil:
   155  		return bqToLoadConfig(q, c), nil
   156  	case q.Query != nil:
   157  		return bqToQueryConfig(q, c)
   158  	default:
   159  		return nil, nil
   160  	}
   161  }
   162  
   163  // JobIDConfig  describes how to create an ID for a job.
   164  type JobIDConfig struct {
   165  	// JobID is the ID to use for the job. If empty, a random job ID will be generated.
   166  	JobID string
   167  
   168  	// If AddJobIDSuffix is true, then a random string will be appended to JobID.
   169  	AddJobIDSuffix bool
   170  
   171  	// Location is the location for the job.
   172  	Location string
   173  
   174  	// ProjectID is the Google Cloud project associated with the job.
   175  	ProjectID string
   176  }
   177  
   178  // createJobRef creates a JobReference.
   179  func (j *JobIDConfig) createJobRef(c *Client) *bq.JobReference {
   180  	projectID := j.ProjectID
   181  	if projectID == "" { // Use Client.ProjectID as a default.
   182  		projectID = c.projectID
   183  	}
   184  	loc := j.Location
   185  	if loc == "" { // Use Client.Location as a default.
   186  		loc = c.Location
   187  	}
   188  	jr := &bq.JobReference{ProjectId: projectID, Location: loc}
   189  	if j.JobID == "" {
   190  		jr.JobId = randomIDFn()
   191  	} else if j.AddJobIDSuffix {
   192  		jr.JobId = j.JobID + "-" + randomIDFn()
   193  	} else {
   194  		jr.JobId = j.JobID
   195  	}
   196  	return jr
   197  }
   198  
   199  // Done reports whether the job has completed.
   200  // After Done returns true, the Err method will return an error if the job completed unsuccessfully.
   201  func (s *JobStatus) Done() bool {
   202  	return s.State == Done
   203  }
   204  
   205  // Err returns the error that caused the job to complete unsuccessfully (if any).
   206  func (s *JobStatus) Err() error {
   207  	return s.err
   208  }
   209  
   210  // Status retrieves the current status of the job from BigQuery. It fails if the Status could not be determined.
   211  func (j *Job) Status(ctx context.Context) (js *JobStatus, err error) {
   212  	ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Job.Status")
   213  	defer func() { trace.EndSpan(ctx, err) }()
   214  
   215  	bqjob, err := j.c.getJobInternal(ctx, j.jobID, j.location, j.projectID, "status", "statistics")
   216  	if err != nil {
   217  		return nil, err
   218  	}
   219  	if err := j.setStatus(bqjob.Status); err != nil {
   220  		return nil, err
   221  	}
   222  	j.setStatistics(bqjob.Statistics, j.c)
   223  	return j.lastStatus, nil
   224  }
   225  
   226  // LastStatus returns the most recently retrieved status of the job. The status is
   227  // retrieved when a new job is created, or when JobFromID or Job.Status is called.
   228  // Call Job.Status to get the most up-to-date information about a job.
   229  func (j *Job) LastStatus() *JobStatus {
   230  	return j.lastStatus
   231  }
   232  
   233  // Cancel requests that a job be cancelled. This method returns without waiting for
   234  // cancellation to take effect. To check whether the job has terminated, use Job.Status.
   235  // Cancelled jobs may still incur costs.
   236  func (j *Job) Cancel(ctx context.Context) error {
   237  	// Jobs.Cancel returns a job entity, but the only relevant piece of
   238  	// data it may contain (the status of the job) is unreliable.  From the
   239  	// docs: "This call will return immediately, and the client will need
   240  	// to poll for the job status to see if the cancel completed
   241  	// successfully".  So it would be misleading to return a status.
   242  	call := j.c.bqs.Jobs.Cancel(j.projectID, j.jobID).
   243  		Location(j.location).
   244  		Fields(). // We don't need any of the response data.
   245  		Context(ctx)
   246  	setClientHeader(call.Header())
   247  	return runWithRetry(ctx, func() error {
   248  		sCtx := trace.StartSpan(ctx, "bigquery.jobs.cancel")
   249  		_, err := call.Do()
   250  		trace.EndSpan(sCtx, err)
   251  		return err
   252  	})
   253  }
   254  
   255  // Delete deletes the job.
   256  func (j *Job) Delete(ctx context.Context) (err error) {
   257  	ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Job.Delete")
   258  	defer func() { trace.EndSpan(ctx, err) }()
   259  
   260  	call := j.c.bqs.Jobs.Delete(j.projectID, j.jobID).Context(ctx)
   261  	if j.location != "" {
   262  		call = call.Location(j.location)
   263  	}
   264  	setClientHeader(call.Header())
   265  
   266  	return runWithRetry(ctx, func() (err error) {
   267  		sCtx := trace.StartSpan(ctx, "bigquery.jobs.delete")
   268  		err = call.Do()
   269  		trace.EndSpan(sCtx, err)
   270  		return err
   271  	})
   272  }
   273  
   274  // Wait blocks until the job or the context is done. It returns the final status
   275  // of the job.
   276  // If an error occurs while retrieving the status, Wait returns that error. But
   277  // Wait returns nil if the status was retrieved successfully, even if
   278  // status.Err() != nil. So callers must check both errors. See the example.
   279  func (j *Job) Wait(ctx context.Context) (js *JobStatus, err error) {
   280  	ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Job.Wait")
   281  	defer func() { trace.EndSpan(ctx, err) }()
   282  
   283  	if j.isQuery() {
   284  		// We can avoid polling for query jobs.
   285  		if _, _, err := j.waitForQuery(ctx, j.projectID); err != nil {
   286  			return nil, err
   287  		}
   288  		// Note: extra RPC even if you just want to wait for the query to finish.
   289  		js, err := j.Status(ctx)
   290  		if err != nil {
   291  			return nil, err
   292  		}
   293  		return js, nil
   294  	}
   295  	// Non-query jobs must poll.
   296  	err = internal.Retry(ctx, gax.Backoff{}, func() (stop bool, err error) {
   297  		js, err = j.Status(ctx)
   298  		if err != nil {
   299  			return true, err
   300  		}
   301  		if js.Done() {
   302  			return true, nil
   303  		}
   304  		return false, nil
   305  	})
   306  	if err != nil {
   307  		return nil, err
   308  	}
   309  	return js, nil
   310  }
   311  
   312  // Read fetches the results of a query job.
   313  // If j is not a query job, Read returns an error.
   314  func (j *Job) Read(ctx context.Context) (ri *RowIterator, err error) {
   315  	ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Job.Read")
   316  	defer func() { trace.EndSpan(ctx, err) }()
   317  
   318  	return j.read(ctx, j.waitForQuery, fetchPage)
   319  }
   320  
   321  func (j *Job) read(ctx context.Context, waitForQuery func(context.Context, string) (Schema, uint64, error), pf pageFetcher) (*RowIterator, error) {
   322  	if !j.isQuery() {
   323  		return nil, errors.New("bigquery: cannot read from a non-query job")
   324  	}
   325  	schema, totalRows, err := waitForQuery(ctx, j.projectID)
   326  	if err != nil {
   327  		return nil, err
   328  	}
   329  	var it *RowIterator
   330  	if j.c.isStorageReadAvailable() {
   331  		it, err = newStorageRowIteratorFromJob(ctx, j)
   332  		if err != nil {
   333  			it = nil
   334  		}
   335  	}
   336  	if it == nil {
   337  		// Shave off some potential overhead by only retaining the minimal job representation in the iterator.
   338  		itJob := &Job{
   339  			c:         j.c,
   340  			projectID: j.projectID,
   341  			jobID:     j.jobID,
   342  			location:  j.location,
   343  		}
   344  		it = newRowIterator(ctx, &rowSource{j: itJob}, pf)
   345  		it.TotalRows = totalRows
   346  	}
   347  	it.Schema = schema
   348  	return it, nil
   349  }
   350  
   351  // waitForQuery waits for the query job to complete and returns its schema. It also
   352  // returns the total number of rows in the result set.
   353  func (j *Job) waitForQuery(ctx context.Context, projectID string) (Schema, uint64, error) {
   354  	// Use GetQueryResults only to wait for completion, not to read results.
   355  	call := j.c.bqs.Jobs.GetQueryResults(projectID, j.jobID).Location(j.location).Context(ctx).MaxResults(0)
   356  	call = call.FormatOptionsUseInt64Timestamp(true)
   357  	setClientHeader(call.Header())
   358  	backoff := gax.Backoff{
   359  		Initial:    1 * time.Second,
   360  		Multiplier: 2,
   361  		Max:        60 * time.Second,
   362  	}
   363  	var res *bq.GetQueryResultsResponse
   364  	err := internal.Retry(ctx, backoff, func() (stop bool, err error) {
   365  		sCtx := trace.StartSpan(ctx, "bigquery.jobs.getQueryResults")
   366  		res, err = call.Do()
   367  		trace.EndSpan(sCtx, err)
   368  		if err != nil {
   369  			return !retryableError(err, jobRetryReasons), err
   370  		}
   371  		if !res.JobComplete { // GetQueryResults may return early without error; retry.
   372  			return false, nil
   373  		}
   374  		return true, nil
   375  	})
   376  	if err != nil {
   377  		return nil, 0, err
   378  	}
   379  	return bqToSchema(res.Schema), res.TotalRows, nil
   380  }
   381  
   382  // JobStatistics contains statistics about a job.
   383  type JobStatistics struct {
   384  	CreationTime        time.Time
   385  	StartTime           time.Time
   386  	EndTime             time.Time
   387  	TotalBytesProcessed int64
   388  
   389  	Details Statistics
   390  
   391  	// NumChildJobs indicates the number of child jobs run as part of a script.
   392  	NumChildJobs int64
   393  
   394  	// ParentJobID indicates the origin job for jobs run as part of a script.
   395  	ParentJobID string
   396  
   397  	// ScriptStatistics includes information run as part of a child job within
   398  	// a script.
   399  	ScriptStatistics *ScriptStatistics
   400  
   401  	// ReservationUsage attributes slot consumption to reservations.
   402  	ReservationUsage []*ReservationUsage
   403  
   404  	// TransactionInfo indicates the transaction ID associated with the job, if any.
   405  	TransactionInfo *TransactionInfo
   406  
   407  	// SessionInfo contains information about the session if this job is part of one.
   408  	SessionInfo *SessionInfo
   409  }
   410  
   411  // Statistics is one of ExtractStatistics, LoadStatistics or QueryStatistics.
   412  type Statistics interface {
   413  	implementsStatistics()
   414  }
   415  
   416  // ExtractStatistics contains statistics about an extract job.
   417  type ExtractStatistics struct {
   418  	// The number of files per destination URI or URI pattern specified in the
   419  	// extract configuration. These values will be in the same order as the
   420  	// URIs specified in the 'destinationUris' field.
   421  	DestinationURIFileCounts []int64
   422  }
   423  
   424  // LoadStatistics contains statistics about a load job.
   425  type LoadStatistics struct {
   426  	// The number of bytes of source data in a load job.
   427  	InputFileBytes int64
   428  
   429  	// The number of source files in a load job.
   430  	InputFiles int64
   431  
   432  	// Size of the loaded data in bytes. Note that while a load job is in the
   433  	// running state, this value may change.
   434  	OutputBytes int64
   435  
   436  	// The number of rows imported in a load job. Note that while an import job is
   437  	// in the running state, this value may change.
   438  	OutputRows int64
   439  }
   440  
   441  // QueryStatistics contains statistics about a query job.
   442  type QueryStatistics struct {
   443  
   444  	// BI-Engine specific statistics.
   445  	BIEngineStatistics *BIEngineStatistics
   446  
   447  	// Billing tier for the job.
   448  	BillingTier int64
   449  
   450  	// Whether the query result was fetched from the query cache.
   451  	CacheHit bool
   452  
   453  	// The type of query statement, if valid.
   454  	StatementType string
   455  
   456  	// Total bytes billed for the job.
   457  	TotalBytesBilled int64
   458  
   459  	// Total bytes processed for the job.
   460  	TotalBytesProcessed int64
   461  
   462  	// For dry run queries, indicates how accurate the TotalBytesProcessed value is.
   463  	// When indicated, values include:
   464  	// UNKNOWN: accuracy of the estimate is unknown.
   465  	// PRECISE: estimate is precise.
   466  	// LOWER_BOUND: estimate is lower bound of what the query would cost.
   467  	// UPPER_BOUND: estimate is upper bound of what the query would cost.
   468  	TotalBytesProcessedAccuracy string
   469  
   470  	// Describes execution plan for the query.
   471  	QueryPlan []*ExplainQueryStage
   472  
   473  	// The number of rows affected by a DML statement. Present only for DML
   474  	// statements INSERT, UPDATE or DELETE.
   475  	NumDMLAffectedRows int64
   476  
   477  	// DMLStats provides statistics about the row mutations performed by
   478  	// DML statements.
   479  	DMLStats *DMLStatistics
   480  
   481  	// Describes a timeline of job execution.
   482  	Timeline []*QueryTimelineSample
   483  
   484  	// ReferencedTables: [Output-only] Referenced tables for
   485  	// the job. Queries that reference more than 50 tables will not have a
   486  	// complete list.
   487  	ReferencedTables []*Table
   488  
   489  	// The schema of the results. Present only for successful dry run of
   490  	// non-legacy SQL queries.
   491  	Schema Schema
   492  
   493  	// Slot-milliseconds consumed by this query job.
   494  	SlotMillis int64
   495  
   496  	// Standard SQL: list of undeclared query parameter names detected during a
   497  	// dry run validation.
   498  	UndeclaredQueryParameterNames []string
   499  
   500  	// DDL target table.
   501  	DDLTargetTable *Table
   502  
   503  	// DDL Operation performed on the target table.  Used to report how the
   504  	// query impacted the DDL target table.
   505  	DDLOperationPerformed string
   506  
   507  	// The DDL target table, present only for CREATE/DROP FUNCTION/PROCEDURE queries.
   508  	DDLTargetRoutine *Routine
   509  
   510  	// Statistics for the EXPORT DATA statement as part of Query Job.
   511  	ExportDataStatistics *ExportDataStatistics
   512  }
   513  
   514  // ExportDataStatistics represents statistics for
   515  // a EXPORT DATA statement as part of Query Job.
   516  type ExportDataStatistics struct {
   517  	// Number of destination files generated.
   518  	FileCount int64
   519  
   520  	// Number of destination rows generated.
   521  	RowCount int64
   522  }
   523  
   524  func bqToExportDataStatistics(in *bq.ExportDataStatistics) *ExportDataStatistics {
   525  	if in == nil {
   526  		return nil
   527  	}
   528  	stats := &ExportDataStatistics{
   529  		FileCount: in.FileCount,
   530  		RowCount:  in.RowCount,
   531  	}
   532  	return stats
   533  }
   534  
   535  // BIEngineStatistics contains query statistics specific to the use of BI Engine.
   536  type BIEngineStatistics struct {
   537  	// Specifies which mode of BI Engine acceleration was performed.
   538  	BIEngineMode string
   539  
   540  	// In case of DISABLED or PARTIAL BIEngineMode, these
   541  	// contain the explanatory reasons as to why BI Engine could not
   542  	// accelerate. In case the full query was accelerated, this field is not
   543  	// populated.
   544  	BIEngineReasons []*BIEngineReason
   545  }
   546  
   547  func bqToBIEngineStatistics(in *bq.BiEngineStatistics) *BIEngineStatistics {
   548  	if in == nil {
   549  		return nil
   550  	}
   551  	stats := &BIEngineStatistics{
   552  		BIEngineMode: in.BiEngineMode,
   553  	}
   554  	for _, v := range in.BiEngineReasons {
   555  		stats.BIEngineReasons = append(stats.BIEngineReasons, bqToBIEngineReason(v))
   556  	}
   557  	return stats
   558  }
   559  
   560  // BIEngineReason contains more detailed information about why a query wasn't fully
   561  // accelerated.
   562  type BIEngineReason struct {
   563  	// High-Level BI engine reason for partial or disabled acceleration.
   564  	Code string
   565  
   566  	// Human-readable reason for partial or disabled acceleration.
   567  	Message string
   568  }
   569  
   570  func bqToBIEngineReason(in *bq.BiEngineReason) *BIEngineReason {
   571  	if in == nil {
   572  		return nil
   573  	}
   574  	return &BIEngineReason{
   575  		Code:    in.Code,
   576  		Message: in.Message,
   577  	}
   578  }
   579  
   580  // ExplainQueryStage describes one stage of a query.
   581  type ExplainQueryStage struct {
   582  	// CompletedParallelInputs: Number of parallel input segments completed.
   583  	CompletedParallelInputs int64
   584  
   585  	// ComputeAvg: Duration the average shard spent on CPU-bound tasks.
   586  	ComputeAvg time.Duration
   587  
   588  	// ComputeMax: Duration the slowest shard spent on CPU-bound tasks.
   589  	ComputeMax time.Duration
   590  
   591  	// Relative amount of the total time the average shard spent on CPU-bound tasks.
   592  	ComputeRatioAvg float64
   593  
   594  	// Relative amount of the total time the slowest shard spent on CPU-bound tasks.
   595  	ComputeRatioMax float64
   596  
   597  	// EndTime: Stage end time.
   598  	EndTime time.Time
   599  
   600  	// Unique ID for stage within plan.
   601  	ID int64
   602  
   603  	// InputStages: IDs for stages that are inputs to this stage.
   604  	InputStages []int64
   605  
   606  	// Human-readable name for stage.
   607  	Name string
   608  
   609  	// ParallelInputs: Number of parallel input segments to be processed.
   610  	ParallelInputs int64
   611  
   612  	// ReadAvg: Duration the average shard spent reading input.
   613  	ReadAvg time.Duration
   614  
   615  	// ReadMax: Duration the slowest shard spent reading input.
   616  	ReadMax time.Duration
   617  
   618  	// Relative amount of the total time the average shard spent reading input.
   619  	ReadRatioAvg float64
   620  
   621  	// Relative amount of the total time the slowest shard spent reading input.
   622  	ReadRatioMax float64
   623  
   624  	// Number of records read into the stage.
   625  	RecordsRead int64
   626  
   627  	// Number of records written by the stage.
   628  	RecordsWritten int64
   629  
   630  	// ShuffleOutputBytes: Total number of bytes written to shuffle.
   631  	ShuffleOutputBytes int64
   632  
   633  	// ShuffleOutputBytesSpilled: Total number of bytes written to shuffle
   634  	// and spilled to disk.
   635  	ShuffleOutputBytesSpilled int64
   636  
   637  	// StartTime: Stage start time.
   638  	StartTime time.Time
   639  
   640  	// Current status for the stage.
   641  	Status string
   642  
   643  	// List of operations within the stage in dependency order (approximately
   644  	// chronological).
   645  	Steps []*ExplainQueryStep
   646  
   647  	// WaitAvg: Duration the average shard spent waiting to be scheduled.
   648  	WaitAvg time.Duration
   649  
   650  	// WaitMax: Duration the slowest shard spent waiting to be scheduled.
   651  	WaitMax time.Duration
   652  
   653  	// Relative amount of the total time the average shard spent waiting to be scheduled.
   654  	WaitRatioAvg float64
   655  
   656  	// Relative amount of the total time the slowest shard spent waiting to be scheduled.
   657  	WaitRatioMax float64
   658  
   659  	// WriteAvg: Duration the average shard spent on writing output.
   660  	WriteAvg time.Duration
   661  
   662  	// WriteMax: Duration the slowest shard spent on writing output.
   663  	WriteMax time.Duration
   664  
   665  	// Relative amount of the total time the average shard spent on writing output.
   666  	WriteRatioAvg float64
   667  
   668  	// Relative amount of the total time the slowest shard spent on writing output.
   669  	WriteRatioMax float64
   670  }
   671  
   672  // ExplainQueryStep describes one step of a query stage.
   673  type ExplainQueryStep struct {
   674  	// Machine-readable operation type.
   675  	Kind string
   676  
   677  	// Human-readable stage descriptions.
   678  	Substeps []string
   679  }
   680  
   681  // QueryTimelineSample represents a sample of execution statistics at a point in time.
   682  type QueryTimelineSample struct {
   683  
   684  	// Total number of units currently being processed by workers, represented as largest value since last sample.
   685  	ActiveUnits int64
   686  
   687  	// Total parallel units of work completed by this query.
   688  	CompletedUnits int64
   689  
   690  	// Time elapsed since start of query execution.
   691  	Elapsed time.Duration
   692  
   693  	// Total parallel units of work remaining for the active stages.
   694  	PendingUnits int64
   695  
   696  	// Cumulative slot-milliseconds consumed by the query.
   697  	SlotMillis int64
   698  }
   699  
   700  // ReservationUsage contains information about a job's usage of a single reservation.
   701  type ReservationUsage struct {
   702  	// SlotMillis reports the slot milliseconds utilized within in the given reservation.
   703  	SlotMillis int64
   704  	// Name indicates the utilized reservation name, or "unreserved" for ondemand usage.
   705  	Name string
   706  }
   707  
   708  func bqToReservationUsage(ru []*bq.JobStatisticsReservationUsage) []*ReservationUsage {
   709  	var usage []*ReservationUsage
   710  	for _, in := range ru {
   711  		usage = append(usage, &ReservationUsage{
   712  			SlotMillis: in.SlotMs,
   713  			Name:       in.Name,
   714  		})
   715  	}
   716  	return usage
   717  }
   718  
   719  // ScriptStatistics report information about script-based query jobs.
   720  type ScriptStatistics struct {
   721  	EvaluationKind string
   722  	StackFrames    []*ScriptStackFrame
   723  }
   724  
   725  func bqToScriptStatistics(bs *bq.ScriptStatistics) *ScriptStatistics {
   726  	if bs == nil {
   727  		return nil
   728  	}
   729  	ss := &ScriptStatistics{
   730  		EvaluationKind: bs.EvaluationKind,
   731  	}
   732  	for _, f := range bs.StackFrames {
   733  		ss.StackFrames = append(ss.StackFrames, bqToScriptStackFrame(f))
   734  	}
   735  	return ss
   736  }
   737  
   738  // ScriptStackFrame represents the location of the statement/expression being evaluated.
   739  //
   740  // Line and column numbers are defined as follows:
   741  //
   742  //   - Line and column numbers start with one.  That is, line 1 column 1 denotes
   743  //     the start of the script.
   744  //   - When inside a stored procedure, all line/column numbers are relative
   745  //     to the procedure body, not the script in which the procedure was defined.
   746  //   - Start/end positions exclude leading/trailing comments and whitespace.
   747  //     The end position always ends with a ";", when present.
   748  //   - Multi-byte Unicode characters are treated as just one column.
   749  //   - If the original script (or procedure definition) contains TAB characters,
   750  //     a tab "snaps" the indentation forward to the nearest multiple of 8
   751  //     characters, plus 1. For example, a TAB on column 1, 2, 3, 4, 5, 6 , or 8
   752  //     will advance the next character to column 9.  A TAB on column 9, 10, 11,
   753  //     12, 13, 14, 15, or 16 will advance the next character to column 17.
   754  type ScriptStackFrame struct {
   755  	StartLine   int64
   756  	StartColumn int64
   757  	EndLine     int64
   758  	EndColumn   int64
   759  	// Name of the active procedure.  Empty if in a top-level script.
   760  	ProcedureID string
   761  	// Text of the current statement/expression.
   762  	Text string
   763  }
   764  
   765  func bqToScriptStackFrame(bsf *bq.ScriptStackFrame) *ScriptStackFrame {
   766  	if bsf == nil {
   767  		return nil
   768  	}
   769  	return &ScriptStackFrame{
   770  		StartLine:   bsf.StartLine,
   771  		StartColumn: bsf.StartColumn,
   772  		EndLine:     bsf.EndLine,
   773  		EndColumn:   bsf.EndColumn,
   774  		ProcedureID: bsf.ProcedureId,
   775  		Text:        bsf.Text,
   776  	}
   777  }
   778  
   779  // DMLStatistics contains counts of row mutations triggered by a DML query statement.
   780  type DMLStatistics struct {
   781  	// Rows added by the statement.
   782  	InsertedRowCount int64
   783  	// Rows removed by the statement.
   784  	DeletedRowCount int64
   785  	// Rows changed by the statement.
   786  	UpdatedRowCount int64
   787  }
   788  
   789  func bqToDMLStatistics(q *bq.DmlStatistics) *DMLStatistics {
   790  	if q == nil {
   791  		return nil
   792  	}
   793  	return &DMLStatistics{
   794  		InsertedRowCount: q.InsertedRowCount,
   795  		DeletedRowCount:  q.DeletedRowCount,
   796  		UpdatedRowCount:  q.UpdatedRowCount,
   797  	}
   798  }
   799  
   800  func (*ExtractStatistics) implementsStatistics() {}
   801  func (*LoadStatistics) implementsStatistics()    {}
   802  func (*QueryStatistics) implementsStatistics()   {}
   803  
   804  // Jobs lists jobs within a project.
   805  func (c *Client) Jobs(ctx context.Context) *JobIterator {
   806  	it := &JobIterator{
   807  		ctx:       ctx,
   808  		c:         c,
   809  		ProjectID: c.projectID,
   810  	}
   811  	it.pageInfo, it.nextFunc = iterator.NewPageInfo(
   812  		it.fetch,
   813  		func() int { return len(it.items) },
   814  		func() interface{} { b := it.items; it.items = nil; return b })
   815  	return it
   816  }
   817  
   818  // JobIterator iterates over jobs in a project.
   819  type JobIterator struct {
   820  	ProjectID       string    // Project ID of the jobs to list. Default is the client's project.
   821  	AllUsers        bool      // Whether to list jobs owned by all users in the project, or just the current caller.
   822  	State           State     // List only jobs in the given state. Defaults to all states.
   823  	MinCreationTime time.Time // List only jobs created after this time.
   824  	MaxCreationTime time.Time // List only jobs created before this time.
   825  	ParentJobID     string    // List only jobs that are children of a given scripting job.
   826  
   827  	ctx      context.Context
   828  	c        *Client
   829  	pageInfo *iterator.PageInfo
   830  	nextFunc func() error
   831  	items    []*Job
   832  }
   833  
   834  // PageInfo is a getter for the JobIterator's PageInfo.
   835  func (it *JobIterator) PageInfo() *iterator.PageInfo { return it.pageInfo }
   836  
   837  // Next returns the next Job. Its second return value is iterator.Done if
   838  // there are no more results. Once Next returns Done, all subsequent calls will
   839  // return Done.
   840  func (it *JobIterator) Next() (*Job, error) {
   841  	if err := it.nextFunc(); err != nil {
   842  		return nil, err
   843  	}
   844  	item := it.items[0]
   845  	it.items = it.items[1:]
   846  	return item, nil
   847  }
   848  
   849  func (it *JobIterator) fetch(pageSize int, pageToken string) (string, error) {
   850  	var st string
   851  	switch it.State {
   852  	case StateUnspecified:
   853  		st = ""
   854  	case Pending:
   855  		st = "pending"
   856  	case Running:
   857  		st = "running"
   858  	case Done:
   859  		st = "done"
   860  	default:
   861  		return "", fmt.Errorf("bigquery: invalid value for JobIterator.State: %d", it.State)
   862  	}
   863  
   864  	req := it.c.bqs.Jobs.List(it.ProjectID).
   865  		Context(it.ctx).
   866  		PageToken(pageToken).
   867  		Projection("full").
   868  		AllUsers(it.AllUsers)
   869  	if st != "" {
   870  		req.StateFilter(st)
   871  	}
   872  	if !it.MinCreationTime.IsZero() {
   873  		req.MinCreationTime(uint64(it.MinCreationTime.UnixNano() / 1e6))
   874  	}
   875  	if !it.MaxCreationTime.IsZero() {
   876  		req.MaxCreationTime(uint64(it.MaxCreationTime.UnixNano() / 1e6))
   877  	}
   878  	setClientHeader(req.Header())
   879  	if pageSize > 0 {
   880  		req.MaxResults(int64(pageSize))
   881  	}
   882  	if it.ParentJobID != "" {
   883  		req.ParentJobId(it.ParentJobID)
   884  	}
   885  	var res *bq.JobList
   886  	err := runWithRetry(it.ctx, func() (err error) {
   887  		sCtx := trace.StartSpan(it.ctx, "bigquery.jobs.list")
   888  		res, err = req.Do()
   889  		trace.EndSpan(sCtx, err)
   890  		return err
   891  	})
   892  
   893  	if err != nil {
   894  		return "", err
   895  	}
   896  	for _, j := range res.Jobs {
   897  		job, err := convertListedJob(j, it.c)
   898  		if err != nil {
   899  			return "", err
   900  		}
   901  		it.items = append(it.items, job)
   902  	}
   903  	return res.NextPageToken, nil
   904  }
   905  
   906  func convertListedJob(j *bq.JobListJobs, c *Client) (*Job, error) {
   907  	return bqToJob2(j.JobReference, j.Configuration, j.Status, j.Statistics, j.UserEmail, c)
   908  }
   909  
   910  func (c *Client) getJobInternal(ctx context.Context, jobID, location, projectID string, fields ...googleapi.Field) (*bq.Job, error) {
   911  	var job *bq.Job
   912  	proj := projectID
   913  	if proj == "" {
   914  		proj = c.projectID
   915  	}
   916  	call := c.bqs.Jobs.Get(proj, jobID).Context(ctx)
   917  	if location != "" {
   918  		call = call.Location(location)
   919  	}
   920  	if len(fields) > 0 {
   921  		call = call.Fields(fields...)
   922  	}
   923  	setClientHeader(call.Header())
   924  	err := runWithRetry(ctx, func() (err error) {
   925  		sCtx := trace.StartSpan(ctx, "bigquery.jobs.get")
   926  		job, err = call.Do()
   927  		trace.EndSpan(sCtx, err)
   928  		return err
   929  	})
   930  	if err != nil {
   931  		return nil, err
   932  	}
   933  	return job, nil
   934  }
   935  
   936  func bqToJob(q *bq.Job, c *Client) (*Job, error) {
   937  	return bqToJob2(q.JobReference, q.Configuration, q.Status, q.Statistics, q.UserEmail, c)
   938  }
   939  
   940  func bqToJob2(qr *bq.JobReference, qc *bq.JobConfiguration, qs *bq.JobStatus, qt *bq.JobStatistics, email string, c *Client) (*Job, error) {
   941  	j := &Job{
   942  		projectID: qr.ProjectId,
   943  		jobID:     qr.JobId,
   944  		location:  qr.Location,
   945  		c:         c,
   946  		email:     email,
   947  	}
   948  	j.setConfig(qc)
   949  	if err := j.setStatus(qs); err != nil {
   950  		return nil, err
   951  	}
   952  	j.setStatistics(qt, c)
   953  	return j, nil
   954  }
   955  
   956  func (j *Job) setConfig(config *bq.JobConfiguration) {
   957  	if config == nil {
   958  		return
   959  	}
   960  	j.config = config
   961  }
   962  
   963  func (j *Job) isQuery() bool {
   964  	return j.config != nil && j.config.Query != nil
   965  }
   966  
   967  func (j *Job) isScript() bool {
   968  	return j.hasStatementType("SCRIPT")
   969  }
   970  
   971  func (j *Job) isSelectQuery() bool {
   972  	return j.hasStatementType("SELECT")
   973  }
   974  
   975  func (j *Job) hasStatementType(statementType string) bool {
   976  	if !j.isQuery() {
   977  		return false
   978  	}
   979  	if j.lastStatus == nil {
   980  		return false
   981  	}
   982  	queryStats, ok := j.lastStatus.Statistics.Details.(*QueryStatistics)
   983  	if !ok {
   984  		return false
   985  	}
   986  	return queryStats.StatementType == statementType
   987  }
   988  
   989  var stateMap = map[string]State{"PENDING": Pending, "RUNNING": Running, "DONE": Done}
   990  
   991  func (j *Job) setStatus(qs *bq.JobStatus) error {
   992  	if qs == nil {
   993  		return nil
   994  	}
   995  	state, ok := stateMap[qs.State]
   996  	if !ok {
   997  		return fmt.Errorf("unexpected job state: %s", qs.State)
   998  	}
   999  	j.lastStatus = &JobStatus{
  1000  		State: state,
  1001  		err:   nil,
  1002  	}
  1003  	if err := bqToError(qs.ErrorResult); state == Done && err != nil {
  1004  		j.lastStatus.err = err
  1005  	}
  1006  	for _, ep := range qs.Errors {
  1007  		j.lastStatus.Errors = append(j.lastStatus.Errors, bqToError(ep))
  1008  	}
  1009  	return nil
  1010  }
  1011  
  1012  func (j *Job) setStatistics(s *bq.JobStatistics, c *Client) {
  1013  	if s == nil || j.lastStatus == nil {
  1014  		return
  1015  	}
  1016  	js := &JobStatistics{
  1017  		CreationTime:        unixMillisToTime(s.CreationTime),
  1018  		StartTime:           unixMillisToTime(s.StartTime),
  1019  		EndTime:             unixMillisToTime(s.EndTime),
  1020  		TotalBytesProcessed: s.TotalBytesProcessed,
  1021  		NumChildJobs:        s.NumChildJobs,
  1022  		ParentJobID:         s.ParentJobId,
  1023  		ScriptStatistics:    bqToScriptStatistics(s.ScriptStatistics),
  1024  		ReservationUsage:    bqToReservationUsage(s.ReservationUsage),
  1025  		TransactionInfo:     bqToTransactionInfo(s.TransactionInfo),
  1026  		SessionInfo:         bqToSessionInfo(s.SessionInfo),
  1027  	}
  1028  	switch {
  1029  	case s.Extract != nil:
  1030  		js.Details = &ExtractStatistics{
  1031  			DestinationURIFileCounts: []int64(s.Extract.DestinationUriFileCounts),
  1032  		}
  1033  	case s.Load != nil:
  1034  		js.Details = &LoadStatistics{
  1035  			InputFileBytes: s.Load.InputFileBytes,
  1036  			InputFiles:     s.Load.InputFiles,
  1037  			OutputBytes:    s.Load.OutputBytes,
  1038  			OutputRows:     s.Load.OutputRows,
  1039  		}
  1040  	case s.Query != nil:
  1041  		var names []string
  1042  		for _, qp := range s.Query.UndeclaredQueryParameters {
  1043  			names = append(names, qp.Name)
  1044  		}
  1045  		var tables []*Table
  1046  		for _, tr := range s.Query.ReferencedTables {
  1047  			tables = append(tables, bqToTable(tr, c))
  1048  		}
  1049  		js.Details = &QueryStatistics{
  1050  			BIEngineStatistics:            bqToBIEngineStatistics(s.Query.BiEngineStatistics),
  1051  			BillingTier:                   s.Query.BillingTier,
  1052  			CacheHit:                      s.Query.CacheHit,
  1053  			DDLTargetTable:                bqToTable(s.Query.DdlTargetTable, c),
  1054  			DDLOperationPerformed:         s.Query.DdlOperationPerformed,
  1055  			DDLTargetRoutine:              bqToRoutine(s.Query.DdlTargetRoutine, c),
  1056  			ExportDataStatistics:          bqToExportDataStatistics(s.Query.ExportDataStatistics),
  1057  			StatementType:                 s.Query.StatementType,
  1058  			TotalBytesBilled:              s.Query.TotalBytesBilled,
  1059  			TotalBytesProcessed:           s.Query.TotalBytesProcessed,
  1060  			TotalBytesProcessedAccuracy:   s.Query.TotalBytesProcessedAccuracy,
  1061  			NumDMLAffectedRows:            s.Query.NumDmlAffectedRows,
  1062  			DMLStats:                      bqToDMLStatistics(s.Query.DmlStats),
  1063  			QueryPlan:                     queryPlanFromProto(s.Query.QueryPlan),
  1064  			Schema:                        bqToSchema(s.Query.Schema),
  1065  			SlotMillis:                    s.Query.TotalSlotMs,
  1066  			Timeline:                      timelineFromProto(s.Query.Timeline),
  1067  			ReferencedTables:              tables,
  1068  			UndeclaredQueryParameterNames: names,
  1069  		}
  1070  	}
  1071  	j.lastStatus.Statistics = js
  1072  }
  1073  
  1074  func queryPlanFromProto(stages []*bq.ExplainQueryStage) []*ExplainQueryStage {
  1075  	var res []*ExplainQueryStage
  1076  	for _, s := range stages {
  1077  		var steps []*ExplainQueryStep
  1078  		for _, p := range s.Steps {
  1079  			steps = append(steps, &ExplainQueryStep{
  1080  				Kind:     p.Kind,
  1081  				Substeps: p.Substeps,
  1082  			})
  1083  		}
  1084  		res = append(res, &ExplainQueryStage{
  1085  			CompletedParallelInputs:   s.CompletedParallelInputs,
  1086  			ComputeAvg:                time.Duration(s.ComputeMsAvg) * time.Millisecond,
  1087  			ComputeMax:                time.Duration(s.ComputeMsMax) * time.Millisecond,
  1088  			ComputeRatioAvg:           s.ComputeRatioAvg,
  1089  			ComputeRatioMax:           s.ComputeRatioMax,
  1090  			EndTime:                   time.Unix(0, s.EndMs*1e6),
  1091  			ID:                        s.Id,
  1092  			InputStages:               s.InputStages,
  1093  			Name:                      s.Name,
  1094  			ParallelInputs:            s.ParallelInputs,
  1095  			ReadAvg:                   time.Duration(s.ReadMsAvg) * time.Millisecond,
  1096  			ReadMax:                   time.Duration(s.ReadMsMax) * time.Millisecond,
  1097  			ReadRatioAvg:              s.ReadRatioAvg,
  1098  			ReadRatioMax:              s.ReadRatioMax,
  1099  			RecordsRead:               s.RecordsRead,
  1100  			RecordsWritten:            s.RecordsWritten,
  1101  			ShuffleOutputBytes:        s.ShuffleOutputBytes,
  1102  			ShuffleOutputBytesSpilled: s.ShuffleOutputBytesSpilled,
  1103  			StartTime:                 time.Unix(0, s.StartMs*1e6),
  1104  			Status:                    s.Status,
  1105  			Steps:                     steps,
  1106  			WaitAvg:                   time.Duration(s.WaitMsAvg) * time.Millisecond,
  1107  			WaitMax:                   time.Duration(s.WaitMsMax) * time.Millisecond,
  1108  			WaitRatioAvg:              s.WaitRatioAvg,
  1109  			WaitRatioMax:              s.WaitRatioMax,
  1110  			WriteAvg:                  time.Duration(s.WriteMsAvg) * time.Millisecond,
  1111  			WriteMax:                  time.Duration(s.WriteMsMax) * time.Millisecond,
  1112  			WriteRatioAvg:             s.WriteRatioAvg,
  1113  			WriteRatioMax:             s.WriteRatioMax,
  1114  		})
  1115  	}
  1116  	return res
  1117  }
  1118  
  1119  func timelineFromProto(timeline []*bq.QueryTimelineSample) []*QueryTimelineSample {
  1120  	var res []*QueryTimelineSample
  1121  	for _, s := range timeline {
  1122  		res = append(res, &QueryTimelineSample{
  1123  			ActiveUnits:    s.ActiveUnits,
  1124  			CompletedUnits: s.CompletedUnits,
  1125  			Elapsed:        time.Duration(s.ElapsedMs) * time.Millisecond,
  1126  			PendingUnits:   s.PendingUnits,
  1127  			SlotMillis:     s.TotalSlotMs,
  1128  		})
  1129  	}
  1130  	return res
  1131  }
  1132  
  1133  // TransactionInfo contains information about a multi-statement transaction that may have associated with a job.
  1134  type TransactionInfo struct {
  1135  	// TransactionID is the system-generated identifier for the transaction.
  1136  	TransactionID string
  1137  }
  1138  
  1139  func bqToTransactionInfo(in *bq.TransactionInfo) *TransactionInfo {
  1140  	if in == nil {
  1141  		return nil
  1142  	}
  1143  	return &TransactionInfo{
  1144  		TransactionID: in.TransactionId,
  1145  	}
  1146  }
  1147  
  1148  // SessionInfo contains information about a session associated with a job.
  1149  type SessionInfo struct {
  1150  	SessionID string
  1151  }
  1152  
  1153  func bqToSessionInfo(in *bq.SessionInfo) *SessionInfo {
  1154  	if in == nil {
  1155  		return nil
  1156  	}
  1157  	return &SessionInfo{
  1158  		SessionID: in.SessionId,
  1159  	}
  1160  }
  1161  

View as plain text