...

Source file src/cloud.google.com/go/bigquery/storage/managedwriter/adapt/schemaconversion.go

Documentation: cloud.google.com/go/bigquery/storage/managedwriter/adapt

     1  // Copyright 2021 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     https://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package adapt
    16  
    17  import (
    18  	"fmt"
    19  
    20  	"cloud.google.com/go/bigquery"
    21  	"cloud.google.com/go/bigquery/storage/apiv1/storagepb"
    22  )
    23  
    24  var fieldTypeMap = map[bigquery.FieldType]storagepb.TableFieldSchema_Type{
    25  	bigquery.StringFieldType:     storagepb.TableFieldSchema_STRING,
    26  	bigquery.BytesFieldType:      storagepb.TableFieldSchema_BYTES,
    27  	bigquery.IntegerFieldType:    storagepb.TableFieldSchema_INT64,
    28  	bigquery.FloatFieldType:      storagepb.TableFieldSchema_DOUBLE,
    29  	bigquery.BooleanFieldType:    storagepb.TableFieldSchema_BOOL,
    30  	bigquery.TimestampFieldType:  storagepb.TableFieldSchema_TIMESTAMP,
    31  	bigquery.RecordFieldType:     storagepb.TableFieldSchema_STRUCT,
    32  	bigquery.DateFieldType:       storagepb.TableFieldSchema_DATE,
    33  	bigquery.TimeFieldType:       storagepb.TableFieldSchema_TIME,
    34  	bigquery.DateTimeFieldType:   storagepb.TableFieldSchema_DATETIME,
    35  	bigquery.NumericFieldType:    storagepb.TableFieldSchema_NUMERIC,
    36  	bigquery.BigNumericFieldType: storagepb.TableFieldSchema_BIGNUMERIC,
    37  	bigquery.GeographyFieldType:  storagepb.TableFieldSchema_GEOGRAPHY,
    38  	bigquery.RangeFieldType:      storagepb.TableFieldSchema_RANGE,
    39  }
    40  
    41  func bqFieldToProto(in *bigquery.FieldSchema) (*storagepb.TableFieldSchema, error) {
    42  	if in == nil {
    43  		return nil, nil
    44  	}
    45  	out := &storagepb.TableFieldSchema{
    46  		Name:        in.Name,
    47  		Description: in.Description,
    48  	}
    49  
    50  	// Type conversion.
    51  	typ, ok := fieldTypeMap[in.Type]
    52  	if !ok {
    53  		return nil, fmt.Errorf("could not convert field (%s) due to unknown type value: %s", in.Name, in.Type)
    54  	}
    55  	out.Type = typ
    56  
    57  	// Mode conversion.  Repeated trumps required.
    58  	out.Mode = storagepb.TableFieldSchema_NULLABLE
    59  	if in.Repeated {
    60  		out.Mode = storagepb.TableFieldSchema_REPEATED
    61  	}
    62  	if !in.Repeated && in.Required {
    63  		out.Mode = storagepb.TableFieldSchema_REQUIRED
    64  	}
    65  
    66  	if in.RangeElementType != nil {
    67  		eleType, ok := fieldTypeMap[in.RangeElementType.Type]
    68  		if !ok {
    69  			return nil, fmt.Errorf("could not convert rante element type in %s: %q", in.Name, in.Type)
    70  		}
    71  		out.RangeElementType = &storagepb.TableFieldSchema_FieldElementType{
    72  			Type: eleType,
    73  		}
    74  	}
    75  
    76  	for _, s := range in.Schema {
    77  		subField, err := bqFieldToProto(s)
    78  		if err != nil {
    79  			return nil, err
    80  		}
    81  		out.Fields = append(out.Fields, subField)
    82  	}
    83  	return out, nil
    84  }
    85  
    86  func protoToBQField(in *storagepb.TableFieldSchema) (*bigquery.FieldSchema, error) {
    87  	if in == nil {
    88  		return nil, nil
    89  	}
    90  	out := &bigquery.FieldSchema{
    91  		Name:        in.GetName(),
    92  		Description: in.GetDescription(),
    93  		Repeated:    in.GetMode() == storagepb.TableFieldSchema_REPEATED,
    94  		Required:    in.GetMode() == storagepb.TableFieldSchema_REQUIRED,
    95  	}
    96  
    97  	typeResolved := false
    98  	for k, v := range fieldTypeMap {
    99  		if v == in.GetType() {
   100  			out.Type = k
   101  			typeResolved = true
   102  			break
   103  		}
   104  	}
   105  	if !typeResolved {
   106  		return nil, fmt.Errorf("could not convert proto type to bigquery type: %v", in.GetType().String())
   107  	}
   108  
   109  	if in.GetRangeElementType() != nil {
   110  		eleType := in.GetRangeElementType().GetType()
   111  		ret := &bigquery.RangeElementType{}
   112  		typeResolved := false
   113  		for k, v := range fieldTypeMap {
   114  			if v == eleType {
   115  				ret.Type = k
   116  				typeResolved = true
   117  				break
   118  			}
   119  		}
   120  		if !typeResolved {
   121  			return nil, fmt.Errorf("could not convert proto range element type to bigquery type: %v", eleType.String())
   122  		}
   123  		out.RangeElementType = ret
   124  	}
   125  
   126  	for _, s := range in.Fields {
   127  		subField, err := protoToBQField(s)
   128  		if err != nil {
   129  			return nil, err
   130  		}
   131  		out.Schema = append(out.Schema, subField)
   132  	}
   133  	return out, nil
   134  }
   135  
   136  // BQSchemaToStorageTableSchema converts a bigquery Schema into the protobuf-based TableSchema used
   137  // by the BigQuery Storage WriteClient.
   138  func BQSchemaToStorageTableSchema(in bigquery.Schema) (*storagepb.TableSchema, error) {
   139  	if in == nil {
   140  		return nil, nil
   141  	}
   142  	out := &storagepb.TableSchema{}
   143  	for _, s := range in {
   144  		converted, err := bqFieldToProto(s)
   145  		if err != nil {
   146  			return nil, err
   147  		}
   148  		out.Fields = append(out.Fields, converted)
   149  	}
   150  	return out, nil
   151  }
   152  
   153  // StorageTableSchemaToBQSchema converts a TableSchema from the BigQuery Storage WriteClient
   154  // into the equivalent BigQuery Schema.
   155  func StorageTableSchemaToBQSchema(in *storagepb.TableSchema) (bigquery.Schema, error) {
   156  	if in == nil {
   157  		return nil, nil
   158  	}
   159  	var out bigquery.Schema
   160  	for _, s := range in.Fields {
   161  		converted, err := protoToBQField(s)
   162  		if err != nil {
   163  			return nil, err
   164  		}
   165  		out = append(out, converted)
   166  	}
   167  	return out, nil
   168  }
   169  

View as plain text