...

Source file src/cloud.google.com/go/bigquery/load.go

Documentation: cloud.google.com/go/bigquery

     1  // Copyright 2016 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //      http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package bigquery
    16  
    17  import (
    18  	"context"
    19  	"io"
    20  	"time"
    21  
    22  	"cloud.google.com/go/internal/trace"
    23  	bq "google.golang.org/api/bigquery/v2"
    24  	"google.golang.org/api/googleapi"
    25  )
    26  
    27  // LoadConfig holds the configuration for a load job.
    28  type LoadConfig struct {
    29  	// Src is the source from which data will be loaded.
    30  	Src LoadSource
    31  
    32  	// Dst is the table into which the data will be loaded.
    33  	Dst *Table
    34  
    35  	// CreateDisposition specifies the circumstances under which the destination table will be created.
    36  	// The default is CreateIfNeeded.
    37  	CreateDisposition TableCreateDisposition
    38  
    39  	// WriteDisposition specifies how existing data in the destination table is treated.
    40  	// The default is WriteAppend.
    41  	WriteDisposition TableWriteDisposition
    42  
    43  	// The labels associated with this job.
    44  	Labels map[string]string
    45  
    46  	// If non-nil, the destination table is partitioned by time.
    47  	TimePartitioning *TimePartitioning
    48  
    49  	// If non-nil, the destination table is partitioned by integer range.
    50  	RangePartitioning *RangePartitioning
    51  
    52  	// Clustering specifies the data clustering configuration for the destination table.
    53  	Clustering *Clustering
    54  
    55  	// Custom encryption configuration (e.g., Cloud KMS keys).
    56  	DestinationEncryptionConfig *EncryptionConfig
    57  
    58  	// Allows the schema of the destination table to be updated as a side effect of
    59  	// the load job.
    60  	SchemaUpdateOptions []string
    61  
    62  	// For Avro-based loads, controls whether logical type annotations are used.
    63  	// See https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro#logical_types
    64  	// for additional information.
    65  	UseAvroLogicalTypes bool
    66  
    67  	// For ingestion from datastore backups, ProjectionFields governs which fields
    68  	// are projected from the backup.  The default behavior projects all fields.
    69  	ProjectionFields []string
    70  
    71  	// HivePartitioningOptions allows use of Hive partitioning based on the
    72  	// layout of objects in Cloud Storage.
    73  	HivePartitioningOptions *HivePartitioningOptions
    74  
    75  	// DecimalTargetTypes allows selection of how decimal values are converted when
    76  	// processed in bigquery, subject to the value type having sufficient precision/scale
    77  	// to support the values.  In the order of NUMERIC, BIGNUMERIC, and STRING, a type is
    78  	// selected if is present in the list and if supports the necessary precision and scale.
    79  	//
    80  	// StringTargetType supports all precision and scale values.
    81  	DecimalTargetTypes []DecimalTargetType
    82  
    83  	// Sets a best-effort deadline on a specific job.  If job execution exceeds this
    84  	// timeout, BigQuery may attempt to cancel this work automatically.
    85  	//
    86  	// This deadline cannot be adjusted or removed once the job is created.  Consider
    87  	// using Job.Cancel in situations where you need more dynamic behavior.
    88  	//
    89  	// Experimental: this option is experimental and may be modified or removed in future versions,
    90  	// regardless of any other documented package stability guarantees.
    91  	JobTimeout time.Duration
    92  
    93  	// When loading a table with external data, the user can provide a reference file with the table schema.
    94  	// This is enabled for the following formats: AVRO, PARQUET, ORC.
    95  	ReferenceFileSchemaURI string
    96  
    97  	// If true, creates a new session, where session id will
    98  	// be a server generated random id. If false, runs query with an
    99  	// existing session_id passed in ConnectionProperty, otherwise runs the
   100  	// load job in non-session mode.
   101  	CreateSession bool
   102  
   103  	// ConnectionProperties are optional key-values settings.
   104  	ConnectionProperties []*ConnectionProperty
   105  
   106  	// MediaOptions stores options for customizing media upload.
   107  	MediaOptions []googleapi.MediaOption
   108  }
   109  
   110  func (l *LoadConfig) toBQ() (*bq.JobConfiguration, io.Reader) {
   111  	config := &bq.JobConfiguration{
   112  		Labels: l.Labels,
   113  		Load: &bq.JobConfigurationLoad{
   114  			CreateDisposition:                  string(l.CreateDisposition),
   115  			WriteDisposition:                   string(l.WriteDisposition),
   116  			DestinationTable:                   l.Dst.toBQ(),
   117  			TimePartitioning:                   l.TimePartitioning.toBQ(),
   118  			RangePartitioning:                  l.RangePartitioning.toBQ(),
   119  			Clustering:                         l.Clustering.toBQ(),
   120  			DestinationEncryptionConfiguration: l.DestinationEncryptionConfig.toBQ(),
   121  			SchemaUpdateOptions:                l.SchemaUpdateOptions,
   122  			UseAvroLogicalTypes:                l.UseAvroLogicalTypes,
   123  			ProjectionFields:                   l.ProjectionFields,
   124  			HivePartitioningOptions:            l.HivePartitioningOptions.toBQ(),
   125  			ReferenceFileSchemaUri:             l.ReferenceFileSchemaURI,
   126  			CreateSession:                      l.CreateSession,
   127  		},
   128  		JobTimeoutMs: l.JobTimeout.Milliseconds(),
   129  	}
   130  	for _, v := range l.DecimalTargetTypes {
   131  		config.Load.DecimalTargetTypes = append(config.Load.DecimalTargetTypes, string(v))
   132  	}
   133  	for _, v := range l.ConnectionProperties {
   134  		config.Load.ConnectionProperties = append(config.Load.ConnectionProperties, v.toBQ())
   135  	}
   136  	media := l.Src.populateLoadConfig(config.Load)
   137  	return config, media
   138  }
   139  
   140  func bqToLoadConfig(q *bq.JobConfiguration, c *Client) *LoadConfig {
   141  	lc := &LoadConfig{
   142  		Labels:                      q.Labels,
   143  		CreateDisposition:           TableCreateDisposition(q.Load.CreateDisposition),
   144  		WriteDisposition:            TableWriteDisposition(q.Load.WriteDisposition),
   145  		Dst:                         bqToTable(q.Load.DestinationTable, c),
   146  		TimePartitioning:            bqToTimePartitioning(q.Load.TimePartitioning),
   147  		RangePartitioning:           bqToRangePartitioning(q.Load.RangePartitioning),
   148  		Clustering:                  bqToClustering(q.Load.Clustering),
   149  		DestinationEncryptionConfig: bqToEncryptionConfig(q.Load.DestinationEncryptionConfiguration),
   150  		SchemaUpdateOptions:         q.Load.SchemaUpdateOptions,
   151  		UseAvroLogicalTypes:         q.Load.UseAvroLogicalTypes,
   152  		ProjectionFields:            q.Load.ProjectionFields,
   153  		HivePartitioningOptions:     bqToHivePartitioningOptions(q.Load.HivePartitioningOptions),
   154  		ReferenceFileSchemaURI:      q.Load.ReferenceFileSchemaUri,
   155  		CreateSession:               q.Load.CreateSession,
   156  	}
   157  	if q.JobTimeoutMs > 0 {
   158  		lc.JobTimeout = time.Duration(q.JobTimeoutMs) * time.Millisecond
   159  	}
   160  	for _, v := range q.Load.DecimalTargetTypes {
   161  		lc.DecimalTargetTypes = append(lc.DecimalTargetTypes, DecimalTargetType(v))
   162  	}
   163  	for _, v := range q.Load.ConnectionProperties {
   164  		lc.ConnectionProperties = append(lc.ConnectionProperties, bqToConnectionProperty(v))
   165  	}
   166  	var fc *FileConfig
   167  	if len(q.Load.SourceUris) == 0 {
   168  		s := NewReaderSource(nil)
   169  		fc = &s.FileConfig
   170  		lc.Src = s
   171  	} else {
   172  		s := NewGCSReference(q.Load.SourceUris...)
   173  		fc = &s.FileConfig
   174  		lc.Src = s
   175  	}
   176  	bqPopulateFileConfig(q.Load, fc)
   177  	return lc
   178  }
   179  
   180  // A Loader loads data from Google Cloud Storage into a BigQuery table.
   181  type Loader struct {
   182  	JobIDConfig
   183  	LoadConfig
   184  	c *Client
   185  }
   186  
   187  // A LoadSource represents a source of data that can be loaded into
   188  // a BigQuery table.
   189  //
   190  // This package defines two LoadSources: GCSReference, for Google Cloud Storage
   191  // objects, and ReaderSource, for data read from an io.Reader.
   192  type LoadSource interface {
   193  	// populates config, returns media
   194  	populateLoadConfig(*bq.JobConfigurationLoad) io.Reader
   195  }
   196  
   197  // LoaderFrom returns a Loader which can be used to load data into a BigQuery table.
   198  // The returned Loader may optionally be further configured before its Run method is called.
   199  // See GCSReference and ReaderSource for additional configuration options that
   200  // affect loading.
   201  func (t *Table) LoaderFrom(src LoadSource) *Loader {
   202  	return &Loader{
   203  		c: t.c,
   204  		LoadConfig: LoadConfig{
   205  			Src: src,
   206  			Dst: t,
   207  		},
   208  	}
   209  }
   210  
   211  // Run initiates a load job.
   212  func (l *Loader) Run(ctx context.Context) (j *Job, err error) {
   213  	ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Load.Run")
   214  	defer func() { trace.EndSpan(ctx, err) }()
   215  
   216  	job, media := l.newJob()
   217  	return l.c.insertJob(ctx, job, media, l.LoadConfig.MediaOptions...)
   218  }
   219  
   220  func (l *Loader) newJob() (*bq.Job, io.Reader) {
   221  	config, media := l.LoadConfig.toBQ()
   222  	return &bq.Job{
   223  		JobReference:  l.JobIDConfig.createJobRef(l.c),
   224  		Configuration: config,
   225  	}, media
   226  }
   227  
   228  // DecimalTargetType is used to express preference ordering for converting values from external formats.
   229  type DecimalTargetType string
   230  
   231  var (
   232  	// NumericTargetType indicates the preferred type is NUMERIC when supported.
   233  	NumericTargetType DecimalTargetType = "NUMERIC"
   234  
   235  	// BigNumericTargetType indicates the preferred type is BIGNUMERIC when supported.
   236  	BigNumericTargetType DecimalTargetType = "BIGNUMERIC"
   237  
   238  	// StringTargetType indicates the preferred type is STRING when supported.
   239  	StringTargetType DecimalTargetType = "STRING"
   240  )
   241  

View as plain text