1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package bigquery
16
17 import (
18 "encoding/json"
19 "errors"
20 "fmt"
21 "reflect"
22 "sync"
23
24 bq "google.golang.org/api/bigquery/v2"
25 )
26
27
28 type Schema []*FieldSchema
29
30
31
32 func (s Schema) Relax() Schema {
33 var out Schema
34 for _, v := range s {
35 relaxed := &FieldSchema{
36 Name: v.Name,
37 Description: v.Description,
38 Repeated: v.Repeated,
39 Required: false,
40 Type: v.Type,
41 Schema: v.Schema.Relax(),
42 }
43 out = append(out, relaxed)
44 }
45 return out
46 }
47
48
49
50
51
52
53
54
55 func (s Schema) ToJSONFields() ([]byte, error) {
56 var rawSchema []*bq.TableFieldSchema
57 for _, f := range s {
58 rawSchema = append(rawSchema, f.toBQ())
59 }
60
61 return json.MarshalIndent(rawSchema, "", " ")
62 }
63
64
65 type FieldSchema struct {
66
67
68
69
70 Name string
71
72
73 Description string
74
75
76 Repeated bool
77
78 Required bool
79
80
81
82 Type FieldType
83
84
85 PolicyTags *PolicyTagList
86
87
88 Schema Schema
89
90
91
92
93
94
95
96
97 MaxLength int64
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112 Precision int64
113
114
115
116
117
118
119
120
121
122
123 Scale int64
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143 DefaultValueExpression string
144
145
146
147
148
149
150 Collation string
151
152
153
154 RangeElementType *RangeElementType
155 }
156
157 func (fs *FieldSchema) toBQ() *bq.TableFieldSchema {
158 tfs := &bq.TableFieldSchema{
159 Description: fs.Description,
160 Name: fs.Name,
161 Type: string(fs.Type),
162 PolicyTags: fs.PolicyTags.toBQ(),
163 MaxLength: fs.MaxLength,
164 Precision: fs.Precision,
165 Scale: fs.Scale,
166 DefaultValueExpression: fs.DefaultValueExpression,
167 Collation: string(fs.Collation),
168 RangeElementType: fs.RangeElementType.toBQ(),
169 }
170
171 if fs.Repeated {
172 tfs.Mode = "REPEATED"
173 } else if fs.Required {
174 tfs.Mode = "REQUIRED"
175 }
176
177 for _, f := range fs.Schema {
178 tfs.Fields = append(tfs.Fields, f.toBQ())
179 }
180
181 return tfs
182 }
183
184
185 type RangeElementType struct {
186
187
188
189 Type FieldType
190 }
191
192 func (rt *RangeElementType) toBQ() *bq.TableFieldSchemaRangeElementType {
193 if rt == nil {
194 return nil
195 }
196 return &bq.TableFieldSchemaRangeElementType{
197 Type: string(rt.Type),
198 }
199 }
200
201 func bqToRangeElementType(rt *bq.TableFieldSchemaRangeElementType) *RangeElementType {
202 if rt == nil {
203 return nil
204 }
205 return &RangeElementType{
206 Type: FieldType(rt.Type),
207 }
208 }
209
210
211
212 type PolicyTagList struct {
213 Names []string
214 }
215
216 func (ptl *PolicyTagList) toBQ() *bq.TableFieldSchemaPolicyTags {
217 if ptl == nil {
218 return nil
219 }
220 return &bq.TableFieldSchemaPolicyTags{
221 Names: ptl.Names,
222 }
223 }
224
225 func bqToPolicyTagList(pt *bq.TableFieldSchemaPolicyTags) *PolicyTagList {
226 if pt == nil {
227 return nil
228 }
229 return &PolicyTagList{
230 Names: pt.Names,
231 }
232 }
233
234 func (s Schema) toBQ() *bq.TableSchema {
235 var fields []*bq.TableFieldSchema
236 for _, f := range s {
237 fields = append(fields, f.toBQ())
238 }
239 return &bq.TableSchema{Fields: fields}
240 }
241
242 func bqToFieldSchema(tfs *bq.TableFieldSchema) *FieldSchema {
243 fs := &FieldSchema{
244 Description: tfs.Description,
245 Name: tfs.Name,
246 Repeated: tfs.Mode == "REPEATED",
247 Required: tfs.Mode == "REQUIRED",
248 Type: FieldType(tfs.Type),
249 PolicyTags: bqToPolicyTagList(tfs.PolicyTags),
250 MaxLength: tfs.MaxLength,
251 Precision: tfs.Precision,
252 Scale: tfs.Scale,
253 DefaultValueExpression: tfs.DefaultValueExpression,
254 Collation: tfs.Collation,
255 RangeElementType: bqToRangeElementType(tfs.RangeElementType),
256 }
257
258 for _, f := range tfs.Fields {
259 fs.Schema = append(fs.Schema, bqToFieldSchema(f))
260 }
261 return fs
262 }
263
264 func bqToSchema(ts *bq.TableSchema) Schema {
265 if ts == nil {
266 return nil
267 }
268 var s Schema
269 for _, f := range ts.Fields {
270 s = append(s, bqToFieldSchema(f))
271 }
272 return s
273 }
274
275
276 type FieldType string
277
278 const (
279
280 StringFieldType FieldType = "STRING"
281
282 BytesFieldType FieldType = "BYTES"
283
284 IntegerFieldType FieldType = "INTEGER"
285
286 FloatFieldType FieldType = "FLOAT"
287
288 BooleanFieldType FieldType = "BOOLEAN"
289
290 TimestampFieldType FieldType = "TIMESTAMP"
291
292 RecordFieldType FieldType = "RECORD"
293
294 DateFieldType FieldType = "DATE"
295
296 TimeFieldType FieldType = "TIME"
297
298 DateTimeFieldType FieldType = "DATETIME"
299
300
301 NumericFieldType FieldType = "NUMERIC"
302
303
304 GeographyFieldType FieldType = "GEOGRAPHY"
305
306
307 BigNumericFieldType FieldType = "BIGNUMERIC"
308
309 IntervalFieldType FieldType = "INTERVAL"
310
311 JSONFieldType FieldType = "JSON"
312
313 RangeFieldType FieldType = "RANGE"
314 )
315
316 var (
317 errEmptyJSONSchema = errors.New("bigquery: empty JSON schema")
318 fieldTypes = map[FieldType]bool{
319 StringFieldType: true,
320 BytesFieldType: true,
321 IntegerFieldType: true,
322 FloatFieldType: true,
323 BooleanFieldType: true,
324 TimestampFieldType: true,
325 RecordFieldType: true,
326 DateFieldType: true,
327 TimeFieldType: true,
328 DateTimeFieldType: true,
329 NumericFieldType: true,
330 GeographyFieldType: true,
331 BigNumericFieldType: true,
332 IntervalFieldType: true,
333 JSONFieldType: true,
334 RangeFieldType: true,
335 }
336
337 fieldAliases = map[FieldType]FieldType{
338 "BOOL": BooleanFieldType,
339 "FLOAT64": FloatFieldType,
340 "INT64": IntegerFieldType,
341 "STRUCT": RecordFieldType,
342 "DECIMAL": NumericFieldType,
343 "BIGDECIMAL": BigNumericFieldType,
344 }
345 )
346
347 var typeOfByteSlice = reflect.TypeOf([]byte{})
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430 func InferSchema(st interface{}) (Schema, error) {
431 return inferSchemaReflectCached(reflect.TypeOf(st))
432 }
433
434 var schemaCache sync.Map
435
436 type cacheVal struct {
437 schema Schema
438 err error
439 }
440
441 func inferSchemaReflectCached(t reflect.Type) (Schema, error) {
442 var cv cacheVal
443 v, ok := schemaCache.Load(t)
444 if ok {
445 cv = v.(cacheVal)
446 } else {
447 s, err := inferSchemaReflect(t)
448 cv = cacheVal{s, err}
449 schemaCache.Store(t, cv)
450 }
451 return cv.schema, cv.err
452 }
453
454 func inferSchemaReflect(t reflect.Type) (Schema, error) {
455 rec, err := hasRecursiveType(t, nil)
456 if err != nil {
457 return nil, err
458 }
459 if rec {
460 return nil, fmt.Errorf("bigquery: schema inference for recursive type %s", t)
461 }
462 return inferStruct(t)
463 }
464
465 func inferStruct(t reflect.Type) (Schema, error) {
466 switch t.Kind() {
467 case reflect.Ptr:
468 if t.Elem().Kind() != reflect.Struct {
469 return nil, noStructError{t}
470 }
471 t = t.Elem()
472 fallthrough
473
474 case reflect.Struct:
475 return inferFields(t)
476 default:
477 return nil, noStructError{t}
478 }
479 }
480
481
482 func inferFieldSchema(fieldName string, rt reflect.Type, nullable, json bool) (*FieldSchema, error) {
483
484 if nullable && !(rt == typeOfByteSlice || rt.Kind() == reflect.Ptr && rt.Elem().Kind() == reflect.Struct) {
485 return nil, badNullableError{fieldName, rt}
486 }
487
488 if json && !(rt.Kind() == reflect.Struct || rt.Kind() == reflect.Ptr && rt.Elem().Kind() == reflect.Struct) {
489 return nil, badJSONError{fieldName, rt}
490 }
491 switch rt {
492 case typeOfByteSlice:
493 return &FieldSchema{Required: !nullable, Type: BytesFieldType}, nil
494 case typeOfGoTime:
495 return &FieldSchema{Required: true, Type: TimestampFieldType}, nil
496 case typeOfDate:
497 return &FieldSchema{Required: true, Type: DateFieldType}, nil
498 case typeOfTime:
499 return &FieldSchema{Required: true, Type: TimeFieldType}, nil
500 case typeOfDateTime:
501 return &FieldSchema{Required: true, Type: DateTimeFieldType}, nil
502 case typeOfRat:
503
504
505
506
507 return &FieldSchema{Required: !nullable, Type: NumericFieldType}, nil
508 case typeOfIntervalValue:
509 return &FieldSchema{Required: !nullable, Type: IntervalFieldType}, nil
510 case typeOfRangeValue:
511
512
513 return &FieldSchema{Required: !nullable, Type: RangeFieldType}, nil
514 }
515 if ft := nullableFieldType(rt); ft != "" {
516 return &FieldSchema{Required: false, Type: ft}, nil
517 }
518 if isSupportedIntType(rt) || isSupportedUintType(rt) {
519 return &FieldSchema{Required: true, Type: IntegerFieldType}, nil
520 }
521 switch rt.Kind() {
522 case reflect.Slice, reflect.Array:
523 et := rt.Elem()
524 if et != typeOfByteSlice && (et.Kind() == reflect.Slice || et.Kind() == reflect.Array) {
525
526 return nil, unsupportedFieldTypeError{fieldName, rt}
527 }
528 if nullableFieldType(et) != "" {
529
530 return nil, unsupportedFieldTypeError{fieldName, rt}
531 }
532 f, err := inferFieldSchema(fieldName, et, false, false)
533 if err != nil {
534 return nil, err
535 }
536 f.Repeated = true
537 f.Required = false
538 return f, nil
539 case reflect.Ptr:
540 if rt.Elem().Kind() != reflect.Struct {
541 return nil, unsupportedFieldTypeError{fieldName, rt}
542 }
543 fallthrough
544 case reflect.Struct:
545 if json {
546 return &FieldSchema{Required: !nullable, Type: JSONFieldType}, nil
547 }
548
549 nested, err := inferStruct(rt)
550 if err != nil {
551 return nil, err
552 }
553 return &FieldSchema{Required: !nullable, Type: RecordFieldType, Schema: nested}, nil
554 case reflect.String:
555 return &FieldSchema{Required: !nullable, Type: StringFieldType}, nil
556 case reflect.Bool:
557 return &FieldSchema{Required: !nullable, Type: BooleanFieldType}, nil
558 case reflect.Float32, reflect.Float64:
559 return &FieldSchema{Required: !nullable, Type: FloatFieldType}, nil
560 case reflect.Map:
561 if rt.Key().Kind() != reflect.String {
562 return nil, unsupportedFieldTypeError{fieldName, rt}
563 }
564 return &FieldSchema{Required: !nullable, Type: JSONFieldType}, nil
565 default:
566 return nil, unsupportedFieldTypeError{fieldName, rt}
567 }
568 }
569
570
571 func inferFields(rt reflect.Type) (Schema, error) {
572 var s Schema
573 fields, err := fieldCache.Fields(rt)
574 if err != nil {
575 return nil, err
576 }
577 for _, field := range fields {
578 var nullable, json bool
579 for _, opt := range field.ParsedTag.([]string) {
580 if opt == nullableTagOption {
581 nullable = true
582 }
583 if opt == jsonTagOption {
584 json = true
585 }
586 }
587 f, err := inferFieldSchema(field.Name, field.Type, nullable, json)
588 if err != nil {
589 return nil, err
590 }
591 f.Name = field.Name
592 s = append(s, f)
593 }
594 return s, nil
595 }
596
597
598
599 func isSupportedIntType(t reflect.Type) bool {
600 switch t.Kind() {
601 case reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64, reflect.Int:
602 return true
603 default:
604 return false
605 }
606 }
607
608
609
610 func isSupportedUintType(t reflect.Type) bool {
611 switch t.Kind() {
612 case reflect.Uint8, reflect.Uint16, reflect.Uint32:
613 return true
614 default:
615 return false
616 }
617 }
618
619
620 type typeList struct {
621 t reflect.Type
622 next *typeList
623 }
624
625 func (l *typeList) has(t reflect.Type) bool {
626 for l != nil {
627 if l.t == t {
628 return true
629 }
630 l = l.next
631 }
632 return false
633 }
634
635
636
637 func hasRecursiveType(t reflect.Type, seen *typeList) (bool, error) {
638 for t.Kind() == reflect.Ptr || t.Kind() == reflect.Slice || t.Kind() == reflect.Array {
639 t = t.Elem()
640 }
641 if t.Kind() != reflect.Struct {
642 return false, nil
643 }
644 if seen.has(t) {
645 return true, nil
646 }
647 fields, err := fieldCache.Fields(t)
648 if err != nil {
649 return false, err
650 }
651 seen = &typeList{t, seen}
652
653
654 for _, field := range fields {
655 ok, err := hasRecursiveType(field.Type, seen)
656 if err != nil {
657 return false, err
658 }
659 if ok {
660 return true, nil
661 }
662 }
663 return false, nil
664 }
665
666
667 func validateKnownType(in FieldType) (FieldType, error) {
668 if _, ok := fieldTypes[in]; !ok {
669
670 if resolved, ok := fieldAliases[in]; ok {
671 return resolved, nil
672 }
673 return "", fmt.Errorf("unknown field type (%s)", in)
674 }
675 return in, nil
676 }
677
678
679
680
681
682
683
684 func SchemaFromJSON(schemaJSON []byte) (Schema, error) {
685
686
687 if len(schemaJSON) == 0 {
688 return nil, errEmptyJSONSchema
689 }
690
691 var rawSchema []*bq.TableFieldSchema
692
693 if err := json.Unmarshal(schemaJSON, &rawSchema); err != nil {
694 return nil, err
695 }
696
697 convertedSchema := Schema{}
698 for _, f := range rawSchema {
699 convField := bqToFieldSchema(f)
700
701 validType, err := validateKnownType(convField.Type)
702 if err != nil {
703 return nil, err
704 }
705 convField.Type = validType
706 convertedSchema = append(convertedSchema, convField)
707 }
708 return convertedSchema, nil
709 }
710
711 type noStructError struct {
712 typ reflect.Type
713 }
714
715 func (e noStructError) Error() string {
716 return fmt.Sprintf("bigquery: can only infer schema from struct or pointer to struct, not %s", e.typ)
717 }
718
719 type badNullableError struct {
720 name string
721 typ reflect.Type
722 }
723
724 func (e badNullableError) Error() string {
725 return fmt.Sprintf(`bigquery: field %q of type %s: use "nullable" only for []byte and struct pointers; for all other types, use a NullXXX type`, e.name, e.typ)
726 }
727
728 type badJSONError struct {
729 name string
730 typ reflect.Type
731 }
732
733 func (e badJSONError) Error() string {
734 return fmt.Sprintf(`bigquery: field %q of type %s: use "json" only for struct and struct pointers`, e.name, e.typ)
735 }
736
737 type unsupportedFieldTypeError struct {
738 name string
739 typ reflect.Type
740 }
741
742 func (e unsupportedFieldTypeError) Error() string {
743 return fmt.Sprintf("bigquery: field %q: type %s is not supported", e.name, e.typ)
744 }
745
View as plain text