// Copyright 2017 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package bigquery import ( "encoding/base64" "unicode/utf8" bq "google.golang.org/api/bigquery/v2" ) // DataFormat describes the format of BigQuery table data. type DataFormat string // Constants describing the format of BigQuery table data. const ( CSV DataFormat = "CSV" Avro DataFormat = "AVRO" JSON DataFormat = "NEWLINE_DELIMITED_JSON" DatastoreBackup DataFormat = "DATASTORE_BACKUP" GoogleSheets DataFormat = "GOOGLE_SHEETS" Bigtable DataFormat = "BIGTABLE" Parquet DataFormat = "PARQUET" ORC DataFormat = "ORC" // For BQ ML Models, TensorFlow Saved Model format. TFSavedModel DataFormat = "ML_TF_SAVED_MODEL" // For BQ ML Models, xgBoost Booster format. XGBoostBooster DataFormat = "ML_XGBOOST_BOOSTER" Iceberg DataFormat = "ICEBERG" ) // ExternalData is a table which is stored outside of BigQuery. It is implemented by // *ExternalDataConfig. // GCSReference also implements it, for backwards compatibility. type ExternalData interface { toBQ() bq.ExternalDataConfiguration } // ExternalDataConfig describes data external to BigQuery that can be used // in queries and to create external tables. type ExternalDataConfig struct { // The format of the data. Required. SourceFormat DataFormat // The fully-qualified URIs that point to your // data in Google Cloud. Required. // // For Google Cloud Storage URIs, each URI can contain one '*' wildcard character // and it must come after the 'bucket' name. Size limits related to load jobs // apply to external data sources. // // For Google Cloud Bigtable URIs, exactly one URI can be specified and it has be // a fully specified and valid HTTPS URL for a Google Cloud Bigtable table. // // For Google Cloud Datastore backups, exactly one URI can be specified. Also, // the '*' wildcard character is not allowed. SourceURIs []string // The schema of the data. Required for CSV and JSON; disallowed for the // other formats. Schema Schema // Try to detect schema and format options automatically. // Any option specified explicitly will be honored. AutoDetect bool // The compression type of the data. Compression Compression // IgnoreUnknownValues causes values not matching the schema to be // tolerated. Unknown values are ignored. For CSV this ignores extra values // at the end of a line. For JSON this ignores named values that do not // match any column name. If this field is not set, records containing // unknown values are treated as bad records. The MaxBadRecords field can // be used to customize how bad records are handled. IgnoreUnknownValues bool // MaxBadRecords is the maximum number of bad records that will be ignored // when reading data. MaxBadRecords int64 // Additional options for CSV, GoogleSheets, Bigtable, and Parquet formats. Options ExternalDataConfigOptions // HivePartitioningOptions allows use of Hive partitioning based on the // layout of objects in Google Cloud Storage. HivePartitioningOptions *HivePartitioningOptions // DecimalTargetTypes allows selection of how decimal values are converted when // processed in bigquery, subject to the value type having sufficient precision/scale // to support the values. In the order of NUMERIC, BIGNUMERIC, and STRING, a type is // selected if is present in the list and if supports the necessary precision and scale. // // StringTargetType supports all precision and scale values. DecimalTargetTypes []DecimalTargetType // ConnectionID associates an external data configuration with a connection ID. // Connections are managed through the BigQuery Connection API: // https://pkg.go.dev/cloud.google.com/go/bigquery/connection/apiv1 ConnectionID string // When creating an external table, the user can provide a reference file with the table schema. // This is enabled for the following formats: AVRO, PARQUET, ORC. ReferenceFileSchemaURI string } func (e *ExternalDataConfig) toBQ() bq.ExternalDataConfiguration { q := bq.ExternalDataConfiguration{ SourceFormat: string(e.SourceFormat), SourceUris: e.SourceURIs, Autodetect: e.AutoDetect, Compression: string(e.Compression), IgnoreUnknownValues: e.IgnoreUnknownValues, MaxBadRecords: e.MaxBadRecords, HivePartitioningOptions: e.HivePartitioningOptions.toBQ(), ConnectionId: e.ConnectionID, ReferenceFileSchemaUri: e.ReferenceFileSchemaURI, } if e.Schema != nil { q.Schema = e.Schema.toBQ() } if e.Options != nil { e.Options.populateExternalDataConfig(&q) } for _, v := range e.DecimalTargetTypes { q.DecimalTargetTypes = append(q.DecimalTargetTypes, string(v)) } return q } func bqToExternalDataConfig(q *bq.ExternalDataConfiguration) (*ExternalDataConfig, error) { e := &ExternalDataConfig{ SourceFormat: DataFormat(q.SourceFormat), SourceURIs: q.SourceUris, AutoDetect: q.Autodetect, Compression: Compression(q.Compression), IgnoreUnknownValues: q.IgnoreUnknownValues, MaxBadRecords: q.MaxBadRecords, Schema: bqToSchema(q.Schema), HivePartitioningOptions: bqToHivePartitioningOptions(q.HivePartitioningOptions), ConnectionID: q.ConnectionId, ReferenceFileSchemaURI: q.ReferenceFileSchemaUri, } for _, v := range q.DecimalTargetTypes { e.DecimalTargetTypes = append(e.DecimalTargetTypes, DecimalTargetType(v)) } switch { case q.AvroOptions != nil: e.Options = bqToAvroOptions(q.AvroOptions) case q.CsvOptions != nil: e.Options = bqToCSVOptions(q.CsvOptions) case q.GoogleSheetsOptions != nil: e.Options = bqToGoogleSheetsOptions(q.GoogleSheetsOptions) case q.BigtableOptions != nil: var err error e.Options, err = bqToBigtableOptions(q.BigtableOptions) if err != nil { return nil, err } case q.ParquetOptions != nil: e.Options = bqToParquetOptions(q.ParquetOptions) } return e, nil } // ExternalDataConfigOptions are additional options for external data configurations. // This interface is implemented by CSVOptions, GoogleSheetsOptions and BigtableOptions. type ExternalDataConfigOptions interface { populateExternalDataConfig(*bq.ExternalDataConfiguration) } // AvroOptions are additional options for Avro external data data sources. type AvroOptions struct { // UseAvroLogicalTypes indicates whether to interpret logical types as the // corresponding BigQuery data type (for example, TIMESTAMP), instead of using // the raw type (for example, INTEGER). UseAvroLogicalTypes bool } func (o *AvroOptions) populateExternalDataConfig(c *bq.ExternalDataConfiguration) { c.AvroOptions = &bq.AvroOptions{ UseAvroLogicalTypes: o.UseAvroLogicalTypes, } } func bqToAvroOptions(q *bq.AvroOptions) *AvroOptions { if q == nil { return nil } return &AvroOptions{ UseAvroLogicalTypes: q.UseAvroLogicalTypes, } } // CSVOptions are additional options for CSV external data sources. type CSVOptions struct { // AllowJaggedRows causes missing trailing optional columns to be tolerated // when reading CSV data. Missing values are treated as nulls. AllowJaggedRows bool // AllowQuotedNewlines sets whether quoted data sections containing // newlines are allowed when reading CSV data. AllowQuotedNewlines bool // Encoding is the character encoding of data to be read. Encoding Encoding // FieldDelimiter is the separator for fields in a CSV file, used when // reading or exporting data. The default is ",". FieldDelimiter string // Quote is the value used to quote data sections in a CSV file. The // default quotation character is the double quote ("), which is used if // both Quote and ForceZeroQuote are unset. // To specify that no character should be interpreted as a quotation // character, set ForceZeroQuote to true. // Only used when reading data. Quote string ForceZeroQuote bool // The number of rows at the top of a CSV file that BigQuery will skip when // reading data. SkipLeadingRows int64 // An optional custom string that will represent a NULL // value in CSV import data. NullMarker string // Preserves the embedded ASCII control characters (the first 32 characters in the ASCII-table, // from '\\x00' to '\\x1F') when loading from CSV. Only applicable to CSV, ignored for other formats. PreserveASCIIControlCharacters bool } func (o *CSVOptions) populateExternalDataConfig(c *bq.ExternalDataConfiguration) { c.CsvOptions = &bq.CsvOptions{ AllowJaggedRows: o.AllowJaggedRows, AllowQuotedNewlines: o.AllowQuotedNewlines, Encoding: string(o.Encoding), FieldDelimiter: o.FieldDelimiter, Quote: o.quote(), SkipLeadingRows: o.SkipLeadingRows, NullMarker: o.NullMarker, PreserveAsciiControlCharacters: o.PreserveASCIIControlCharacters, } } // quote returns the CSV quote character, or nil if unset. func (o *CSVOptions) quote() *string { if o.ForceZeroQuote { quote := "" return "e } if o.Quote == "" { return nil } return &o.Quote } func (o *CSVOptions) setQuote(ps *string) { if ps != nil { o.Quote = *ps if o.Quote == "" { o.ForceZeroQuote = true } } } func bqToCSVOptions(q *bq.CsvOptions) *CSVOptions { o := &CSVOptions{ AllowJaggedRows: q.AllowJaggedRows, AllowQuotedNewlines: q.AllowQuotedNewlines, Encoding: Encoding(q.Encoding), FieldDelimiter: q.FieldDelimiter, SkipLeadingRows: q.SkipLeadingRows, NullMarker: q.NullMarker, PreserveASCIIControlCharacters: q.PreserveAsciiControlCharacters, } o.setQuote(q.Quote) return o } // GoogleSheetsOptions are additional options for GoogleSheets external data sources. type GoogleSheetsOptions struct { // The number of rows at the top of a sheet that BigQuery will skip when // reading data. SkipLeadingRows int64 // Optionally specifies a more specific range of cells to include. // Typical format: sheet_name!top_left_cell_id:bottom_right_cell_id // // Example: sheet1!A1:B20 Range string } func (o *GoogleSheetsOptions) populateExternalDataConfig(c *bq.ExternalDataConfiguration) { c.GoogleSheetsOptions = &bq.GoogleSheetsOptions{ SkipLeadingRows: o.SkipLeadingRows, Range: o.Range, } } func bqToGoogleSheetsOptions(q *bq.GoogleSheetsOptions) *GoogleSheetsOptions { return &GoogleSheetsOptions{ SkipLeadingRows: q.SkipLeadingRows, Range: q.Range, } } // BigtableOptions are additional options for Bigtable external data sources. type BigtableOptions struct { // A list of column families to expose in the table schema along with their // types. If omitted, all column families are present in the table schema and // their values are read as BYTES. ColumnFamilies []*BigtableColumnFamily // If true, then the column families that are not specified in columnFamilies // list are not exposed in the table schema. Otherwise, they are read with BYTES // type values. The default is false. IgnoreUnspecifiedColumnFamilies bool // If true, then the rowkey column families will be read and converted to string. // Otherwise they are read with BYTES type values and users need to manually cast // them with CAST if necessary. The default is false. ReadRowkeyAsString bool } func (o *BigtableOptions) populateExternalDataConfig(c *bq.ExternalDataConfiguration) { q := &bq.BigtableOptions{ IgnoreUnspecifiedColumnFamilies: o.IgnoreUnspecifiedColumnFamilies, ReadRowkeyAsString: o.ReadRowkeyAsString, } for _, f := range o.ColumnFamilies { q.ColumnFamilies = append(q.ColumnFamilies, f.toBQ()) } c.BigtableOptions = q } func bqToBigtableOptions(q *bq.BigtableOptions) (*BigtableOptions, error) { b := &BigtableOptions{ IgnoreUnspecifiedColumnFamilies: q.IgnoreUnspecifiedColumnFamilies, ReadRowkeyAsString: q.ReadRowkeyAsString, } for _, f := range q.ColumnFamilies { f2, err := bqToBigtableColumnFamily(f) if err != nil { return nil, err } b.ColumnFamilies = append(b.ColumnFamilies, f2) } return b, nil } // BigtableColumnFamily describes how BigQuery should access a Bigtable column family. type BigtableColumnFamily struct { // Identifier of the column family. FamilyID string // Lists of columns that should be exposed as individual fields as opposed to a // list of (column name, value) pairs. All columns whose qualifier matches a // qualifier in this list can be accessed as .. Other columns can be accessed as // a list through .Column field. Columns []*BigtableColumn // The encoding of the values when the type is not STRING. Acceptable encoding values are: // - TEXT - indicates values are alphanumeric text strings. // - BINARY - indicates values are encoded using HBase Bytes.toBytes family of functions. // This can be overridden for a specific column by listing that column in 'columns' and // specifying an encoding for it. Encoding string // If true, only the latest version of values are exposed for all columns in this // column family. This can be overridden for a specific column by listing that // column in 'columns' and specifying a different setting for that column. OnlyReadLatest bool // The type to convert the value in cells of this // column family. The values are expected to be encoded using HBase // Bytes.toBytes function when using the BINARY encoding value. // Following BigQuery types are allowed (case-sensitive): // BYTES STRING INTEGER FLOAT BOOLEAN. // The default type is BYTES. This can be overridden for a specific column by // listing that column in 'columns' and specifying a type for it. Type string } func (b *BigtableColumnFamily) toBQ() *bq.BigtableColumnFamily { q := &bq.BigtableColumnFamily{ FamilyId: b.FamilyID, Encoding: b.Encoding, OnlyReadLatest: b.OnlyReadLatest, Type: b.Type, } for _, col := range b.Columns { q.Columns = append(q.Columns, col.toBQ()) } return q } func bqToBigtableColumnFamily(q *bq.BigtableColumnFamily) (*BigtableColumnFamily, error) { b := &BigtableColumnFamily{ FamilyID: q.FamilyId, Encoding: q.Encoding, OnlyReadLatest: q.OnlyReadLatest, Type: q.Type, } for _, col := range q.Columns { c, err := bqToBigtableColumn(col) if err != nil { return nil, err } b.Columns = append(b.Columns, c) } return b, nil } // BigtableColumn describes how BigQuery should access a Bigtable column. type BigtableColumn struct { // Qualifier of the column. Columns in the parent column family that have this // exact qualifier are exposed as . field. The column field name is the // same as the column qualifier. Qualifier string // If the qualifier is not a valid BigQuery field identifier i.e. does not match // [a-zA-Z][a-zA-Z0-9_]*, a valid identifier must be provided as the column field // name and is used as field name in queries. FieldName string // If true, only the latest version of values are exposed for this column. // See BigtableColumnFamily.OnlyReadLatest. OnlyReadLatest bool // The encoding of the values when the type is not STRING. // See BigtableColumnFamily.Encoding Encoding string // The type to convert the value in cells of this column. // See BigtableColumnFamily.Type Type string } func (b *BigtableColumn) toBQ() *bq.BigtableColumn { q := &bq.BigtableColumn{ FieldName: b.FieldName, OnlyReadLatest: b.OnlyReadLatest, Encoding: b.Encoding, Type: b.Type, } if utf8.ValidString(b.Qualifier) { q.QualifierString = b.Qualifier } else { q.QualifierEncoded = base64.RawStdEncoding.EncodeToString([]byte(b.Qualifier)) } return q } func bqToBigtableColumn(q *bq.BigtableColumn) (*BigtableColumn, error) { b := &BigtableColumn{ FieldName: q.FieldName, OnlyReadLatest: q.OnlyReadLatest, Encoding: q.Encoding, Type: q.Type, } if q.QualifierString != "" { b.Qualifier = q.QualifierString } else { bytes, err := base64.RawStdEncoding.DecodeString(q.QualifierEncoded) if err != nil { return nil, err } b.Qualifier = string(bytes) } return b, nil } // ParquetOptions are additional options for Parquet external data sources. type ParquetOptions struct { // EnumAsString indicates whether to infer Parquet ENUM logical type as // STRING instead of BYTES by default. EnumAsString bool // EnableListInference indicates whether to use schema inference // specifically for Parquet LIST logical type. EnableListInference bool } func (o *ParquetOptions) populateExternalDataConfig(c *bq.ExternalDataConfiguration) { if o != nil { c.ParquetOptions = &bq.ParquetOptions{ EnumAsString: o.EnumAsString, EnableListInference: o.EnableListInference, } } } func bqToParquetOptions(q *bq.ParquetOptions) *ParquetOptions { if q == nil { return nil } return &ParquetOptions{ EnumAsString: q.EnumAsString, EnableListInference: q.EnableListInference, } } // HivePartitioningMode is used in conjunction with HivePartitioningOptions. type HivePartitioningMode string const ( // AutoHivePartitioningMode automatically infers partitioning key and types. AutoHivePartitioningMode HivePartitioningMode = "AUTO" // StringHivePartitioningMode automatically infers partitioning keys and treats values as string. StringHivePartitioningMode HivePartitioningMode = "STRINGS" // CustomHivePartitioningMode allows custom definition of the external partitioning. CustomHivePartitioningMode HivePartitioningMode = "CUSTOM" ) // HivePartitioningOptions defines the behavior of Hive partitioning // when working with external data. type HivePartitioningOptions struct { // Mode defines which hive partitioning mode to use when reading data. Mode HivePartitioningMode // When hive partition detection is requested, a common prefix for // all source uris should be supplied. The prefix must end immediately // before the partition key encoding begins. // // For example, consider files following this data layout. // gs://bucket/path_to_table/dt=2019-01-01/country=BR/id=7/file.avro // gs://bucket/path_to_table/dt=2018-12-31/country=CA/id=3/file.avro // // When hive partitioning is requested with either AUTO or STRINGS // detection, the common prefix can be either of // gs://bucket/path_to_table or gs://bucket/path_to_table/ (trailing // slash does not matter). SourceURIPrefix string // If set to true, queries against this external table require // a partition filter to be present that can perform partition // elimination. Hive-partitioned load jobs with this field // set to true will fail. RequirePartitionFilter bool } func (o *HivePartitioningOptions) toBQ() *bq.HivePartitioningOptions { if o == nil { return nil } return &bq.HivePartitioningOptions{ Mode: string(o.Mode), SourceUriPrefix: o.SourceURIPrefix, RequirePartitionFilter: o.RequirePartitionFilter, } } func bqToHivePartitioningOptions(q *bq.HivePartitioningOptions) *HivePartitioningOptions { if q == nil { return nil } return &HivePartitioningOptions{ Mode: HivePartitioningMode(q.Mode), SourceURIPrefix: q.SourceUriPrefix, RequirePartitionFilter: q.RequirePartitionFilter, } }