...

Source file src/cloud.google.com/go/bigquery/value.go

Documentation: cloud.google.com/go/bigquery

     1  // Copyright 2015 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //      http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package bigquery
    16  
    17  import (
    18  	"encoding/base64"
    19  	"errors"
    20  	"fmt"
    21  	"math/big"
    22  	"reflect"
    23  	"strconv"
    24  	"strings"
    25  	"time"
    26  
    27  	"cloud.google.com/go/civil"
    28  	bq "google.golang.org/api/bigquery/v2"
    29  )
    30  
    31  // Value stores the contents of a single cell from a BigQuery result.
    32  type Value interface{}
    33  
    34  // ValueLoader stores a slice of Values representing a result row from a Read operation.
    35  // See RowIterator.Next for more information.
    36  type ValueLoader interface {
    37  	Load(v []Value, s Schema) error
    38  }
    39  
    40  // valueList converts a []Value to implement ValueLoader.
    41  type valueList []Value
    42  
    43  // Load stores a sequence of values in a valueList.
    44  // It resets the slice length to zero, then appends each value to it.
    45  func (vs *valueList) Load(v []Value, _ Schema) error {
    46  	*vs = append((*vs)[:0], v...)
    47  	return nil
    48  }
    49  
    50  // valueMap converts a map[string]Value to implement ValueLoader.
    51  type valueMap map[string]Value
    52  
    53  // Load stores a sequence of values in a valueMap.
    54  func (vm *valueMap) Load(v []Value, s Schema) error {
    55  	if *vm == nil {
    56  		*vm = map[string]Value{}
    57  	}
    58  	loadMap(*vm, v, s)
    59  	return nil
    60  }
    61  
    62  func loadMap(m map[string]Value, vals []Value, s Schema) {
    63  	for i, f := range s {
    64  		val := vals[i]
    65  		var v interface{}
    66  		switch {
    67  		case val == nil:
    68  			v = val
    69  		case f.Schema == nil:
    70  			v = val
    71  		case !f.Repeated:
    72  			m2 := map[string]Value{}
    73  			loadMap(m2, val.([]Value), f.Schema)
    74  			v = m2
    75  		default: // repeated and nested
    76  			sval := val.([]Value)
    77  			vs := make([]Value, len(sval))
    78  			for j, e := range sval {
    79  				m2 := map[string]Value{}
    80  				loadMap(m2, e.([]Value), f.Schema)
    81  				vs[j] = m2
    82  			}
    83  			v = vs
    84  		}
    85  
    86  		m[f.Name] = v
    87  	}
    88  }
    89  
    90  type structLoader struct {
    91  	typ reflect.Type // type of struct
    92  	err error
    93  	ops []structLoaderOp
    94  
    95  	vstructp reflect.Value // pointer to current struct value; changed by set
    96  }
    97  
    98  // A setFunc is a function that sets a struct field or slice/array
    99  // element to a value.
   100  type setFunc func(v reflect.Value, val interface{}) error
   101  
   102  // A structLoaderOp instructs the loader to set a struct field to a row value.
   103  type structLoaderOp struct {
   104  	fieldIndex []int
   105  	valueIndex int
   106  	setFunc    setFunc
   107  	repeated   bool
   108  }
   109  
   110  var errNoNulls = errors.New("bigquery: NULL values cannot be read into structs")
   111  
   112  func setAny(v reflect.Value, x interface{}) error {
   113  	if x == nil {
   114  		return errNoNulls
   115  	}
   116  	v.Set(reflect.ValueOf(x))
   117  	return nil
   118  }
   119  
   120  func setInt(v reflect.Value, x interface{}) error {
   121  	if x == nil {
   122  		return errNoNulls
   123  	}
   124  	xx := x.(int64)
   125  	if v.OverflowInt(xx) {
   126  		return fmt.Errorf("bigquery: value %v overflows struct field of type %v", xx, v.Type())
   127  	}
   128  	v.SetInt(xx)
   129  	return nil
   130  }
   131  
   132  func setUint(v reflect.Value, x interface{}) error {
   133  	if x == nil {
   134  		return errNoNulls
   135  	}
   136  	xx := x.(int64)
   137  	if xx < 0 || v.OverflowUint(uint64(xx)) {
   138  		return fmt.Errorf("bigquery: value %v overflows struct field of type %v", xx, v.Type())
   139  	}
   140  	v.SetUint(uint64(xx))
   141  	return nil
   142  }
   143  
   144  func setFloat(v reflect.Value, x interface{}) error {
   145  	if x == nil {
   146  		return errNoNulls
   147  	}
   148  	xx := x.(float64)
   149  	if v.OverflowFloat(xx) {
   150  		return fmt.Errorf("bigquery: value %v overflows struct field of type %v", xx, v.Type())
   151  	}
   152  	v.SetFloat(xx)
   153  	return nil
   154  }
   155  
   156  func setBool(v reflect.Value, x interface{}) error {
   157  	if x == nil {
   158  		return errNoNulls
   159  	}
   160  	v.SetBool(x.(bool))
   161  	return nil
   162  }
   163  
   164  func setString(v reflect.Value, x interface{}) error {
   165  	if x == nil {
   166  		return errNoNulls
   167  	}
   168  	v.SetString(x.(string))
   169  	return nil
   170  }
   171  
   172  func setGeography(v reflect.Value, x interface{}) error {
   173  	if x == nil {
   174  		return errNoNulls
   175  	}
   176  	v.SetString(x.(string))
   177  	return nil
   178  }
   179  
   180  func setJSON(v reflect.Value, x interface{}) error {
   181  	if x == nil {
   182  		return errNoNulls
   183  	}
   184  	v.SetString(x.(string))
   185  	return nil
   186  }
   187  
   188  func setBytes(v reflect.Value, x interface{}) error {
   189  	if x == nil {
   190  		v.SetBytes(nil)
   191  	} else {
   192  		v.SetBytes(x.([]byte))
   193  	}
   194  	return nil
   195  }
   196  
   197  func setNull(v reflect.Value, x interface{}, build func() interface{}) error {
   198  	if x == nil {
   199  		v.Set(reflect.Zero(v.Type()))
   200  	} else {
   201  		n := build()
   202  		v.Set(reflect.ValueOf(n))
   203  	}
   204  	return nil
   205  }
   206  
   207  // set remembers a value for the next call to Load. The value must be
   208  // a pointer to a struct. (This is checked in RowIterator.Next.)
   209  func (sl *structLoader) set(structp interface{}, schema Schema) error {
   210  	if sl.err != nil {
   211  		return sl.err
   212  	}
   213  	sl.vstructp = reflect.ValueOf(structp)
   214  	typ := sl.vstructp.Type().Elem()
   215  	if sl.typ == nil {
   216  		// First call: remember the type and compile the schema.
   217  		sl.typ = typ
   218  		ops, err := compileToOps(typ, schema)
   219  		if err != nil {
   220  			sl.err = err
   221  			return err
   222  		}
   223  		sl.ops = ops
   224  	} else if sl.typ != typ {
   225  		return fmt.Errorf("bigquery: struct type changed from %s to %s", sl.typ, typ)
   226  	}
   227  	return nil
   228  }
   229  
   230  // compileToOps produces a sequence of operations that will set the fields of a
   231  // value of structType to the contents of a row with schema.
   232  func compileToOps(structType reflect.Type, schema Schema) ([]structLoaderOp, error) {
   233  	var ops []structLoaderOp
   234  	fields, err := fieldCache.Fields(structType)
   235  	if err != nil {
   236  		return nil, err
   237  	}
   238  	for i, schemaField := range schema {
   239  		// Look for an exported struct field with the same name as the schema
   240  		// field, ignoring case (BigQuery column names are case-insensitive,
   241  		// and we want to act like encoding/json anyway).
   242  		structField := fields.Match(schemaField.Name)
   243  		if structField == nil {
   244  			// Ignore schema fields with no corresponding struct field.
   245  			continue
   246  		}
   247  		op := structLoaderOp{
   248  			fieldIndex: structField.Index,
   249  			valueIndex: i,
   250  		}
   251  		t := structField.Type
   252  		if schemaField.Repeated {
   253  			if t.Kind() != reflect.Slice && t.Kind() != reflect.Array {
   254  				return nil, fmt.Errorf("bigquery: repeated schema field %s requires slice or array, but struct field %s has type %s",
   255  					schemaField.Name, structField.Name, t)
   256  			}
   257  			t = t.Elem()
   258  			op.repeated = true
   259  		}
   260  		if schemaField.Type == RecordFieldType {
   261  			// Field can be a struct or a pointer to a struct.
   262  			if t.Kind() == reflect.Ptr {
   263  				t = t.Elem()
   264  			}
   265  			if t.Kind() != reflect.Struct {
   266  				return nil, fmt.Errorf("bigquery: field %s has type %s, expected struct or *struct",
   267  					structField.Name, structField.Type)
   268  			}
   269  			nested, err := compileToOps(t, schemaField.Schema)
   270  			if err != nil {
   271  				return nil, err
   272  			}
   273  			op.setFunc = func(v reflect.Value, val interface{}) error {
   274  				return setNested(nested, v, val)
   275  			}
   276  		} else {
   277  			op.setFunc = determineSetFunc(t, schemaField.Type)
   278  			if op.setFunc == nil {
   279  				return nil, fmt.Errorf("bigquery: schema field %s of type %s is not assignable to struct field %s of type %s",
   280  					schemaField.Name, schemaField.Type, structField.Name, t)
   281  			}
   282  		}
   283  		ops = append(ops, op)
   284  	}
   285  	return ops, nil
   286  }
   287  
   288  // determineSetFunc chooses the best function for setting a field of type ftype
   289  // to a value whose schema field type is stype. It returns nil if stype
   290  // is not assignable to ftype.
   291  // determineSetFunc considers only basic types. See compileToOps for
   292  // handling of repetition and nesting.
   293  func determineSetFunc(ftype reflect.Type, stype FieldType) setFunc {
   294  	switch stype {
   295  	case StringFieldType:
   296  		if ftype.Kind() == reflect.String {
   297  			return setString
   298  		}
   299  		if ftype == typeOfNullString {
   300  			return func(v reflect.Value, x interface{}) error {
   301  				return setNull(v, x, func() interface{} {
   302  					return NullString{StringVal: x.(string), Valid: true}
   303  				})
   304  			}
   305  		}
   306  
   307  	case GeographyFieldType:
   308  		if ftype.Kind() == reflect.String {
   309  			return setGeography
   310  		}
   311  		if ftype == typeOfNullGeography {
   312  			return func(v reflect.Value, x interface{}) error {
   313  				return setNull(v, x, func() interface{} {
   314  					return NullGeography{GeographyVal: x.(string), Valid: true}
   315  				})
   316  			}
   317  		}
   318  
   319  	case JSONFieldType:
   320  		if ftype.Kind() == reflect.String {
   321  			return setJSON
   322  		}
   323  		if ftype == typeOfNullJSON {
   324  			return func(v reflect.Value, x interface{}) error {
   325  				return setNull(v, x, func() interface{} {
   326  					return NullJSON{JSONVal: x.(string), Valid: true}
   327  				})
   328  			}
   329  		}
   330  
   331  	case BytesFieldType:
   332  		if ftype == typeOfByteSlice {
   333  			return setBytes
   334  		}
   335  
   336  	case IntegerFieldType:
   337  		if isSupportedUintType(ftype) {
   338  			return setUint
   339  		} else if isSupportedIntType(ftype) {
   340  			return setInt
   341  		}
   342  		if ftype == typeOfNullInt64 {
   343  			return func(v reflect.Value, x interface{}) error {
   344  				return setNull(v, x, func() interface{} {
   345  					return NullInt64{Int64: x.(int64), Valid: true}
   346  				})
   347  			}
   348  		}
   349  
   350  	case FloatFieldType:
   351  		switch ftype.Kind() {
   352  		case reflect.Float32, reflect.Float64:
   353  			return setFloat
   354  		}
   355  		if ftype == typeOfNullFloat64 {
   356  			return func(v reflect.Value, x interface{}) error {
   357  				return setNull(v, x, func() interface{} {
   358  					return NullFloat64{Float64: x.(float64), Valid: true}
   359  				})
   360  			}
   361  		}
   362  
   363  	case BooleanFieldType:
   364  		if ftype.Kind() == reflect.Bool {
   365  			return setBool
   366  		}
   367  		if ftype == typeOfNullBool {
   368  			return func(v reflect.Value, x interface{}) error {
   369  				return setNull(v, x, func() interface{} {
   370  					return NullBool{Bool: x.(bool), Valid: true}
   371  				})
   372  			}
   373  		}
   374  
   375  	case TimestampFieldType:
   376  		if ftype == typeOfGoTime {
   377  			return setAny
   378  		}
   379  		if ftype == typeOfNullTimestamp {
   380  			return func(v reflect.Value, x interface{}) error {
   381  				return setNull(v, x, func() interface{} {
   382  					return NullTimestamp{Timestamp: x.(time.Time), Valid: true}
   383  				})
   384  			}
   385  		}
   386  
   387  	case DateFieldType:
   388  		if ftype == typeOfDate {
   389  			return setAny
   390  		}
   391  		if ftype == typeOfNullDate {
   392  			return func(v reflect.Value, x interface{}) error {
   393  				return setNull(v, x, func() interface{} {
   394  					return NullDate{Date: x.(civil.Date), Valid: true}
   395  				})
   396  			}
   397  		}
   398  
   399  	case TimeFieldType:
   400  		if ftype == typeOfTime {
   401  			return setAny
   402  		}
   403  		if ftype == typeOfNullTime {
   404  			return func(v reflect.Value, x interface{}) error {
   405  				return setNull(v, x, func() interface{} {
   406  					return NullTime{Time: x.(civil.Time), Valid: true}
   407  				})
   408  			}
   409  		}
   410  
   411  	case DateTimeFieldType:
   412  		if ftype == typeOfDateTime {
   413  			return setAny
   414  		}
   415  		if ftype == typeOfNullDateTime {
   416  			return func(v reflect.Value, x interface{}) error {
   417  				return setNull(v, x, func() interface{} {
   418  					return NullDateTime{DateTime: x.(civil.DateTime), Valid: true}
   419  				})
   420  			}
   421  		}
   422  
   423  	case NumericFieldType:
   424  		if ftype == typeOfRat {
   425  			return func(v reflect.Value, x interface{}) error {
   426  				return setNull(v, x, func() interface{} { return x.(*big.Rat) })
   427  			}
   428  		}
   429  
   430  	case BigNumericFieldType:
   431  		if ftype == typeOfRat {
   432  			return func(v reflect.Value, x interface{}) error {
   433  				return setNull(v, x, func() interface{} { return x.(*big.Rat) })
   434  			}
   435  		}
   436  
   437  	case RangeFieldType:
   438  		if ftype == typeOfRangeValue {
   439  			return func(v reflect.Value, x interface{}) error {
   440  				return setNull(v, x, func() interface{} { return x.(*RangeValue) })
   441  			}
   442  		}
   443  
   444  	}
   445  
   446  	return nil
   447  }
   448  
   449  func (sl *structLoader) Load(values []Value, _ Schema) error {
   450  	if sl.err != nil {
   451  		return sl.err
   452  	}
   453  	return runOps(sl.ops, sl.vstructp.Elem(), values)
   454  }
   455  
   456  // runOps executes a sequence of ops, setting the fields of vstruct to the
   457  // supplied values.
   458  func runOps(ops []structLoaderOp, vstruct reflect.Value, values []Value) error {
   459  	for _, op := range ops {
   460  		field := vstruct.FieldByIndex(op.fieldIndex)
   461  		var err error
   462  		if op.repeated {
   463  			err = setRepeated(field, values[op.valueIndex].([]Value), op.setFunc)
   464  		} else {
   465  			err = op.setFunc(field, values[op.valueIndex])
   466  			if errors.Is(err, errNoNulls) {
   467  				f := vstruct.Type().FieldByIndex(op.fieldIndex)
   468  				err = fmt.Errorf("bigquery: NULL cannot be assigned to field `%s` of type %s", f.Name, f.Type.Name())
   469  			}
   470  		}
   471  		if err != nil {
   472  			return err
   473  		}
   474  	}
   475  	return nil
   476  }
   477  
   478  func setNested(ops []structLoaderOp, v reflect.Value, val interface{}) error {
   479  	// v is either a struct or a pointer to a struct.
   480  	if v.Kind() == reflect.Ptr {
   481  		// If the value is nil, set the pointer to nil.
   482  		if val == nil {
   483  			v.Set(reflect.Zero(v.Type()))
   484  			return nil
   485  		}
   486  		// If the pointer is nil, set it to a zero struct value.
   487  		if v.IsNil() {
   488  			v.Set(reflect.New(v.Type().Elem()))
   489  		}
   490  		v = v.Elem()
   491  	}
   492  	return runOps(ops, v, val.([]Value))
   493  }
   494  
   495  func setRepeated(field reflect.Value, vslice []Value, setElem setFunc) error {
   496  	vlen := len(vslice)
   497  	var flen int
   498  	switch field.Type().Kind() {
   499  	case reflect.Slice:
   500  		// Make a slice of the right size, avoiding allocation if possible.
   501  		switch {
   502  		case field.Len() < vlen:
   503  			field.Set(reflect.MakeSlice(field.Type(), vlen, vlen))
   504  		case field.Len() > vlen:
   505  			field.SetLen(vlen)
   506  		}
   507  		flen = vlen
   508  
   509  	case reflect.Array:
   510  		flen = field.Len()
   511  		if flen > vlen {
   512  			// Set extra elements to their zero value.
   513  			z := reflect.Zero(field.Type().Elem())
   514  			for i := vlen; i < flen; i++ {
   515  				field.Index(i).Set(z)
   516  			}
   517  		}
   518  	default:
   519  		return fmt.Errorf("bigquery: impossible field type %s", field.Type())
   520  	}
   521  	for i, val := range vslice {
   522  		if i < flen { // avoid writing past the end of a short array
   523  			if err := setElem(field.Index(i), val); err != nil {
   524  				return err
   525  			}
   526  		}
   527  	}
   528  	return nil
   529  }
   530  
   531  // A ValueSaver returns a row of data to be inserted into a table.
   532  type ValueSaver interface {
   533  	// Save returns a row to be inserted into a BigQuery table, represented
   534  	// as a map from field name to Value.
   535  	// The insertID governs the best-effort deduplication feature of
   536  	// BigQuery streaming inserts.
   537  	//
   538  	// If the insertID is empty, a random insertID will be generated by
   539  	// this library to facilitate deduplication.
   540  	//
   541  	// If the insertID is set to the sentinel value NoDedupeID, an insertID
   542  	// is not sent.
   543  	//
   544  	// For all other non-empty values, BigQuery will use the provided
   545  	// value for best-effort deduplication.
   546  	Save() (row map[string]Value, insertID string, err error)
   547  }
   548  
   549  // ValuesSaver implements ValueSaver for a slice of Values.
   550  type ValuesSaver struct {
   551  	Schema Schema
   552  
   553  	// InsertID governs the best-effort deduplication feature of
   554  	// BigQuery streaming inserts.
   555  	//
   556  	// If the InsertID is empty, a random insertID will be generated by
   557  	// this library to facilitate deduplication.
   558  	//
   559  	// If the InsertID is set to the sentinel value NoDedupeID, an insertID
   560  	// is not sent.
   561  	//
   562  	// For all other non-empty values, BigQuery will use the provided
   563  	// value for best-effort deduplication.
   564  	InsertID string
   565  
   566  	Row []Value
   567  }
   568  
   569  // Save implements ValueSaver.
   570  func (vls *ValuesSaver) Save() (map[string]Value, string, error) {
   571  	m, err := valuesToMap(vls.Row, vls.Schema)
   572  	return m, vls.InsertID, err
   573  }
   574  
   575  func valuesToMap(vs []Value, schema Schema) (map[string]Value, error) {
   576  	if len(vs) != len(schema) {
   577  		return nil, errors.New("schema does not match length of row to be inserted")
   578  	}
   579  
   580  	m := make(map[string]Value)
   581  	for i, fieldSchema := range schema {
   582  		if vs[i] == nil {
   583  			m[fieldSchema.Name] = nil
   584  			continue
   585  		}
   586  		if fieldSchema.Type != RecordFieldType {
   587  			m[fieldSchema.Name] = toUploadValue(vs[i], fieldSchema)
   588  			continue
   589  		}
   590  		// Nested record, possibly repeated.
   591  		vals, ok := vs[i].([]Value)
   592  		if !ok {
   593  			return nil, errors.New("nested record is not a []Value")
   594  		}
   595  		if !fieldSchema.Repeated {
   596  			value, err := valuesToMap(vals, fieldSchema.Schema)
   597  			if err != nil {
   598  				return nil, err
   599  			}
   600  			m[fieldSchema.Name] = value
   601  			continue
   602  		}
   603  		// A repeated nested field is converted into a slice of maps.
   604  		maps := []Value{}
   605  		for _, v := range vals {
   606  			sv, ok := v.([]Value)
   607  			if !ok {
   608  				return nil, errors.New("nested record in slice is not a []Value")
   609  			}
   610  			value, err := valuesToMap(sv, fieldSchema.Schema)
   611  			if err != nil {
   612  				return nil, err
   613  			}
   614  			maps = append(maps, value)
   615  		}
   616  		m[fieldSchema.Name] = maps
   617  	}
   618  	return m, nil
   619  }
   620  
   621  // StructSaver implements ValueSaver for a struct.
   622  // The struct is converted to a map of values by using the values of struct
   623  // fields corresponding to schema fields. Additional and missing
   624  // fields are ignored, as are nested struct pointers that are nil.
   625  type StructSaver struct {
   626  	// Schema determines what fields of the struct are uploaded. It should
   627  	// match the table's schema.
   628  	// Schema is optional for StructSavers that are passed to Uploader.Put.
   629  	Schema Schema
   630  
   631  	// InsertID governs the best-effort deduplication feature of
   632  	// BigQuery streaming inserts.
   633  	//
   634  	// If the InsertID is empty, a random InsertID will be generated by
   635  	// this library to facilitate deduplication.
   636  	//
   637  	// If the InsertID is set to the sentinel value NoDedupeID, an InsertID
   638  	// is not sent.
   639  	//
   640  	// For all other non-empty values, BigQuery will use the provided
   641  	// value for best-effort deduplication.
   642  	InsertID string
   643  
   644  	// Struct should be a struct or a pointer to a struct.
   645  	Struct interface{}
   646  }
   647  
   648  // Save implements ValueSaver.
   649  func (ss *StructSaver) Save() (row map[string]Value, insertID string, err error) {
   650  	vstruct := reflect.ValueOf(ss.Struct)
   651  	row, err = structToMap(vstruct, ss.Schema)
   652  	if err != nil {
   653  		return nil, "", err
   654  	}
   655  	return row, ss.InsertID, nil
   656  }
   657  
   658  func structToMap(vstruct reflect.Value, schema Schema) (map[string]Value, error) {
   659  	if vstruct.Kind() == reflect.Ptr {
   660  		vstruct = vstruct.Elem()
   661  	}
   662  	if !vstruct.IsValid() {
   663  		return nil, nil
   664  	}
   665  	m := map[string]Value{}
   666  	if vstruct.Kind() != reflect.Struct {
   667  		return nil, fmt.Errorf("bigquery: type is %s, need struct or struct pointer", vstruct.Type())
   668  	}
   669  	fields, err := fieldCache.Fields(vstruct.Type())
   670  	if err != nil {
   671  		return nil, err
   672  	}
   673  	for _, schemaField := range schema {
   674  		// Look for an exported struct field with the same name as the schema
   675  		// field, ignoring case.
   676  		structField := fields.Match(schemaField.Name)
   677  		if structField == nil {
   678  			continue
   679  		}
   680  		val, err := structFieldToUploadValue(vstruct.FieldByIndex(structField.Index), schemaField)
   681  		if err != nil {
   682  			return nil, err
   683  		}
   684  		// Add the value to the map, unless it is nil.
   685  		if val != nil {
   686  			m[schemaField.Name] = val
   687  		}
   688  	}
   689  	return m, nil
   690  }
   691  
   692  // structFieldToUploadValue converts a struct field to a value suitable for ValueSaver.Save, using
   693  // the schemaField as a guide.
   694  // structFieldToUploadValue is careful to return a true nil interface{} when needed, so its
   695  // caller can easily identify a nil value.
   696  func structFieldToUploadValue(vfield reflect.Value, schemaField *FieldSchema) (interface{}, error) {
   697  	if schemaField.Repeated && (vfield.Kind() != reflect.Slice && vfield.Kind() != reflect.Array) {
   698  		return nil, fmt.Errorf("bigquery: repeated schema field %s requires slice or array, but value has type %s",
   699  			schemaField.Name, vfield.Type())
   700  	}
   701  
   702  	// A non-nested field can be represented by its Go value, except for some types.
   703  	if schemaField.Type != RecordFieldType {
   704  		return toUploadValueReflect(vfield, schemaField), nil
   705  	}
   706  	// A non-repeated nested field is converted into a map[string]Value.
   707  	if !schemaField.Repeated {
   708  		m, err := structToMap(vfield, schemaField.Schema)
   709  		if err != nil {
   710  			return nil, err
   711  		}
   712  		if m == nil {
   713  			return nil, nil
   714  		}
   715  		return m, nil
   716  	}
   717  	// A repeated nested field is converted into a slice of maps.
   718  	// If the field is zero-length (but not nil), we return a zero-length []Value.
   719  	if vfield.IsNil() {
   720  		return nil, nil
   721  	}
   722  	vals := []Value{}
   723  	for i := 0; i < vfield.Len(); i++ {
   724  		m, err := structToMap(vfield.Index(i), schemaField.Schema)
   725  		if err != nil {
   726  			return nil, err
   727  		}
   728  		vals = append(vals, m)
   729  	}
   730  	return vals, nil
   731  }
   732  
   733  func toUploadValue(val interface{}, fs *FieldSchema) interface{} {
   734  	if fs.Type == TimeFieldType || fs.Type == DateTimeFieldType || fs.Type == NumericFieldType || fs.Type == BigNumericFieldType {
   735  		return toUploadValueReflect(reflect.ValueOf(val), fs)
   736  	}
   737  	return val
   738  }
   739  
   740  func toUploadValueReflect(v reflect.Value, fs *FieldSchema) interface{} {
   741  	switch fs.Type {
   742  	case TimeFieldType:
   743  		if v.Type() == typeOfNullTime {
   744  			return v.Interface()
   745  		}
   746  		return formatUploadValue(v, fs, func(v reflect.Value) string {
   747  			return CivilTimeString(v.Interface().(civil.Time))
   748  		})
   749  	case DateTimeFieldType:
   750  		if v.Type() == typeOfNullDateTime {
   751  			return v.Interface()
   752  		}
   753  		return formatUploadValue(v, fs, func(v reflect.Value) string {
   754  			return CivilDateTimeString(v.Interface().(civil.DateTime))
   755  		})
   756  	case NumericFieldType:
   757  		if r, ok := v.Interface().(*big.Rat); ok && r == nil {
   758  			return nil
   759  		}
   760  		return formatUploadValue(v, fs, func(v reflect.Value) string {
   761  			return NumericString(v.Interface().(*big.Rat))
   762  		})
   763  	case BigNumericFieldType:
   764  		if r, ok := v.Interface().(*big.Rat); ok && r == nil {
   765  			return nil
   766  		}
   767  		return formatUploadValue(v, fs, func(v reflect.Value) string {
   768  			return BigNumericString(v.Interface().(*big.Rat))
   769  		})
   770  	case IntervalFieldType:
   771  		if r, ok := v.Interface().(*IntervalValue); ok && r == nil {
   772  			return nil
   773  		}
   774  		return formatUploadValue(v, fs, func(v reflect.Value) string {
   775  			return IntervalString(v.Interface().(*IntervalValue))
   776  		})
   777  	case RangeFieldType:
   778  		return v.Interface()
   779  	default:
   780  		if !fs.Repeated || v.Len() > 0 {
   781  			return v.Interface()
   782  		}
   783  		// The service treats a null repeated field as an error. Return
   784  		// nil to omit the field entirely.
   785  		return nil
   786  	}
   787  }
   788  
   789  func formatUploadValue(v reflect.Value, fs *FieldSchema, cvt func(reflect.Value) string) interface{} {
   790  	if !fs.Repeated {
   791  		return cvt(v)
   792  	}
   793  	if v.Len() == 0 {
   794  		return nil
   795  	}
   796  	s := make([]string, v.Len())
   797  	for i := 0; i < v.Len(); i++ {
   798  		s[i] = cvt(v.Index(i))
   799  	}
   800  	return s
   801  }
   802  
   803  // CivilTimeString returns a string representing a civil.Time in a format compatible
   804  // with BigQuery SQL. It rounds the time to the nearest microsecond and returns a
   805  // string with six digits of sub-second precision.
   806  //
   807  // Use CivilTimeString when using civil.Time in DML, for example in INSERT
   808  // statements.
   809  func CivilTimeString(t civil.Time) string {
   810  	if t.Nanosecond == 0 {
   811  		return t.String()
   812  	}
   813  	micro := (t.Nanosecond + 500) / 1000 // round to nearest microsecond
   814  	t.Nanosecond = 0
   815  	return t.String() + fmt.Sprintf(".%06d", micro)
   816  }
   817  
   818  // CivilDateTimeString returns a string representing a civil.DateTime in a format compatible
   819  // with BigQuery SQL. It separate the date and time with a space, and formats the time
   820  // with CivilTimeString.
   821  //
   822  // Use CivilDateTimeString when using civil.DateTime in DML, for example in INSERT
   823  // statements.
   824  func CivilDateTimeString(dt civil.DateTime) string {
   825  	return dt.Date.String() + " " + CivilTimeString(dt.Time)
   826  }
   827  
   828  // parseCivilDateTime parses a date-time represented in a BigQuery SQL
   829  // compatible format and returns a civil.DateTime.
   830  func parseCivilDateTime(s string) (civil.DateTime, error) {
   831  	parts := strings.Fields(s)
   832  	if len(parts) != 2 {
   833  		return civil.DateTime{}, fmt.Errorf("bigquery: bad DATETIME value %q", s)
   834  	}
   835  	return civil.ParseDateTime(parts[0] + "T" + parts[1])
   836  }
   837  
   838  const (
   839  	// NumericPrecisionDigits is the maximum number of digits in a NUMERIC value.
   840  	NumericPrecisionDigits = 38
   841  
   842  	// NumericScaleDigits is the maximum number of digits after the decimal point in a NUMERIC value.
   843  	NumericScaleDigits = 9
   844  
   845  	// BigNumericPrecisionDigits is the maximum number of full digits in a BIGNUMERIC value.
   846  	BigNumericPrecisionDigits = 76
   847  
   848  	// BigNumericScaleDigits is the maximum number of full digits in a BIGNUMERIC value.
   849  	BigNumericScaleDigits = 38
   850  )
   851  
   852  // NumericString returns a string representing a *big.Rat in a format compatible
   853  // with BigQuery SQL. It returns a floating-point literal with 9 digits
   854  // after the decimal point.
   855  func NumericString(r *big.Rat) string {
   856  	return r.FloatString(NumericScaleDigits)
   857  }
   858  
   859  // BigNumericString returns a string representing a *big.Rat in a format compatible with BigQuery
   860  // SQL.  It returns a floating point literal with 38 digits after the decimal point.
   861  func BigNumericString(r *big.Rat) string {
   862  	return r.FloatString(BigNumericScaleDigits)
   863  }
   864  
   865  // IntervalString returns a string  representing an *IntervalValue in a format compatible with
   866  // BigQuery SQL.  It returns an interval literal in canonical format.
   867  func IntervalString(iv *IntervalValue) string {
   868  	return iv.String()
   869  }
   870  
   871  // convertRows converts a series of TableRows into a series of Value slices.
   872  // schema is used to interpret the data from rows; its length must match the
   873  // length of each row.
   874  func convertRows(rows []*bq.TableRow, schema Schema) ([][]Value, error) {
   875  	var rs [][]Value
   876  	for _, r := range rows {
   877  		row, err := convertRow(r, schema)
   878  		if err != nil {
   879  			return nil, err
   880  		}
   881  		rs = append(rs, row)
   882  	}
   883  	return rs, nil
   884  }
   885  
   886  func convertRow(r *bq.TableRow, schema Schema) ([]Value, error) {
   887  	if len(schema) != len(r.F) {
   888  		return nil, errors.New("schema length does not match row length")
   889  	}
   890  	var values []Value
   891  	for i, cell := range r.F {
   892  		fs := schema[i]
   893  		var v Value
   894  		var err error
   895  		if fs.Type == RangeFieldType {
   896  			// interception range conversion here, as we don't propagate range element type more deeply.
   897  			if fs.RangeElementType == nil {
   898  				return nil, errors.New("bigquery: incomplete range schema for conversion")
   899  			}
   900  			v, err = convertRangeValue(cell.V.(string), fs.RangeElementType.Type)
   901  		} else {
   902  			v, err = convertValue(cell.V, fs.Type, fs.Schema)
   903  		}
   904  		if err != nil {
   905  			return nil, err
   906  		}
   907  		values = append(values, v)
   908  	}
   909  	return values, nil
   910  }
   911  
   912  func convertValue(val interface{}, typ FieldType, schema Schema) (Value, error) {
   913  	switch val := val.(type) {
   914  	case nil:
   915  		return nil, nil
   916  	case []interface{}:
   917  		return convertRepeatedRecord(val, typ, schema)
   918  	case map[string]interface{}:
   919  		return convertNestedRecord(val, schema)
   920  	case string:
   921  		return convertBasicType(val, typ)
   922  	default:
   923  		return nil, fmt.Errorf("got value %v; expected a value of type %s", val, typ)
   924  	}
   925  }
   926  
   927  func convertRepeatedRecord(vals []interface{}, typ FieldType, schema Schema) (Value, error) {
   928  	var values []Value
   929  	for _, cell := range vals {
   930  		// each cell contains a single entry, keyed by "v"
   931  		val := cell.(map[string]interface{})["v"]
   932  		v, err := convertValue(val, typ, schema)
   933  		if err != nil {
   934  			return nil, err
   935  		}
   936  		values = append(values, v)
   937  	}
   938  	return values, nil
   939  }
   940  
   941  func convertNestedRecord(val map[string]interface{}, schema Schema) (Value, error) {
   942  	// convertNestedRecord is similar to convertRow, as a record has the same structure as a row.
   943  
   944  	// Nested records are wrapped in a map with a single key, "f".
   945  	record := val["f"].([]interface{})
   946  	if len(record) != len(schema) {
   947  		return nil, errors.New("schema length does not match record length")
   948  	}
   949  
   950  	var values []Value
   951  	for i, cell := range record {
   952  		// each cell contains a single entry, keyed by "v"
   953  		val := cell.(map[string]interface{})["v"]
   954  		fs := schema[i]
   955  		v, err := convertValue(val, fs.Type, fs.Schema)
   956  		if err != nil {
   957  			return nil, err
   958  		}
   959  		values = append(values, v)
   960  	}
   961  	return values, nil
   962  }
   963  
   964  // convertBasicType returns val as an interface with a concrete type specified by typ.
   965  func convertBasicType(val string, typ FieldType) (Value, error) {
   966  	switch typ {
   967  	case StringFieldType:
   968  		return val, nil
   969  	case BytesFieldType:
   970  		return base64.StdEncoding.DecodeString(val)
   971  	case IntegerFieldType:
   972  		return strconv.ParseInt(val, 10, 64)
   973  	case FloatFieldType:
   974  		return strconv.ParseFloat(val, 64)
   975  	case BooleanFieldType:
   976  		return strconv.ParseBool(val)
   977  	case TimestampFieldType:
   978  		i, err := strconv.ParseInt(val, 10, 64)
   979  		if err != nil {
   980  			return nil, err
   981  		}
   982  		return time.UnixMicro(i).UTC(), nil
   983  	case DateFieldType:
   984  		return civil.ParseDate(val)
   985  	case TimeFieldType:
   986  		return civil.ParseTime(val)
   987  	case DateTimeFieldType:
   988  		return civil.ParseDateTime(val)
   989  	case NumericFieldType:
   990  		r, ok := (&big.Rat{}).SetString(val)
   991  		if !ok {
   992  			return nil, fmt.Errorf("bigquery: invalid NUMERIC value %q", val)
   993  		}
   994  		return Value(r), nil
   995  	case BigNumericFieldType:
   996  		r, ok := (&big.Rat{}).SetString(val)
   997  		if !ok {
   998  			return nil, fmt.Errorf("bigquery: invalid BIGNUMERIC value %q", val)
   999  		}
  1000  		return Value(r), nil
  1001  	case GeographyFieldType:
  1002  		return val, nil
  1003  	case JSONFieldType:
  1004  		return val, nil
  1005  	case IntervalFieldType:
  1006  		i, err := ParseInterval(val)
  1007  		if err != nil {
  1008  			return nil, fmt.Errorf("bigquery: invalid INTERVAL value %q", val)
  1009  		}
  1010  		return Value(i), nil
  1011  	default:
  1012  		return nil, fmt.Errorf("unrecognized type: %s", typ)
  1013  	}
  1014  }
  1015  
  1016  // how BQ declares an unbounded RANGE.
  1017  var unboundedRangeSentinel = "UNBOUNDED"
  1018  
  1019  // convertRangeValue aids in parsing the compound RANGE api data representation.
  1020  // The format for a range value is: "[startval, endval)"
  1021  func convertRangeValue(val string, elementType FieldType) (Value, error) {
  1022  	supported := false
  1023  	for _, t := range []FieldType{DateFieldType, DateTimeFieldType, TimestampFieldType} {
  1024  		if elementType == t {
  1025  			supported = true
  1026  			break
  1027  		}
  1028  	}
  1029  	if !supported {
  1030  		return nil, fmt.Errorf("bigquery: invalid RANGE element type %q", elementType)
  1031  	}
  1032  	if !strings.HasPrefix(val, "[") || !strings.HasSuffix(val, ")") {
  1033  		return nil, fmt.Errorf("bigquery: invalid RANGE value %q", val)
  1034  	}
  1035  	// trim the leading/trailing characters
  1036  	val = val[1 : len(val)-1]
  1037  	parts := strings.Split(val, ", ")
  1038  	if len(parts) != 2 {
  1039  		return nil, fmt.Errorf("bigquery: invalid RANGE value %q", val)
  1040  	}
  1041  	rv := &RangeValue{}
  1042  	if parts[0] != unboundedRangeSentinel {
  1043  		sv, err := convertBasicType(parts[0], elementType)
  1044  		if err != nil {
  1045  			return nil, fmt.Errorf("bigquery: invalid RANGE start value %q", parts[0])
  1046  		}
  1047  		rv.Start = sv
  1048  	}
  1049  	if parts[1] != unboundedRangeSentinel {
  1050  		ev, err := convertBasicType(parts[1], elementType)
  1051  		if err != nil {
  1052  			return nil, fmt.Errorf("bigquery: invalid RANGE end value %q", parts[1])
  1053  		}
  1054  		rv.End = ev
  1055  	}
  1056  	return rv, nil
  1057  }
  1058  

View as plain text