1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package bigquery
16
17 import (
18 "encoding/base64"
19 "unicode/utf8"
20
21 bq "google.golang.org/api/bigquery/v2"
22 )
23
24
25 type DataFormat string
26
27
28 const (
29 CSV DataFormat = "CSV"
30 Avro DataFormat = "AVRO"
31 JSON DataFormat = "NEWLINE_DELIMITED_JSON"
32 DatastoreBackup DataFormat = "DATASTORE_BACKUP"
33 GoogleSheets DataFormat = "GOOGLE_SHEETS"
34 Bigtable DataFormat = "BIGTABLE"
35 Parquet DataFormat = "PARQUET"
36 ORC DataFormat = "ORC"
37
38 TFSavedModel DataFormat = "ML_TF_SAVED_MODEL"
39
40 XGBoostBooster DataFormat = "ML_XGBOOST_BOOSTER"
41 Iceberg DataFormat = "ICEBERG"
42 )
43
44
45
46
47 type ExternalData interface {
48 toBQ() bq.ExternalDataConfiguration
49 }
50
51
52
53 type ExternalDataConfig struct {
54
55 SourceFormat DataFormat
56
57
58
59
60
61
62
63
64
65
66
67
68
69 SourceURIs []string
70
71
72
73 Schema Schema
74
75
76
77 AutoDetect bool
78
79
80 Compression Compression
81
82
83
84
85
86
87
88 IgnoreUnknownValues bool
89
90
91
92 MaxBadRecords int64
93
94
95 Options ExternalDataConfigOptions
96
97
98
99 HivePartitioningOptions *HivePartitioningOptions
100
101
102
103
104
105
106
107 DecimalTargetTypes []DecimalTargetType
108
109
110
111
112 ConnectionID string
113
114
115
116 ReferenceFileSchemaURI string
117 }
118
119 func (e *ExternalDataConfig) toBQ() bq.ExternalDataConfiguration {
120 q := bq.ExternalDataConfiguration{
121 SourceFormat: string(e.SourceFormat),
122 SourceUris: e.SourceURIs,
123 Autodetect: e.AutoDetect,
124 Compression: string(e.Compression),
125 IgnoreUnknownValues: e.IgnoreUnknownValues,
126 MaxBadRecords: e.MaxBadRecords,
127 HivePartitioningOptions: e.HivePartitioningOptions.toBQ(),
128 ConnectionId: e.ConnectionID,
129 ReferenceFileSchemaUri: e.ReferenceFileSchemaURI,
130 }
131 if e.Schema != nil {
132 q.Schema = e.Schema.toBQ()
133 }
134 if e.Options != nil {
135 e.Options.populateExternalDataConfig(&q)
136 }
137 for _, v := range e.DecimalTargetTypes {
138 q.DecimalTargetTypes = append(q.DecimalTargetTypes, string(v))
139 }
140 return q
141 }
142
143 func bqToExternalDataConfig(q *bq.ExternalDataConfiguration) (*ExternalDataConfig, error) {
144 e := &ExternalDataConfig{
145 SourceFormat: DataFormat(q.SourceFormat),
146 SourceURIs: q.SourceUris,
147 AutoDetect: q.Autodetect,
148 Compression: Compression(q.Compression),
149 IgnoreUnknownValues: q.IgnoreUnknownValues,
150 MaxBadRecords: q.MaxBadRecords,
151 Schema: bqToSchema(q.Schema),
152 HivePartitioningOptions: bqToHivePartitioningOptions(q.HivePartitioningOptions),
153 ConnectionID: q.ConnectionId,
154 ReferenceFileSchemaURI: q.ReferenceFileSchemaUri,
155 }
156 for _, v := range q.DecimalTargetTypes {
157 e.DecimalTargetTypes = append(e.DecimalTargetTypes, DecimalTargetType(v))
158 }
159 switch {
160 case q.AvroOptions != nil:
161 e.Options = bqToAvroOptions(q.AvroOptions)
162 case q.CsvOptions != nil:
163 e.Options = bqToCSVOptions(q.CsvOptions)
164 case q.GoogleSheetsOptions != nil:
165 e.Options = bqToGoogleSheetsOptions(q.GoogleSheetsOptions)
166 case q.BigtableOptions != nil:
167 var err error
168 e.Options, err = bqToBigtableOptions(q.BigtableOptions)
169 if err != nil {
170 return nil, err
171 }
172 case q.ParquetOptions != nil:
173 e.Options = bqToParquetOptions(q.ParquetOptions)
174 }
175 return e, nil
176 }
177
178
179
180 type ExternalDataConfigOptions interface {
181 populateExternalDataConfig(*bq.ExternalDataConfiguration)
182 }
183
184
185 type AvroOptions struct {
186
187
188
189 UseAvroLogicalTypes bool
190 }
191
192 func (o *AvroOptions) populateExternalDataConfig(c *bq.ExternalDataConfiguration) {
193 c.AvroOptions = &bq.AvroOptions{
194 UseAvroLogicalTypes: o.UseAvroLogicalTypes,
195 }
196 }
197
198 func bqToAvroOptions(q *bq.AvroOptions) *AvroOptions {
199 if q == nil {
200 return nil
201 }
202 return &AvroOptions{
203 UseAvroLogicalTypes: q.UseAvroLogicalTypes,
204 }
205 }
206
207
208 type CSVOptions struct {
209
210
211 AllowJaggedRows bool
212
213
214
215 AllowQuotedNewlines bool
216
217
218 Encoding Encoding
219
220
221
222 FieldDelimiter string
223
224
225
226
227
228
229
230 Quote string
231 ForceZeroQuote bool
232
233
234
235 SkipLeadingRows int64
236
237
238
239 NullMarker string
240
241
242
243 PreserveASCIIControlCharacters bool
244 }
245
246 func (o *CSVOptions) populateExternalDataConfig(c *bq.ExternalDataConfiguration) {
247 c.CsvOptions = &bq.CsvOptions{
248 AllowJaggedRows: o.AllowJaggedRows,
249 AllowQuotedNewlines: o.AllowQuotedNewlines,
250 Encoding: string(o.Encoding),
251 FieldDelimiter: o.FieldDelimiter,
252 Quote: o.quote(),
253 SkipLeadingRows: o.SkipLeadingRows,
254 NullMarker: o.NullMarker,
255 PreserveAsciiControlCharacters: o.PreserveASCIIControlCharacters,
256 }
257 }
258
259
260 func (o *CSVOptions) quote() *string {
261 if o.ForceZeroQuote {
262 quote := ""
263 return "e
264 }
265 if o.Quote == "" {
266 return nil
267 }
268 return &o.Quote
269 }
270
271 func (o *CSVOptions) setQuote(ps *string) {
272 if ps != nil {
273 o.Quote = *ps
274 if o.Quote == "" {
275 o.ForceZeroQuote = true
276 }
277 }
278 }
279
280 func bqToCSVOptions(q *bq.CsvOptions) *CSVOptions {
281 o := &CSVOptions{
282 AllowJaggedRows: q.AllowJaggedRows,
283 AllowQuotedNewlines: q.AllowQuotedNewlines,
284 Encoding: Encoding(q.Encoding),
285 FieldDelimiter: q.FieldDelimiter,
286 SkipLeadingRows: q.SkipLeadingRows,
287 NullMarker: q.NullMarker,
288 PreserveASCIIControlCharacters: q.PreserveAsciiControlCharacters,
289 }
290 o.setQuote(q.Quote)
291 return o
292 }
293
294
295 type GoogleSheetsOptions struct {
296
297
298 SkipLeadingRows int64
299
300
301
302
303 Range string
304 }
305
306 func (o *GoogleSheetsOptions) populateExternalDataConfig(c *bq.ExternalDataConfiguration) {
307 c.GoogleSheetsOptions = &bq.GoogleSheetsOptions{
308 SkipLeadingRows: o.SkipLeadingRows,
309 Range: o.Range,
310 }
311 }
312
313 func bqToGoogleSheetsOptions(q *bq.GoogleSheetsOptions) *GoogleSheetsOptions {
314 return &GoogleSheetsOptions{
315 SkipLeadingRows: q.SkipLeadingRows,
316 Range: q.Range,
317 }
318 }
319
320
321 type BigtableOptions struct {
322
323
324
325 ColumnFamilies []*BigtableColumnFamily
326
327
328
329
330 IgnoreUnspecifiedColumnFamilies bool
331
332
333
334
335 ReadRowkeyAsString bool
336 }
337
338 func (o *BigtableOptions) populateExternalDataConfig(c *bq.ExternalDataConfiguration) {
339 q := &bq.BigtableOptions{
340 IgnoreUnspecifiedColumnFamilies: o.IgnoreUnspecifiedColumnFamilies,
341 ReadRowkeyAsString: o.ReadRowkeyAsString,
342 }
343 for _, f := range o.ColumnFamilies {
344 q.ColumnFamilies = append(q.ColumnFamilies, f.toBQ())
345 }
346 c.BigtableOptions = q
347 }
348
349 func bqToBigtableOptions(q *bq.BigtableOptions) (*BigtableOptions, error) {
350 b := &BigtableOptions{
351 IgnoreUnspecifiedColumnFamilies: q.IgnoreUnspecifiedColumnFamilies,
352 ReadRowkeyAsString: q.ReadRowkeyAsString,
353 }
354 for _, f := range q.ColumnFamilies {
355 f2, err := bqToBigtableColumnFamily(f)
356 if err != nil {
357 return nil, err
358 }
359 b.ColumnFamilies = append(b.ColumnFamilies, f2)
360 }
361 return b, nil
362 }
363
364
365 type BigtableColumnFamily struct {
366
367 FamilyID string
368
369
370
371
372
373 Columns []*BigtableColumn
374
375
376
377
378
379
380 Encoding string
381
382
383
384
385 OnlyReadLatest bool
386
387
388
389
390
391
392
393
394 Type string
395 }
396
397 func (b *BigtableColumnFamily) toBQ() *bq.BigtableColumnFamily {
398 q := &bq.BigtableColumnFamily{
399 FamilyId: b.FamilyID,
400 Encoding: b.Encoding,
401 OnlyReadLatest: b.OnlyReadLatest,
402 Type: b.Type,
403 }
404 for _, col := range b.Columns {
405 q.Columns = append(q.Columns, col.toBQ())
406 }
407 return q
408 }
409
410 func bqToBigtableColumnFamily(q *bq.BigtableColumnFamily) (*BigtableColumnFamily, error) {
411 b := &BigtableColumnFamily{
412 FamilyID: q.FamilyId,
413 Encoding: q.Encoding,
414 OnlyReadLatest: q.OnlyReadLatest,
415 Type: q.Type,
416 }
417 for _, col := range q.Columns {
418 c, err := bqToBigtableColumn(col)
419 if err != nil {
420 return nil, err
421 }
422 b.Columns = append(b.Columns, c)
423 }
424 return b, nil
425 }
426
427
428 type BigtableColumn struct {
429
430
431
432 Qualifier string
433
434
435
436
437 FieldName string
438
439
440
441 OnlyReadLatest bool
442
443
444
445 Encoding string
446
447
448
449 Type string
450 }
451
452 func (b *BigtableColumn) toBQ() *bq.BigtableColumn {
453 q := &bq.BigtableColumn{
454 FieldName: b.FieldName,
455 OnlyReadLatest: b.OnlyReadLatest,
456 Encoding: b.Encoding,
457 Type: b.Type,
458 }
459 if utf8.ValidString(b.Qualifier) {
460 q.QualifierString = b.Qualifier
461 } else {
462 q.QualifierEncoded = base64.RawStdEncoding.EncodeToString([]byte(b.Qualifier))
463 }
464 return q
465 }
466
467 func bqToBigtableColumn(q *bq.BigtableColumn) (*BigtableColumn, error) {
468 b := &BigtableColumn{
469 FieldName: q.FieldName,
470 OnlyReadLatest: q.OnlyReadLatest,
471 Encoding: q.Encoding,
472 Type: q.Type,
473 }
474 if q.QualifierString != "" {
475 b.Qualifier = q.QualifierString
476 } else {
477 bytes, err := base64.RawStdEncoding.DecodeString(q.QualifierEncoded)
478 if err != nil {
479 return nil, err
480 }
481 b.Qualifier = string(bytes)
482 }
483 return b, nil
484 }
485
486
487 type ParquetOptions struct {
488
489
490 EnumAsString bool
491
492
493
494 EnableListInference bool
495 }
496
497 func (o *ParquetOptions) populateExternalDataConfig(c *bq.ExternalDataConfiguration) {
498 if o != nil {
499 c.ParquetOptions = &bq.ParquetOptions{
500 EnumAsString: o.EnumAsString,
501 EnableListInference: o.EnableListInference,
502 }
503 }
504 }
505
506 func bqToParquetOptions(q *bq.ParquetOptions) *ParquetOptions {
507 if q == nil {
508 return nil
509 }
510 return &ParquetOptions{
511 EnumAsString: q.EnumAsString,
512 EnableListInference: q.EnableListInference,
513 }
514 }
515
516
517 type HivePartitioningMode string
518
519 const (
520
521 AutoHivePartitioningMode HivePartitioningMode = "AUTO"
522
523 StringHivePartitioningMode HivePartitioningMode = "STRINGS"
524
525 CustomHivePartitioningMode HivePartitioningMode = "CUSTOM"
526 )
527
528
529
530 type HivePartitioningOptions struct {
531
532
533 Mode HivePartitioningMode
534
535
536
537
538
539
540
541
542
543
544
545
546
547 SourceURIPrefix string
548
549
550
551
552
553 RequirePartitionFilter bool
554 }
555
556 func (o *HivePartitioningOptions) toBQ() *bq.HivePartitioningOptions {
557 if o == nil {
558 return nil
559 }
560 return &bq.HivePartitioningOptions{
561 Mode: string(o.Mode),
562 SourceUriPrefix: o.SourceURIPrefix,
563 RequirePartitionFilter: o.RequirePartitionFilter,
564 }
565 }
566
567 func bqToHivePartitioningOptions(q *bq.HivePartitioningOptions) *HivePartitioningOptions {
568 if q == nil {
569 return nil
570 }
571 return &HivePartitioningOptions{
572 Mode: HivePartitioningMode(q.Mode),
573 SourceURIPrefix: q.SourceUriPrefix,
574 RequirePartitionFilter: q.RequirePartitionFilter,
575 }
576 }
577
View as plain text